fix postprocessing events

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2024-10-22 15:44:18 +02:00
parent 2ae6728210
commit 139a8a5bd4
2 changed files with 11 additions and 5 deletions

View File

@@ -2,4 +2,5 @@ Bugfix: increase event processing workers
We increased the number of go routines that pull events from the queue to three and made the number off workers configurable. Furthermore, the postprocessing delay no longer introduces a sleep that slows down pulling of events, but asynchronously triggers the next step.
https://github.com/owncloud/ocis/pull/10385
https://github.com/owncloud/ocis/pull/10368

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
@@ -65,12 +66,14 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
// Spawn workers that'll concurrently work the queue
wg := sync.WaitGroup{}
for i := 0; i < pps.c.Workers; i++ {
go (func() {
wg.Add(1)
go func() {
defer wg.Done()
for e := range pps.events {
err := pps.processEvent(e)
if err != nil {
if err := pps.processEvent(e); err != nil {
switch {
case errors.Is(err, ErrFatal):
pps.log.Fatal().Err(err).Msg("fatal error - exiting")
@@ -81,8 +84,10 @@ func (pps *PostprocessingService) Run() error {
}
}
}
})()
}()
}
wg.Wait()
return nil
}