fix: ensure the task hasn't finished before interrupt it

This commit is contained in:
Juan Pablo Villafáñez
2024-04-10 14:30:00 +02:00
parent 0da6810cb1
commit 6ddc0addd3
2 changed files with 28 additions and 6 deletions

View File

@@ -79,11 +79,15 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
// interrupt the rest of the runners
for _, runner := range gr.runners {
if _, ok := results[runner.ID]; !ok {
// there might still be race conditions because the result might not have
// been made available even though the runner has finished. We assume
// that calling the `Interrupt` method multiple times and / or calling
// the `Interrupt` method when the task has finished is safe
runner.Interrupt()
select {
case <-runner.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
runner.Interrupt()
}
}
}
@@ -122,8 +126,13 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
//
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
func (gr *GroupRunner) Interrupt() {
for _, runner := range gr.runners {
runner.Interrupt()
select {
case <-runner.Finished():
default:
runner.Interrupt()
}
}
}

View File

@@ -13,6 +13,7 @@ type Runner struct {
ID string
fn Runable
interrupt Stopper
finished chan struct{}
}
// New will create a new runner.
@@ -28,6 +29,7 @@ func New(id string, fn Runable, interrupt Stopper) *Runner {
ID: id,
fn: fn,
interrupt: interrupt,
finished: make(chan struct{}),
}
}
@@ -85,11 +87,22 @@ func (r *Runner) Interrupt() {
r.interrupt()
}
// Finished will return a receive-only channel that can be used to know when
// the task has finished but the result hasn't been made available yet. The
// channel will be closed (without sending any message) when the task has finished.
// This can be used specially with the `RunAsync` method when multiple runners
// use the same channel: results could be waiting on your side of the channel
func (r *Runner) Finished() <-chan struct{} {
return r.finished
}
// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()
close(r.finished)
result := &Result{
RunnerID: r.ID,
RunnerError: err,