diff --git a/changelog/unreleased/postprocessing-events.md b/changelog/unreleased/postprocessing-events.md index 7b32efcc3..86394e314 100644 --- a/changelog/unreleased/postprocessing-events.md +++ b/changelog/unreleased/postprocessing-events.md @@ -2,4 +2,5 @@ Bugfix: increase event processing workers We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step. +https://github.com/owncloud/ocis/pull/10385 https://github.com/owncloud/ocis/pull/10368 diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 70918e810..0628fd73f 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -65,12 +66,14 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger // Run to fulfil Runner interface func (pps *PostprocessingService) Run() error { - // Spawn workers that'll concurrently work the queue + wg := sync.WaitGroup{} + for i := 0; i < pps.c.Workers; i++ { - go (func() { + wg.Add(1) + go func() { + defer wg.Done() for e := range pps.events { - err := pps.processEvent(e) - if err != nil { + if err := pps.processEvent(e); err != nil { switch { case errors.Is(err, ErrFatal): pps.log.Fatal().Err(err).Msg("fatal error - exiting") @@ -81,8 +84,10 @@ func (pps *PostprocessingService) Run() error { } } } - })() + }() } + wg.Wait() + return nil }