diff --git a/ocis-pkg/runner/grouprunner.go b/ocis-pkg/runner/grouprunner.go index 1ed23855f6..f77f769b6b 100644 --- a/ocis-pkg/runner/grouprunner.go +++ b/ocis-pkg/runner/grouprunner.go @@ -79,11 +79,15 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { // interrupt the rest of the runners for _, runner := range gr.runners { if _, ok := results[runner.ID]; !ok { - // there might still be race conditions because the result might not have - // been made available even though the runner has finished. We assume - // that calling the `Interrupt` method multiple times and / or calling - // the `Interrupt` method when the task has finished is safe - runner.Interrupt() + select { + case <-runner.Finished(): + // No data should be sent through the channel, so we'd be + // here only if the channel is closed. This means the task + // has finished and we don't need to interrupt. We do + // nothing in this case + default: + runner.Interrupt() + } } } @@ -122,8 +126,13 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) { // // As said, this will affect ALL the tasks in the group. It isn't possible to // try to stop just one task. +// If a task has finished, the corresponding stopper won't be called func (gr *GroupRunner) Interrupt() { for _, runner := range gr.runners { - runner.Interrupt() + select { + case <-runner.Finished(): + default: + runner.Interrupt() + } } } diff --git a/ocis-pkg/runner/runner.go b/ocis-pkg/runner/runner.go index 6b1bab6aed..0f4084cf6b 100644 --- a/ocis-pkg/runner/runner.go +++ b/ocis-pkg/runner/runner.go @@ -13,6 +13,7 @@ type Runner struct { ID string fn Runable interrupt Stopper + finished chan struct{} } // New will create a new runner. @@ -28,6 +29,7 @@ func New(id string, fn Runable, interrupt Stopper) *Runner { ID: id, fn: fn, interrupt: interrupt, + finished: make(chan struct{}), } } @@ -85,11 +87,22 @@ func (r *Runner) Interrupt() { r.interrupt() } +// Finished will return a receive-only channel that can be used to know when +// the task has finished but the result hasn't been made available yet. The +// channel will be closed (without sending any message) when the task has finished. +// This can be used specially with the `RunAsync` method when multiple runners +// use the same channel: results could be waiting on your side of the channel +func (r *Runner) Finished() <-chan struct{} { + return r.finished +} + // doTask will perform this runner's task and write the result in the provided // channel. The channel will be closed if requested. func (r *Runner) doTask(ch chan<- *Result, closeChan bool) { err := r.fn() + close(r.finished) + result := &Result{ RunnerID: r.ID, RunnerError: err,