refactor: reuse functions and name changes

This commit is contained in:
Juan Pablo Villafáñez
2024-04-09 17:04:34 +02:00
parent ef32af6402
commit da71059a16
2 changed files with 34 additions and 41 deletions

View File

@@ -23,8 +23,8 @@ type GroupRunner struct {
runners []*Runner
}
// NewGroupRunner will create a GroupRunner
func NewGroupRunner() *GroupRunner {
// NewGroup will create a GroupRunner
func NewGroup() *GroupRunner {
return &GroupRunner{
runners: []*Runner{},
}
@@ -61,7 +61,7 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
ch := make(chan *Result, len(gr.runners)) // no need to block writing results
for _, runner := range gr.runners {
runner.RunNoContext(ch)
runner.RunAsync(ch)
}
// wait for a result or for the context to be done
@@ -76,7 +76,9 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
for _, runner := range gr.runners {
if _, ok := results[runner.ID]; !ok {
// there might still be race conditions because the result might not have
// been made available even though the runner has finished
// been made available even though the runner has finished. We assume
// that calling the `Interrupt` method multiple times and / or calling
// the `Interrupt` method when the task has finished is safe
runner.Interrupt()
}
}
@@ -97,20 +99,13 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
return values
}
// RunNoContext will execute the tasks in the group asynchronously.
// Each tasks will run a separated goroutine (one goroutine per task), and then
// this method will finish.
// The tasks' result will be available in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
// to use a blocking or non-blocking channel, but the task will always finish
// before writing in the channel.
//
// This method guarantees that there will be the same number of results as the
// number of provided tasks (one result per task), but it won't guarantee
// any order
func (gr *GroupRunner) RunNoContext(ch chan<- *Result) {
// RunAsync will execute the tasks in the group asynchronously.
// The result of each task will be placed in the provided channel as soon
// as it's available.
// Note that this method will finish as soon as all the tasks are running.
func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
for _, runner := range gr.runners {
runner.RunNoContext(ch)
runner.RunAsync(ch)
}
}

View File

@@ -15,7 +15,7 @@ type Runner struct {
interrupt Stopper
}
// NewRunner will create a new runner.
// New will create a new runner.
// The runner will be created with the provided id (the id must be unique,
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
@@ -23,7 +23,7 @@ type Runner struct {
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func NewRunner(id string, fn Runable, interrupt Stopper) *Runner {
func New(id string, fn Runable, interrupt Stopper) *Runner {
return &Runner{
ID: id,
fn: fn,
@@ -52,16 +52,7 @@ func NewRunner(id string, fn Runable, interrupt Stopper) *Runner {
func (r *Runner) Run(ctx context.Context) *Result {
ch := make(chan *Result)
go func(ch chan<- *Result) {
err := r.fn()
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
ch <- result
close(ch)
}(ch)
go r.doTask(ch, true)
select {
case result := <-ch:
@@ -73,7 +64,7 @@ func (r *Runner) Run(ctx context.Context) *Result {
return <-ch
}
// RunNoContext will execute the task associated to this runner asynchronously.
// RunAsync will execute the task associated to this runner asynchronously.
// The task will be spawned in a new goroutine and this method will finish.
// The task's result will be written in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
@@ -82,17 +73,8 @@ func (r *Runner) Run(ctx context.Context) *Result {
//
// To interrupt the running task, the only option is to call the `Interrupt`
// method at some point.
func (r *Runner) RunNoContext(ch chan<- *Result) {
go func(ch chan<- *Result) {
err := r.fn()
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
ch <- result
// Do not close the channel here
}(ch)
func (r *Runner) RunAsync(ch chan<- *Result) {
go r.doTask(ch, false)
}
// Interrupt will execute the stopper function, which should notify the task
@@ -102,3 +84,19 @@ func (r *Runner) RunNoContext(ch chan<- *Result) {
func (r *Runner) Interrupt() {
r.interrupt()
}
// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
ch <- result
if closeChan {
close(ch)
}
}