Merge pull request #7874 from aduffeck/retry-pp

[full-ci] Retry postprocessing steps
This commit is contained in:
Andre Duffeck
2023-12-12 14:56:40 +01:00
committed by GitHub
11 changed files with 114 additions and 37 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1017,8 +1017,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8 h1:Z1i5VmeHNc6n0jIl/Iljfs+gt7bhdcVT/5cNxn1XIs4=
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28 h1:IhBjtl4F/aAUdbpfjWOy1jwzrh1wLOH50UToPPOqJy8=
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -170,7 +170,10 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
outcome = av.o
case !res.Infected && err == nil:
outcome = events.PPOutcomeContinue
case err != nil:
outcome = events.PPOutcomeRetry
default:
// Not sure what this is about. abort.
outcome = events.PPOutcomeAbort
}

View File

@@ -55,9 +55,16 @@ By using the envvar `POSTPROCESSING_STEPS`, custom postprocessing steps can be a
For using custom postprocessing steps you need a custom service listening to the configured event system (see `General Prerequisites`)
#### Workflow
When setting a custom postprocessing step (eg. `"customstep"`) the postprocessing service will eventually sent an event during postprocessing. The event will be of type `StartPostprocessingStep` with its field `StepToStart` set to `"customstep"`. When the custom service receives this event it can safely execute its actions, postprocessing service will wait until it has finished its work. The event contains further information (filename, executing user, size, ...) and also required tokens and urls to download the file in case byte inspection is necessary.
When defining a custom postprocessing step (eg. `"customstep"`), the postprocessing service will eventually send an event during postprocessing. The event will be of type `StartPostprocessingStep` with its field `StepToStart` set to `"customstep"`. When the service defined as custom step receives this event, it can safely execute its actions. The postprocessing service will wait until it has finished its work. The event contains further information (filename, executing user, size, ...) and also requires tokens and URLs to download the file in case byte inspection is necessary.
Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case).
Once the service defined as custom step has finished its work, it should send an event of type `PostprocessingFinished` via the configured events system back to the postprocessing service. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of the following:
- `delete`: Abort postprocessing, delete the file.
- `abort`: Abort postprocessing, keep the file.
- `retry`: There was a problem that was most likely temporary and may be solved by trying again after some backoff duration. Retry runs automatically and is defined by the backoff behavior as described below.
- `continue`: Continue postprocessing, this is the success case.
The backoff behavior as mentioned in the `retry` outcome can be configured using the `POSTPROCESSING_RETRY_BACKOFF_DURATION` and `POSTPROCESSING_MAX_RETRIES` environment variables. The backoff duration is calculated using the following formula after each failure: `backoff_duration = POSTPROCESSING_RETRY_BACKOFF_DURATION * 2^(number of failures - 1)`. This means that the time between the next round grows exponentially limited by the number of retries. Steps that still don't succeed after the maximum number of retries will be automatically moved to the `abort` state.
See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions.

View File

