fix: ensure runners provide a result after being interrupted

This commit is contained in:
Juan Pablo Villafáñez
2024-04-19 10:54:38 +02:00
parent cb2e8e0ef2
commit df3c496cba
6 changed files with 146 additions and 179 deletions
+9 -9
View File
@@ -19,14 +19,14 @@ var _ = Describe("GroupRunner", func() {
task1Ch := make(chan error)
task1 := TimedTask(task1Ch, 30*time.Second)
gr.Add(runner.New("task1", task1, func() {
gr.Add(runner.New("task1", 30*time.Second, task1, func() {
task1Ch <- nil
close(task1Ch)
}))
task2Ch := make(chan error)
task2 := TimedTask(task2Ch, 20*time.Second)
gr.Add(runner.New("task2", task2, func() {
gr.Add(runner.New("task2", 30*time.Second, task2, func() {
task2Ch <- nil
close(task2Ch)
}))
@@ -35,7 +35,7 @@ var _ = Describe("GroupRunner", func() {
Describe("Add", func() {
It("Duplicated runner id panics", func() {
Expect(func() {
gr.Add(runner.New("task1", func() error {
gr.Add(runner.New("task1", 30*time.Second, func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
@@ -64,7 +64,7 @@ var _ = Describe("GroupRunner", func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
Expect(func() {
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
@@ -78,7 +78,7 @@ var _ = Describe("GroupRunner", func() {
Expect(func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
@@ -90,7 +90,7 @@ var _ = Describe("GroupRunner", func() {
It("Context is done", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
@@ -117,7 +117,7 @@ var _ = Describe("GroupRunner", func() {
It("One task finishes early", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
@@ -157,7 +157,7 @@ var _ = Describe("GroupRunner", func() {
It("Wait in channel", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
@@ -182,7 +182,7 @@ var _ = Describe("GroupRunner", func() {
It("Interrupt async", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
-55
View File
@@ -1,55 +0,0 @@
package runner
import (
"fmt"
"time"
)
// InterruptedTimeoutRunner will create a new runner (R2) based an original
// runner (R1).
// The new runner (R2) will monitor the original (R1). Once the `Interrupt`
// method is called in the new (R2), the interruption will be delivered to
// the original (R1), but a timeout will start. If we reach the timeout
// before the original runner (R1) is finished, the new runner (R2) will
// return an error.
//
// Any valid duration can be provided for the timeout, but you should give
// enough time for the task to finish in order to get the error from the
// original task (R1) and not the timeout one from the new (R2).
// Depending on the task, 5s, 10s or 30s might be reasonable timeout values.
//
// The timeout will start once the new (R2) runner is interrupted, either
// manually or via context
//
// Note that R2 can't stop R1 in any way. Even if R2 returns a "timeout" error,
// R1 might still be running and consuming resources.
// This method is intended to provide a way to ensure that the main thread
// won't be blocked forever.
func InterruptedTimeoutRunner(r *Runner, d time.Duration) *Runner {
timeoutCh := make(chan time.Time)
return New(r.ID, func() error {
ch := make(chan *Result)
r.RunAsync(ch)
select {
case result := <-ch:
return result.RunnerError // forward the runner error
case t := <-timeoutCh:
// timeout reached. We can't stop the task, but we'll return
// an error instead to prevent blocking the thread.
return fmt.Errorf("Timeout reached at %s after waiting for %s after being interrupted", t.String(), d.String())
}
}, func() {
go func() {
select {
case <-r.Finished():
// Task finished -> runner should be delivering the result
case t := <-time.After(d):
// timeout reached -> send it through the channel so our runner
// can abort
timeoutCh <- t
}
}()
r.Interrupt()
})
}
-87
View File
@@ -1,87 +0,0 @@
package runner_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
)
var _ = Describe("Helper", func() {
Describe("InterruptedTimeoutRunner", func() {
It("Context done, no timeout", func(ctx SpecContext) {
r1 := runner.New("task", func() error {
time.Sleep(10 * time.Millisecond)
return nil
}, func() {
})
r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second)
// context will be done in 1 second (task will finishes before)
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r2.Run(myCtx)
close(ch2)
}(ch2)
expectedResult := &runner.Result{
RunnerID: "task",
RunnerError: nil,
}
Eventually(ctx, ch2).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Context done, timeout reached", func(ctx SpecContext) {
r1 := runner.New("task", func() error {
time.Sleep(10 * time.Second)
return nil
}, func() {
})
r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second)
// context will be done in 1 second (task will finishes before)
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r2.Run(myCtx)
close(ch2)
}(ch2)
var expectedResult *runner.Result
Eventually(ctx, ch2).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("task"))
Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("Timeout reached"))
}, SpecTimeout(5*time.Second))
It("Interrupted, timeout reached", func(ctx SpecContext) {
r1 := runner.New("task", func() error {
time.Sleep(10 * time.Second)
return nil
}, func() {
})
r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second)
ch2 := make(chan *runner.Result)
r2.RunAsync(ch2)
r2.Interrupt()
var expectedResult *runner.Result
Eventually(ctx, ch2).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("task"))
Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("Timeout reached"))
}, SpecTimeout(5*time.Second))
})
})
+77 -17
View File
@@ -2,7 +2,9 @@ package runner
import (
"context"
"fmt"
"sync/atomic"
"time"
)
// Runner represents the one executing a long running task, such as a server
@@ -10,13 +12,18 @@ import (
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
//
// Runners are intended to be used only once. Reusing them isn't possible.
// You'd need to create a new runner if you want to rerun the same task.
type Runner struct {
ID string
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
finished chan struct{}
ID string
interruptDur time.Duration
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
interruptedCh chan time.Duration
finished chan struct{}
}
// New will create a new runner.
@@ -24,15 +31,24 @@ type Runner struct {
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
//
// The interrupt duration will be used to ensure the runner doesn't block
// forever. The interrupt duration will be used to start a timeout when the
// runner gets interrupted (either the context of the `Run` method is done
// or this runner's `Interrupt` method is called). If the timeout is reached,
// a timeout result will be returned instead of whatever result the task should
// be returning.
//
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func New(id string, fn Runable, interrupt Stopper) *Runner {
func New(id string, interruptDur time.Duration, fn Runable, interrupt Stopper) *Runner {
return &Runner{
ID: id,
fn: fn,
interrupt: interrupt,
finished: make(chan struct{}),
ID: id,
interruptDur: interruptDur,
fn: fn,
interrupt: interrupt,
interruptedCh: make(chan time.Duration),
finished: make(chan struct{}),
}
}
@@ -48,6 +64,11 @@ func New(id string, fn Runable, interrupt Stopper) *Runner {
// make the task to eventually complete.
//
// Once the task finishes, the result will be returned.
// When the context is done, or if the runner is interrupted, a timeout will
// start using the provided "interrupt duration". If this timeout is reached,
// a timeout result will be returned instead of the one from the task. This is
// intended to prevent blocking the main thread indefinitely. A suitable
// duration should be used depending on the task, usually 5, 10 or 30 secs
//
// Some nice things you can do:
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
@@ -97,9 +118,24 @@ func (r *Runner) RunAsync(ch chan<- *Result) {
// in order for it to finish.
// The stopper will be called immediately, although it's expected the
// consequences to take a while (task might need a while to stop)
// A timeout will start using the provided "interrupt duration". Once that
// timeout is reached, the task must provide a result with a timeout error.
// Note that, even after returning the timeout result, the task could still
// be being executed and consuming resource.
// This method will be called only once. Further calls won't do anything
func (r *Runner) Interrupt() {
if r.interrupted.CompareAndSwap(false, true) {
go func() {
select {
case <-r.Finished():
// Task finished -> runner should be delivering the result
case <-time.After(r.interruptDur):
// timeout reached -> send it through the channel so our runner
// can abort
r.interruptedCh <- r.interruptDur
close(r.interruptedCh)
}
}()
r.interrupt()
}
}
@@ -115,17 +151,41 @@ func (r *Runner) Finished() <-chan struct{} {
// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
// A result will be provided when either the task finishes naturally or we
// reach the timeout after being interrupted
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()
tmpCh := make(chan *Result)
close(r.finished)
// spawn the task and return the result in a temporary channel
go func(tmpCh chan *Result) {
err := r.fn()
result := &Result{
RunnerID: r.ID,
RunnerError: err,
close(r.finished)
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
tmpCh <- result
close(tmpCh)
}(tmpCh)
// wait for the result in the temporary channel or until we get the
// interrupted signal
var result *Result
select {
case d := <-r.interruptedCh:
result = &Result{
RunnerID: r.ID,
RunnerError: fmt.Errorf("runner %s timed out after waiting for %s", r.ID, d.String()),
}
case result = <-tmpCh:
// Just assign the received value, nothing else to do
}
ch <- result
// send the result
ch <- result
if closeChan {
close(ch)
}
+1 -1
View File
@@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)
func TestGraph(t *testing.T) {
func TestRunner(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Runner Suite")
}
+59 -10
View File
@@ -44,7 +44,7 @@ var _ = Describe("Runner", func() {
// channel, so the task can finish
// Worst case, the task will finish after 15 secs
ch := make(chan error)
r := runner.New("run001", TimedTask(ch, 15*time.Second), func() {
r := runner.New("run001", 30*time.Second, TimedTask(ch, 15*time.Second), func() {
ch <- nil
close(ch)
})
@@ -76,7 +76,7 @@ var _ = Describe("Runner", func() {
// channel, so the task can finish
// Worst case, the task will finish after 15 secs
ch := make(chan error)
r := runner.New("run001", TimedTask(ch, 15*time.Second), func() {
r := runner.New("run001", 30*time.Second, TimedTask(ch, 15*time.Second), func() {
ch <- nil
close(ch)
})
@@ -106,7 +106,7 @@ var _ = Describe("Runner", func() {
It("Task finishes naturally", func(ctx SpecContext) {
e := errors.New("overslept!")
r := runner.New("run002", func() error {
r := runner.New("run002", 30*time.Second, func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
@@ -134,7 +134,7 @@ var _ = Describe("Runner", func() {
}, SpecTimeout(5*time.Second))
It("Task doesn't finish", func(ctx SpecContext) {
r := runner.New("run003", func() error {
r := runner.New("run003", 30*time.Second, func() error {
time.Sleep(20 * time.Second)
return nil
}, func() {
@@ -156,9 +156,37 @@ var _ = Describe("Runner", func() {
Consistently(ctx, ch2).WithTimeout(4500 * time.Millisecond).ShouldNot(Receive())
}, SpecTimeout(5*time.Second))
It("Task doesn't finish and times out", func(ctx SpecContext) {
r := runner.New("run003", 3*time.Second, func() error {
time.Sleep(20 * time.Second)
return nil
}, func() {
})
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
ch2 := make(chan *runner.Result)
go func(ch2 chan *runner.Result) {
ch2 <- r.Run(myCtx)
close(ch2)
}(ch2)
var expectedResult *runner.Result
// Task will finish naturally in 60 secs
// Task's context will finish in 1 sec, but task won't receive
// the notification and it will keep going
// Task will time out in 3 seconds after being interrupted (when
// context is done), so test should finish in 4 seconds
Eventually(ctx, ch2).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("run003"))
Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("timed out"))
}, SpecTimeout(5*time.Second))
It("Run mutiple times panics", func(ctx SpecContext) {
e := errors.New("overslept!")
r := runner.New("run002", func() error {
r := runner.New("run002", 30*time.Second, func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
@@ -180,7 +208,7 @@ var _ = Describe("Runner", func() {
ch := make(chan *runner.Result)
e := errors.New("Task has finished")
r := runner.New("run004", func() error {
r := runner.New("run004", 30*time.Second, func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
@@ -199,7 +227,7 @@ var _ = Describe("Runner", func() {
ch := make(chan *runner.Result)
e := errors.New("Task has finished")
r := runner.New("run004", func() error {
r := runner.New("run004", 30*time.Second, func() error {
time.Sleep(50 * time.Millisecond)
return e
}, func() {
@@ -217,7 +245,7 @@ var _ = Describe("Runner", func() {
e := errors.New("Task interrupted")
taskCh := make(chan error)
r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() {
r := runner.New("run005", 30*time.Second, TimedTask(taskCh, 20*time.Second), func() {
taskCh <- e
close(taskCh)
})
@@ -233,12 +261,33 @@ var _ = Describe("Runner", func() {
Eventually(ctx, ch).Should(Receive(Equal(expectedResult)))
}, SpecTimeout(5*time.Second))
It("Interrupt async times out", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task interrupted")
r := runner.New("run005", 3*time.Second, func() error {
time.Sleep(30 * time.Second)
return e
}, func() {
})
r.RunAsync(ch)
r.Interrupt()
var expectedResult *runner.Result
// Task will timeout after 3 second of receiving the interruption
Eventually(ctx, ch).Should(Receive(&expectedResult))
Expect(expectedResult.RunnerID).To(Equal("run005"))
Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("timed out"))
}, SpecTimeout(5*time.Second))
It("Interrupt async multiple times", func(ctx SpecContext) {
ch := make(chan *runner.Result)
e := errors.New("Task interrupted")
taskCh := make(chan error)
r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() {
r := runner.New("run005", 30*time.Second, TimedTask(taskCh, 20*time.Second), func() {
taskCh <- e
close(taskCh)
})
@@ -260,7 +309,7 @@ var _ = Describe("Runner", func() {
Describe("Finished", func() {
It("Finish channel closes", func(ctx SpecContext) {
r := runner.New("run006", func() error {
r := runner.New("run006", 30*time.Second, func() error {
time.Sleep(50 * time.Millisecond)
return nil
}, func() {