From 76141a492fcbcb9330a77111b0872efc9abcb4ad Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Jun 2023 12:01:00 +0200 Subject: [PATCH 1/4] add functionality for resending pp events Signed-off-by: jkoberg --- .../pkg/postprocessing/postprocessing.go | 23 +++++++++++++--- .../postprocessing/pkg/service/service.go | 27 ++++++++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index bb4eaf5127..29188e7dcc 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -17,9 +17,16 @@ type Postprocessing struct { Filesize uint64 ResourceID *provider.ResourceId Steps []events.Postprocessingstep + Status Status PPDelay time.Duration } +// Status is helper struct to show current postprocessing status +type Status struct { + CurrentStep events.Postprocessingstep + Outcome events.PostprocessingOutcome +} + // New returns a new postprocessing instance func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing { return &Postprocessing{ @@ -40,7 +47,7 @@ func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { return pp.finished(events.PPOutcomeContinue) } - return pp.nextStep(pp.Steps[0]) + return pp.step(pp.Steps[0]) } // NextStep returns the next postprocessing step @@ -54,6 +61,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa } } +// CurrentStep returns the current postprocessing step +func (pp *Postprocessing) CurrentStep() interface{} { + if pp.Status.Outcome != "" { + return pp.finished(pp.Status.Outcome) + } + return pp.step(pp.Status.CurrentStep) +} + // Delay will sleep the configured time then continue func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { time.Sleep(pp.PPDelay) @@ -64,13 +79,14 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { l := len(pp.Steps) for i, s := range pp.Steps { if s == current && i+1 < l { - return pp.nextStep(pp.Steps[i+1]) + return pp.step(pp.Steps[i+1]) } } return pp.finished(events.PPOutcomeContinue) } -func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep { +func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep { + pp.Status.CurrentStep = next return events.StartPostprocessingStep{ UploadID: pp.ID, URL: pp.URL, @@ -83,6 +99,7 @@ func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartP } func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + pp.Status.Outcome = outcome return events.PostprocessingFinished{ UploadID: pp.ID, ExecutingUser: pp.User, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index b6ce3fba51..b68f06e7cb 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -28,6 +28,7 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store events.StartPostprocessingStep{}, events.UploadReady{}, events.PostprocessingStepFinished{}, + events.ResumePostprocessing{}, ) if err != nil { return nil, err @@ -46,10 +47,15 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { for e := range pps.events { - var next interface{} + var ( + next interface{} + pp *postprocessing.Postprocessing + err error + ) + switch ev := e.Event.(type) { case events.BytesReceived: - pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) + pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) if err := storePP(pps.store, pp); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload") continue @@ -61,7 +67,7 @@ func (pps *PostprocessingService) Run() error { // no current upload - this was an on demand scan continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -71,7 +77,7 @@ func (pps *PostprocessingService) Run() error { if ev.StepToStart != events.PPStepDelay { continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -83,8 +89,21 @@ func (pps *PostprocessingService) Run() error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") continue } + case events.ResumePostprocessing: + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + continue + } + next = pp.CurrentStep() } + if pp != nil { + if err := storePP(pps.store, pp); err != nil { + pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") + // TODO: should we continue here? + } + } if next != nil { if err := events.Publish(pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event") From aac793a4004b0307cd1b4df19ab6f1e0532660bc Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Jun 2023 15:13:57 +0200 Subject: [PATCH 2/4] add ctl command to resume postprocessing Signed-off-by: jkoberg --- services/antivirus/pkg/service/service.go | 1 + .../pkg/command/postprocessing.go | 58 ++++++++++++++++++ services/postprocessing/pkg/command/root.go | 1 + services/postprocessing/pkg/command/server.go | 59 ++++++++++--------- 4 files changed, 91 insertions(+), 28 deletions(-) create mode 100644 services/postprocessing/pkg/command/postprocessing.go diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 09952d9033..5b3fac1462 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -121,6 +121,7 @@ func (av Antivirus) Run() error { av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") return err } + continue } av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go new file mode 100644 index 0000000000..b49d4b9ef1 --- /dev/null +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -0,0 +1,58 @@ +package command + +import ( + "fmt" + "time" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" + "github.com/urfave/cli/v2" +) + +// RestartPostprocessing cli command to restart postprocessing +func RestartPostprocessing(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "restart", + Usage: "restart postprocessing for an uploadID", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "upload-id", + Aliases: []string{"u"}, + Required: true, + Usage: "the uploadid to restart", + }, + }, + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + stream, err := getEventBus(cfg.Postprocessing.Events) + if err != nil { + return err + } + + ev := events.ResumePostprocessing{ + UploadID: c.String("upload-id"), + Timestamp: utils.TSNow(), + } + + if err := events.Publish(stream, ev); err != nil { + fmt.Println(err) + return err + } + + // go-micro nats implementation uses async publishing, + // therefore we need to manually wait. + // + // FIXME: upstream pr + // + // https://github.com/go-micro/plugins/blob/3e77393890683be4bacfb613bc5751867d584692/v4/events/natsjs/nats.go#L115 + time.Sleep(5 * time.Second) + + return nil + }, + } +} diff --git a/services/postprocessing/pkg/command/root.go b/services/postprocessing/pkg/command/root.go index 4718a58bdf..f4a2086320 100644 --- a/services/postprocessing/pkg/command/root.go +++ b/services/postprocessing/pkg/command/root.go @@ -15,6 +15,7 @@ func GetCommands(cfg *config.Config) cli.Commands { Server(cfg), // interaction with this service + RestartPostprocessing(cfg), // infos about this service Health(cfg), diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index 8824a630f1..fb81fc7147 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -7,6 +7,7 @@ import ( "fmt" "os" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/store" "github.com/go-micro/plugins/v4/events/natsjs" @@ -42,9 +43,6 @@ func Server(cfg *config.Config) *cli.Command { gr = run.Group{} logger = logging.Configure(cfg.Service.Name, cfg.Log) - evtsCfg = cfg.Postprocessing.Events - tlsConf *tls.Config - ctx, cancel = func() (context.Context, context.CancelFunc) { if cfg.Context == nil { return context.WithCancel(context.Background()) @@ -55,31 +53,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() { - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - RootCAs: rootCAPool, - } - } - - bus, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) + bus, err := getEventBus(cfg.Postprocessing.Events) if err != nil { return err } @@ -136,3 +110,32 @@ func Server(cfg *config.Config) *cli.Command { }, } } + +func getEventBus(evtsCfg config.Events) (events.Stream, error) { + var tlsConf *tls.Config + if evtsCfg.EnableTLS { + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return nil, err + } + evtsCfg.TLSInsecure = false + } + + tlsConf = &tls.Config{ + RootCAs: rootCAPool, + } + } + + return stream.Nats( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) +} From f23f35e227b1290376b9fa306d2f72d63c41d536 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Jun 2023 15:27:25 +0200 Subject: [PATCH 3/4] improve readme Signed-off-by: jkoberg --- services/postprocessing/README.md | 16 ++++++++++++++++ .../postprocessing/pkg/command/postprocessing.go | 2 -- services/postprocessing/pkg/service/service.go | 7 +------ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index dd13d92d51..c8cd1182f5 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -60,3 +60,19 @@ When setting a custom postprocessing step (eg. `"customstep"`) the postprocessin Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case). See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions. + +## CLI Commands + +### Resume Postprocessing + +If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process: + +- First find the upload ID of the failed upload. +```bash +ocis storage-users uploads list +``` + +- Then use the restart command to resume postprocessing of the ID selected. +```bash +ocis postprocessing restart -u +``` diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index b49d4b9ef1..999727f8cb 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -1,7 +1,6 @@ package command import ( - "fmt" "time" "github.com/cs3org/reva/v2/pkg/events" @@ -40,7 +39,6 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { } if err := events.Publish(stream, ev); err != nil { - fmt.Println(err) return err } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index b68f06e7cb..3e2afc077b 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -56,11 +56,6 @@ func (pps *PostprocessingService) Run() error { switch ev := e.Event.(type) { case events.BytesReceived: pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) - if err := storePP(pps.store, pp); err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload") - continue - } - next = pp.Init(ev) case events.PostprocessingStepFinished: if ev.UploadID == "" { @@ -101,7 +96,7 @@ func (pps *PostprocessingService) Run() error { if pp != nil { if err := storePP(pps.store, pp); err != nil { pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") - // TODO: should we continue here? + continue // TODO: should we really continue here? } } if next != nil { From 86356707d24f55d21590d2d183e9e0ad89617806 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 12 Jun 2023 15:16:35 +0200 Subject: [PATCH 4/4] bump reva Signed-off-by: jkoberg --- changelog/unreleased/retry-postprocessing.md | 5 +++++ go.mod | 2 +- go.sum | 6 ++---- .../cs3org/reva/v2/pkg/events/postprocessing.go | 13 +++++++++++++ vendor/modules.txt | 2 +- 5 files changed, 22 insertions(+), 6 deletions(-) create mode 100644 changelog/unreleased/retry-postprocessing.md diff --git a/changelog/unreleased/retry-postprocessing.md b/changelog/unreleased/retry-postprocessing.md new file mode 100644 index 0000000000..47e9c6bab1 --- /dev/null +++ b/changelog/unreleased/retry-postprocessing.md @@ -0,0 +1,5 @@ +Enhancement: Add functionality to retry postprocessing + +Adds a ctl command to manually retry failed postprocessing on uploads + +https://github.com/owncloud/ocis/pull/6500 diff --git a/go.mod b/go.mod index 26f3dc3e91..930b7cad3c 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.6.0 github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d - github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 + github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e github.com/egirna/icap-client v0.1.1 diff --git a/go.sum b/go.sum index c575bd8517..e328a33598 100644 --- a/go.sum +++ b/go.sum @@ -629,10 +629,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93 h1:yRhkp28pdpSbEDX+XQtq5ZiZ8jLMRnmuEKwFj9AlzfY= -github.com/cs3org/reva/v2 v2.14.1-0.20230608155229-cf1aa9641f93/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= -github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 h1:LN4ADWFL8SbuVDCN5d5b63swaEA8D7Ojt39AgUv46qA= -github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= +github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c h1:Pc5WFgjMoJrp757siaxbYr3FogI1QKgkHT64y+G+3Cc= +github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c/go.mod h1:E32krZG159YflDSjDWfx/QGIC2529PS5LiPnGNHu3d0= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go b/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go index 74968e28e3..0b01050112 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go @@ -171,3 +171,16 @@ func (UploadReady) Unmarshal(v []byte) (interface{}, error) { err := json.Unmarshal(v, &e) return e, err } + +// ResumePostprocessing can be emitted to repair broken postprocessing +type ResumePostprocessing struct { + UploadID string + Timestamp *types.Timestamp +} + +// Unmarshal to fulfill umarshaller interface +func (ResumePostprocessing) Unmarshal(v []byte) (interface{}, error) { + e := ResumePostprocessing{} + err := json.Unmarshal(v, &e) + return e, err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f483caad5e..5388ecfeb5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.14.1-0.20230612154151-5fbd21b664e1 +# github.com/cs3org/reva/v2 v2.14.1-0.20230613083627-c7aeac2d3a3c ## explicit; go 1.20 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime