From ef32af6402362dd7b2836c25ce2b60d2f79c7772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Villaf=C3=A1=C3=B1ez?= Date: Mon, 8 Apr 2024 14:22:17 +0200 Subject: [PATCH] feat: add runners to startup the ocis' services --- ocis-pkg/runner/grouprunner.go | 130 +++++++++++++++++++++++++++++++++ ocis-pkg/runner/runner.go | 104 ++++++++++++++++++++++++++ ocis-pkg/runner/types.go | 35 +++++++++ 3 files changed, 269 insertions(+) create mode 100644 ocis-pkg/runner/grouprunner.go create mode 100644 ocis-pkg/runner/runner.go create mode 100644 ocis-pkg/runner/types.go diff --git a/ocis-pkg/runner/grouprunner.go b/ocis-pkg/runner/grouprunner.go new file mode 100644 index 000000000..614a027e4 --- /dev/null +++ b/ocis-pkg/runner/grouprunner.go @@ -0,0 +1,130 @@ +package runner + +import ( + "context" +) + +// GroupRunner represent a group of tasks that need to run together. +// The expectation is that all the tasks will run at the same time, and when +// one of them stops, the rest will also stop. +// +// The GroupRunner is intended to be used to run multiple services, which are +// more or less independent from eachother, but at the same time it doesn't +// make sense to have any of them stopped while the rest are running. +// Basically, either all of them run, or none of them. +// For example, you can have a GRPC and HTTP servers running, each of them +// providing a piece of functionality, however, if any of them fails, the +// feature provided by them would be incomplete or broken. +// +// Note that, as services, the task aren't expected to stop by default. +// This means that, if a task finishes naturally, the rest of the task will +// asked to stop as well. +type GroupRunner struct { + runners []*Runner +} + +// NewGroupRunner will create a GroupRunner +func NewGroupRunner() *GroupRunner { + return &GroupRunner{ + runners: []*Runner{}, + } +} + +// Add will add a runner to the group. +// +// It's mandatory that each runner in the group has an unique id, otherwise +// there will be issues +func (gr *GroupRunner) Add(r *Runner) { + gr.runners = append(gr.runners, r) +} + +// Run will execute all the tasks in the group at the same time. +// +// Similarly to the "regular" runner's `Run` method, the execution thread +// will be blocked here until all tasks are completed, and their results +// will be available (each result will have the runner's id so it's easy to +// find which one failed). Note that there is no guarantee about the result's +// order, so the first result in the slice might or might not be the first +// result to be obtained. +// +// When the context is marked as done, the groupRunner will call all the +// stoppers for each runner to notify each task to stop. Note that the tasks +// might still take a while to complete. +// +// If a task finishes naturally (with the context still "alive"), it will also +// cause the groupRunner to call the stoppers of the rest of the tasks. So if +// a task finishes, the rest will also finish. +// Note that it is NOT expected for the finished task's stopper to be called +// in this case. +func (gr *GroupRunner) Run(ctx context.Context) []*Result { + results := make(map[string]*Result) + + ch := make(chan *Result, len(gr.runners)) // no need to block writing results + for _, runner := range gr.runners { + runner.RunNoContext(ch) + } + + // wait for a result or for the context to be done + select { + case result := <-ch: + results[result.RunnerID] = result + case <-ctx.Done(): + // Do nothing + } + + // interrupt the rest of the runners + for _, runner := range gr.runners { + if _, ok := results[runner.ID]; !ok { + // there might still be race conditions because the result might not have + // been made available even though the runner has finished + runner.Interrupt() + } + } + + // Having notified that the context has been finished, we still need to + // wait for the rest of the results + for i := len(results); i < len(gr.runners); i++ { + result := <-ch + results[result.RunnerID] = result + } + + close(ch) + + values := make([]*Result, 0, len(gr.runners)) + for _, val := range results { + values = append(values, val) + } + return values +} + +// RunNoContext will execute the tasks in the group asynchronously. +// Each tasks will run a separated goroutine (one goroutine per task), and then +// this method will finish. +// The tasks' result will be available in the provided channel when it's +// available, so you can wait for it if needed. It's up to you to decide +// to use a blocking or non-blocking channel, but the task will always finish +// before writing in the channel. +// +// This method guarantees that there will be the same number of results as the +// number of provided tasks (one result per task), but it won't guarantee +// any order +func (gr *GroupRunner) RunNoContext(ch chan<- *Result) { + for _, runner := range gr.runners { + runner.RunNoContext(ch) + } +} + +// Interrupt will execute the stopper function of ALL the tasks, which should +// notify the tasks in order for them to finish. +// The stoppers will be called immediately but sequentially. This means that +// the second stopper won't be called until the first one has returned. This +// usually isn't a problem because the service `Stop`'s methods either don't +// take a long time to return, or they run asynchronously in another goroutine. +// +// As said, this will affect ALL the tasks in the group. It isn't possible to +// try to stop just one task. +func (gr *GroupRunner) Interrupt() { + for _, runner := range gr.runners { + runner.Interrupt() + } +} diff --git a/ocis-pkg/runner/runner.go b/ocis-pkg/runner/runner.go new file mode 100644 index 000000000..d52edf6cf --- /dev/null +++ b/ocis-pkg/runner/runner.go @@ -0,0 +1,104 @@ +package runner + +import ( + "context" +) + +// Runner represents the one executing a long running task, such as a server +// or a service. +// The ID of the runner is public to make identification easier, and the +// Result that it will generated will contain the same ID, so we can +// know which runner provided which result. +type Runner struct { + ID string + fn Runable + interrupt Stopper +} + +// NewRunner will create a new runner. +// The runner will be created with the provided id (the id must be unique, +// otherwise undefined behavior might occur), and will run the provided +// runable task, using the "interrupt" function to stop that task if needed. +// +// Note that it's your responsibility to provide a proper stopper for the task. +// The runner will just call that method assuming it will be enough to +// eventually stop the task at some point. +func NewRunner(id string, fn Runable, interrupt Stopper) *Runner { + return &Runner{ + ID: id, + fn: fn, + interrupt: interrupt, + } +} + +// Run will execute the task associated to this runner in a synchronous way. +// The task will be spawned in a new goroutine, and the current thread will +// wait until the task finishes. +// +// The task will finish "naturally". The stopper will be called in the +// following ways: +// - Manually calling this runner's `Interrupt` method +// - When the provided context is done +// As said, it's expected that calling the provided stopper will be enough to +// make the task to eventually complete. +// +// Once the task finishes, the result will be returned. +// +// Some nice things you can do: +// - Use signal.NotifyContext(...) to call the stopper and provide a clean +// shutdown procedure when an OS signal is received +// - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task +// for a limited time +func (r *Runner) Run(ctx context.Context) *Result { + ch := make(chan *Result) + + go func(ch chan<- *Result) { + err := r.fn() + + result := &Result{ + RunnerID: r.ID, + RunnerError: err, + } + ch <- result + close(ch) + }(ch) + + select { + case result := <-ch: + return result + case <-ctx.Done(): + r.interrupt() + } + + return <-ch +} + +// RunNoContext will execute the task associated to this runner asynchronously. +// The task will be spawned in a new goroutine and this method will finish. +// The task's result will be written in the provided channel when it's +// available, so you can wait for it if needed. It's up to you to decide +// to use a blocking or non-blocking channel, but the task will always finish +// before writing in the channel. +// +// To interrupt the running task, the only option is to call the `Interrupt` +// method at some point. +func (r *Runner) RunNoContext(ch chan<- *Result) { + go func(ch chan<- *Result) { + err := r.fn() + + result := &Result{ + RunnerID: r.ID, + RunnerError: err, + } + ch <- result + // Do not close the channel here + }(ch) +} + +// Interrupt will execute the stopper function, which should notify the task +// in order for it to finish. +// The stopper will be called immediately, although it's expected the +// consequences to take a while (task might need a while to stop) +func (r *Runner) Interrupt() { + r.interrupt() +} diff --git a/ocis-pkg/runner/types.go b/ocis-pkg/runner/types.go new file mode 100644 index 000000000..4cce8d6bd --- /dev/null +++ b/ocis-pkg/runner/types.go @@ -0,0 +1,35 @@ +package runner + +// Runable represent a task that can be executed by the Runner. +// It expected to be a long running task with an indefinite execution time, +// so it's suitable for servers or services. +// The task can eventually return an error, or nil if the execution finishes +// without errors +type Runable func() error + +// Stopper represent a function that will stop the Runable. +// The stopper acts as a notification to the runable to know that the task +// needs to be finished now. +// +// The stopper won't need to crash the runable or force the runable to stop, +// instead, it will let the runable to know it has to stop and let it finish. +// This means that the runable might still run for a while. +// +// It's recommended the stopper to run asynchronously. This means that the +// stopper might need to spawn a goroutine. The intention is avoid blocking +// the running thread. +// +// Usually, the stoppers are the servers's `Shutdown()` or `Close()` methods, +// that will cause the server to start its shutdown procedure. As said, there +// is no need to force the shutdown, so graceful shutdowns are preferred if +// they're available +type Stopper func() + +// Result represents the result of a runner. +// The result contains the provided runner's id (for easier identification +// in case of multiple results) and the runner's error, which is the result +// of the Runable function (might be nil if no error happened) +type Result struct { + RunnerID string + RunnerError error +}