From c43b9d9bc6d648f37f1abf2af0177da73124096b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 4 Dec 2023 08:59:59 +0100 Subject: [PATCH] Allow for retrying postprocessing steps with an exponential backoff --- services/antivirus/pkg/service/service.go | 3 ++ services/postprocessing/README.md | 10 +++- services/postprocessing/pkg/config/config.go | 3 ++ .../pkg/config/defaults/defaultconfig.go | 4 ++ .../pkg/postprocessing/postprocessing.go | 42 +++++++++++----- .../postprocessing/pkg/service/service.go | 50 ++++++++++++++++--- 6 files changed, 92 insertions(+), 20 deletions(-) diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 948be0c77..352e3390f 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -170,7 +170,10 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { outcome = av.o case !res.Infected && err == nil: outcome = events.PPOutcomeContinue + case err != nil: + outcome = events.PPOutcomeRetry default: + // Not sure what this is about. abort. outcome = events.PPOutcomeAbort } diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index 115240fcc..5c7f82082 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -57,7 +57,15 @@ For using custom postprocessing steps you need a custom service listening to the #### Workflow When setting a custom postprocessing step (eg. `"customstep"`) the postprocessing service will eventually sent an event during postprocessing. The event will be of type `StartPostprocessingStep` with its field `StepToStart` set to `"customstep"`. When the custom service receives this event it can safely execute its actions, postprocessing service will wait until it has finished its work. The event contains further information (filename, executing user, size, ...) and also required tokens and urls to download the file in case byte inspection is necessary. -Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case). +Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of the following: + +- "delete": abort postprocessing, delete the file +- "abort": abort postprocessing, keep the file +- "retry": there was a temporary issue that will likely be resolved soon. retry the current step after some backoff duration +- "continue": continue postprocessing, this is the success case + +The backoff behavior can be configured using the `POSTPROCESSING_RETRY_BACKOFF_DURATION` and `POSTPROCESSING_MAX_RETRIES` environment variables. The backoff duration is calculated using the following formula after each failure: `backoff_duration = POSTPROCESSING_RETRY_BACKOFF_DURATION * 2^(number of failures - 1)`. +Steps that still don't succeed after the maximum number of retries will be moved into the `abort` state. See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions. diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 9d4d1ca18..66bc772da 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -28,6 +28,9 @@ type Postprocessing struct { Events Events `yaml:"events"` Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details."` Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details."` + + RetryBackoffDuration time.Duration `yaml:"retry_backoff_duration" env:"POSTPROCESSING_RETRY_BACKOFF_DURATION" desc:"The base for the exponential backoff duration before retrying a failed postprocessing step. See the Environment Variable Types description for more details."` + MaxRetries int `yaml:"max_retries" env:"POSTPROCESSING_MAX_RETRIES" desc:"The maximum number of retries for a failed postprocessing step. See the Environment Variable Types description for more details."` } // Events combines the configuration options for the event bus. diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index a1e76b555..11cf3f02a 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -1,6 +1,8 @@ package defaults import ( + "time" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" ) @@ -29,6 +31,8 @@ func DefaultConfig() *config.Config { Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + RetryBackoffDuration: 5 * time.Second, + MaxRetries: 14, }, Store: config.Store{ Store: "memory", diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index 29188e7dc..b2764f43f 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -1,11 +1,13 @@ package postprocessing import ( + "math" "time" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" ) // Postprocessing handles postprocessing of a file @@ -18,7 +20,9 @@ type Postprocessing struct { ResourceID *provider.ResourceId Steps []events.Postprocessingstep Status Status - PPDelay time.Duration + Failures int + + config config.Postprocessing } // Status is helper struct to show current postprocessing status @@ -28,16 +32,9 @@ type Status struct { } // New returns a new postprocessing instance -func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing { +func New(config config.Postprocessing) *Postprocessing { return &Postprocessing{ - ID: uploadID, - URL: uploadURL, - User: user, - Filename: filename, - Filesize: filesize, - ResourceID: resourceID, - Steps: steps, - PPDelay: delay, + config: config, } } @@ -55,9 +52,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa switch ev.Outcome { case events.PPOutcomeContinue: return pp.next(ev.FinishedStep) + case events.PPOutcomeRetry: + pp.Failures++ + if pp.Failures > pp.config.MaxRetries { + return pp.finished(events.PPOutcomeAbort) + } + return pp.retry() default: return pp.finished(ev.Outcome) - } } @@ -71,10 +73,15 @@ func (pp *Postprocessing) CurrentStep() interface{} { // Delay will sleep the configured time then continue func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { - time.Sleep(pp.PPDelay) + time.Sleep(pp.config.Delayprocessing) return pp.next(events.PPStepDelay) } +// BackoffDuration calculates the duration for exponential backoff based on the number of failures. +func (pp *Postprocessing) BackoffDuration() time.Duration { + return pp.config.RetryBackoffDuration * time.Duration(math.Pow(2, float64(pp.Failures-1))) +} + func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { l := len(pp.Steps) for i, s := range pp.Steps { @@ -107,3 +114,14 @@ func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events. Outcome: outcome, } } + +func (pp *Postprocessing) retry() events.PostprocessingRetry { + pp.Status.Outcome = events.PPOutcomeRetry + return events.PostprocessingRetry{ + UploadID: pp.ID, + ExecutingUser: pp.User, + Filename: pp.Filename, + Failures: pp.Failures, + BackoffDuration: pp.BackoffDuration(), + } +} diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 032d2c8b1..50355001b 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/cs3org/reva/v2/pkg/events" "github.com/owncloud/ocis/v2/ocis-pkg/log" @@ -89,24 +90,54 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { switch ev := e.Event.(type) { case events.BytesReceived: - pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) + pp = &postprocessing.Postprocessing{ + ID: ev.UploadID, + URL: ev.URL, + User: ev.ExecutingUser, + Filename: ev.Filename, + Filesize: ev.Filesize, + ResourceID: ev.ResourceID, + Steps: pps.steps, + } next = pp.Init(ev) case events.PostprocessingStepFinished: if ev.UploadID == "" { // no current upload - this was an on demand scan return nil } - pp, err = getPP(pps.store, ev.UploadID) + pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") return fmt.Errorf("%w: cannot get upload", errEvent) } next = pp.NextStep(ev) + + switch pp.Status.Outcome { + case events.PPOutcomeRetry: + // schedule retry + backoff := pp.BackoffDuration() + go func() { + time.Sleep(backoff) + retryEvent := events.StartPostprocessingStep{ + UploadID: pp.ID, + URL: pp.URL, + ExecutingUser: pp.User, + Filename: pp.Filename, + Filesize: pp.Filesize, + ResourceID: pp.ResourceID, + StepToStart: pp.Status.CurrentStep, + } + err := events.Publish(ctx, pps.pub, retryEvent) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event") + } + }() + } case events.StartPostprocessingStep: if ev.StepToStart != events.PPStepDelay { return nil } - pp, err = getPP(pps.store, ev.UploadID) + pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") return fmt.Errorf("%w: cannot get upload", errEvent) @@ -119,7 +150,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { return fmt.Errorf("%w: cannot delete upload", errEvent) } case events.ResumePostprocessing: - pp, err = getPP(pps.store, ev.UploadID) + pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { if err == store.ErrNotFound { if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ @@ -175,7 +206,7 @@ func storePP(sto store.Store, pp *postprocessing.Postprocessing) error { }) } -func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) { +func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) { recs, err := sto.Read(uploadID) if err != nil { return nil, err @@ -185,6 +216,11 @@ func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, er return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs)) } - var pp postprocessing.Postprocessing - return &pp, json.Unmarshal(recs[0].Value, &pp) + pp := postprocessing.New(pps.c) + err = json.Unmarshal(recs[0].Value, pp) + if err != nil { + return nil, err + } + + return pp, nil }