diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index bb4eaf512..29188e7dc 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -17,9 +17,16 @@ type Postprocessing struct { Filesize uint64 ResourceID *provider.ResourceId Steps []events.Postprocessingstep + Status Status PPDelay time.Duration } +// Status is helper struct to show current postprocessing status +type Status struct { + CurrentStep events.Postprocessingstep + Outcome events.PostprocessingOutcome +} + // New returns a new postprocessing instance func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing { return &Postprocessing{ @@ -40,7 +47,7 @@ func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { return pp.finished(events.PPOutcomeContinue) } - return pp.nextStep(pp.Steps[0]) + return pp.step(pp.Steps[0]) } // NextStep returns the next postprocessing step @@ -54,6 +61,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa } } +// CurrentStep returns the current postprocessing step +func (pp *Postprocessing) CurrentStep() interface{} { + if pp.Status.Outcome != "" { + return pp.finished(pp.Status.Outcome) + } + return pp.step(pp.Status.CurrentStep) +} + // Delay will sleep the configured time then continue func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { time.Sleep(pp.PPDelay) @@ -64,13 +79,14 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { l := len(pp.Steps) for i, s := range pp.Steps { if s == current && i+1 < l { - return pp.nextStep(pp.Steps[i+1]) + return pp.step(pp.Steps[i+1]) } } return pp.finished(events.PPOutcomeContinue) } -func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep { +func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep { + pp.Status.CurrentStep = next return events.StartPostprocessingStep{ UploadID: pp.ID, URL: pp.URL, @@ -83,6 +99,7 @@ func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartP } func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + pp.Status.Outcome = outcome return events.PostprocessingFinished{ UploadID: pp.ID, ExecutingUser: pp.User, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index b6ce3fba5..b68f06e7c 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -28,6 +28,7 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store events.StartPostprocessingStep{}, events.UploadReady{}, events.PostprocessingStepFinished{}, + events.ResumePostprocessing{}, ) if err != nil { return nil, err @@ -46,10 +47,15 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { for e := range pps.events { - var next interface{} + var ( + next interface{} + pp *postprocessing.Postprocessing + err error + ) + switch ev := e.Event.(type) { case events.BytesReceived: - pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) + pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing) if err := storePP(pps.store, pp); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload") continue @@ -61,7 +67,7 @@ func (pps *PostprocessingService) Run() error { // no current upload - this was an on demand scan continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -71,7 +77,7 @@ func (pps *PostprocessingService) Run() error { if ev.StepToStart != events.PPStepDelay { continue } - pp, err := getPP(pps.store, ev.UploadID) + pp, err = getPP(pps.store, ev.UploadID) if err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") continue @@ -83,8 +89,21 @@ func (pps *PostprocessingService) Run() error { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") continue } + case events.ResumePostprocessing: + pp, err = getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + continue + } + next = pp.CurrentStep() } + if pp != nil { + if err := storePP(pps.store, pp); err != nil { + pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") + // TODO: should we continue here? + } + } if next != nil { if err := events.Publish(pps.pub, next); err != nil { pps.log.Error().Err(err).Msg("unable to publish event")