diff --git a/changelog/unreleased/fix-postprocessing-restart.md b/changelog/unreleased/fix-postprocessing-restart.md new file mode 100644 index 0000000000..6183701f16 --- /dev/null +++ b/changelog/unreleased/fix-postprocessing-restart.md @@ -0,0 +1,8 @@ +Bugfix: Fix restarting of postprocessing + +When an upload is not found, the logic to restart postprocessing was bunked. Additionally we extended the upload sessions +command to be able to restart the uploads without using a second command. + +NOTE: This also includes a breaking fix for the deprecated `ocis storage-users uploads list` command + +https://github.com/owncloud/ocis/pull/8782 diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index 9e08ba2180..5b58319318 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -81,12 +81,17 @@ See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postproces If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process: -- First find the upload ID of the failed upload. +- First list ongoing upload sessions ```bash -ocis storage-users uploads list +ocis storage-users uploads sessions ``` -- Then use the restart command to resume postprocessing of the ID selected. +- If you want to restart all uploads just rerun the command with the `--restart` flag +```bash +ocis storage-users uploads sessions --restart +``` + +- If you want to restart only one upload use the postprocessing restart command ```bash ocis postprocessing restart -u ``` diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index 11cf3f02af..51a9bb6112 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -35,9 +35,10 @@ func DefaultConfig() *config.Config { MaxRetries: 14, }, Store: config.Store{ - Store: "memory", + Store: "nats-js-kv", + Nodes: []string{"127.0.0.1:9233"}, Database: "postprocessing", - Table: "postprocessing", + Table: "", }, } } diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index e3c9ca6bff..40b94d56f8 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -39,7 +39,7 @@ func New(config config.Postprocessing) *Postprocessing { } // Init is the first step of the postprocessing -func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { +func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} { if len(pp.Steps) == 0 { return pp.finished(events.PPOutcomeContinue) } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index f2a3c122e2..70e1221a57 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -29,10 +29,12 @@ type PostprocessingService struct { } var ( - // errFatal is returned when a fatal error occurs and we want to exit. - errFatal = errors.New("fatal error") + // ErrFatal is returned when a fatal error occurs and we want to exit. + ErrFatal = errors.New("fatal error") // ErrEvent is returned when something went wrong with a specific event. - errEvent = errors.New("event error") + ErrEvent = errors.New("event error") + // ErrNotFound is returned when a postprocessing is not found in the store. + ErrNotFound = errors.New("postprocessing not found") ) // NewPostprocessingService returns a new instance of a postprocessing service @@ -66,9 +68,9 @@ func (pps *PostprocessingService) Run() error { err := pps.processEvent(e) if err != nil { switch { - case errors.Is(err, errFatal): + case errors.Is(err, ErrFatal): return err - case errors.Is(err, errEvent): + case errors.Is(err, ErrEvent): continue default: pps.log.Fatal().Err(err).Msg("unknown error - exiting") @@ -109,7 +111,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("%w: cannot get upload", ErrEvent) } next = pp.NextStep(ev) @@ -141,7 +143,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("%w: cannot get upload", ErrEvent) } next = pp.Delay() case events.UploadReady: @@ -153,7 +155,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { // the storage provider thinks the upload is done - so no need to keep it any more if err := pps.store.Delete(ev.UploadID); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") - return fmt.Errorf("%w: cannot delete upload", errEvent) + return fmt.Errorf("%w: cannot delete upload", ErrEvent) } case events.ResumePostprocessing: return pps.handleResumePPEvent(ctx, ev) @@ -162,13 +164,13 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { if pp != nil { if err := storePP(pps.store, pp); err != nil { pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") - return fmt.Errorf("%w: cannot store upload", errEvent) + return fmt.Errorf("%w: cannot store upload", ErrEvent) } } if next != nil { if err := events.Publish(ctx, pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event") - return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed + return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed } } return nil @@ -177,10 +179,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) { recs, err := sto.Read(uploadID) if err != nil { + if err == store.ErrNotFound { + return nil, ErrNotFound + } return nil, err } - if len(recs) != 1 { + if len(recs) == 0 { + return nil, ErrNotFound + } + + if len(recs) > 1 { return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs)) } @@ -226,7 +235,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev for _, id := range ids { if err := pps.resumePP(ctx, id); err != nil { pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload") - return fmt.Errorf("%w: cannot resume upload", errEvent) + return fmt.Errorf("cannot resume upload: %w", err) } } return nil @@ -235,7 +244,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error { pp, err := pps.getPP(pps.store, uploadID) if err != nil { - if err == store.ErrNotFound { + if err == ErrNotFound { if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ UploadID: uploadID, Timestamp: utils.TSNow(), @@ -244,7 +253,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) } return nil } - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("cannot get upload: %w", err) } return events.Publish(ctx, pps.pub, pp.CurrentStep()) diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 1c651cc9f1..e5ae4f2561 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -1,6 +1,7 @@ package command import ( + "context" "encoding/json" "fmt" "os" @@ -13,11 +14,14 @@ import ( "github.com/urfave/cli/v2" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/storage-users/pkg/event" "github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig" ) @@ -69,7 +73,7 @@ func ListUploads(cfg *config.Config) *cli.Command { fmt.Println("Incomplete uploads:") for _, u := range uploads { ref := u.Reference() - fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) } return nil }, @@ -101,6 +105,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { Name: "json", Usage: "output as json", }, + &cli.BoolFlag{ + Name: "restart", + Usage: "send restart event for all listed sessions", + }, }, Before: func(c *cli.Context) error { return configlog.ReturnFatal(parser.ParseConfig(cfg)) @@ -124,6 +132,15 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { os.Exit(1) } + var stream events.Stream + if c.Bool("restart") { + stream, err = event.NewStream(cfg) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err) + os.Exit(1) + } + } + var b strings.Builder filter := storage.UploadSessionFilter{} if c.IsSet("processing") { @@ -200,6 +217,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { fmt.Println(err) } fmt.Println(string(j)) + + if c.Bool("restart") { + if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{ + UploadID: u.ID(), + Timestamp: utils.TSNow(), + }); err != nil { + fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID()) + // if publishing fails there is no need to try publishing other events - they will fail too. + os.Exit(1) + } + } } } else { @@ -223,6 +251,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { u.Expires().Format(time.RFC3339), strconv.FormatBool(u.IsProcessing()), }) + + if c.Bool("restart") { + if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{ + UploadID: u.ID(), + Timestamp: utils.TSNow(), + }); err != nil { + fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID()) + // if publishing fails there is no need to try publishing other events - they will fail too. + os.Exit(1) + } + } } table.Render() }