From b6a6b6114a4ae2884677f932691d07d96194c4e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Fri, 12 Apr 2024 10:31:50 +0200 Subject: [PATCH] test: add unit tests --- ocis-pkg/runner/grouprunner_test.go | 200 +++++++++++++++++++ ocis-pkg/runner/runner_suite_test.go | 13 ++ ocis-pkg/runner/runner_test.go | 284 +++++++++++++++++++++++++++ 3 files changed, 497 insertions(+) create mode 100644 ocis-pkg/runner/grouprunner_test.go create mode 100644 ocis-pkg/runner/runner_suite_test.go create mode 100644 ocis-pkg/runner/runner_test.go diff --git a/ocis-pkg/runner/grouprunner_test.go b/ocis-pkg/runner/grouprunner_test.go new file mode 100644 index 0000000000..110960a5bf --- /dev/null +++ b/ocis-pkg/runner/grouprunner_test.go @@ -0,0 +1,200 @@ +package runner_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" +) + +var _ = Describe("GroupRunner", func() { + var ( + gr *runner.GroupRunner + ) + + BeforeEach(func() { + gr = runner.NewGroup() + + task1Ch := make(chan error) + task1 := TimedTask(task1Ch, 30*time.Second) + gr.Add(runner.New("task1", task1, func() { + task1Ch <- nil + close(task1Ch) + })) + + task2Ch := make(chan error) + task2 := TimedTask(task2Ch, 20*time.Second) + gr.Add(runner.New("task2", task2, func() { + task2Ch <- nil + close(task2Ch) + })) + }) + + Describe("Add", func() { + It("Duplicated runner id panics", func() { + Expect(func() { + gr.Add(runner.New("task1", func() error { + time.Sleep(6 * time.Second) + return nil + }, func() { + })) + }).To(Panic()) + }) + + It("Add after run panics", func(ctx SpecContext) { + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan []*runner.Result) + go func(ch2 chan []*runner.Result) { + ch2 <- gr.Run(myCtx) + close(ch2) + }(ch2) + + // context is done in 1 sec, so all task should be interrupted and finish + Eventually(ctx, ch2).Should(Receive(ContainElements( + &runner.Result{RunnerID: "task1", RunnerError: nil}, + &runner.Result{RunnerID: "task2", RunnerError: nil}, + ))) + + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 15*time.Second) + Expect(func() { + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + + It("Add after runAsync panics", func(ctx SpecContext) { + ch2 := make(chan *runner.Result) + gr.RunAsync(ch2) + + Expect(func() { + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 15*time.Second) + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + }) + + Describe("Run", func() { + It("Context is done", func(ctx SpecContext) { + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 15*time.Second) + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan []*runner.Result) + go func(ch2 chan []*runner.Result) { + ch2 <- gr.Run(myCtx) + close(ch2) + }(ch2) + + // context is done in 1 sec, so all task should be interrupted and finish + Eventually(ctx, ch2).Should(Receive(ContainElements( + &runner.Result{RunnerID: "task1", RunnerError: nil}, + &runner.Result{RunnerID: "task2", RunnerError: nil}, + &runner.Result{RunnerID: "task3", RunnerError: nil}, + ))) + }, SpecTimeout(5*time.Second)) + + It("One task finishes early", func(ctx SpecContext) { + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 1*time.Second) + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + + // context will be done in 10 second + myCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan []*runner.Result) + go func(ch2 chan []*runner.Result) { + ch2 <- gr.Run(myCtx) + close(ch2) + }(ch2) + + // task3 finishes in 1 sec, so the rest should also be interrupted + Eventually(ctx, ch2).Should(Receive(ContainElements( + &runner.Result{RunnerID: "task1", RunnerError: nil}, + &runner.Result{RunnerID: "task2", RunnerError: nil}, + &runner.Result{RunnerID: "task3", RunnerError: nil}, + ))) + }, SpecTimeout(5*time.Second)) + + It("Doble run panics", func(ctx SpecContext) { + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + Expect(func() { + gr.Run(myCtx) + gr.Run(myCtx) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + }) + + Describe("RunAsync", func() { + It("Wait in channel", func(ctx SpecContext) { + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 1*time.Second) + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + + ch2 := make(chan *runner.Result) + gr.RunAsync(ch2) + + // task3 finishes in 1 sec, so the rest should also be interrupted + Eventually(ctx, ch2).Should(Receive()) + Eventually(ctx, ch2).Should(Receive()) + Eventually(ctx, ch2).Should(Receive()) + }, SpecTimeout(5*time.Second)) + + It("Double runAsync panics", func(ctx SpecContext) { + ch2 := make(chan *runner.Result) + Expect(func() { + gr.RunAsync(ch2) + gr.RunAsync(ch2) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + + It("Interrupt async", func(ctx SpecContext) { + task3Ch := make(chan error) + task3 := TimedTask(task3Ch, 15*time.Second) + gr.Add(runner.New("task3", task3, func() { + task3Ch <- nil + close(task3Ch) + })) + + ch2 := make(chan *runner.Result) + gr.RunAsync(ch2) + gr.Interrupt() + + // tasks will be interrupted + Eventually(ctx, ch2).Should(Receive()) + Eventually(ctx, ch2).Should(Receive()) + Eventually(ctx, ch2).Should(Receive()) + }, SpecTimeout(5*time.Second)) + }) +}) diff --git a/ocis-pkg/runner/runner_suite_test.go b/ocis-pkg/runner/runner_suite_test.go new file mode 100644 index 0000000000..0d45944ed7 --- /dev/null +++ b/ocis-pkg/runner/runner_suite_test.go @@ -0,0 +1,13 @@ +package runner_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestGraph(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Runner Suite") +} diff --git a/ocis-pkg/runner/runner_test.go b/ocis-pkg/runner/runner_test.go new file mode 100644 index 0000000000..bf44fdf798 --- /dev/null +++ b/ocis-pkg/runner/runner_test.go @@ -0,0 +1,284 @@ +package runner_test + +import ( + "context" + "errors" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/owncloud/ocis/v2/ocis-pkg/runner" +) + +// TimedTask will create a task with the specified duration +// The task will finish naturally after the given duration, or +// when it receives from the provided channel +// +// For the related stopper, just reuse the same channel: +// +// func() { +// ch <- nil +// close(ch) +// } +func TimedTask(ch chan error, dur time.Duration) runner.Runable { + return func() error { + timer := time.NewTimer(dur) + defer timer.Stop() + + var result error + select { + case <-timer.C: + // finish the task in 15 secs + case result = <-ch: + // or finish when we receive from the channel + } + return result + } +} + +var _ = Describe("Runner", func() { + Describe("Run", func() { + It("Context is done", func(ctx SpecContext) { + // task will wait until it receives from the channel + // stopper will just send something through the + // channel, so the task can finish + // Worst case, the task will finish after 15 secs + ch := make(chan error) + r := runner.New("run001", TimedTask(ch, 15*time.Second), func() { + ch <- nil + close(ch) + }) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + expectedResult := &runner.Result{ + RunnerID: "run001", + RunnerError: nil, + } + + // a result should be available in ch2 within the 5 secs spec + // (task's context finishes in 1 sec so we expect a 1 sec delay) + Eventually(ctx, ch2).Should(Receive(Equal(expectedResult))) + }, SpecTimeout(5*time.Second)) + + It("Context is done and interrupt after", func(ctx SpecContext) { + // task will wait until it receives from the channel + // stopper will just send something through the + // channel, so the task can finish + // Worst case, the task will finish after 15 secs + ch := make(chan error) + r := runner.New("run001", TimedTask(ch, 15*time.Second), func() { + ch <- nil + close(ch) + }) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + expectedResult := &runner.Result{ + RunnerID: "run001", + RunnerError: nil, + } + + // a result should be available in ch2 within the 5 secs spec + // (task's context finishes in 1 sec so we expect a 1 sec delay) + Eventually(ctx, ch2).Should(Receive(Equal(expectedResult))) + + r.Interrupt() // this shouldn't do anything + }, SpecTimeout(5*time.Second)) + + It("Task finishes naturally", func(ctx SpecContext) { + e := errors.New("overslept!") + r := runner.New("run002", func() error { + time.Sleep(50 * time.Millisecond) + return e + }, func() { + }) + + // context will be done in 1 second (task will finishes before) + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + // spawn a new goroutine and return the result in the channel + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + expectedResult := &runner.Result{ + RunnerID: "run002", + RunnerError: e, + } + + // a result should be available in ch2 within the 5 secs spec + // (task finish naturally in 50 msec) + Eventually(ctx, ch2).Should(Receive(Equal(expectedResult))) + }, SpecTimeout(5*time.Second)) + + It("Task doesn't finish", func(ctx SpecContext) { + r := runner.New("run003", func() error { + time.Sleep(20 * time.Second) + return nil + }, func() { + }) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + // Task will finish naturally in 60 secs + // Task's context will finish in 1 sec, but task won't receive + // the notification and it will keep going + Consistently(ctx, ch2).WithTimeout(4500 * time.Millisecond).ShouldNot(Receive()) + }, SpecTimeout(5*time.Second)) + + It("Run mutiple times panics", func(ctx SpecContext) { + e := errors.New("overslept!") + r := runner.New("run002", func() error { + time.Sleep(50 * time.Millisecond) + return e + }, func() { + }) + + // context will be done in 1 second (task will finishes before) + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + Expect(func() { + r.Run(myCtx) + r.Run(myCtx) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + }) + + Describe("RunAsync", func() { + It("Wait in channel", func(ctx SpecContext) { + ch := make(chan *runner.Result) + e := errors.New("Task has finished") + + r := runner.New("run004", func() error { + time.Sleep(50 * time.Millisecond) + return e + }, func() { + }) + + r.RunAsync(ch) + expectedResult := &runner.Result{ + RunnerID: "run004", + RunnerError: e, + } + + Eventually(ctx, ch).Should(Receive(Equal(expectedResult))) + }, SpecTimeout(5*time.Second)) + + It("Run multiple times panics", func(ctx SpecContext) { + ch := make(chan *runner.Result) + e := errors.New("Task has finished") + + r := runner.New("run004", func() error { + time.Sleep(50 * time.Millisecond) + return e + }, func() { + }) + + r.RunAsync(ch) + + Expect(func() { + r.RunAsync(ch) + }).To(Panic()) + }, SpecTimeout(5*time.Second)) + + It("Interrupt async", func(ctx SpecContext) { + ch := make(chan *runner.Result) + e := errors.New("Task interrupted") + + taskCh := make(chan error) + r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() { + taskCh <- e + close(taskCh) + }) + + r.RunAsync(ch) + r.Interrupt() + + expectedResult := &runner.Result{ + RunnerID: "run005", + RunnerError: e, + } + + Eventually(ctx, ch).Should(Receive(Equal(expectedResult))) + }, SpecTimeout(5*time.Second)) + + It("Interrupt async multiple times", func(ctx SpecContext) { + ch := make(chan *runner.Result) + e := errors.New("Task interrupted") + + taskCh := make(chan error) + r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() { + taskCh <- e + close(taskCh) + }) + + r.RunAsync(ch) + r.Interrupt() + r.Interrupt() + r.Interrupt() + + expectedResult := &runner.Result{ + RunnerID: "run005", + RunnerError: e, + } + + Eventually(ctx, ch).Should(Receive(Equal(expectedResult))) + }, SpecTimeout(5*time.Second)) + }) + + Describe("Finished", func() { + It("Finish channel closes", func(ctx SpecContext) { + + r := runner.New("run006", func() error { + time.Sleep(50 * time.Millisecond) + return nil + }, func() { + }) + + // context will be done in 1 second (task will finishes before) + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + finishedCh := r.Finished() + + Eventually(ctx, finishedCh).Should(BeClosed()) + }, SpecTimeout(5*time.Second)) + }) +})