diff --git a/changelog/unreleased/fix-postprocessing-restart.md b/changelog/unreleased/fix-postprocessing-restart.md new file mode 100644 index 000000000..66d810b30 --- /dev/null +++ b/changelog/unreleased/fix-postprocessing-restart.md @@ -0,0 +1,5 @@ +Bugfix: Fix restarting of postprocessing + +When an upload is not found, the logic to restart postprocessing was bunked + +https://github.com/owncloud/ocis/pull/8782 diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index 11cf3f02a..51a9bb611 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -35,9 +35,10 @@ func DefaultConfig() *config.Config { MaxRetries: 14, }, Store: config.Store{ - Store: "memory", + Store: "nats-js-kv", + Nodes: []string{"127.0.0.1:9233"}, Database: "postprocessing", - Table: "postprocessing", + Table: "", }, } } diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index afdc15f70..e462401a5 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -40,7 +40,7 @@ func New(config config.Postprocessing) *Postprocessing { } // Init is the first step of the postprocessing -func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { +func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} { if len(pp.Steps) == 0 { return pp.finished(events.PPOutcomeContinue) } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 3d1a81f07..2be92d6fc 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -30,10 +30,12 @@ type PostprocessingService struct { } var ( - // errFatal is returned when a fatal error occurs and we want to exit. - errFatal = errors.New("fatal error") + // ErrFatal is returned when a fatal error occurs and we want to exit. + ErrFatal = errors.New("fatal error") // ErrEvent is returned when something went wrong with a specific event. - errEvent = errors.New("event error") + ErrEvent = errors.New("event error") + // ErrNotFound is returned when a postprocessing is not found in the store. + ErrNotFound = errors.New("postprocessing not found") ) // NewPostprocessingService returns a new instance of a postprocessing service @@ -67,9 +69,9 @@ func (pps *PostprocessingService) Run() error { err := pps.processEvent(e) if err != nil { switch { - case errors.Is(err, errFatal): + case errors.Is(err, ErrFatal): return err - case errors.Is(err, errEvent): + case errors.Is(err, ErrEvent): continue default: pps.log.Fatal().Err(err).Msg("unknown error - exiting") @@ -111,7 +113,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("%w: cannot get upload", ErrEvent) } next = pp.NextStep(ev) @@ -143,7 +145,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("%w: cannot get upload", ErrEvent) } next = pp.Delay() case events.UploadReady: @@ -155,7 +157,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { // the storage provider thinks the upload is done - so no need to keep it any more if err := pps.store.Delete(ev.UploadID); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") - return fmt.Errorf("%w: cannot delete upload", errEvent) + return fmt.Errorf("%w: cannot delete upload", ErrEvent) } case events.ResumePostprocessing: return pps.handleResumePPEvent(ctx, ev) @@ -166,14 +168,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { if err := storePP(pps.store, pp); err != nil { pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") - return fmt.Errorf("%w: cannot store upload", errEvent) + return fmt.Errorf("%w: cannot store upload", ErrEvent) } } if next != nil { if err := events.Publish(ctx, pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event") - return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed + return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed } } return nil @@ -182,10 +184,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) { recs, err := sto.Read(uploadID) if err != nil { + if err == store.ErrNotFound { + return nil, ErrNotFound + } return nil, err } - if len(recs) != 1 { + if len(recs) == 0 { + return nil, ErrNotFound + } + + if len(recs) > 1 { return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs)) } @@ -231,7 +240,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev for _, id := range ids { if err := pps.resumePP(ctx, id); err != nil { pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload") - return fmt.Errorf("%w: cannot resume upload", errEvent) + return fmt.Errorf("cannot resume upload: %w", err) } } return nil @@ -240,7 +249,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error { pp, err := pps.getPP(pps.store, uploadID) if err != nil { - if err == store.ErrNotFound { + if err == ErrNotFound { if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{ UploadID: uploadID, Timestamp: utils.TSNow(), @@ -249,7 +258,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) } return nil } - return fmt.Errorf("%w: cannot get upload", errEvent) + return fmt.Errorf("cannot get upload: %w", err) } return events.Publish(ctx, pps.pub, pp.CurrentStep()) diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 1c651cc9f..99d93e533 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -69,7 +69,7 @@ func ListUploads(cfg *config.Config) *cli.Command { fmt.Println("Incomplete uploads:") for _, u := range uploads { ref := u.Reference() - fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) } return nil },