Files
opencloud/services/postprocessing/pkg/postprocessing/postprocessing.go
Jörn Friedrich Dreyer 72e3e54fe0 make delay only affect the step
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
2024-10-22 10:04:56 +02:00

137 lines
3.7 KiB
Go

package postprocessing
import (
"math"
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// Postprocessing handles postprocessing of a file
type Postprocessing struct {
ID string
URL string
User *user.User
ImpersonatingUser *user.User
Filename string
Filesize uint64
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
Failures int
InitiatorID string
Finished bool
config config.Postprocessing
}
// Status is helper struct to show current postprocessing status
type Status struct {
CurrentStep events.Postprocessingstep
Outcome events.PostprocessingOutcome
}
// New returns a new postprocessing instance
func New(config config.Postprocessing) *Postprocessing {
return &Postprocessing{
config: config,
}
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
return pp.step(pp.Steps[0])
}
// NextStep returns the next postprocessing step
func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interface{} {
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(ev.FinishedStep)
case events.PPOutcomeRetry:
pp.Failures++
if pp.Failures > pp.config.MaxRetries {
return pp.finished(events.PPOutcomeAbort)
}
return pp.retry()
default:
return pp.finished(ev.Outcome)
}
}
// CurrentStep returns the current postprocessing step
func (pp *Postprocessing) CurrentStep() interface{} {
if pp.Status.CurrentStep == events.PPStepFinished {
return pp.finished(pp.Status.Outcome)
}
return pp.step(pp.Status.CurrentStep)
}
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(f func(next interface{})) {
next := pp.next(events.PPStepDelay)
go func() {
time.Sleep(pp.config.Delayprocessing)
f(next)
}()
}
// BackoffDuration calculates the duration for exponential backoff based on the number of failures.
func (pp *Postprocessing) BackoffDuration() time.Duration {
return pp.config.RetryBackoffDuration * time.Duration(math.Pow(2, float64(pp.Failures-1)))
}
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.Steps)
for i, s := range pp.Steps {
if s == current && i+1 < l {
return pp.step(pp.Steps[i+1])
}
}
return pp.finished(events.PPOutcomeContinue)
}
func (pp *Postprocessing) step(next events.Postprocessingstep) events.StartPostprocessingStep {
pp.Status.CurrentStep = next
return events.StartPostprocessingStep{
UploadID: pp.ID,
URL: pp.URL,
ExecutingUser: pp.User,
Filename: pp.Filename,
Filesize: pp.Filesize,
ResourceID: pp.ResourceID,
StepToStart: next,
ImpersonatingUser: pp.ImpersonatingUser,
}
}
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
pp.Status.CurrentStep = events.PPStepFinished
pp.Status.Outcome = outcome
return events.PostprocessingFinished{
UploadID: pp.ID,
ExecutingUser: pp.User,
Filename: pp.Filename,
Outcome: outcome,
ImpersonatingUser: pp.ImpersonatingUser,
}
}
func (pp *Postprocessing) retry() events.PostprocessingRetry {
pp.Status.Outcome = events.PPOutcomeRetry
return events.PostprocessingRetry{
UploadID: pp.ID,
ExecutingUser: pp.User,
Filename: pp.Filename,
Failures: pp.Failures,
BackoffDuration: pp.BackoffDuration(),
}
}