mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-06 20:29:54 -06:00
feat: add runners to startup the ocis' services
This commit is contained in:
130
ocis-pkg/runner/grouprunner.go
Normal file
130
ocis-pkg/runner/grouprunner.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// GroupRunner represent a group of tasks that need to run together.
|
||||
// The expectation is that all the tasks will run at the same time, and when
|
||||
// one of them stops, the rest will also stop.
|
||||
//
|
||||
// The GroupRunner is intended to be used to run multiple services, which are
|
||||
// more or less independent from eachother, but at the same time it doesn't
|
||||
// make sense to have any of them stopped while the rest are running.
|
||||
// Basically, either all of them run, or none of them.
|
||||
// For example, you can have a GRPC and HTTP servers running, each of them
|
||||
// providing a piece of functionality, however, if any of them fails, the
|
||||
// feature provided by them would be incomplete or broken.
|
||||
//
|
||||
// Note that, as services, the task aren't expected to stop by default.
|
||||
// This means that, if a task finishes naturally, the rest of the task will
|
||||
// asked to stop as well.
|
||||
type GroupRunner struct {
|
||||
runners []*Runner
|
||||
}
|
||||
|
||||
// NewGroupRunner will create a GroupRunner
|
||||
func NewGroupRunner() *GroupRunner {
|
||||
return &GroupRunner{
|
||||
runners: []*Runner{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add will add a runner to the group.
|
||||
//
|
||||
// It's mandatory that each runner in the group has an unique id, otherwise
|
||||
// there will be issues
|
||||
func (gr *GroupRunner) Add(r *Runner) {
|
||||
gr.runners = append(gr.runners, r)
|
||||
}
|
||||
|
||||
// Run will execute all the tasks in the group at the same time.
|
||||
//
|
||||
// Similarly to the "regular" runner's `Run` method, the execution thread
|
||||
// will be blocked here until all tasks are completed, and their results
|
||||
// will be available (each result will have the runner's id so it's easy to
|
||||
// find which one failed). Note that there is no guarantee about the result's
|
||||
// order, so the first result in the slice might or might not be the first
|
||||
// result to be obtained.
|
||||
//
|
||||
// When the context is marked as done, the groupRunner will call all the
|
||||
// stoppers for each runner to notify each task to stop. Note that the tasks
|
||||
// might still take a while to complete.
|
||||
//
|
||||
// If a task finishes naturally (with the context still "alive"), it will also
|
||||
// cause the groupRunner to call the stoppers of the rest of the tasks. So if
|
||||
// a task finishes, the rest will also finish.
|
||||
// Note that it is NOT expected for the finished task's stopper to be called
|
||||
// in this case.
|
||||
func (gr *GroupRunner) Run(ctx context.Context) []*Result {
|
||||
results := make(map[string]*Result)
|
||||
|
||||
ch := make(chan *Result, len(gr.runners)) // no need to block writing results
|
||||
for _, runner := range gr.runners {
|
||||
runner.RunNoContext(ch)
|
||||
}
|
||||
|
||||
// wait for a result or for the context to be done
|
||||
select {
|
||||
case result := <-ch:
|
||||
results[result.RunnerID] = result
|
||||
case <-ctx.Done():
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
// interrupt the rest of the runners
|
||||
for _, runner := range gr.runners {
|
||||
if _, ok := results[runner.ID]; !ok {
|
||||
// there might still be race conditions because the result might not have
|
||||
// been made available even though the runner has finished
|
||||
runner.Interrupt()
|
||||
}
|
||||
}
|
||||
|
||||
// Having notified that the context has been finished, we still need to
|
||||
// wait for the rest of the results
|
||||
for i := len(results); i < len(gr.runners); i++ {
|
||||
result := <-ch
|
||||
results[result.RunnerID] = result
|
||||
}
|
||||
|
||||
close(ch)
|
||||
|
||||
values := make([]*Result, 0, len(gr.runners))
|
||||
for _, val := range results {
|
||||
values = append(values, val)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// RunNoContext will execute the tasks in the group asynchronously.
|
||||
// Each tasks will run a separated goroutine (one goroutine per task), and then
|
||||
// this method will finish.
|
||||
// The tasks' result will be available in the provided channel when it's
|
||||
// available, so you can wait for it if needed. It's up to you to decide
|
||||
// to use a blocking or non-blocking channel, but the task will always finish
|
||||
// before writing in the channel.
|
||||
//
|
||||
// This method guarantees that there will be the same number of results as the
|
||||
// number of provided tasks (one result per task), but it won't guarantee
|
||||
// any order
|
||||
func (gr *GroupRunner) RunNoContext(ch chan<- *Result) {
|
||||
for _, runner := range gr.runners {
|
||||
runner.RunNoContext(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// Interrupt will execute the stopper function of ALL the tasks, which should
|
||||
// notify the tasks in order for them to finish.
|
||||
// The stoppers will be called immediately but sequentially. This means that
|
||||
// the second stopper won't be called until the first one has returned. This
|
||||
// usually isn't a problem because the service `Stop`'s methods either don't
|
||||
// take a long time to return, or they run asynchronously in another goroutine.
|
||||
//
|
||||
// As said, this will affect ALL the tasks in the group. It isn't possible to
|
||||
// try to stop just one task.
|
||||
func (gr *GroupRunner) Interrupt() {
|
||||
for _, runner := range gr.runners {
|
||||
runner.Interrupt()
|
||||
}
|
||||
}
|
||||
104
ocis-pkg/runner/runner.go
Normal file
104
ocis-pkg/runner/runner.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Runner represents the one executing a long running task, such as a server
|
||||
// or a service.
|
||||
// The ID of the runner is public to make identification easier, and the
|
||||
// Result that it will generated will contain the same ID, so we can
|
||||
// know which runner provided which result.
|
||||
type Runner struct {
|
||||
ID string
|
||||
fn Runable
|
||||
interrupt Stopper
|
||||
}
|
||||
|
||||
// NewRunner will create a new runner.
|
||||
// The runner will be created with the provided id (the id must be unique,
|
||||
// otherwise undefined behavior might occur), and will run the provided
|
||||
// runable task, using the "interrupt" function to stop that task if needed.
|
||||
//
|
||||
// Note that it's your responsibility to provide a proper stopper for the task.
|
||||
// The runner will just call that method assuming it will be enough to
|
||||
// eventually stop the task at some point.
|
||||
func NewRunner(id string, fn Runable, interrupt Stopper) *Runner {
|
||||
return &Runner{
|
||||
ID: id,
|
||||
fn: fn,
|
||||
interrupt: interrupt,
|
||||
}
|
||||
}
|
||||
|
||||
// Run will execute the task associated to this runner in a synchronous way.
|
||||
// The task will be spawned in a new goroutine, and the current thread will
|
||||
// wait until the task finishes.
|
||||
//
|
||||
// The task will finish "naturally". The stopper will be called in the
|
||||
// following ways:
|
||||
// - Manually calling this runner's `Interrupt` method
|
||||
// - When the provided context is done
|
||||
// As said, it's expected that calling the provided stopper will be enough to
|
||||
// make the task to eventually complete.
|
||||
//
|
||||
// Once the task finishes, the result will be returned.
|
||||
//
|
||||
// Some nice things you can do:
|
||||
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
|
||||
// shutdown procedure when an OS signal is received
|
||||
// - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task
|
||||
// for a limited time
|
||||
func (r *Runner) Run(ctx context.Context) *Result {
|
||||
ch := make(chan *Result)
|
||||
|
||||
go func(ch chan<- *Result) {
|
||||
err := r.fn()
|
||||
|
||||
result := &Result{
|
||||
RunnerID: r.ID,
|
||||
RunnerError: err,
|
||||
}
|
||||
ch <- result
|
||||
close(ch)
|
||||
}(ch)
|
||||
|
||||
select {
|
||||
case result := <-ch:
|
||||
return result
|
||||
case <-ctx.Done():
|
||||
r.interrupt()
|
||||
}
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// RunNoContext will execute the task associated to this runner asynchronously.
|
||||
// The task will be spawned in a new goroutine and this method will finish.
|
||||
// The task's result will be written in the provided channel when it's
|
||||
// available, so you can wait for it if needed. It's up to you to decide
|
||||
// to use a blocking or non-blocking channel, but the task will always finish
|
||||
// before writing in the channel.
|
||||
//
|
||||
// To interrupt the running task, the only option is to call the `Interrupt`
|
||||
// method at some point.
|
||||
func (r *Runner) RunNoContext(ch chan<- *Result) {
|
||||
go func(ch chan<- *Result) {
|
||||
err := r.fn()
|
||||
|
||||
result := &Result{
|
||||
RunnerID: r.ID,
|
||||
RunnerError: err,
|
||||
}
|
||||
ch <- result
|
||||
// Do not close the channel here
|
||||
}(ch)
|
||||
}
|
||||
|
||||
// Interrupt will execute the stopper function, which should notify the task
|
||||
// in order for it to finish.
|
||||
// The stopper will be called immediately, although it's expected the
|
||||
// consequences to take a while (task might need a while to stop)
|
||||
func (r *Runner) Interrupt() {
|
||||
r.interrupt()
|
||||
}
|
||||
35
ocis-pkg/runner/types.go
Normal file
35
ocis-pkg/runner/types.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package runner
|
||||
|
||||
// Runable represent a task that can be executed by the Runner.
|
||||
// It expected to be a long running task with an indefinite execution time,
|
||||
// so it's suitable for servers or services.
|
||||
// The task can eventually return an error, or nil if the execution finishes
|
||||
// without errors
|
||||
type Runable func() error
|
||||
|
||||
// Stopper represent a function that will stop the Runable.
|
||||
// The stopper acts as a notification to the runable to know that the task
|
||||
// needs to be finished now.
|
||||
//
|
||||
// The stopper won't need to crash the runable or force the runable to stop,
|
||||
// instead, it will let the runable to know it has to stop and let it finish.
|
||||
// This means that the runable might still run for a while.
|
||||
//
|
||||
// It's recommended the stopper to run asynchronously. This means that the
|
||||
// stopper might need to spawn a goroutine. The intention is avoid blocking
|
||||
// the running thread.
|
||||
//
|
||||
// Usually, the stoppers are the servers's `Shutdown()` or `Close()` methods,
|
||||
// that will cause the server to start its shutdown procedure. As said, there
|
||||
// is no need to force the shutdown, so graceful shutdowns are preferred if
|
||||
// they're available
|
||||
type Stopper func()
|
||||
|
||||
// Result represents the result of a runner.
|
||||
// The result contains the provided runner's id (for easier identification
|
||||
// in case of multiple results) and the runner's error, which is the result
|
||||
// of the Runable function (might be nil if no error happened)
|
||||
type Result struct {
|
||||
RunnerID string
|
||||
RunnerError error
|
||||
}
|
||||
Reference in New Issue
Block a user