diff --git a/changelog/unreleased/concurrent-userlog.md b/changelog/unreleased/concurrent-userlog.md new file mode 100644 index 000000000..7efe6f1ec --- /dev/null +++ b/changelog/unreleased/concurrent-userlog.md @@ -0,0 +1,5 @@ +Enhancement: Concurrent userlog processing + +We now start multiple go routines that process events. The default of 5 goroutines can be changed with the new `USERLOG_MAX_CONCURRENCY` environment variable. + +https://github.com/owncloud/ocis/pull/10504 diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index b7b6279c8..3d2b8ab88 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -26,6 +26,7 @@ type Config struct { TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details." introductionVersion:"pre5.0"` DefaultLanguage string `yaml:"default_language" env:"OCIS_DEFAULT_LANGUAGE" desc:"The default language used by services and the WebUI. If not defined, English will be used as default. See the documentation for more details." introductionVersion:"5.0"` Events Events `yaml:"events"` + MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;USERLOG_MAX_CONCURRENCY" desc:"Maximum number of concurrent go-routines. Higher values can potentially get work done faster but will also cause more load on the system. Values of 0 or below will be ignored and the default value will be used." introductionVersion:"7.0.0"` Persistence Persistence `yaml:"persistence"` DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications." introductionVersion:"pre5.0"` diff --git a/services/userlog/pkg/config/defaults/defaultconfig.go b/services/userlog/pkg/config/defaults/defaultconfig.go index 69dc1b589..1d960161a 100644 --- a/services/userlog/pkg/config/defaults/defaultconfig.go +++ b/services/userlog/pkg/config/defaults/defaultconfig.go @@ -34,6 +34,7 @@ func DefaultConfig() *config.Config { Cluster: "ocis-cluster", EnableTLS: false, }, + MaxConcurrency: 5, Persistence: config.Persistence{ Store: "memory", Database: "userlog", @@ -104,4 +105,7 @@ func Sanitize(cfg *config.Config) { if cfg.HTTP.Root != "/" { cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/") } + if cfg.MaxConcurrency < 1 { + cfg.MaxConcurrency = 5 + } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index f18e4c18c..89efc523f 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -10,13 +10,13 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/go-chi/chi/v5" "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" - "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/l10n" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/roles" @@ -96,8 +96,12 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { // MemorizeEvents stores eventIDs a user wants to receive func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { - for event := range ch { - ul.processEvent(event) + for i := 0; i < ul.cfg.MaxConcurrency; i++ { + go func(ch <-chan events.Event) { + for event := range ch { + ul.processEvent(event) + } + }(ch) } } diff --git a/services/userlog/pkg/service/service_suit_test.go b/services/userlog/pkg/service/service_suit_test.go index 9da9654d4..96b389be9 100644 --- a/services/userlog/pkg/service/service_suit_test.go +++ b/services/userlog/pkg/service/service_suit_test.go @@ -3,11 +3,11 @@ package service_test import ( "testing" - "github.com/owncloud/ocis/v2/ocis-pkg/registry" - mRegistry "go-micro.dev/v4/registry" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + mRegistry "go-micro.dev/v4/registry" + + "github.com/owncloud/ocis/v2/ocis-pkg/registry" ) func init() { diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go index bbef09594..1203aa654 100644 --- a/services/userlog/pkg/service/service_test.go +++ b/services/userlog/pkg/service/service_test.go @@ -19,6 +19,13 @@ import ( "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/mock" + "go-micro.dev/v4/client" + microevents "go-micro.dev/v4/events" + microstore "go-micro.dev/v4/store" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/log" ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" @@ -26,17 +33,13 @@ import ( settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/service" - "github.com/stretchr/testify/mock" - "go-micro.dev/v4/client" - microevents "go-micro.dev/v4/events" - microstore "go-micro.dev/v4/store" - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" ) var _ = Describe("UserlogService", func() { var ( - cfg = &config.Config{} + cfg = &config.Config{ + MaxConcurrency: 5, + } ul *service.UserlogService bus testBus