@@ -28,6 +28,9 @@ type Postprocessing struct {
Events Events `yaml:"events"`
Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details."`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details."`
RetryBackoffDuration time.Duration `yaml:"retry_backoff_duration" env:"POSTPROCESSING_RETRY_BACKOFF_DURATION" desc:"The base for the exponential backoff duration before retrying a failed postprocessing step. See the Environment Variable Types description for more details."`
MaxRetries int `yaml:"max_retries" env:"POSTPROCESSING_MAX_RETRIES" desc:"The maximum number of retries for a failed postprocessing step."`
}
// Events combines the configuration options for the event bus.

View File

@@ -1,6 +1,8 @@
package defaults
import (
"time"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
@@ -29,6 +31,8 @@ func DefaultConfig() *config.Config {
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
RetryBackoffDuration: 5 * time.Second,
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",

View File

@@ -1,11 +1,13 @@
package postprocessing
import (
"math"
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// Postprocessing handles postprocessing of a file
@@ -18,7 +20,9 @@ type Postprocessing struct {
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
PPDelay time.Duration
Failures int
config config.Postprocessing
}
// Status is helper struct to show current postprocessing status
@@ -28,16 +32,9 @@ type Status struct {
}
// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
func New(config config.Postprocessing) *Postprocessing {
return &Postprocessing{
ID: uploadID,
URL: uploadURL,
User: user,
Filename: filename,
Filesize: filesize,
ResourceID: resourceID,
Steps: steps,
PPDelay: delay,
config: config,
}
}
@@ -55,9 +52,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(ev.FinishedStep)
case events.PPOutcomeRetry:
pp.Failures++
if pp.Failures > pp.config.MaxRetries {
return pp.finished(events.PPOutcomeAbort)
}
return pp.retry()
default:
return pp.finished(ev.Outcome)
}
}
@@ -71,10 +73,15 @@ func (pp *Postprocessing) CurrentStep() interface{} {
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
time.Sleep(pp.PPDelay)
time.Sleep(pp.config.Delayprocessing)
return pp.next(events.PPStepDelay)
}
// BackoffDuration calculates the duration for exponential backoff based on the number of failures.
func (pp *Postprocessing) BackoffDuration() time.Duration {
return pp.config.RetryBackoffDuration * time.Duration(math.Pow(2, float64(pp.Failures-1)))
}
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.Steps)
for i, s := range pp.Steps {
@@ -107,3 +114,14 @@ func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.
Outcome: outcome,
}
}
func (pp *Postprocessing) retry() events.PostprocessingRetry {
pp.Status.Outcome = events.PPOutcomeRetry
return events.PostprocessingRetry{
UploadID: pp.ID,
ExecutingUser: pp.User,
Filename: pp.Filename,
Failures: pp.Failures,
BackoffDuration: pp.BackoffDuration(),
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
@@ -89,24 +90,54 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
switch ev := e.Event.(type) {
case events.BytesReceived:
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
pp = &postprocessing.Postprocessing{
ID: ev.UploadID,
URL: ev.URL,
User: ev.ExecutingUser,
Filename: ev.Filename,
Filesize: ev.Filesize,
ResourceID: ev.ResourceID,
Steps: pps.steps,
}
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.NextStep(ev)
switch pp.Status.Outcome {
case events.PPOutcomeRetry:
// schedule retry
backoff := pp.BackoffDuration()
go func() {
time.Sleep(backoff)
retryEvent := events.StartPostprocessingStep{
UploadID: pp.ID,
URL: pp.URL,
ExecutingUser: pp.User,
Filename: pp.Filename,
Filesize: pp.Filesize,
ResourceID: pp.ResourceID,
StepToStart: pp.Status.CurrentStep,
}
err := events.Publish(ctx, pps.pub, retryEvent)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
}
}()
}
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
@@ -119,7 +150,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
return fmt.Errorf("%w: cannot delete upload", errEvent)
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
if err == store.ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
@@ -175,7 +206,7 @@ func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
})
}
func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
return nil, err
@@ -185,6 +216,11 @@ func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, er
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}
var pp postprocessing.Postprocessing
return &pp, json.Unmarshal(recs[0].Value, &pp)
pp := postprocessing.New(pps.c)
err = json.Unmarshal(recs[0].Value, pp)
if err != nil {
return nil, err
}
return pp, nil
}

View File

@@ -49,6 +49,8 @@ var (
PPOutcomeAbort PostprocessingOutcome = "abort"
// PPOutcomeContinue means that the upload is moved to its final destination (eventually being marked with pp results)
PPOutcomeContinue PostprocessingOutcome = "continue"
// PPOutcomeRetry means that there was a temporary issue and the postprocessing should be retried at a later point in time
PPOutcomeRetry PostprocessingOutcome = "retry"
)
// BytesReceived is emitted by the server when it received all bytes of an upload
@@ -153,6 +155,22 @@ func (PostprocessingFinished) Unmarshal(v []byte) (interface{}, error) {
return e, err
}
// PostprocessingRetry is emitted by *some* service which can decide that
type PostprocessingRetry struct {
UploadID string
Filename string
ExecutingUser *user.User
Failures int
BackoffDuration time.Duration
}
// Unmarshal to fulfill umarshaller interface
func (PostprocessingRetry) Unmarshal(v []byte) (interface{}, error) {
e := PostprocessingRetry{}
err := json.Unmarshal(v, &e)
return e, err
}
// UploadReady is emitted by the storage provider when postprocessing is finished
type UploadReady struct {
UploadID string

View File

@@ -174,12 +174,6 @@ func (c *Client) ReadDir(path string) ([]os.FileInfo, error) {
err := c.propfind(path, false,
`<d:propfind xmlns:d='DAV:'>
<d:prop>
<d:displayname/>
<d:resourcetype/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getetag/>
<d:getlastmodified/>
</d:prop>
</d:propfind>`,
&response{},
@@ -226,12 +220,6 @@ func (c *Client) Stat(path string) (os.FileInfo, error) {
err := c.propfind(path, true,
`<d:propfind xmlns:d='DAV:'>
<d:prop>
<d:displayname/>
<d:resourcetype/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getetag/>
<d:getlastmodified/>
</d:prop>
</d:propfind>`,
&response{},

2
vendor/modules.txt vendored
View File

@@ -359,7 +359,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8
# github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime