Merge pull request #6500 from kobergj/PostprocessingRetries

[full-ci] Retry Postprocessing
This commit is contained in:
kobergj
2023-06-14 10:45:27 +02:00
committed by GitHub
12 changed files with 170 additions and 46 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Add functionality to retry postprocessing
Adds a ctl command to manually retry failed postprocessing on uploads
https://github.com/owncloud/ocis/pull/6500

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.6.0
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1
github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

6
go.sum
View File

@@ -625,10 +625,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93 h1:yRhkp28pdpSbEDX+XQtq5ZiZ8jLMRnmuEKwFj9AlzfY=
github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0=
github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 h1:LN4ADWFL8SbuVDCN5d5b63swaEA8D7Ojt39AgUv46qA=
github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0=
github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c h1:Pc5WFgjMoJrp757siaxbYr3FogI1QKgkHT64y+G+3Cc=
github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=

View File

@@ -121,6 +121,7 @@ func (av Antivirus) Run() error {
av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting")
return err
}
continue
}
av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.")

View File

@@ -60,3 +60,19 @@ When setting a custom postprocessing step (eg. `"customstep"`) the postprocessin
Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case).
See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions.
## CLI Commands
### Resume Postprocessing
If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process:
- First find the upload ID of the failed upload.
```bash
ocis storage-users uploads list
```
- Then use the restart command to resume postprocessing of the ID selected.
```bash
ocis postprocessing restart -u <uploadID>
```

View File

@@ -0,0 +1,56 @@
package command
import (
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser"
"github.com/urfave/cli/v2"
)
// RestartPostprocessing cli command to restart postprocessing
func RestartPostprocessing(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "restart",
Usage: "restart postprocessing for an uploadID",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "upload-id",
Aliases: []string{"u"},
Required: true,
Usage: "the uploadid to restart",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := getEventBus(cfg.Postprocessing.Events)
if err != nil {
return err
}
ev := events.ResumePostprocessing{
UploadID: c.String("upload-id"),
Timestamp: utils.TSNow(),
}
if err := events.Publish(stream, ev); err != nil {
return err
}
// go-micro nats implementation uses async publishing,
// therefore we need to manually wait.
//
// FIXME: upstream pr
//
// https://github.com/go-micro/plugins/blob/3e77393890683be4bacfb613bc5751867d584692/v4/events/natsjs/nats.go#L115
time.Sleep(5 * time.Second)
return nil
},
}
}

View File

@@ -15,6 +15,7 @@ func GetCommands(cfg *config.Config) cli.Commands {
Server(cfg),
// interaction with this service
RestartPostprocessing(cfg),
// infos about this service
Health(cfg),

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/go-micro/plugins/v4/events/natsjs"
@@ -42,9 +43,6 @@ func Server(cfg *config.Config) *cli.Command {
gr = run.Group{}
logger = logging.Configure(cfg.Service.Name, cfg.Log)
evtsCfg = cfg.Postprocessing.Events
tlsConf *tls.Config
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
@@ -55,31 +53,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
{
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
bus, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
bus, err := getEventBus(cfg.Postprocessing.Events)
if err != nil {
return err
}
@@ -136,3 +110,32 @@ func Server(cfg *config.Config) *cli.Command {
},
}
}
func getEventBus(evtsCfg config.Events) (events.Stream, error) {
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
return stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
}

View File

@@ -17,9 +17,16 @@ type Postprocessing struct {
Filesize uint64
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
PPDelay time.Duration
}
// Status is helper struct to show current postprocessing status
type Status struct {
CurrentStep events.Postprocessingstep
Outcome events.PostprocessingOutcome
}
// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
return &Postprocessing{
@@ -40,7 +47,7 @@ func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
return pp.finished(events.PPOutcomeContinue)
}
return pp.nextStep(pp.Steps[0])
return pp.step(pp.Steps[0])
}
// NextStep returns the next postprocessing step
@@ -54,6 +61,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
}
}
// CurrentStep returns the current postprocessing step
func (pp *Postprocessing) CurrentStep() interface{} {
if pp.Status.Outcome != "" {
return pp.finished(pp.Status.Outcome)
}
return pp.step(pp.Status.CurrentStep)
}
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
time.Sleep(pp.PPDelay)
@@ -64,13 +79,14 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.Steps)
for i, s := range pp.Steps {
if s == current && i+1 < l {
return pp.nextStep(pp.Steps[i+1])
return pp.step(pp.Steps[i+1])
}
}
return pp.finished(events.PPOutcomeContinue)
}
func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep {
func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep {
pp.Status.CurrentStep = next
return events.StartPostprocessingStep{
UploadID: pp.ID,
URL: pp.URL,
@@ -83,6 +99,7 @@ func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartP
}
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
pp.Status.Outcome = outcome
return events.PostprocessingFinished{
UploadID: pp.ID,
ExecutingUser: pp.User,

View File

@@ -28,6 +28,7 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store
events.StartPostprocessingStep{},
events.UploadReady{},
events.PostprocessingStepFinished{},
events.ResumePostprocessing{},
)
if err != nil {
return nil, err
@@ -46,22 +47,22 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
for e := range pps.events {
var next interface{}
var (
next interface{}
pp *postprocessing.Postprocessing
err error
)
switch ev := e.Event.(type) {
case events.BytesReceived:
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload")
continue
}
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
continue
}
pp, err := getPP(pps.store, ev.UploadID)
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
@@ -71,7 +72,7 @@ func (pps *PostprocessingService) Run() error {
if ev.StepToStart != events.PPStepDelay {
continue
}
pp, err := getPP(pps.store, ev.UploadID)
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
@@ -83,8 +84,21 @@ func (pps *PostprocessingService) Run() error {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
continue
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
}
next = pp.CurrentStep()
}
if pp != nil {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
continue // TODO: should we really continue here?
}
}
if next != nil {
if err := events.Publish(pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")

View File

@@ -171,3 +171,16 @@ func (UploadReady) Unmarshal(v []byte) (interface{}, error) {
err := json.Unmarshal(v, &e)
return e, err
}
// ResumePostprocessing can be emitted to repair broken postprocessing
type ResumePostprocessing struct {
UploadID string
Timestamp *types.Timestamp
}
// Unmarshal to fulfill umarshaller interface
func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) {
e := ResumePostprocessing{}
err := json.Unmarshal(v, &e)
return e, err
}

2
vendor/modules.txt vendored
View File

@@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1
# github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime