mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-06 04:09:40 -06:00
Allow for retrying postprocessing steps with an exponential backoff
This commit is contained in:
@@ -28,6 +28,9 @@ type Postprocessing struct {
|
||||
Events Events `yaml:"events"`
|
||||
Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details."`
|
||||
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details."`
|
||||
|
||||
RetryBackoffDuration time.Duration `yaml:"retry_backoff_duration" env:"POSTPROCESSING_RETRY_BACKOFF_DURATION" desc:"The base for the exponential backoff duration before retrying a failed postprocessing step. See the Environment Variable Types description for more details."`
|
||||
MaxRetries int `yaml:"max_retries" env:"POSTPROCESSING_MAX_RETRIES" desc:"The maximum number of retries for a failed postprocessing step. See the Environment Variable Types description for more details."`
|
||||
}
|
||||
|
||||
// Events combines the configuration options for the event bus.
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package defaults
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
|
||||
)
|
||||
|
||||
@@ -29,6 +31,8 @@ func DefaultConfig() *config.Config {
|
||||
Endpoint: "127.0.0.1:9233",
|
||||
Cluster: "ocis-cluster",
|
||||
},
|
||||
RetryBackoffDuration: 5 * time.Second,
|
||||
MaxRetries: 14,
|
||||
},
|
||||
Store: config.Store{
|
||||
Store: "memory",
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package postprocessing
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
||||
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
|
||||
)
|
||||
|
||||
// Postprocessing handles postprocessing of a file
|
||||
@@ -18,7 +20,9 @@ type Postprocessing struct {
|
||||
ResourceID *provider.ResourceId
|
||||
Steps []events.Postprocessingstep
|
||||
Status Status
|
||||
PPDelay time.Duration
|
||||
Failures int
|
||||
|
||||
config config.Postprocessing
|
||||
}
|
||||
|
||||
// Status is helper struct to show current postprocessing status
|
||||
@@ -28,16 +32,9 @@ type Status struct {
|
||||
}
|
||||
|
||||
// New returns a new postprocessing instance
|
||||
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
|
||||
func New(config config.Postprocessing) *Postprocessing {
|
||||
return &Postprocessing{
|
||||
ID: uploadID,
|
||||
URL: uploadURL,
|
||||
User: user,
|
||||
Filename: filename,
|
||||
Filesize: filesize,
|
||||
ResourceID: resourceID,
|
||||
Steps: steps,
|
||||
PPDelay: delay,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,9 +52,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
|
||||
switch ev.Outcome {
|
||||
case events.PPOutcomeContinue:
|
||||
return pp.next(ev.FinishedStep)
|
||||
case events.PPOutcomeRetry:
|
||||
pp.Failures++
|
||||
if pp.Failures > pp.config.MaxRetries {
|
||||
return pp.finished(events.PPOutcomeAbort)
|
||||
}
|
||||
return pp.retry()
|
||||
default:
|
||||
return pp.finished(ev.Outcome)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,10 +73,15 @@ func (pp *Postprocessing) CurrentStep() interface{} {
|
||||
|
||||
// Delay will sleep the configured time then continue
|
||||
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
|
||||
time.Sleep(pp.PPDelay)
|
||||
time.Sleep(pp.config.Delayprocessing)
|
||||
return pp.next(events.PPStepDelay)
|
||||
}
|
||||
|
||||
// BackoffDuration calculates the duration for exponential backoff based on the number of failures.
|
||||
func (pp *Postprocessing) BackoffDuration() time.Duration {
|
||||
return pp.config.RetryBackoffDuration * time.Duration(math.Pow(2, float64(pp.Failures-1)))
|
||||
}
|
||||
|
||||
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
|
||||
l := len(pp.Steps)
|
||||
for i, s := range pp.Steps {
|
||||
@@ -107,3 +114,14 @@ func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.
|
||||
Outcome: outcome,
|
||||
}
|
||||
}
|
||||
|
||||
func (pp *Postprocessing) retry() events.PostprocessingRetry {
|
||||
pp.Status.Outcome = events.PPOutcomeRetry
|
||||
return events.PostprocessingRetry{
|
||||
UploadID: pp.ID,
|
||||
ExecutingUser: pp.User,
|
||||
Filename: pp.Filename,
|
||||
Failures: pp.Failures,
|
||||
BackoffDuration: pp.BackoffDuration(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
@@ -89,24 +90,54 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
|
||||
switch ev := e.Event.(type) {
|
||||
case events.BytesReceived:
|
||||
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
|
||||
pp = &postprocessing.Postprocessing{
|
||||
ID: ev.UploadID,
|
||||
URL: ev.URL,
|
||||
User: ev.ExecutingUser,
|
||||
Filename: ev.Filename,
|
||||
Filesize: ev.Filesize,
|
||||
ResourceID: ev.ResourceID,
|
||||
Steps: pps.steps,
|
||||
}
|
||||
next = pp.Init(ev)
|
||||
case events.PostprocessingStepFinished:
|
||||
if ev.UploadID == "" {
|
||||
// no current upload - this was an on demand scan
|
||||
return nil
|
||||
}
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
pp, err = pps.getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
return fmt.Errorf("%w: cannot get upload", errEvent)
|
||||
}
|
||||
next = pp.NextStep(ev)
|
||||
|
||||
switch pp.Status.Outcome {
|
||||
case events.PPOutcomeRetry:
|
||||
// schedule retry
|
||||
backoff := pp.BackoffDuration()
|
||||
go func() {
|
||||
time.Sleep(backoff)
|
||||
retryEvent := events.StartPostprocessingStep{
|
||||
UploadID: pp.ID,
|
||||
URL: pp.URL,
|
||||
ExecutingUser: pp.User,
|
||||
Filename: pp.Filename,
|
||||
Filesize: pp.Filesize,
|
||||
ResourceID: pp.ResourceID,
|
||||
StepToStart: pp.Status.CurrentStep,
|
||||
}
|
||||
err := events.Publish(ctx, pps.pub, retryEvent)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
|
||||
}
|
||||
}()
|
||||
}
|
||||
case events.StartPostprocessingStep:
|
||||
if ev.StepToStart != events.PPStepDelay {
|
||||
return nil
|
||||
}
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
pp, err = pps.getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
return fmt.Errorf("%w: cannot get upload", errEvent)
|
||||
@@ -119,7 +150,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
return fmt.Errorf("%w: cannot delete upload", errEvent)
|
||||
}
|
||||
case events.ResumePostprocessing:
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
pp, err = pps.getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
if err == store.ErrNotFound {
|
||||
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
|
||||
@@ -175,7 +206,7 @@ func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
|
||||
})
|
||||
}
|
||||
|
||||
func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
|
||||
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
|
||||
recs, err := sto.Read(uploadID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -185,6 +216,11 @@ func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, er
|
||||
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
|
||||
}
|
||||
|
||||
var pp postprocessing.Postprocessing
|
||||
return &pp, json.Unmarshal(recs[0].Value, &pp)
|
||||
pp := postprocessing.New(pps.c)
|
||||
err = json.Unmarshal(recs[0].Value, pp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pp, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user