From 99dee5ae77ea3b92e6b1129de85d96933ba6e180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 11 Sep 2025 16:23:20 +0200 Subject: [PATCH] allow disabling search grpc/event servers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/search/pkg/command/server.go | 172 ++++++++++++-- services/search/pkg/config/grpc.go | 1 + services/search/pkg/config/search.go | 1 + services/search/pkg/search/events.go | 144 ------------ services/search/pkg/server/grpc/option.go | 38 ++- services/search/pkg/server/grpc/server.go | 14 +- .../{search => service/event}/debouncer.go | 3 +- .../event}/debouncer_test.go | 15 +- .../pkg/service/event/search_suite_test.go | 25 ++ services/search/pkg/service/event/service.go | 220 ++++++++++++++++++ .../event/service_test.go} | 21 +- services/search/pkg/service/grpc/v0/option.go | 32 ++- .../search/pkg/service/grpc/v0/service.go | 121 +--------- 13 files changed, 492 insertions(+), 315 deletions(-) delete mode 100644 services/search/pkg/search/events.go rename services/search/pkg/{search => service/event}/debouncer.go (99%) rename services/search/pkg/{search => service/event}/debouncer_test.go (89%) create mode 100644 services/search/pkg/service/event/search_suite_test.go create mode 100644 services/search/pkg/service/event/service.go rename services/search/pkg/{search/events_test.go => service/event/service_test.go} (83%) diff --git a/services/search/pkg/command/server.go b/services/search/pkg/command/server.go index 490e2d6729..b85811268b 100644 --- a/services/search/pkg/command/server.go +++ b/services/search/pkg/command/server.go @@ -5,18 +5,31 @@ import ( "fmt" "os/signal" + "github.com/opencloud-eu/reva/v2/pkg/events/raw" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + opensearchgo "github.com/opensearch-project/opensearch-go/v4" + opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/urfave/cli/v2" + "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" + "github.com/opencloud-eu/opencloud/pkg/registry" "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" + "github.com/opencloud-eu/opencloud/services/search/pkg/bleve" "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/opencloud/services/search/pkg/config/parser" + "github.com/opencloud-eu/opencloud/services/search/pkg/content" "github.com/opencloud-eu/opencloud/services/search/pkg/logging" "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" + "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch" + bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" "github.com/opencloud-eu/opencloud/services/search/pkg/server/debug" "github.com/opencloud-eu/opencloud/services/search/pkg/server/grpc" - "github.com/urfave/cli/v2" + svcEvent "github.com/opencloud-eu/opencloud/services/search/pkg/service/event" ) // Server is the entrypoint for the server command. @@ -52,36 +65,147 @@ func Server(cfg *config.Config) *cli.Command { mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + // initialize search engine + var eng search.Engine + switch cfg.Engine.Type { + case "bleve": + idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath) + if err != nil { + return err + } + + defer func() { + if err = idx.Close(); err != nil { + logger.Error().Err(err).Msg("could not close bleve index") + } + }() + + eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger) + case "open-search": + client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{ + Client: opensearchgo.Config{ + Addresses: cfg.Engine.OpenSearch.Client.Addresses, + Username: cfg.Engine.OpenSearch.Client.Username, + Password: cfg.Engine.OpenSearch.Client.Password, + Header: cfg.Engine.OpenSearch.Client.Header, + CACert: cfg.Engine.OpenSearch.Client.CACert, + RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus, + DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry, + EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout, + MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries, + CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody, + DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart, + DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval, + EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics, + EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger, + }, + }) + if err != nil { + return fmt.Errorf("failed to create OpenSearch client: %w", err) + } + + openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client) + if err != nil { + return fmt.Errorf("failed to create OpenSearch backend: %w", err) + } + + eng = openSearchBackend + default: + return fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) + } + + // initialize gateway selector + selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(traceProvider)) + if err != nil { + logger.Fatal().Err(err).Msg("could not get reva gateway selector") + return err + } + + // initialize search content extractor + var extractor content.Extractor + switch cfg.Extractor.Type { + case "basic": + if extractor, err = content.NewBasicExtractor(logger); err != nil { + return err + } + case "tika": + if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil { + return err + } + default: + return fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) + } + + ss := search.NewService(selector, eng, extractor, mtrcs, logger, cfg) + + // setup the servers gr := runner.NewGroup() - grpcServer, teardown, err := grpc.Server( - grpc.Config(cfg), - grpc.Logger(logger), - grpc.Name(cfg.Service.Name), - grpc.Context(ctx), - grpc.Metrics(mtrcs), - grpc.JWTSecret(cfg.TokenManager.JWTSecret), - grpc.TraceProvider(traceProvider), - ) - defer teardown() - if err != nil { - logger.Info().Err(err).Str("transport", "grpc").Msg("Failed to initialize server") - return err + if !cfg.GRPC.Disabled { + grpcServer, err := grpc.Server( + grpc.Config(cfg), + grpc.Logger(logger), + grpc.Name(cfg.Service.Name), + grpc.Context(ctx), + grpc.Metrics(mtrcs), + grpc.JWTSecret(cfg.TokenManager.JWTSecret), + grpc.TraceProvider(traceProvider), + grpc.GatewaySelector(selector), + grpc.Searcher(ss), + ) + if err != nil { + logger.Error().Err(err).Str("transport", "grpc").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer)) + } else { + logger.Info().Msg("gRPC server disabled, not starting gRPC service") } - gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer)) + if !cfg.Events.Disabled { + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) + bus, err := raw.FromConfig(context.Background(), connName, raw.Config{ + Endpoint: cfg.Events.Endpoint, + Cluster: cfg.Events.Cluster, + EnableTLS: cfg.Events.EnableTLS, + TLSInsecure: cfg.Events.TLSInsecure, + TLSRootCACertificate: cfg.Events.TLSRootCACertificate, + AuthUsername: cfg.Events.AuthUsername, + AuthPassword: cfg.Events.AuthPassword, + MaxAckPending: cfg.Events.MaxAckPending, + AckWait: cfg.Events.AckWait, + }) - debugServer, err := debug.Server( - debug.Logger(logger), - debug.Context(ctx), - debug.Config(cfg), - ) - if err != nil { - logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server") - return err + eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, mtrcs, ss, cfg.Events.DebounceDuration, cfg.Events.NumConsumers, cfg.Events.AsyncUploads) + if err != nil { + logger.Error().Err(err).Str("transport", "event").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.New(cfg.Service.Name+".svc", func() error { + return eventSvc.Run() + }, func() { + eventSvc.Close() + })) + } else { + logger.Info().Msg("event listening disabled, not starting event service") } - gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer)) + // always start a debug server + { + debugServer, err := debug.Server( + debug.Logger(logger), + debug.Context(ctx), + debug.Config(cfg), + ) + if err != nil { + logger.Error().Err(err).Str("transport", "debug").Msg("Failed to initialize server") + return err + } + + gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer)) + } grResults := gr.Run(ctx) diff --git a/services/search/pkg/config/grpc.go b/services/search/pkg/config/grpc.go index bbd26a1f8f..9bdfac4cda 100644 --- a/services/search/pkg/config/grpc.go +++ b/services/search/pkg/config/grpc.go @@ -4,6 +4,7 @@ import "github.com/opencloud-eu/opencloud/pkg/shared" // GRPCConfig defines the available grpc configuration. type GRPCConfig struct { + Disabled bool `yaml:"disabled" env:"SEARCH_GRPC_DISABLED" desc:"Disables the GRPC service. Set this to true if the service should only handle events." introductionVersion:"%%NEXT%%"` Addr string `yaml:"addr" env:"SEARCH_GRPC_ADDR" desc:"The bind address of the GRPC service." introductionVersion:"1.0.0"` Namespace string `yaml:"-"` TLS *shared.GRPCServiceTLS `yaml:"tls"` diff --git a/services/search/pkg/config/search.go b/services/search/pkg/config/search.go index b08dea984d..8ff0399270 100644 --- a/services/search/pkg/config/search.go +++ b/services/search/pkg/config/search.go @@ -4,6 +4,7 @@ import "time" // Events combines the configuration options for the event bus. type Events struct { + Disabled bool `yaml:"disabled" env:"SEARCH_EVENTS_DISABLED" desc:"Disables listening for events. Set this to true if the service should only handle GRPC requests." introductionVersion:"%%NEXT%%"` Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"` Cluster string `yaml:"cluster" env:"OC_EVENTS_CLUSTER;SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"1.0.0"` AsyncUploads bool `yaml:"async_uploads" env:"OC_ASYNC_UPLOADS;SEARCH_EVENTS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads." introductionVersion:"1.0.0"` diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go deleted file mode 100644 index 64a825da63..0000000000 --- a/services/search/pkg/search/events.go +++ /dev/null @@ -1,144 +0,0 @@ -package search - -import ( - "context" - "time" - - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/opencloud-eu/opencloud/pkg/log" - "github.com/opencloud-eu/opencloud/services/search/pkg/config" - "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" - "github.com/opencloud-eu/reva/v2/pkg/events" - "github.com/opencloud-eu/reva/v2/pkg/events/raw" - "github.com/opencloud-eu/reva/v2/pkg/storagespace" -) - -// HandleEvents listens to the needed events, -// it handles the whole resource indexing livecycle. -func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error { - evts := []events.Unmarshaller{ - events.ItemTrashed{}, - events.ItemPurged{}, - events.ItemRestored{}, - events.ItemMoved{}, - events.TrashbinPurged{}, - events.ContainerCreated{}, - events.FileTouched{}, - events.FileVersionRestored{}, - events.TagsAdded{}, - events.TagsRemoved{}, - events.SpaceRenamed{}, - } - - if cfg.Events.AsyncUploads { - evts = append(evts, events.UploadReady{}) - } else { - evts = append(evts, events.FileUploaded{}) - } - - ch, err := stream.Consume("search-pull", evts...) - if err != nil { - return err - } - - if m != nil { - monitorMetrics(stream, "search-pull", m, logger) - } - - if cfg.Events.NumConsumers == 0 { - cfg.Events.NumConsumers = 1 - } - - getSpaceID := func(ref *provider.Reference) *provider.StorageSpaceId { - return &provider.StorageSpaceId{ - OpaqueId: storagespace.FormatResourceID( - &provider.ResourceId{ - StorageId: ref.GetResourceId().GetStorageId(), - SpaceId: ref.GetResourceId().GetSpaceId(), - }, - ), - } - } - - indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) { - if err := s.IndexSpace(id); err != nil { - logger.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space") - } - }, logger) - - for i := 0; i < cfg.Events.NumConsumers; i++ { - go func(s Searcher, ch <-chan raw.Event) { - for event := range ch { - e := event - go func() { - e.InProgress() // let nats know that we are processing this event - logger.Debug().Interface("event", e).Msg("updating index") - - switch ev := e.Event.Event.(type) { - case events.ItemTrashed: - s.TrashItem(ev.ID) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.ItemPurged: - s.PurgeItem(ev.Ref) - e.Ack() - case events.TrashbinPurged: - s.PurgeDeleted(getSpaceID(ev.Ref)) - e.Ack() - case events.ItemMoved: - s.MoveItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.ItemRestored: - s.RestoreItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.ContainerCreated: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.FileTouched: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.FileVersionRestored: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.TagsAdded: - s.UpsertItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.TagsRemoved: - s.UpsertItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.FileUploaded: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) - case events.UploadReady: - indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack) - case events.SpaceRenamed: - indexSpaceDebouncer.Debounce(ev.ID, e.Ack) - } - }() - } - }( - s, - ch, - ) - } - - return nil -} - -func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { - ctx := context.Background() - consumer, err := stream.JetStream().Consumer(ctx, name) - if err != nil { - logger.Error().Err(err).Msg("failed to get consumer") - } - ticker := time.NewTicker(5 * time.Second) - go func() { - for range ticker.C { - info, err := consumer.Info(ctx) - if err != nil { - logger.Error().Err(err).Msg("failed to get consumer") - continue - } - - m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) - m.EventsUnprocessed.Set(float64(info.NumPending)) - m.EventsRedelivered.Set(float64(info.NumRedelivered)) - logger.Trace().Msg("updated search event metrics") - } - }() -} diff --git a/services/search/pkg/server/grpc/option.go b/services/search/pkg/server/grpc/option.go index bc27a4ecc7..1686dce219 100644 --- a/services/search/pkg/server/grpc/option.go +++ b/services/search/pkg/server/grpc/option.go @@ -3,11 +3,15 @@ package grpc import ( "context" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "go.opentelemetry.io/otel/trace" + "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" svc "github.com/opencloud-eu/opencloud/services/search/pkg/service/grpc/v0" - "go.opentelemetry.io/otel/trace" ) // Option defines a single option function. @@ -15,14 +19,16 @@ type Option func(o *Options) // Options defines the available options for this package. type Options struct { - Name string - Logger log.Logger - Context context.Context - Config *config.Config - Metrics *metrics.Metrics - Handler *svc.Service - JWTSecret string - TraceProvider trace.TracerProvider + Name string + Logger log.Logger + Context context.Context + Config *config.Config + Metrics *metrics.Metrics + Handler *svc.Service + JWTSecret string + TraceProvider trace.TracerProvider + GatewaySelector *pool.Selector[gateway.GatewayAPIClient] + Searcher search.Searcher } // newOptions initializes the available default options. @@ -91,3 +97,17 @@ func TraceProvider(val trace.TracerProvider) Option { o.TraceProvider = val } } + +// GatewaySelector provides a function to set the GatewaySelector option. +func GatewaySelector(val *pool.Selector[gateway.GatewayAPIClient]) Option { + return func(o *Options) { + o.GatewaySelector = val + } +} + +// Searcher provides a function to set the Searcher option. +func Searcher(val search.Searcher) Option { + return func(o *Options) { + o.Searcher = val + } +} diff --git a/services/search/pkg/server/grpc/server.go b/services/search/pkg/server/grpc/server.go index d3a012ec89..b6e8f1868e 100644 --- a/services/search/pkg/server/grpc/server.go +++ b/services/search/pkg/server/grpc/server.go @@ -8,7 +8,7 @@ import ( ) // Server initializes a new go-micro service ready to run -func Server(opts ...Option) (grpc.Service, func(), error) { +func Server(opts ...Option) (grpc.Service, error) { options := newOptions(opts...) service, err := grpc.NewServiceWithClient( @@ -28,21 +28,23 @@ func Server(opts ...Option) (grpc.Service, func(), error) { ) if err != nil { options.Logger.Fatal().Err(err).Msg("Error creating search service") - return grpc.Service{}, func() {}, err + return grpc.Service{}, err } - handle, teardown, err := svc.NewHandler( + handle, err := svc.NewHandler( svc.Config(options.Config), svc.Logger(options.Logger), svc.JWTSecret(options.JWTSecret), svc.TracerProvider(options.TraceProvider), svc.Metrics(options.Metrics), + svc.GatewaySelector(options.GatewaySelector), + svc.Searcher(options.Searcher), ) if err != nil { options.Logger.Error(). Err(err). Msg("Error initializing search service") - return grpc.Service{}, teardown, err + return grpc.Service{}, err } if err := searchsvc.RegisterSearchProviderHandler( @@ -52,8 +54,8 @@ func Server(opts ...Option) (grpc.Service, func(), error) { options.Logger.Error(). Err(err). Msg("Error registering search provider handler") - return grpc.Service{}, teardown, err + return grpc.Service{}, err } - return service, teardown, nil + return service, nil } diff --git a/services/search/pkg/search/debouncer.go b/services/search/pkg/service/event/debouncer.go similarity index 99% rename from services/search/pkg/search/debouncer.go rename to services/search/pkg/service/event/debouncer.go index 2849c66a77..1444b9dbd7 100644 --- a/services/search/pkg/search/debouncer.go +++ b/services/search/pkg/service/event/debouncer.go @@ -1,10 +1,11 @@ -package search +package event import ( "sync" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/opencloud-eu/opencloud/pkg/log" ) diff --git a/services/search/pkg/search/debouncer_test.go b/services/search/pkg/service/event/debouncer_test.go similarity index 89% rename from services/search/pkg/search/debouncer_test.go rename to services/search/pkg/service/event/debouncer_test.go index 08c26f3936..6e2c419155 100644 --- a/services/search/pkg/search/debouncer_test.go +++ b/services/search/pkg/service/event/debouncer_test.go @@ -1,4 +1,4 @@ -package search_test +package event_test import ( "sync/atomic" @@ -7,13 +7,14 @@ import ( sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/opencloud-eu/opencloud/pkg/log" - "github.com/opencloud-eu/opencloud/services/search/pkg/search" + "github.com/opencloud-eu/opencloud/services/search/pkg/service/event" ) var _ = Describe("SpaceDebouncer", func() { var ( - debouncer *search.SpaceDebouncer + debouncer *event.SpaceDebouncer callCount atomic.Int32 @@ -24,7 +25,7 @@ var _ = Describe("SpaceDebouncer", func() { BeforeEach(func() { callCount = atomic.Int32{} - debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) { + debouncer = event.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } @@ -55,7 +56,7 @@ var _ = Describe("SpaceDebouncer", func() { }) It("doesn't trigger twice simultaneously", func() { - debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) { + debouncer = event.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } @@ -74,7 +75,7 @@ var _ = Describe("SpaceDebouncer", func() { }) It("fires at the timeout even when continuously debounced", func() { - debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) { + debouncer = event.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } @@ -116,7 +117,7 @@ var _ = Describe("SpaceDebouncer", func() { }) It("doesn't run the timeout function if the work function has been called", func() { - debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) { + debouncer = event.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } diff --git a/services/search/pkg/service/event/search_suite_test.go b/services/search/pkg/service/event/search_suite_test.go new file mode 100644 index 0000000000..c3336dce5f --- /dev/null +++ b/services/search/pkg/service/event/search_suite_test.go @@ -0,0 +1,25 @@ +package event_test + +import ( + "testing" + + "github.com/opencloud-eu/opencloud/pkg/registry" + mRegistry "go-micro.dev/v4/registry" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func init() { + r := registry.GetRegistry(registry.Inmemory()) + service := registry.BuildGRPCService("eu.opencloud.api.gateway", "", "", "") + service.Nodes = []*mRegistry.Node{{ + Address: "any", + }} + + _ = r.Register(service) +} +func TestEvent(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Event Suite") +} diff --git a/services/search/pkg/service/event/service.go b/services/search/pkg/service/event/service.go new file mode 100644 index 0000000000..237f11e9a3 --- /dev/null +++ b/services/search/pkg/service/event/service.go @@ -0,0 +1,220 @@ +package event + +import ( + "context" + "sync" + "sync/atomic" + "time" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" + "github.com/opencloud-eu/reva/v2/pkg/events" + "github.com/opencloud-eu/reva/v2/pkg/events/raw" + "github.com/opencloud-eu/reva/v2/pkg/storagespace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +var tracer trace.Tracer + +func init() { + tracer = otel.Tracer("github.com/opencloud-eu/opencloud/services/search/pkg/service/event") +} + +// Service defines the service handlers. +type Service struct { + ctx context.Context + log log.Logger + tp trace.TracerProvider + m *metrics.Metrics + index search.Searcher + events []events.Unmarshaller + stream raw.Stream + indexSpaceDebouncer *SpaceDebouncer + numConsumers int + stopCh chan struct{} + stopped *atomic.Bool +} + +// New returns a service implementation for Service. +func New(ctx context.Context, stream raw.Stream, logger log.Logger, tp trace.TracerProvider, m *metrics.Metrics, index search.Searcher, debounceDuration int, numConsumers int, asyncUploads bool) (Service, error) { + svc := Service{ + ctx: ctx, + log: logger, + tp: tp, + m: m, + index: index, + stream: stream, + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), + events: []events.Unmarshaller{ + events.ItemTrashed{}, + events.ItemPurged{}, + events.ItemRestored{}, + events.ItemMoved{}, + events.TrashbinPurged{}, + events.ContainerCreated{}, + events.FileTouched{}, + events.FileVersionRestored{}, + events.TagsAdded{}, + events.TagsRemoved{}, + events.SpaceRenamed{}, + }, + numConsumers: numConsumers, + } + + if asyncUploads { + svc.events = append(svc.events, events.UploadReady{}) + } else { + svc.events = append(svc.events, events.FileUploaded{}) + } + + svc.indexSpaceDebouncer = NewSpaceDebouncer(time.Duration(debounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) { + if err := svc.index.IndexSpace(id); err != nil { + svc.log.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space") + } + }, svc.log) + + return svc, nil +} + +// Run to fulfil Runner interface +func (s Service) Run() error { + ch, err := s.stream.Consume("search-pull", s.events...) + if err != nil { + return err + } + + if s.m != nil { + monitorMetrics(s.stream, "search-pull", s.m, s.log) + } + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + + // start workers + for i := 0; i < s.numConsumers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + return + } + if err := s.processEvent(e); err != nil { + s.log.Error().Err(err). + Int("worker", workerID). + Interface("event", e). + Msg("failed to process event") + } + } + } + }(i) + } + + // wait for stop signal + <-s.stopCh + cancel() // signal workers to stop + wg.Wait() + + return nil +} + +// Close will make the service to stop processing, so the `Run` +// method can finish. +// TODO: Underlying services can't be stopped. This means that some goroutines +// will get stuck trying to push events through a channel nobody is reading +// from, so resources won't be freed and there will be memory leaks. For now, +// if the service is stopped, you should close the app soon after. +func (s Service) Close() { + if s.stopped.CompareAndSwap(false, true) { + close(s.stopCh) + } +} + +func getSpaceID(ref *provider.Reference) *provider.StorageSpaceId { + return &provider.StorageSpaceId{ + OpaqueId: storagespace.FormatResourceID( + &provider.ResourceId{ + StorageId: ref.GetResourceId().GetStorageId(), + SpaceId: ref.GetResourceId().GetSpaceId(), + }, + ), + } +} + +func (s Service) processEvent(e raw.Event) error { + ctx := e.GetTraceContext(s.ctx) + ctx, span := tracer.Start(ctx, "processEvent") + defer span.End() + + e.InProgress() // let nats know that we are processing this event + s.log.Debug().Interface("event", e).Msg("updating index") + + switch ev := e.Event.Event.(type) { + case events.ItemTrashed: + s.index.TrashItem(ev.ID) + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.ItemPurged: + s.index.PurgeItem(ev.Ref) + e.Ack() + case events.TrashbinPurged: + s.index.PurgeDeleted(getSpaceID(ev.Ref)) + e.Ack() + case events.ItemMoved: + s.index.MoveItem(ev.Ref) + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.ItemRestored: + s.index.RestoreItem(ev.Ref) + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.ContainerCreated: + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.FileTouched: + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.FileVersionRestored: + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.TagsAdded: + s.index.UpsertItem(ev.Ref) + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.TagsRemoved: + s.index.UpsertItem(ev.Ref) + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.FileUploaded: + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) + case events.UploadReady: + s.indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack) + case events.SpaceRenamed: + s.indexSpaceDebouncer.Debounce(ev.ID, e.Ack) + } + return nil +} + +func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { + ctx := context.Background() + consumer, err := stream.JetStream().Consumer(ctx, name) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + ticker := time.NewTicker(5 * time.Second) + go func() { + for range ticker.C { + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + continue + } + + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Trace().Msg("updated search event metrics") + } + }() +} diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/service/event/service_test.go similarity index 83% rename from services/search/pkg/search/events_test.go rename to services/search/pkg/service/event/service_test.go index 3eeb499245..cb707af6e1 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/service/event/service_test.go @@ -1,22 +1,22 @@ -package search_test +package event_test import ( + "context" "sync/atomic" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/opencloud-eu/opencloud/pkg/log" - "github.com/opencloud-eu/opencloud/services/search/pkg/config" - "github.com/opencloud-eu/opencloud/services/search/pkg/search" searchMocks "github.com/opencloud-eu/opencloud/services/search/pkg/search/mocks" + "github.com/opencloud-eu/opencloud/services/search/pkg/service/event" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/raw" rawMocks "github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks" "github.com/stretchr/testify/mock" ) -var _ = DescribeTable("events", +var _ = DescribeTable("event", func(mcks []string, e any, asyncUploads bool) { var ( s = &searchMocks.Searcher{} @@ -27,11 +27,14 @@ var _ = DescribeTable("events", ch := make(chan raw.Event, 1) stream.EXPECT().Consume(mock.Anything, mock.Anything).Return((<-chan raw.Event)(ch), nil) - search.HandleEvents(s, stream, &config.Config{ - Events: config.Events{ - AsyncUploads: asyncUploads, - }, - }, nil, log.NewLogger()) + event, err := event.New(context.Background(), stream, log.NewLogger(), nil, nil, s, 50, 1, asyncUploads) + Expect(err).NotTo(HaveOccurred()) + + go func() { + err := event.Run() + Expect(err).NotTo(HaveOccurred()) + }() + defer event.Close() for _, mck := range mcks { s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { diff --git a/services/search/pkg/service/grpc/v0/option.go b/services/search/pkg/service/grpc/v0/option.go index b7ab82250b..e284651895 100644 --- a/services/search/pkg/service/grpc/v0/option.go +++ b/services/search/pkg/service/grpc/v0/option.go @@ -1,10 +1,14 @@ package service import ( + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + "go.opentelemetry.io/otel/trace" + "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" - "go.opentelemetry.io/otel/trace" + "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) // Option defines a single option function. @@ -12,11 +16,13 @@ type Option func(o *Options) // Options defines the available options for this package. type Options struct { - Logger log.Logger - Config *config.Config - JWTSecret string - TracerProvider trace.TracerProvider - Metrics *metrics.Metrics + Logger log.Logger + Config *config.Config + JWTSecret string + TracerProvider trace.TracerProvider + Metrics *metrics.Metrics + GatewaySelector *pool.Selector[gateway.GatewayAPIClient] + Searcher search.Searcher } func newOptions(opts ...Option) Options { @@ -65,3 +71,17 @@ func Metrics(val *metrics.Metrics) Option { } } } + +// GatewaySelector provides a function to set the GatewaySelector option. +func GatewaySelector(val *pool.Selector[gateway.GatewayAPIClient]) Option { + return func(o *Options) { + o.GatewaySelector = val + } +} + +// Searcher provides a function to set the Searcher option. +func Searcher(val search.Searcher) Option { + return func(o *Options) { + o.Searcher = val + } +} diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 32a76843e8..9817626756 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -13,132 +13,35 @@ import ( "github.com/jellydator/ttlcache/v2" revactx "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/errtypes" - "github.com/opencloud-eu/reva/v2/pkg/events/raw" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/token" "github.com/opencloud-eu/reva/v2/pkg/token/manager/jwt" "github.com/opencloud-eu/reva/v2/pkg/utils" - opensearchgo "github.com/opensearch-project/opensearch-go/v4" - opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi" merrors "go-micro.dev/v4/errors" "go-micro.dev/v4/metadata" grpcmetadata "google.golang.org/grpc/metadata" - "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" - "github.com/opencloud-eu/opencloud/pkg/registry" v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" searchsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0" - "github.com/opencloud-eu/opencloud/services/search/pkg/bleve" "github.com/opencloud-eu/opencloud/services/search/pkg/config" - "github.com/opencloud-eu/opencloud/services/search/pkg/content" - "github.com/opencloud-eu/opencloud/services/search/pkg/opensearch" - bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve" "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) // NewHandler returns a service implementation for Service. -func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) { - teardown := func() {} +func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) { options := newOptions(opts...) - logger := options.Logger cfg := options.Config - - // initialize search engine - var eng search.Engine - switch cfg.Engine.Type { - case "bleve": - idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath) - if err != nil { - return nil, teardown, err - } - - teardown = func() { - _ = idx.Close() - } - - eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger) - case "open-search": - client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{ - Client: opensearchgo.Config{ - Addresses: cfg.Engine.OpenSearch.Client.Addresses, - Username: cfg.Engine.OpenSearch.Client.Username, - Password: cfg.Engine.OpenSearch.Client.Password, - Header: cfg.Engine.OpenSearch.Client.Header, - CACert: cfg.Engine.OpenSearch.Client.CACert, - RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus, - DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry, - EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout, - MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries, - CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody, - DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart, - DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval, - EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics, - EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger, - }, - }) - if err != nil { - return nil, teardown, fmt.Errorf("failed to create OpenSearch client: %w", err) - } - - openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client) - if err != nil { - return nil, teardown, fmt.Errorf("failed to create OpenSearch backend: %w", err) - } - - eng = openSearchBackend - default: - return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) + if options.GatewaySelector == nil { + return nil, errors.New("no GatewaySelector provided") } - - // initialize gateway - selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(options.TracerProvider)) - if err != nil { - logger.Fatal().Err(err).Msg("could not get reva gateway selector") - return nil, teardown, err - } - // initialize search content extractor - var extractor content.Extractor - switch cfg.Extractor.Type { - case "basic": - if extractor, err = content.NewBasicExtractor(logger); err != nil { - return nil, teardown, err - } - case "tika": - if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil { - return nil, teardown, err - } - default: - return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) - } - - ss := search.NewService(selector, eng, extractor, options.Metrics, logger, cfg) - - // setup event handling - - connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) - stream, err := raw.FromConfig(context.Background(), connName, raw.Config{ - Endpoint: cfg.Events.Endpoint, - Cluster: cfg.Events.Cluster, - EnableTLS: cfg.Events.EnableTLS, - TLSInsecure: cfg.Events.TLSInsecure, - TLSRootCACertificate: cfg.Events.TLSRootCACertificate, - AuthUsername: cfg.Events.AuthUsername, - AuthPassword: cfg.Events.AuthPassword, - MaxAckPending: cfg.Events.MaxAckPending, - AckWait: cfg.Events.AckWait, - }) - if err != nil { - return nil, teardown, err - } - - if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil { - return nil, teardown, err + if options.Searcher == nil { + return nil, errors.New("no Searcher provided") } cache := ttlcache.NewCache() if err := cache.SetTTL(time.Second); err != nil { - return nil, teardown, err + return nil, err } tokenManager, err := jwt.New(map[string]interface{}{ @@ -146,24 +49,24 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) "expires": int64(24 * 60 * 60), }) if err != nil { - return nil, teardown, err + return nil, err } return &Service{ id: cfg.GRPC.Namespace + "." + cfg.Service.Name, - log: logger, - searcher: ss, + log: &options.Logger, + searcher: options.Searcher, cache: cache, tokenManager: tokenManager, - gws: selector, + gws: options.GatewaySelector, cfg: cfg, - }, teardown, nil + }, nil } // Service implements the searchServiceHandler interface type Service struct { id string - log log.Logger + log *log.Logger searcher search.Searcher cache *ttlcache.Cache tokenManager token.Manager