diff --git a/changelog/unreleased/retry-postprocessing.md b/changelog/unreleased/retry-postprocessing.md new file mode 100644 index 0000000000..47e9c6bab1 --- /dev/null +++ b/changelog/unreleased/retry-postprocessing.md @@ -0,0 +1,5 @@ +Enhancement: Add functionality to retry postprocessing + +Adds a ctl command to manually retry failed postprocessing on uploads + +https://github.com/owncloud/ocis/pull/6500 diff --git a/go.mod b/go.mod index f6c42fb317..2c1d47c5a5 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.6.0 github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d - github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 + github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e github.com/egirna/icap-client v0.1.1 diff --git a/go.sum b/go.sum index 04528336e6..392aafad59 100644 --- a/go.sum +++ b/go.sum @@ -625,10 +625,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93 h1:yRhkp28pdpSbEDX+XQtq5ZiZ8jLMRnmuEKwFj9AlzfY= -github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= -github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 h1:LN4ADWFL8SbuVDCN5d5b63swaEA8D7Ojt39AgUv46qA= -github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= +github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c h1:Pc5WFgjMoJrp757siaxbYr3FogI1QKgkHT64y+G+3Cc= +github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 09952d9033..5b3fac1462 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -121,6 +121,7 @@ func (av Antivirus) Run() error { av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") return err } + continue } av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index dd13d92d51..c8cd1182f5 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -60,3 +60,19 @@ When setting a custom postprocessing step (eg. `"customstep"`) the postprocessin Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case). See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions. + +## CLI Commands + +### Resume Postprocessing + +If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process: + +- First find the upload ID of the failed upload. +```bash +ocis storage-users uploads list +``` + +- Then use the restart command to resume postprocessing of the ID selected. +```bash +ocis postprocessing restart -u +``` diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go new file mode 100644 index 0000000000..999727f8cb --- /dev/null +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -0,0 +1,56 @@ +package command + +import ( + "time" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" + "github.com/urfave/cli/v2" +) + +// RestartPostprocessing cli command to restart postprocessing +func RestartPostprocessing(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "restart", + Usage: "restart postprocessing for an uploadID", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "upload-id", + Aliases: []string{"u"}, + Required: true, + Usage: "the uploadid to restart", + }, + }, + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + stream, err := getEventBus(cfg.Postprocessing.Events) + if err != nil { + return err + } + + ev := events.ResumePostprocessing{ + UploadID: c.String("upload-id"), + Timestamp: utils.TSNow(), + } + + if err := events.Publish(stream, ev); err != nil { + return err + } + + // go-micro nats implementation uses async publishing, + // therefore we need to manually wait. + // + // FIXME: upstream pr + // + // https://github.com/go-micro/plugins/blob/3e77393890683be4bacfb613bc5751867d584692/v4/events/natsjs/nats.go#L115 + time.Sleep(5 * time.Second) + + return nil + }, + } +} diff --git a/services/postprocessing/pkg/command/root.go b/services/postprocessing/pkg/command/root.go index 4718a58bdf..f4a2086320 100644 --- a/services/postprocessing/pkg/command/root.go +++ b/services/postprocessing/pkg/command/root.go @@ -15,6 +15,7 @@ func GetCommands(cfg *config.Config) cli.Commands { Server(cfg), // interaction with this service + RestartPostprocessing(cfg), // infos about this service Health(cfg), diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index 8824a630f1..fb81fc7147 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -7,6 +7,7 @@ import ( "fmt" "os" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/store" "github.com/go-micro/plugins/v4/events/natsjs" @@ -42,9 +43,6 @@ func Server(cfg *config.Config) *cli.Command { gr = run.Group{} logger = logging.Configure(cfg.Service.Name, cfg.Log) - evtsCfg = cfg.Postprocessing.Events - tlsConf *tls.Config - ctx, cancel = func() (context.Context, context.CancelFunc) { if cfg.Context == nil { return context.WithCancel(context.Background()) @@ -55,31 +53,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() { - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - RootCAs: rootCAPool, - } - } - - bus, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) + bus, err := getEventBus(cfg.Postprocessing.Events) if err != nil { return err } @@ -136,3 +110,32 @@ func Server(cfg *config.Config) *cli.Command { }, } } + +func getEventBus(evtsCfg config.Events) (events.Stream, error) { + var tlsConf *tls.Config + if evtsCfg.EnableTLS { + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return nil, err + } + evtsCfg.TLSInsecure = false + } + + tlsConf = &tls.Config{ + RootCAs: rootCAPool, + } + } + + return stream.Nats( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) +} diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index bb4eaf5127..29188e7dcc 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -17,9 +17,16 @@ type Postprocessing struct { Filesize uint64 ResourceID *provider.ResourceId Steps []events.Postprocessingstep + Status Status PPDelay time.Duration } +// Status is helper struct to show current postprocessing status +type Status struct { + CurrentStep events.Postprocessingstep + Outcome events.PostprocessingOutcome +} + // New returns a new postprocessing instance func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing { return &Postprocessing{ @@ -40,7 +47,7 @@ func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { return pp.finished(events.PPOutcomeContinue) } - return pp.nextStep(pp.Steps[0]) + return pp.step(pp.Steps[0]) } // NextStep returns the next postprocessing step @@ -54,6 +61,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa } } +// CurrentStep returns the current postprocessing step +func (pp *Postprocessing) CurrentStep() interface{} { + if pp.Status.Outcome != "" { + return pp.finished(pp.Status.Outcome) + } + return pp.step(pp.Status.CurrentStep) +} + // Delay will sleep the configured time then continue func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { time.Sleep(pp.PPDelay) @@ -64,13 +79,14 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { l := len(pp.Steps) for i, s := range pp.Steps { if s == current && i+1 < l { - return pp.nextStep(pp.Steps[i+1]) + return pp.step(pp.Steps[i+1]) } } return pp.finished(events.PPOutcomeContinue) } -func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep { +func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep { + pp.Status.CurrentStep = next return events.StartPostprocessingStep{ UploadID: pp.ID, URL: pp.URL, @@ -83,6 +99,7 @@ func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartP } func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + pp.Status.Outcome = outcome return events.PostprocessingFinished{ UploadID: pp.ID, ExecutingUser: pp.User, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index b6ce3fba51..3e2afc077b 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -28,6 +28,7 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store events.StartPostprocessingStep{}, events.UploadReady{}, events.PostprocessingStepFinished{}, + events.ResumePostprocessing{}, ) if err != nil { return nil, err @@ -46,22 +47,22 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { for e := range pps.events { - var next interface{} + var ( + next interface{} + pp *postprocessing.Postprocessing + err error + ) + switch ev := e.Event.(type) { case events.BytesReceived: - pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) - if err := storePP(pps.store, pp); err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload") - continue - } - + pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) next = pp.Init(ev) case events.PostprocessingStepFinished: if ev.UploadID == "" { // no current upload - this was an on demand scan continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -71,7 +72,7 @@ func (pps *PostprocessingService) Run() error { if ev.StepToStart != events.PPStepDelay { continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -83,8 +84,21 @@ func (pps *PostprocessingService) Run() error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") continue } + case events.ResumePostprocessing: + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + continue + } + next = pp.CurrentStep() } + if pp != nil { + if err := storePP(pps.store, pp); err != nil { + pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") + continue // TODO: should we really continue here? + } + } if next != nil { if err := events.Publish(pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event") diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go b/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go index 74968e28e3..0b01050112 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go @@ -171,3 +171,16 @@ func (UploadReady) Unmarshal(v []byte) (interface{}, error) { err := json.Unmarshal(v, &e) return e, err } + +// ResumePostprocessing can be emitted to repair broken postprocessing +type ResumePostprocessing struct { + UploadID string + Timestamp *types.Timestamp +} + +// Unmarshal to fulfill umarshaller interface +func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) { + e := ResumePostprocessing{} + err := json.Unmarshal(v, &e) + return e, err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3be7021b07..e527cadc0b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 +# github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime