Merge pull request #10504 from owncloud/userlog-concurrency

concurrent userlog
This commit is contained in:
Michael Barz
2024-11-07 16:17:26 +01:00
committed by GitHub
6 changed files with 32 additions and 15 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Concurrent userlog processing
We now start multiple go routines that process events. The default of 5 goroutines can be changed with the new `USERLOG_MAX_CONCURRENCY` environment variable.
https://github.com/owncloud/ocis/pull/10504

View File

@@ -26,6 +26,7 @@ type Config struct {
TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details." introductionVersion:"pre5.0"`
DefaultLanguage string `yaml:"default_language" env:"OCIS_DEFAULT_LANGUAGE" desc:"The default language used by services and the WebUI. If not defined, English will be used as default. See the documentation for more details." introductionVersion:"5.0"`
Events Events `yaml:"events"`
MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;USERLOG_MAX_CONCURRENCY" desc:"Maximum number of concurrent go-routines. Higher values can potentially get work done faster but will also cause more load on the system. Values of 0 or below will be ignored and the default value will be used." introductionVersion:"7.0.0"`
Persistence Persistence `yaml:"persistence"`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications." introductionVersion:"pre5.0"`

View File

@@ -34,6 +34,7 @@ func DefaultConfig() *config.Config {
Cluster: "ocis-cluster",
EnableTLS: false,
},
MaxConcurrency: 5,
Persistence: config.Persistence{
Store: "memory",
Database: "userlog",
@@ -104,4 +105,7 @@ func Sanitize(cfg *config.Config) {
if cfg.HTTP.Root != "/" {
cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/")
}
if cfg.MaxConcurrency < 1 {
cfg.MaxConcurrency = 5
}
}

View File

@@ -10,13 +10,13 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/chi/v5"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/l10n"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
@@ -96,8 +96,12 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
// MemorizeEvents stores eventIDs a user wants to receive
func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
for event := range ch {
ul.processEvent(event)
for i := 0; i < ul.cfg.MaxConcurrency; i++ {
go func(ch <-chan events.Event) {
for event := range ch {
ul.processEvent(event)
}
}(ch)
}
}

View File

@@ -3,11 +3,11 @@ package service_test
import (
"testing"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
mRegistry "go-micro.dev/v4/registry"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
mRegistry "go-micro.dev/v4/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
)
func init() {

View File

@@ -19,6 +19,13 @@ import (
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
"go-micro.dev/v4/client"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
@@ -26,17 +33,13 @@ import (
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/service"
"github.com/stretchr/testify/mock"
"go-micro.dev/v4/client"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
var _ = Describe("UserlogService", func() {
var (
cfg = &config.Config{}
cfg = &config.Config{
MaxConcurrency: 5,
}
ul *service.UserlogService
bus testBus