fix(postprocessing): repair restart logic

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-04-04 17:02:28 +02:00
parent 9b795c085b
commit 2ab941dea2
5 changed files with 33 additions and 18 deletions

View File

@@ -0,0 +1,5 @@
Bugfix: Fix restarting of postprocessing
When an upload is not found, the logic to restart postprocessing was bunked
https://github.com/owncloud/ocis/pull/8782

View File

@@ -35,9 +35,10 @@ func DefaultConfig() *config.Config {
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",
Store: "nats-js-kv",
Nodes: []string{"127.0.0.1:9233"},
Database: "postprocessing",
Table: "postprocessing",
Table: "",
},
}
}

View File

@@ -39,7 +39,7 @@ func New(config config.Postprocessing) *Postprocessing {
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}

View File

@@ -29,10 +29,12 @@ type PostprocessingService struct {
}
var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrFatal is returned when a fatal error occurs and we want to exit.
ErrFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
ErrEvent = errors.New("event error")
// ErrNotFound is returned when a postprocessing is not found in the store.
ErrNotFound = errors.New("postprocessing not found")
)
// NewPostprocessingService returns a new instance of a postprocessing service
@@ -66,9 +68,9 @@ func (pps *PostprocessingService) Run() error {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, errEvent):
case errors.Is(err, ErrEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
@@ -109,7 +111,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.NextStep(ev)
@@ -141,7 +143,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.Delay()
case events.UploadReady:
@@ -153,7 +155,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
}
case events.ResumePostprocessing:
return pps.handleResumePPEvent(ctx, ev)
@@ -162,13 +164,13 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
if pp != nil {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
return fmt.Errorf("%w: cannot store upload", ErrEvent)
}
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
}
}
return nil
@@ -177,10 +179,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
if err == store.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
if len(recs) != 1 {
if len(recs) == 0 {
return nil, ErrNotFound
}
if len(recs) > 1 {
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}
@@ -226,7 +235,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
for _, id := range ids {
if err := pps.resumePP(ctx, id); err != nil {
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
return fmt.Errorf("%w: cannot resume upload", errEvent)
return fmt.Errorf("cannot resume upload: %w", err)
}
}
return nil
@@ -235,7 +244,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
pp, err := pps.getPP(pps.store, uploadID)
if err != nil {
if err == store.ErrNotFound {
if err == ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: uploadID,
Timestamp: utils.TSNow(),
@@ -244,7 +253,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
}
return nil
}
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("cannot get upload: %w", err)
}
return events.Publish(ctx, pps.pub, pp.CurrentStep())

View File

@@ -69,7 +69,7 @@ func ListUploads(cfg *config.Config) *cli.Command {
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
return nil
},