diff --git a/changelog/unreleased/concurrent-autoaccept.md b/changelog/unreleased/concurrent-autoaccept.md index 188f47eac..f3738acc2 100644 --- a/changelog/unreleased/concurrent-autoaccept.md +++ b/changelog/unreleased/concurrent-autoaccept.md @@ -2,4 +2,5 @@ Enhancement: Concurrent autoaccept for shares Shares for groups are now concurrently accepted. Tha default of 25 goroutinges can be changed with the new `FRONTEND_MAX_CONCURRENCY` environment variable. +https://github.com/owncloud/ocis/pull/10505 https://github.com/owncloud/ocis/pull/10476 diff --git a/services/frontend/pkg/command/events.go b/services/frontend/pkg/command/events.go index 16701429b..8875d73ef 100644 --- a/services/frontend/pkg/command/events.go +++ b/services/frontend/pkg/command/events.go @@ -82,20 +82,31 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient) - for { - select { - case e := <-evChannel: - switch ev := e.Event.(type) { - default: - l.Error().Interface("event", e).Msg("unhandled event") - case events.ShareCreated: - AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency) + wg := sync.WaitGroup{} + for i := 0; i < cfg.MaxConcurrency; i++ { + wg.Add(1) + go func(ch <-chan events.Event) { + defer wg.Done() + for { + select { + case e := <-ch: + switch ev := e.Event.(type) { + default: + l.Error().Interface("event", e).Msg("unhandled event") + case events.ShareCreated: + AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency) + } + case <-ctx.Done(): + l.Info().Msg("context cancelled") + return + } } - case <-ctx.Done(): - l.Info().Msg("context cancelled") - return nil - } + }(evChannel) } + // Wait for all goroutines to finish + wg.Wait() + + return nil } // AutoAcceptShares automatically accepts shares if configured by the admin or user