From f54582ddc4c30074d9e96ab01480cf6d786cffed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 15 Sep 2025 13:49:41 +0200 Subject: [PATCH] fix event consumers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/search/pkg/config/defaults/defaultconfig.go | 1 + services/search/pkg/config/search.go | 2 +- services/search/pkg/service/event/service.go | 8 +++++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/services/search/pkg/config/defaults/defaultconfig.go b/services/search/pkg/config/defaults/defaultconfig.go index 222eb273d0..1a17faee72 100644 --- a/services/search/pkg/config/defaults/defaultconfig.go +++ b/services/search/pkg/config/defaults/defaultconfig.go @@ -58,6 +58,7 @@ func DefaultConfig() *config.Config { Cluster: "opencloud-cluster", DebounceDuration: 1000, AsyncUploads: true, + NumConsumers: 1, EnableTLS: false, MaxAckPending: 1000, AckWait: 1 * time.Minute, diff --git a/services/search/pkg/config/search.go b/services/search/pkg/config/search.go index 8ff0399270..b2d3b26cbd 100644 --- a/services/search/pkg/config/search.go +++ b/services/search/pkg/config/search.go @@ -8,7 +8,7 @@ type Events struct { Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"` Cluster string `yaml:"cluster" env:"OC_EVENTS_CLUSTER;SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"1.0.0"` AsyncUploads bool `yaml:"async_uploads" env:"OC_ASYNC_UPLOADS;SEARCH_EVENTS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads." introductionVersion:"1.0.0"` - NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands. The default value is 0." introductionVersion:"1.0.0"` + NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands." introductionVersion:"1.0.0"` DebounceDuration int `yaml:"debounce_duration" env:"SEARCH_EVENTS_REINDEX_DEBOUNCE_DURATION" desc:"The duration in milliseconds the reindex debouncer waits before triggering a reindex of a space that was modified." introductionVersion:"1.0.0"` TLSInsecure bool `yaml:"tls_insecure" env:"OC_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates." introductionVersion:"1.0.0"` diff --git a/services/search/pkg/service/event/service.go b/services/search/pkg/service/event/service.go index 237f11e9a3..843717908b 100644 --- a/services/search/pkg/service/event/service.go +++ b/services/search/pkg/service/event/service.go @@ -95,6 +95,12 @@ func (s Service) Run() error { ctx, cancel := context.WithCancel(s.ctx) defer cancel() + s.log.Debug().Int("worker.count", s.numConsumers). + Str("messaging.consumer.group.name", "search-pull"). + Str("messaging.system", "nats"). + Str("messaging.operation.name", "receive"). + Msg("starting event processing workers") + // start workers for i := 0; i < s.numConsumers; i++ { wg.Add(1) @@ -152,7 +158,7 @@ func getSpaceID(ref *provider.Reference) *provider.StorageSpaceId { func (s Service) processEvent(e raw.Event) error { ctx := e.GetTraceContext(s.ctx) - ctx, span := tracer.Start(ctx, "processEvent") + _, span := tracer.Start(ctx, "processEvent") defer span.End() e.InProgress() // let nats know that we are processing this event