mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-31 01:10:20 -06:00
add functionality for resending pp events
Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
@@ -17,9 +17,16 @@ type Postprocessing struct {
|
||||
Filesize uint64
|
||||
ResourceID *provider.ResourceId
|
||||
Steps []events.Postprocessingstep
|
||||
Status Status
|
||||
PPDelay time.Duration
|
||||
}
|
||||
|
||||
// Status is helper struct to show current postprocessing status
|
||||
type Status struct {
|
||||
CurrentStep events.Postprocessingstep
|
||||
Outcome events.PostprocessingOutcome
|
||||
}
|
||||
|
||||
// New returns a new postprocessing instance
|
||||
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
|
||||
return &Postprocessing{
|
||||
@@ -40,7 +47,7 @@ func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
|
||||
return pp.finished(events.PPOutcomeContinue)
|
||||
}
|
||||
|
||||
return pp.nextStep(pp.Steps[0])
|
||||
return pp.step(pp.Steps[0])
|
||||
}
|
||||
|
||||
// NextStep returns the next postprocessing step
|
||||
@@ -54,6 +61,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
|
||||
}
|
||||
}
|
||||
|
||||
// CurrentStep returns the current postprocessing step
|
||||
func (pp *Postprocessing) CurrentStep() interface{} {
|
||||
if pp.Status.Outcome != "" {
|
||||
return pp.finished(pp.Status.Outcome)
|
||||
}
|
||||
return pp.step(pp.Status.CurrentStep)
|
||||
}
|
||||
|
||||
// Delay will sleep the configured time then continue
|
||||
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
|
||||
time.Sleep(pp.PPDelay)
|
||||
@@ -64,13 +79,14 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
|
||||
l := len(pp.Steps)
|
||||
for i, s := range pp.Steps {
|
||||
if s == current && i+1 < l {
|
||||
return pp.nextStep(pp.Steps[i+1])
|
||||
return pp.step(pp.Steps[i+1])
|
||||
}
|
||||
}
|
||||
return pp.finished(events.PPOutcomeContinue)
|
||||
}
|
||||
|
||||
func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep {
|
||||
func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep {
|
||||
pp.Status.CurrentStep = next
|
||||
return events.StartPostprocessingStep{
|
||||
UploadID: pp.ID,
|
||||
URL: pp.URL,
|
||||
@@ -83,6 +99,7 @@ func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartP
|
||||
}
|
||||
|
||||
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
|
||||
pp.Status.Outcome = outcome
|
||||
return events.PostprocessingFinished{
|
||||
UploadID: pp.ID,
|
||||
ExecutingUser: pp.User,
|
||||
|
||||
@@ -28,6 +28,7 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store
|
||||
events.StartPostprocessingStep{},
|
||||
events.UploadReady{},
|
||||
events.PostprocessingStepFinished{},
|
||||
events.ResumePostprocessing{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -46,10 +47,15 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store
|
||||
// Run to fulfil Runner interface
|
||||
func (pps *PostprocessingService) Run() error {
|
||||
for e := range pps.events {
|
||||
var next interface{}
|
||||
var (
|
||||
next interface{}
|
||||
pp *postprocessing.Postprocessing
|
||||
err error
|
||||
)
|
||||
|
||||
switch ev := e.Event.(type) {
|
||||
case events.BytesReceived:
|
||||
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
|
||||
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
|
||||
if err := storePP(pps.store, pp); err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload")
|
||||
continue
|
||||
@@ -61,7 +67,7 @@ func (pps *PostprocessingService) Run() error {
|
||||
// no current upload - this was an on demand scan
|
||||
continue
|
||||
}
|
||||
pp, err := getPP(pps.store, ev.UploadID)
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
continue
|
||||
@@ -71,7 +77,7 @@ func (pps *PostprocessingService) Run() error {
|
||||
if ev.StepToStart != events.PPStepDelay {
|
||||
continue
|
||||
}
|
||||
pp, err := getPP(pps.store, ev.UploadID)
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
continue
|
||||
@@ -83,8 +89,21 @@ func (pps *PostprocessingService) Run() error {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
|
||||
continue
|
||||
}
|
||||
case events.ResumePostprocessing:
|
||||
pp, err = getPP(pps.store, ev.UploadID)
|
||||
if err != nil {
|
||||
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
||||
continue
|
||||
}
|
||||
next = pp.CurrentStep()
|
||||
}
|
||||
|
||||
if pp != nil {
|
||||
if err := storePP(pps.store, pp); err != nil {
|
||||
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
|
||||
// TODO: should we continue here?
|
||||
}
|
||||
}
|
||||
if next != nil {
|
||||
if err := events.Publish(pps.pub, next); err != nil {
|
||||
pps.log.Error().Err(err).Msg("unable to publish event")
|
||||
|
||||
Reference in New Issue
Block a user