mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-06 04:09:40 -06:00
fix(postprocessing): repair restart logic
Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
@@ -30,10 +30,12 @@ type PostprocessingService struct {
|
||||
}
|
||||
|
||||
var (
|
||||
// errFatal is returned when a fatal error occurs and we want to exit.
|
||||
errFatal = errors.New("fatal error")
|
||||
// ErrFatal is returned when a fatal error occurs and we want to exit.
|
||||
ErrFatal = errors.New("fatal error")
|
||||
// ErrEvent is returned when something went wrong with a specific event.
|
||||
errEvent = errors.New("event error")
|
||||
ErrEvent = errors.New("event error")
|
||||
// ErrNotFound is returned when a postprocessing is not found in the store.
|
||||
ErrNotFound = errors.New("postprocessing not found")
|
||||
)
|
||||
|
||||
// NewPostprocessingService returns a new instance of a postprocessing service
|
||||
@@ -67,9 +69,9 @@ func (pps *PostprocessingService) Run() error {
|
||||
err := pps.processEvent(e)
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errFatal):
|
||||
case errors.Is(err, ErrFatal):
|
||||
return err
|
||||
case errors.Is(err, errEvent):
|
||||
case errors.Is(err, ErrEvent):
|
||||
continue
|
||||
default:
|
||||
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
|
||||
@@ -111,7 +113,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
pp, err = pps.getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
return fmt.Errorf("%w: cannot get upload", errEvent)
|
||||
return fmt.Errorf("%w: cannot get upload", ErrEvent)
|
||||
}
|
||||
next = pp.NextStep(ev)
|
||||
|
||||
@@ -143,7 +145,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
pp, err = pps.getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
return fmt.Errorf("%w: cannot get upload", errEvent)
|
||||
return fmt.Errorf("%w: cannot get upload", ErrEvent)
|
||||
}
|
||||
next = pp.Delay()
|
||||
case events.UploadReady:
|
||||
@@ -155,7 +157,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
// the storage provider thinks the upload is done - so no need to keep it any more
|
||||
if err := pps.store.Delete(ev.UploadID); err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
|
||||
return fmt.Errorf("%w: cannot delete upload", errEvent)
|
||||
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
|
||||
}
|
||||
case events.ResumePostprocessing:
|
||||
return pps.handleResumePPEvent(ctx, ev)
|
||||
@@ -166,14 +168,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
|
||||
if err := storePP(pps.store, pp); err != nil {
|
||||
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
|
||||
return fmt.Errorf("%w: cannot store upload", errEvent)
|
||||
return fmt.Errorf("%w: cannot store upload", ErrEvent)
|
||||
}
|
||||
}
|
||||
|
||||
if next != nil {
|
||||
if err := events.Publish(ctx, pps.pub, next); err != nil {
|
||||
pps.log.Error().Err(err).Msg("unable to publish event")
|
||||
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
|
||||
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -182,10 +184,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
|
||||
recs, err := sto.Read(uploadID)
|
||||
if err != nil {
|
||||
if err == store.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(recs) != 1 {
|
||||
if len(recs) == 0 {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
if len(recs) > 1 {
|
||||
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
|
||||
}
|
||||
|
||||
@@ -231,7 +240,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
|
||||
for _, id := range ids {
|
||||
if err := pps.resumePP(ctx, id); err != nil {
|
||||
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
|
||||
return fmt.Errorf("%w: cannot resume upload", errEvent)
|
||||
return fmt.Errorf("cannot resume upload: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -240,7 +249,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
|
||||
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
|
||||
pp, err := pps.getPP(pps.store, uploadID)
|
||||
if err != nil {
|
||||
if err == store.ErrNotFound {
|
||||
if err == ErrNotFound {
|
||||
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
|
||||
UploadID: uploadID,
|
||||
Timestamp: utils.TSNow(),
|
||||
@@ -249,7 +258,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("%w: cannot get upload", errEvent)
|
||||
return fmt.Errorf("cannot get upload: %w", err)
|
||||
}
|
||||
|
||||
return events.Publish(ctx, pps.pub, pp.CurrentStep())
|
||||
|
||||
Reference in New Issue
Block a user