Merge pull request #8802 from owncloud/servers_startup

feat: add runners to startup the ocis' services
This commit is contained in:
kobergj
2024-04-29 13:52:31 +02:00
committed by GitHub
8 changed files with 1219 additions and 0 deletions
@@ -0,0 +1,6 @@
Enhancement: Prepare runners to start the services
The runners will improve and make service startup easier. The runner's
behavior is more predictable with clear expectations.
https://github.com/owncloud/ocis/pull/8802
+262
View File
@@ -0,0 +1,262 @@
package runner
import (
"context"
"sync"
"sync/atomic"
"time"
)
// GroupRunner represent a group of tasks that need to run together.
// The expectation is that all the tasks will run at the same time, and when
// one of them stops, the rest will also stop.
//
// The GroupRunner is intended to be used to run multiple services, which are
// more or less independent from eachother, but at the same time it doesn't
// make sense to have any of them stopped while the rest are running.
// Basically, either all of them run, or none of them.
// For example, you can have a GRPC and HTTP servers running, each of them
// providing a piece of functionality, however, if any of them fails, the
// feature provided by them would be incomplete or broken.
//
// The interrupt duration for the group can be set through the
// `WithInterruptDuration` option. If the option isn't supplied, the default
// value (15 secs) will be used.
//
// It's recommended that the timeouts are handled by each runner individually,
// meaning that each runner's timeout should be less than the group runner's
// timeout. This way, we can know which runner timed out.
// If the group timeout is reached, the remaining results will have the
// runner's id as "_unknown_".
//
// Note that, as services, the task aren't expected to stop by default.
// This means that, if a task finishes naturally, the rest of the task will
// asked to stop as well.
type GroupRunner struct {
runners sync.Map
runnersCount int
isRunning bool
interruptDur time.Duration
interrupted atomic.Bool
interruptedCh chan time.Duration
runningMutex sync.Mutex
}
// NewGroup will create a GroupRunner
func NewGroup(opts ...Option) *GroupRunner {
options := Options{
InterruptDuration: DefaultGroupInterruptDuration,
}
for _, o := range opts {
o(&options)
}
return &GroupRunner{
runners: sync.Map{},
runningMutex: sync.Mutex{},
interruptDur: options.InterruptDuration,
interruptedCh: make(chan time.Duration, 1),
}
}
// Add will add a runner to the group.
//
// It's mandatory that each runner in the group has an unique id, otherwise
// there will be issues
// Adding new runners once the group starts will cause a panic
func (gr *GroupRunner) Add(r *Runner) {
gr.runningMutex.Lock()
defer gr.runningMutex.Unlock()
if gr.isRunning {
panic("Adding a new runner after the group starts is forbidden")
}
// LoadOrStore will try to store the runner
if _, loaded := gr.runners.LoadOrStore(r.ID, r); loaded {
// there is already a runner with the same id, which is forbidden
panic("Trying to add a runner with an existing Id in the group")
}
// Only increase the count if a runner is stored.
// Currently panicking if the runner exists and is loaded
gr.runnersCount++
}
// Run will execute all the tasks in the group at the same time.
//
// Similarly to the "regular" runner's `Run` method, the execution thread
// will be blocked here until all tasks are completed, and their results
// will be available (each result will have the runner's id so it's easy to
// find which one failed). Note that there is no guarantee about the result's
// order, so the first result in the slice might or might not be the first
// result to be obtained.
//
// When the context is marked as done, the groupRunner will call all the
// stoppers for each runner to notify each task to stop. Note that the tasks
// might still take a while to complete.
//
// If a task finishes naturally (with the context still "alive"), it will also
// cause the groupRunner to call the stoppers of the rest of the tasks. So if
// a task finishes, the rest will also finish.
// Note that it is NOT expected for the finished task's stopper to be called
// in this case.
func (gr *GroupRunner) Run(ctx context.Context) []*Result {
// Set the flag inside the runningMutex to ensure we don't read the old value
// in the `Add` method and add a new runner when this method is being executed
// Note that if multiple `Run` or `RunAsync` happens, the underlying runners
// will panic
gr.runningMutex.Lock()
gr.isRunning = true
gr.runningMutex.Unlock()
results := make([]*Result, 0, gr.runnersCount)
ch := make(chan *Result, gr.runnersCount) // no need to block writing results
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
r.RunAsync(ch)
return true
})
var d time.Duration
// wait for a result or for the context to be done
select {
case result := <-ch:
results = append(results, result)
case d = <-gr.interruptedCh:
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
case <-ctx.Done():
// Do nothing
}
// interrupt the rest of the runners
gr.Interrupt()
// Having notified that the context has been finished, we still need to
// wait for the rest of the results
for i := len(results); i < gr.runnersCount; i++ {
select {
case result := <-ch:
results = append(results, result)
case d2, ok := <-gr.interruptedCh:
if ok {
d = d2
}
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
}
}
// Even if we reach the group time out and bail out early, tasks might
// be running and eventually deliver the result through the channel.
// We'll rely on the buffered channel so the tasks won't block and the
// data can be eventually garbage-collected along with the unused
// channel, so we won't close the channel here.
return results
}
// RunAsync will execute the tasks in the group asynchronously.
// The result of each task will be placed in the provided channel as soon
// as it's available.
// Note that this method will finish as soon as all the tasks are running.
func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
// Set the flag inside the runningMutex to ensure we don't read the old value
// in the `Add` method and add a new runner when this method is being executed
// Note that if multiple `Run` or `RunAsync` happens, the underlying runners
// will panic
gr.runningMutex.Lock()
gr.isRunning = true
gr.runningMutex.Unlock()
// we need a secondary channel to receive the first result so we can
// interrupt the rest of the tasks
interCh := make(chan *Result, gr.runnersCount)
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
r.RunAsync(interCh)
return true
})
go func() {
var result *Result
var d time.Duration
select {
case result = <-interCh:
// result already assigned, so do nothing
case d = <-gr.interruptedCh:
// we aren't tracking which runners have finished and which are still
// running, so we'll use "_unknown_" as runner id
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
gr.Interrupt()
ch <- result
for i := 1; i < gr.runnersCount; i++ {
select {
case result = <-interCh:
// result already assigned, so do nothing
case d2, ok := <-gr.interruptedCh:
// if ok is true, d2 will have a good value; if false, the channel
// is closed and we get a default value
if ok {
d = d2
}
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
ch <- result
}
}()
}
// Interrupt will execute the stopper function of ALL the tasks, which should
// notify the tasks in order for them to finish.
// The stoppers will be called immediately but sequentially. This means that
// the second stopper won't be called until the first one has returned. This
// usually isn't a problem because the service `Stop`'s methods either don't
// take a long time to return, or they run asynchronously in another goroutine.
//
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
//
// The interrupt timeout for the group will start after all the runners in the
// group have been notified. Note that, if the task's stopper for a runner
// takes a lot of time to return, it will delay the timeout's start, so it's
// advised that the stopper either returns fast or is run asynchronously.
func (gr *GroupRunner) Interrupt() {
if gr.interrupted.CompareAndSwap(false, true) {
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
select {
case <-r.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
r.Interrupt()
}
return true
})
_ = time.AfterFunc(gr.interruptDur, func() {
// timeout reached -> send it through the channel so our runner
// can abort
gr.interruptedCh <- gr.interruptDur
close(gr.interruptedCh)
})
}
}
+293
View File
@@ -0,0 +1,293 @@
package runner_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
)
var _ = Describe("GroupRunner", func() {
var (
gr *runner.GroupRunner
)
BeforeEach(func() {
gr = runner.NewGroup()
task1Ch := make(chan error)
task1 := TimedTask(task1Ch, 30*time.Second)
gr.Add(runner.New("task1", task1, func() {
task1Ch <- nil
close(task1Ch)
}))
task2Ch := make(chan error)
task2 := TimedTask(task2Ch, 20*time.Second)
gr.Add(runner.New("task2", task2, func() {
task2Ch <- nil
close(task2Ch)
}))
})
Describe("Add", func() {
It("Duplicated runner id panics", func() {
Expect(func() {
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
}).To(Panic())
})
It("Add after run panics", func(ctx SpecContext) {
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
// context is done in 1 sec, so all task should be interrupted and finish
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "task1", RunnerError: nil},
&runner.Result{RunnerID: "task2", RunnerError: nil},
)))
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 6*time.Second)
Expect(func() {
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
}).To(Panic())
}, SpecTimeout(5*time.Second))
It("Add after runAsync panics", func(ctx SpecContext) {
ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
Expect(func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
}).To(Panic())
}, SpecTimeout(5*time.Second))
})
Describe("Run", func() {
It("Context is done", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
// context is done in 1 sec, so all task should be interrupted and finish
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "task1", RunnerError: nil},
&runner.Result{RunnerID: "task2", RunnerError: nil},
&runner.Result{RunnerID: "task3", RunnerError: nil},
)))
}, SpecTimeout(5*time.Second))
It("One task finishes early", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
// context will be done in 10 second
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
// task3 finishes in 1 sec, so the rest should also be interrupted
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "task1", RunnerError: nil},
&runner.Result{RunnerID: "task2", RunnerError: nil},
&runner.Result{RunnerID: "task3", RunnerError: nil},
)))
}, SpecTimeout(5*time.Second))
It("Context done and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
// context finishes in 1 sec, tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))
It("Interrupted and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
// context will be done in 10 second
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
gr.Interrupt()
// tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))
It("Doble run panics", func(ctx SpecContext) {
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
Expect(func() {
gr.Run(myCtx)
gr.Run(myCtx)
}).To(Panic())
}, SpecTimeout(5*time.Second))
})
Describe("RunAsync", func() {
It("Wait in channel", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
// task3 finishes in 1 sec, so the rest should also be interrupted
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
}, SpecTimeout(5*time.Second))
It("Double runAsync panics", func(ctx SpecContext) {
ch2 := make(chan *runner.Result)
Expect(func() {
gr.RunAsync(ch2)
gr.RunAsync(ch2)
}).To(Panic())
}, SpecTimeout(5*time.Second))
It("Interrupt async", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
}))
ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
gr.Interrupt()
// tasks will be interrupted
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
}, SpecTimeout(5*time.Second))
It("Interrupt async group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
gr.Interrupt()
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
}, SpecTimeout(5*time.Second))
})
})
+30
View File
@@ -0,0 +1,30 @@
package runner
import (
"time"
)
var (
// DefaultInterruptDuration is the default value for the `WithInterruptDuration`
// for the "regular" runners. This global value can be adjusted if needed.
DefaultInterruptDuration = 10 * time.Second
// DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration`
// for the group runners. This global value can be adjusted if needed.
DefaultGroupInterruptDuration = 15 * time.Second
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
InterruptDuration time.Duration
}
// WithInterruptDuration provides a function to set the interrupt
// duration option.
func WithInterruptDuration(val time.Duration) Option {
return func(o *Options) {
o.InterruptDuration = val
}
}
+201
View File
@@ -0,0 +1,201 @@
package runner
import (
"context"
"sync/atomic"
"time"
)
// Runner represents the one executing a long running task, such as a server
// or a service.
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
//
// Runners are intended to be used only once. Reusing them isn't possible.
// You'd need to create a new runner if you want to rerun the same task.
type Runner struct {
ID string
interruptDur time.Duration
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
interruptedCh chan time.Duration
finished chan struct{}
}
// New will create a new runner.
// The runner will be created with the provided id (the id must be unique,
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
//
// The interrupt duration, which can be set through the `WithInterruptDuration`
// option, will be used to ensure the runner doesn't block forever. If the
// option isn't supplied, the default value (10 secs) will be used.
// The interrupt duration will be used to start a timeout when the
// runner gets interrupted (either the context of the `Run` method is done
// or this runner's `Interrupt` method is called). If the timeout is reached,
// a timeout result will be returned instead of whatever result the task should
// be returning.
//
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func New(id string, fn Runable, interrupt Stopper, opts ...Option) *Runner {
options := Options{
InterruptDuration: DefaultInterruptDuration,
}
for _, o := range opts {
o(&options)
}
return &Runner{
ID: id,
interruptDur: options.InterruptDuration,
fn: fn,
interrupt: interrupt,
interruptedCh: make(chan time.Duration, 1),
finished: make(chan struct{}),
}
}
// Run will execute the task associated to this runner in a synchronous way.
// The task will be spawned in a new goroutine, and the current thread will
// wait until the task finishes.
//
// The task will finish "naturally". The stopper will be called in the
// following ways:
// - Manually calling this runner's `Interrupt` method
// - When the provided context is done
// As said, it's expected that calling the provided stopper will be enough to
// make the task to eventually complete.
//
// Once the task finishes, the result will be returned.
// When the context is done, or if the runner is interrupted, a timeout will
// start using the provided "interrupt duration". If this timeout is reached,
// a timeout result will be returned instead of the one from the task. This is
// intended to prevent blocking the main thread indefinitely. A suitable
// duration should be used depending on the task, usually 5, 10 or 30 secs
//
// Some nice things you can do:
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
// shutdown procedure when an OS signal is received
// - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task
// for a limited time
func (r *Runner) Run(ctx context.Context) *Result {
if !r.running.CompareAndSwap(false, true) {
// If not swapped, the task is already running.
// Running the same task multiple times is a bug, so we panic
panic("Runner with id " + r.ID + " was running twice")
}
ch := make(chan *Result)
go r.doTask(ch, true)
select {
case result := <-ch:
return result
case <-ctx.Done():
r.Interrupt()
return <-ch
}
}
// RunAsync will execute the task associated to this runner asynchronously.
// The task will be spawned in a new goroutine and this method will finish.
// The task's result will be written in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
// to use a blocking or non-blocking channel, but the task will always finish
// before writing in the channel.
//
// To interrupt the running task, the only option is to call the `Interrupt`
// method at some point.
func (r *Runner) RunAsync(ch chan<- *Result) {
if !r.running.CompareAndSwap(false, true) {
// If not swapped, the task is already running.
// Running the same task multiple times is a bug, so we panic
panic("Runner with id " + r.ID + " was running twice")
}
go r.doTask(ch, false)
}
// Interrupt will execute the stopper function, which should notify the task
// in order for it to finish.
// The stopper will be called immediately, although it's expected the
// consequences to take a while (task might need a while to stop)
// A timeout will start using the provided "interrupt duration". Once that
// timeout is reached, the task must provide a result with a timeout error.
// Note that, even after returning the timeout result, the task could still
// be being executed and consuming resource.
// This method will be called only once. Further calls won't do anything
func (r *Runner) Interrupt() {
if r.interrupted.CompareAndSwap(false, true) {
go func() {
select {
case <-r.Finished():
// Task finished -> runner should be delivering the result
case <-time.After(r.interruptDur):
// timeout reached -> send it through the channel so our runner
// can abort
r.interruptedCh <- r.interruptDur
close(r.interruptedCh)
}
}()
r.interrupt()
}
}
// Finished will return a receive-only channel that can be used to know when
// the task has finished but the result hasn't been made available yet. The
// channel will be closed (without sending any message) when the task has finished.
// This can be used specially with the `RunAsync` method when multiple runners
// use the same channel: results could be waiting on your side of the channel
func (r *Runner) Finished() <-chan struct{} {
return r.finished
}
// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
// A result will be provided when either the task finishes naturally or we
// reach the timeout after being interrupted
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
tmpCh := make(chan *Result, 1)
// spawn the task and return the result in a temporary channel
go func(tmpCh chan *Result) {
err := r.fn()
close(r.finished)
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
tmpCh <- result
close(tmpCh)
}(tmpCh)
// wait for the result in the temporary channel or until we get the
// interrupted signal
var result *Result
select {
case d := <-r.interruptedCh:
result = &Result{
RunnerID: r.ID,
RunnerError: NewTimeoutError(r.ID, d),
}
case result = <-tmpCh:
// Just assign the received value, nothing else to do
}
// send the result
ch <- result
if closeChan {
close(ch)
}
}
+13
View File
@@ -0,0 +1,13 @@
package runner_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestRunner(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Runner Suite")
}
+337
View File
@@ -0,0 +1,337 @@
package runner_test
import (
"context"
"errors"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
)
// TimedTask will create a task with the specified duration
// The task will finish naturally after the given duration, or
// when it receives from the provided channel
//
// For the related stopper, just reuse the same channel:
//
// func() {
// ch <- nil
// close(ch)
// }
func TimedTask(ch chan error, dur time.Duration) runner.Runable {
return func() error {
timer := time.NewTimer(dur)
defer timer.Stop()
var result error
select {
case <-timer.C:
// finish the task in 15 secs
case result = <-ch:
// or finish when we receive from the channel
}
return result
}
}
var _ = Describe("Runner", func() {
Describe("Run", func() {
It("Context is done", func(ctx SpecContext) {
// task will wait until it receives from the channel
// stopper will just send something through the
// channel, so the task can finish
// Worst case, the task will finish after 15 secs
ch := make(chan error)
r := runner.New("run001", TimedTask(ch, 15*time.Second), func() {
ch <- nil
close(ch)
})
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
expectedResult := &runner.Result{
RunnerID: "run001",
RunnerError: nil,
}
// a result should be available in ch2 within the 5 secs spec
// (task's context finishes in 1 sec so we expect a 1 sec delay)
Eventually(ctx, ch2).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Context is done and interrupt after", func(ctx SpecContext) {
// task will wait until it receives from the channel
// stopper will just send something through the
// channel, so the task can finish
// Worst case, the task will finish after 15 secs
ch := make(chan error)
r := runner.New("run001", TimedTask(ch, 15*time.Second), func() {
ch <- nil
close(ch)
})
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
expectedResult := &runner.Result{
RunnerID: "run001",
RunnerError: nil,
}
// a result should be available in ch2 within the 5 secs spec
// (task's context finishes in 1 sec so we expect a 1 sec delay)
Eventually(ctx, ch2).Should(Receive(Equal(expectedResult)))
r.Interrupt() // this shouldn't do anything
}, SpecTimeout(5*time.Second))
It("Task finishes naturally", func(ctx SpecContext) {
e := errors.New("overslept!")
r := runner.New("run002", func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
})
// context will be done in 1 second (task will finishes before)
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
expectedResult := &runner.Result{
RunnerID: "run002",
RunnerError: e,
}
// a result should be available in ch2 within the 5 secs spec
// (task finish naturally in 50 msec)
Eventually(ctx, ch2).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Task doesn't finish", func(ctx SpecContext) {
r := runner.New("run003", func() error {
time.Sleep(20 * time.Second)
return nil
}, func() {
})
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
// Task will finish naturally in 60 secs
// Task's context will finish in 1 sec, but task won't receive
// the notification and it will keep going
Consistently(ctx, ch2).WithTimeout(4500 * time.Millisecond).ShouldNot(Receive())
}, SpecTimeout(5*time.Second))
It("Task doesn't finish and times out", func(ctx SpecContext) {
r := runner.New("run003", func() error {
time.Sleep(20 * time.Second)
return nil
}, func() {
}, runner.WithInterruptDuration(3*time.Second))
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
var expectedResult *runner.Result
// Task will finish naturally in 60 secs
// Task's context will finish in 1 sec, but task won't receive
// the notification and it will keep going
// Task will time out in 3 seconds after being interrupted (when
// context is done), so test should finish in 4 seconds
Eventually(ctx, ch2).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("run003"))
var timeoutError *runner.TimeoutError
Expect(errors.As(expectedResult.RunnerError, &timeoutError)).To(BeTrue())
Expect(timeoutError.RunnerID).To(Equal("run003"))
Expect(timeoutError.Duration).To(Equal(3 * time.Second))
}, SpecTimeout(5*time.Second))
It("Run mutiple times panics", func(ctx SpecContext) {
e := errors.New("overslept!")
r := runner.New("run002", func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
})
// context will be done in 1 second (task will finishes before)
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
Expect(func() {
r.Run(myCtx)
r.Run(myCtx)
}).To(Panic())
}, SpecTimeout(5*time.Second))
})
Describe("RunAsync", func() {
It("Wait in channel", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task has finished")
r := runner.New("run004", func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
})
r.RunAsync(ch)
expectedResult := &runner.Result{
RunnerID: "run004",
RunnerError: e,
}
Eventually(ctx, ch).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Run multiple times panics", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task has finished")
r := runner.New("run004", func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
})
r.RunAsync(ch)
Expect(func() {
r.RunAsync(ch)
}).To(Panic())
}, SpecTimeout(5*time.Second))
It("Interrupt async", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task interrupted")
taskCh := make(chan error)
r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() {
taskCh <- e
close(taskCh)
})
r.RunAsync(ch)
r.Interrupt()
expectedResult := &runner.Result{
RunnerID: "run005",
RunnerError: e,
}
Eventually(ctx, ch).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Interrupt async times out", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task interrupted")
r := runner.New("run005", func() error {
time.Sleep(30 * time.Second)
return e
}, func() {
}, runner.WithInterruptDuration(3*time.Second))
r.RunAsync(ch)
r.Interrupt()
var expectedResult *runner.Result
// Task will timeout after 3 second of receiving the interruption
Eventually(ctx, ch).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("run005"))
Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("timed out"))
}, SpecTimeout(5*time.Second))
It("Interrupt async multiple times", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task interrupted")
taskCh := make(chan error)
r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() {
taskCh <- e
close(taskCh)
})
r.RunAsync(ch)
r.Interrupt()
r.Interrupt()
r.Interrupt()
expectedResult := &runner.Result{
RunnerID: "run005",
RunnerError: e,
}
Eventually(ctx, ch).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
})
Describe("Finished", func() {
It("Finish channel closes", func(ctx SpecContext) {
r := runner.New("run006", func() error {
time.Sleep(50 * time.Millisecond)
return nil
}, func() {
})
// context will be done in 1 second (task will finishes before)
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
finishedCh := r.Finished()
Eventually(ctx, finishedCh).Should(BeClosed())
}, SpecTimeout(5*time.Second))
})
})
+77
View File
@@ -0,0 +1,77 @@
package runner
import (
"strings"
"time"
)
// Runable represent a task that can be executed by the Runner.
// It expected to be a long running task with an indefinite execution time,
// so it's suitable for servers or services.
// The task can eventually return an error, or nil if the execution finishes
// without errors
type Runable func() error
// Stopper represent a function that will stop the Runable.
// The stopper acts as a notification to the runable to know that the task
// needs to be finished now.
//
// The stopper won't need to crash the runable or force the runable to stop,
// instead, it will let the runable to know it has to stop and let it finish.
// This means that the runable might still run for a while.
//
// It's recommended the stopper to run asynchronously. This means that the
// stopper might need to spawn a goroutine. The intention is avoid blocking
// the running thread.
//
// Usually, the stoppers are the servers's `Shutdown()` or `Close()` methods,
// that will cause the server to start its shutdown procedure. As said, there
// is no need to force the shutdown, so graceful shutdowns are preferred if
// they're available
type Stopper func()
// Result represents the result of a runner.
// The result contains the provided runner's id (for easier identification
// in case of multiple results) and the runner's error, which is the result
// of the Runable function (might be nil if no error happened)
type Result struct {
RunnerID string
RunnerError error
}
// TimeoutError is an error that should be used for timeouts.
// It implements the `error` interface
type TimeoutError struct {
RunnerID string
Duration time.Duration
}
// NewTimeoutError creates a new timeout error. Both runnerID and duration
// will be used in the error message
func NewTimeoutError(runnerID string, duration time.Duration) *TimeoutError {
return &TimeoutError{
RunnerID: runnerID,
Duration: duration,
}
}
// NewGroupTimeoutError creates a new timeout error. This is intended to be
// used for group runners when the timeout of the group is reached.
// The runner id will be set to "_unknown_" because we don't know which is
// the id of the missing runner.
func NewGroupTimeoutError(duration time.Duration) *TimeoutError {
return &TimeoutError{
RunnerID: "_unknown_",
Duration: duration,
}
}
// Error generates the message for this particular error.
func (te *TimeoutError) Error() string {
var sb strings.Builder
sb.WriteString("Runner ")
sb.WriteString(te.RunnerID)
sb.WriteString(" timed out after waiting for ")
sb.WriteString(te.Duration.String())
return sb.String()
}