diff --git a/ocis-pkg/runner/grouprunner.go b/ocis-pkg/runner/grouprunner.go index 8aa727e72f..e44f72d4c7 100644 --- a/ocis-pkg/runner/grouprunner.go +++ b/ocis-pkg/runner/grouprunner.go @@ -3,6 +3,8 @@ package runner import ( "context" "sync" + "sync/atomic" + "time" ) // GroupRunner represent a group of tasks that need to run together. @@ -17,21 +19,44 @@ import ( // providing a piece of functionality, however, if any of them fails, the // feature provided by them would be incomplete or broken. // +// The interrupt duration for the group can be set through the +// `WithInterruptDuration` option. If the option isn't supplied, the default +// value (15 secs) will be used. +// +// It's recommended that the timeouts are handled by each runner individually, +// meaning that each runner's timeout should be less than the group runner's +// timeout. This way, we can know which runner timed out. +// If the group timeout is reached, the remaining results will have the +// runner's id as "_unknown_". +// // Note that, as services, the task aren't expected to stop by default. // This means that, if a task finishes naturally, the rest of the task will // asked to stop as well. type GroupRunner struct { - runners sync.Map - runnersCount int - isRunning bool - runningMutex sync.Mutex + runners sync.Map + runnersCount int + isRunning bool + interruptDur time.Duration + interrupted atomic.Bool + interruptedCh chan time.Duration + runningMutex sync.Mutex } // NewGroup will create a GroupRunner -func NewGroup() *GroupRunner { +func NewGroup(opts ...Option) *GroupRunner { + options := Options{ + InterruptDuration: DefaultGroupInterruptDuration, + } + + for _, o := range opts { + o(&options) + } + return &GroupRunner{ - runners: sync.Map{}, - runningMutex: sync.Mutex{}, + runners: sync.Map{}, + runningMutex: sync.Mutex{}, + interruptDur: options.InterruptDuration, + interruptedCh: make(chan time.Duration, 1), } } @@ -85,7 +110,7 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { gr.isRunning = true gr.runningMutex.Unlock() - results := make(map[string]*Result) + results := make([]*Result, 0, gr.runnersCount) ch := make(chan *Result, gr.runnersCount) // no need to block writing results gr.runners.Range(func(_, value any) bool { @@ -94,45 +119,46 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { return true }) + var d time.Duration // wait for a result or for the context to be done select { case result := <-ch: - results[result.RunnerID] = result + results = append(results, result) + case d = <-gr.interruptedCh: + results = append(results, &Result{ + RunnerID: "_unknown_", + RunnerError: NewGroupTimeoutError(d), + }) case <-ctx.Done(): // Do nothing } // interrupt the rest of the runners - gr.runners.Range(func(_, value any) bool { - r := value.(*Runner) - if _, ok := results[r.ID]; !ok { - select { - case <-r.Finished(): - // No data should be sent through the channel, so we'd be - // here only if the channel is closed. This means the task - // has finished and we don't need to interrupt. We do - // nothing in this case - default: - r.Interrupt() - } - } - return true - }) + gr.Interrupt() // Having notified that the context has been finished, we still need to // wait for the rest of the results for i := len(results); i < gr.runnersCount; i++ { - result := <-ch - results[result.RunnerID] = result + select { + case result := <-ch: + results = append(results, result) + case d2, ok := <-gr.interruptedCh: + if ok { + d = d2 + } + results = append(results, &Result{ + RunnerID: "_unknown_", + RunnerError: NewGroupTimeoutError(d), + }) + } } - close(ch) - - values := make([]*Result, 0, gr.runnersCount) - for _, val := range results { - values = append(values, val) - } - return values + // Even if we reach the group time out and bail out early, tasks might + // be running and eventually deliver the result through the channel. + // We'll rely on the buffered channel so the tasks won't block and the + // data can be eventually garbage-collected along with the unused + // channel, so we won't close the channel here. + return results } // RunAsync will execute the tasks in the group asynchronously. @@ -158,12 +184,38 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) { }) go func() { - result := <-interCh + var result *Result + var d time.Duration + + select { + case result = <-interCh: + // result already assigned, so do nothing + case d = <-gr.interruptedCh: + // we aren't tracking which runners have finished and which are still + // running, so we'll use "_unknown_" as runner id + result = &Result{ + RunnerID: "_unknown_", + RunnerError: NewGroupTimeoutError(d), + } + } gr.Interrupt() ch <- result for i := 1; i < gr.runnersCount; i++ { - result = <-interCh + select { + case result = <-interCh: + // result already assigned, so do nothing + case d2, ok := <-gr.interruptedCh: + // if ok is true, d2 will have a good value; if false, the channel + // is closed and we get a default value + if ok { + d = d2 + } + result = &Result{ + RunnerID: "_unknown_", + RunnerError: NewGroupTimeoutError(d), + } + } ch <- result } }() @@ -179,14 +231,32 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) { // As said, this will affect ALL the tasks in the group. It isn't possible to // try to stop just one task. // If a task has finished, the corresponding stopper won't be called +// +// The interrupt timeout for the group will start after all the runners in the +// group have been notified. Note that, if the task's stopper for a runner +// takes a lot of time to return, it will delay the timeout's start, so it's +// advised that the stopper either returns fast or is run asynchronously. func (gr *GroupRunner) Interrupt() { - gr.runners.Range(func(_, value any) bool { - r := value.(*Runner) - select { - case <-r.Finished(): - default: - r.Interrupt() - } - return true - }) + if gr.interrupted.CompareAndSwap(false, true) { + gr.runners.Range(func(_, value any) bool { + r := value.(*Runner) + select { + case <-r.Finished(): + // No data should be sent through the channel, so we'd be + // here only if the channel is closed. This means the task + // has finished and we don't need to interrupt. We do + // nothing in this case + default: + r.Interrupt() + } + return true + }) + + _ = time.AfterFunc(gr.interruptDur, func() { + // timeout reached -> send it through the channel so our runner + // can abort + gr.interruptedCh <- gr.interruptDur + close(gr.interruptedCh) + }) + } } diff --git a/ocis-pkg/runner/grouprunner_test.go b/ocis-pkg/runner/grouprunner_test.go index 110960a5bf..4a5984ba8b 100644 --- a/ocis-pkg/runner/grouprunner_test.go +++ b/ocis-pkg/runner/grouprunner_test.go @@ -62,7 +62,7 @@ var _ = Describe("GroupRunner", func() { ))) task3Ch := make(chan error) - task3 := TimedTask(task3Ch, 15*time.Second) + task3 := TimedTask(task3Ch, 6*time.Second) Expect(func() { gr.Add(runner.New("task3", task3, func() { task3Ch <- nil @@ -77,7 +77,7 @@ var _ = Describe("GroupRunner", func() { Expect(func() { task3Ch := make(chan error) - task3 := TimedTask(task3Ch, 15*time.Second) + task3 := TimedTask(task3Ch, 6*time.Second) gr.Add(runner.New("task3", task3, func() { task3Ch <- nil close(task3Ch) @@ -89,7 +89,7 @@ var _ = Describe("GroupRunner", func() { Describe("Run", func() { It("Context is done", func(ctx SpecContext) { task3Ch := make(chan error) - task3 := TimedTask(task3Ch, 15*time.Second) + task3 := TimedTask(task3Ch, 6*time.Second) gr.Add(runner.New("task3", task3, func() { task3Ch <- nil close(task3Ch) @@ -141,6 +141,75 @@ var _ = Describe("GroupRunner", func() { ))) }, SpecTimeout(5*time.Second)) + It("Context done and group timeout reached", func(ctx SpecContext) { + gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second)) + + gr.Add(runner.New("task1", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + gr.Add(runner.New("task2", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan []*runner.Result) + go func(ch2 chan []*runner.Result) { + ch2 <- gr.Run(myCtx) + close(ch2) + }(ch2) + + // context finishes in 1 sec, tasks will be interrupted + // group timeout will be reached after 2 extra seconds + Eventually(ctx, ch2).Should(Receive(ContainElements( + &runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}, + &runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}, + ))) + }, SpecTimeout(5*time.Second)) + + It("Interrupted and group timeout reached", func(ctx SpecContext) { + gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second)) + + gr.Add(runner.New("task1", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + gr.Add(runner.New("task2", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + // context will be done in 10 second + myCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan []*runner.Result) + go func(ch2 chan []*runner.Result) { + ch2 <- gr.Run(myCtx) + close(ch2) + }(ch2) + gr.Interrupt() + + // tasks will be interrupted + // group timeout will be reached after 2 extra seconds + Eventually(ctx, ch2).Should(Receive(ContainElements( + &runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}, + &runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}, + ))) + }, SpecTimeout(5*time.Second)) + It("Doble run panics", func(ctx SpecContext) { // context will be done in 1 second myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -181,7 +250,7 @@ var _ = Describe("GroupRunner", func() { It("Interrupt async", func(ctx SpecContext) { task3Ch := make(chan error) - task3 := TimedTask(task3Ch, 15*time.Second) + task3 := TimedTask(task3Ch, 6*time.Second) gr.Add(runner.New("task3", task3, func() { task3Ch <- nil close(task3Ch) @@ -196,5 +265,29 @@ var _ = Describe("GroupRunner", func() { Eventually(ctx, ch2).Should(Receive()) Eventually(ctx, ch2).Should(Receive()) }, SpecTimeout(5*time.Second)) + + It("Interrupt async group timeout reached", func(ctx SpecContext) { + gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second)) + + gr.Add(runner.New("task1", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + gr.Add(runner.New("task2", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + + ch2 := make(chan *runner.Result) + gr.RunAsync(ch2) + gr.Interrupt() + + // group timeout will be reached after 2 extra seconds + Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}))) + Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)}))) + }, SpecTimeout(5*time.Second)) }) }) diff --git a/ocis-pkg/runner/option.go b/ocis-pkg/runner/option.go index 2bef541379..af1dca0455 100644 --- a/ocis-pkg/runner/option.go +++ b/ocis-pkg/runner/option.go @@ -6,8 +6,11 @@ import ( var ( // DefaultInterruptDuration is the default value for the `WithInterruptDuration` - // This global value can be adjusted if needed. + // for the "regular" runners. This global value can be adjusted if needed. DefaultInterruptDuration = 10 * time.Second + // DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration` + // for the group runners. This global value can be adjusted if needed. + DefaultGroupInterruptDuration = 15 * time.Second ) // Option defines a single option function. diff --git a/ocis-pkg/runner/runner.go b/ocis-pkg/runner/runner.go index ced99cc8b0..6733a745f1 100644 --- a/ocis-pkg/runner/runner.go +++ b/ocis-pkg/runner/runner.go @@ -32,7 +32,7 @@ type Runner struct { // // The interrupt duration, which can be set through the `WithInterruptDuration` // option, will be used to ensure the runner doesn't block forever. If the -// option isn't suplied, the default value will be used. +// option isn't supplied, the default value (10 secs) will be used. // The interrupt duration will be used to start a timeout when the // runner gets interrupted (either the context of the `Run` method is done // or this runner's `Interrupt` method is called). If the timeout is reached, @@ -56,7 +56,7 @@ func New(id string, fn Runable, interrupt Stopper, opts ...Option) *Runner { interruptDur: options.InterruptDuration, fn: fn, interrupt: interrupt, - interruptedCh: make(chan time.Duration), + interruptedCh: make(chan time.Duration, 1), finished: make(chan struct{}), } } @@ -163,7 +163,7 @@ func (r *Runner) Finished() <-chan struct{} { // A result will be provided when either the task finishes naturally or we // reach the timeout after being interrupted func (r *Runner) doTask(ch chan<- *Result, closeChan bool) { - tmpCh := make(chan *Result) + tmpCh := make(chan *Result, 1) // spawn the task and return the result in a temporary channel go func(tmpCh chan *Result) { diff --git a/ocis-pkg/runner/types.go b/ocis-pkg/runner/types.go index 2789838c65..c4914e3263 100644 --- a/ocis-pkg/runner/types.go +++ b/ocis-pkg/runner/types.go @@ -55,6 +55,17 @@ func NewTimeoutError(runnerID string, duration time.Duration) *TimeoutError { } } +// NewGroupTimeoutError creates a new timeout error. This is intended to be +// used for group runners when the timeout of the group is reached. +// The runner id will be set to "_unknown_" because we don't know which is +// the id of the missing runner. +func NewGroupTimeoutError(duration time.Duration) *TimeoutError { + return &TimeoutError{ + RunnerID: "_unknown_", + Duration: duration, + } +} + // Error generates the message for this particular error. func (te *TimeoutError) Error() string { var sb strings.Builder