From da71059a16161d40e3023f756a4d6eab6623f0bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Tue, 9 Apr 2024 17:04:34 +0200 Subject: [PATCH] refactor: reuse functions and name changes --- ocis-pkg/runner/grouprunner.go | 29 +++++++++------------ ocis-pkg/runner/runner.go | 46 ++++++++++++++++------------------ 2 files changed, 34 insertions(+), 41 deletions(-) diff --git a/ocis-pkg/runner/grouprunner.go b/ocis-pkg/runner/grouprunner.go index 614a027e48..d2c9d38895 100644 --- a/ocis-pkg/runner/grouprunner.go +++ b/ocis-pkg/runner/grouprunner.go @@ -23,8 +23,8 @@ type GroupRunner struct { runners []*Runner } -// NewGroupRunner will create a GroupRunner -func NewGroupRunner() *GroupRunner { +// NewGroup will create a GroupRunner +func NewGroup() *GroupRunner { return &GroupRunner{ runners: []*Runner{}, } @@ -61,7 +61,7 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { ch := make(chan *Result, len(gr.runners)) // no need to block writing results for _, runner := range gr.runners { - runner.RunNoContext(ch) + runner.RunAsync(ch) } // wait for a result or for the context to be done @@ -76,7 +76,9 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { for _, runner := range gr.runners { if _, ok := results[runner.ID]; !ok { // there might still be race conditions because the result might not have - // been made available even though the runner has finished + // been made available even though the runner has finished. We assume + // that calling the `Interrupt` method multiple times and / or calling + // the `Interrupt` method when the task has finished is safe runner.Interrupt() } } @@ -97,20 +99,13 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result { return values } -// RunNoContext will execute the tasks in the group asynchronously. -// Each tasks will run a separated goroutine (one goroutine per task), and then -// this method will finish. -// The tasks' result will be available in the provided channel when it's -// available, so you can wait for it if needed. It's up to you to decide -// to use a blocking or non-blocking channel, but the task will always finish -// before writing in the channel. -// -// This method guarantees that there will be the same number of results as the -// number of provided tasks (one result per task), but it won't guarantee -// any order -func (gr *GroupRunner) RunNoContext(ch chan<- *Result) { +// RunAsync will execute the tasks in the group asynchronously. +// The result of each task will be placed in the provided channel as soon +// as it's available. +// Note that this method will finish as soon as all the tasks are running. +func (gr *GroupRunner) RunAsync(ch chan<- *Result) { for _, runner := range gr.runners { - runner.RunNoContext(ch) + runner.RunAsync(ch) } } diff --git a/ocis-pkg/runner/runner.go b/ocis-pkg/runner/runner.go index d52edf6cf8..6b1bab6aed 100644 --- a/ocis-pkg/runner/runner.go +++ b/ocis-pkg/runner/runner.go @@ -15,7 +15,7 @@ type Runner struct { interrupt Stopper } -// NewRunner will create a new runner. +// New will create a new runner. // The runner will be created with the provided id (the id must be unique, // otherwise undefined behavior might occur), and will run the provided // runable task, using the "interrupt" function to stop that task if needed. @@ -23,7 +23,7 @@ type Runner struct { // Note that it's your responsibility to provide a proper stopper for the task. // The runner will just call that method assuming it will be enough to // eventually stop the task at some point. -func NewRunner(id string, fn Runable, interrupt Stopper) *Runner { +func New(id string, fn Runable, interrupt Stopper) *Runner { return &Runner{ ID: id, fn: fn, @@ -52,16 +52,7 @@ func NewRunner(id string, fn Runable, interrupt Stopper) *Runner { func (r *Runner) Run(ctx context.Context) *Result { ch := make(chan *Result) - go func(ch chan<- *Result) { - err := r.fn() - - result := &Result{ - RunnerID: r.ID, - RunnerError: err, - } - ch <- result - close(ch) - }(ch) + go r.doTask(ch, true) select { case result := <-ch: @@ -73,7 +64,7 @@ func (r *Runner) Run(ctx context.Context) *Result { return <-ch } -// RunNoContext will execute the task associated to this runner asynchronously. +// RunAsync will execute the task associated to this runner asynchronously. // The task will be spawned in a new goroutine and this method will finish. // The task's result will be written in the provided channel when it's // available, so you can wait for it if needed. It's up to you to decide @@ -82,17 +73,8 @@ func (r *Runner) Run(ctx context.Context) *Result { // // To interrupt the running task, the only option is to call the `Interrupt` // method at some point. -func (r *Runner) RunNoContext(ch chan<- *Result) { - go func(ch chan<- *Result) { - err := r.fn() - - result := &Result{ - RunnerID: r.ID, - RunnerError: err, - } - ch <- result - // Do not close the channel here - }(ch) +func (r *Runner) RunAsync(ch chan<- *Result) { + go r.doTask(ch, false) } // Interrupt will execute the stopper function, which should notify the task @@ -102,3 +84,19 @@ func (r *Runner) RunNoContext(ch chan<- *Result) { func (r *Runner) Interrupt() { r.interrupt() } + +// doTask will perform this runner's task and write the result in the provided +// channel. The channel will be closed if requested. +func (r *Runner) doTask(ch chan<- *Result, closeChan bool) { + err := r.fn() + + result := &Result{ + RunnerID: r.ID, + RunnerError: err, + } + ch <- result + + if closeChan { + close(ch) + } +}