Merge pull request #10372 from owncloud/postprocessing-events

Postprocessing events
This commit is contained in:
Jörn Friedrich Dreyer
2024-10-22 14:14:34 +02:00
committed by GitHub
5 changed files with 36 additions and 16 deletions
+3 -1
View File
@@ -25,7 +25,9 @@ type Config struct {
// Postprocessing defines the config options for the postprocessing service.
type Postprocessing struct {
Events Events `yaml:"events"`
Events Events `yaml:"events"`
Workers int `yaml:"workers" env:"POSTPROCESSING_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"`
Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
@@ -31,6 +31,7 @@ func DefaultConfig() *config.Config {
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
Workers: 3,
RetryBackoffDuration: 5 * time.Second,
MaxRetries: 14,
},
@@ -75,9 +75,12 @@ func (pp *Postprocessing) CurrentStep() interface{} {
}
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay() interface{} {
time.Sleep(pp.config.Delayprocessing)
return pp.next(events.PPStepDelay)
func (pp *Postprocessing) Delay(f func(next interface{})) {
next := pp.next(events.PPStepDelay)
go func() {
time.Sleep(pp.config.Delayprocessing)
f(next)
}()
}
// BackoffDuration calculates the duration for exponential backoff based on the number of failures.
+21 -12
View File
@@ -65,18 +65,23 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
for e := range pps.events {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, ErrEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
// Spawn workers that'll concurrently work the queue
for i := 0; i < pps.c.Workers; i++ {
go (func() {
for e := range pps.events {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
case errors.Is(err, ErrEvent):
pps.log.Error().Err(err).Msg("continuing")
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
}
}
}
}
})()
}
return nil
}
@@ -149,7 +154,11 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.Delay()
pp.Delay(func(next interface{}) {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("cannot publish event")
}
})
case events.UploadReady:
if ev.Failed {
// the upload failed - let's keep it around for a while - but mark it as finished