diff --git a/services/search/pkg/config/defaults/defaultconfig.go b/services/search/pkg/config/defaults/defaultconfig.go index 222eb273d..1a17faee7 100644 --- a/services/search/pkg/config/defaults/defaultconfig.go +++ b/services/search/pkg/config/defaults/defaultconfig.go @@ -58,6 +58,7 @@ func DefaultConfig() *config.Config { Cluster: "opencloud-cluster", DebounceDuration: 1000, AsyncUploads: true, + NumConsumers: 1, EnableTLS: false, MaxAckPending: 1000, AckWait: 1 * time.Minute, diff --git a/services/search/pkg/config/search.go b/services/search/pkg/config/search.go index 8ff039927..b2d3b26cb 100644 --- a/services/search/pkg/config/search.go +++ b/services/search/pkg/config/search.go @@ -8,7 +8,7 @@ type Events struct { Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"` Cluster string `yaml:"cluster" env:"OC_EVENTS_CLUSTER;SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"1.0.0"` AsyncUploads bool `yaml:"async_uploads" env:"OC_ASYNC_UPLOADS;SEARCH_EVENTS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads." introductionVersion:"1.0.0"` - NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands. The default value is 0." introductionVersion:"1.0.0"` + NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands." introductionVersion:"1.0.0"` DebounceDuration int `yaml:"debounce_duration" env:"SEARCH_EVENTS_REINDEX_DEBOUNCE_DURATION" desc:"The duration in milliseconds the reindex debouncer waits before triggering a reindex of a space that was modified." introductionVersion:"1.0.0"` TLSInsecure bool `yaml:"tls_insecure" env:"OC_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates." introductionVersion:"1.0.0"` diff --git a/services/search/pkg/service/event/service.go b/services/search/pkg/service/event/service.go index 237f11e9a..843717908 100644 --- a/services/search/pkg/service/event/service.go +++ b/services/search/pkg/service/event/service.go @@ -95,6 +95,12 @@ func (s Service) Run() error { ctx, cancel := context.WithCancel(s.ctx) defer cancel() + s.log.Debug().Int("worker.count", s.numConsumers). + Str("messaging.consumer.group.name", "search-pull"). + Str("messaging.system", "nats"). + Str("messaging.operation.name", "receive"). + Msg("starting event processing workers") + // start workers for i := 0; i < s.numConsumers; i++ { wg.Add(1) @@ -152,7 +158,7 @@ func getSpaceID(ref *provider.Reference) *provider.StorageSpaceId { func (s Service) processEvent(e raw.Event) error { ctx := e.GetTraceContext(s.ctx) - ctx, span := tracer.Start(ctx, "processEvent") + _, span := tracer.Start(ctx, "processEvent") defer span.End() e.InProgress() // let nats know that we are processing this event