allow disabling search grpc/event servers

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2025-09-11 16:23:20 +02:00
parent 25ae8a3fc2
commit 99dee5ae77
13 changed files with 492 additions and 315 deletions

View File

@@ -5,18 +5,31 @@ import (
"fmt"
"os/signal"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/urfave/cli/v2"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/runner"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/search/pkg/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/logging"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/debug"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/grpc"
"github.com/urfave/cli/v2"
svcEvent "github.com/opencloud-eu/opencloud/services/search/pkg/service/event"
)
// Server is the entrypoint for the server command.
@@ -52,36 +65,147 @@ func Server(cfg *config.Config) *cli.Command {
mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)
// initialize search engine
var eng search.Engine
switch cfg.Engine.Type {
case "bleve":
idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath)
if err != nil {
return err
}
defer func() {
if err = idx.Close(); err != nil {
logger.Error().Err(err).Msg("could not close bleve index")
}
}()
eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger)
case "open-search":
client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{
Client: opensearchgo.Config{
Addresses: cfg.Engine.OpenSearch.Client.Addresses,
Username: cfg.Engine.OpenSearch.Client.Username,
Password: cfg.Engine.OpenSearch.Client.Password,
Header: cfg.Engine.OpenSearch.Client.Header,
CACert: cfg.Engine.OpenSearch.Client.CACert,
RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus,
DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry,
EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout,
MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries,
CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody,
DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart,
DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval,
EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics,
EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger,
},
})
if err != nil {
return fmt.Errorf("failed to create OpenSearch client: %w", err)
}
openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client)
if err != nil {
return fmt.Errorf("failed to create OpenSearch backend: %w", err)
}
eng = openSearchBackend
default:
return fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
}
// initialize gateway selector
selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(traceProvider))
if err != nil {
logger.Fatal().Err(err).Msg("could not get reva gateway selector")
return err
}
// initialize search content extractor
var extractor content.Extractor
switch cfg.Extractor.Type {
case "basic":
if extractor, err = content.NewBasicExtractor(logger); err != nil {
return err
}
case "tika":
if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil {
return err
}
default:
return fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
ss := search.NewService(selector, eng, extractor, mtrcs, logger, cfg)
// setup the servers
gr := runner.NewGroup()
grpcServer, teardown, err := grpc.Server(
grpc.Config(cfg),
grpc.Logger(logger),
grpc.Name(cfg.Service.Name),
grpc.Context(ctx),
grpc.Metrics(mtrcs),
grpc.JWTSecret(cfg.TokenManager.JWTSecret),
grpc.TraceProvider(traceProvider),
)
defer teardown()
if err != nil {
logger.Info().Err(err).Str("transport", "grpc").Msg("Failed to initialize server")
return err
if !cfg.GRPC.Disabled {
grpcServer, err := grpc.Server(
grpc.Config(cfg),
grpc.Logger(logger),
grpc.Name(cfg.Service.Name),
grpc.Context(ctx),
grpc.Metrics(mtrcs),
grpc.JWTSecret(cfg.TokenManager.JWTSecret),
grpc.TraceProvider(traceProvider),
grpc.GatewaySelector(selector),
grpc.Searcher(ss),
)
if err != nil {
logger.Error().Err(err).Str("transport", "grpc").Msg("Failed to initialize server")
return err
}
gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer))
} else {
logger.Info().Msg("gRPC server disabled, not starting gRPC service")
}
gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer))
if !cfg.Events.Disabled {
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
bus, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
MaxAckPending: cfg.Events.MaxAckPending,
AckWait: cfg.Events.AckWait,
})
debugServer, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, mtrcs, ss, cfg.Events.DebounceDuration, cfg.Events.NumConsumers, cfg.Events.AsyncUploads)
if err != nil {
logger.Error().Err(err).Str("transport", "event").Msg("Failed to initialize server")
return err
}
gr.Add(runner.New(cfg.Service.Name+".svc", func() error {
return eventSvc.Run()
}, func() {
eventSvc.Close()
}))
} else {
logger.Info().Msg("event listening disabled, not starting event service")
}
gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer))
// always start a debug server
{
debugServer, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Error().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
}
gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer))
}
grResults := gr.Run(ctx)

View File

@@ -4,6 +4,7 @@ import "github.com/opencloud-eu/opencloud/pkg/shared"
// GRPCConfig defines the available grpc configuration.
type GRPCConfig struct {
Disabled bool `yaml:"disabled" env:"SEARCH_GRPC_DISABLED" desc:"Disables the GRPC service. Set this to true if the service should only handle events." introductionVersion:"%%NEXT%%"`
Addr string `yaml:"addr" env:"SEARCH_GRPC_ADDR" desc:"The bind address of the GRPC service." introductionVersion:"1.0.0"`
Namespace string `yaml:"-"`
TLS *shared.GRPCServiceTLS `yaml:"tls"`

View File

@@ -4,6 +4,7 @@ import "time"
// Events combines the configuration options for the event bus.
type Events struct {
Disabled bool `yaml:"disabled" env:"SEARCH_EVENTS_DISABLED" desc:"Disables listening for events. Set this to true if the service should only handle GRPC requests." introductionVersion:"%%NEXT%%"`
Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"`
Cluster string `yaml:"cluster" env:"OC_EVENTS_CLUSTER;SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"1.0.0"`
AsyncUploads bool `yaml:"async_uploads" env:"OC_ASYNC_UPLOADS;SEARCH_EVENTS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads." introductionVersion:"1.0.0"`

View File

@@ -1,144 +0,0 @@
package search
import (
"context"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
)
// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemPurged{},
events.ItemRestored{},
events.ItemMoved{},
events.TrashbinPurged{},
events.ContainerCreated{},
events.FileTouched{},
events.FileVersionRestored{},
events.TagsAdded{},
events.TagsRemoved{},
events.SpaceRenamed{},
}
if cfg.Events.AsyncUploads {
evts = append(evts, events.UploadReady{})
} else {
evts = append(evts, events.FileUploaded{})
}
ch, err := stream.Consume("search-pull", evts...)
if err != nil {
return err
}
if m != nil {
monitorMetrics(stream, "search-pull", m, logger)
}
if cfg.Events.NumConsumers == 0 {
cfg.Events.NumConsumers = 1
}
getSpaceID := func(ref *provider.Reference) *provider.StorageSpaceId {
return &provider.StorageSpaceId{
OpaqueId: storagespace.FormatResourceID(
&provider.ResourceId{
StorageId: ref.GetResourceId().GetStorageId(),
SpaceId: ref.GetResourceId().GetSpaceId(),
},
),
}
}
indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) {
if err := s.IndexSpace(id); err != nil {
logger.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space")
}
}, logger)
for i := 0; i < cfg.Events.NumConsumers; i++ {
go func(s Searcher, ch <-chan raw.Event) {
for event := range ch {
e := event
go func() {
e.InProgress() // let nats know that we are processing this event
logger.Debug().Interface("event", e).Msg("updating index")
switch ev := e.Event.Event.(type) {
case events.ItemTrashed:
s.TrashItem(ev.ID)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemPurged:
s.PurgeItem(ev.Ref)
e.Ack()
case events.TrashbinPurged:
s.PurgeDeleted(getSpaceID(ev.Ref))
e.Ack()
case events.ItemMoved:
s.MoveItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemRestored:
s.RestoreItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ContainerCreated:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileTouched:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileVersionRestored:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.TagsAdded:
s.UpsertItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.TagsRemoved:
s.UpsertItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileUploaded:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.UploadReady:
indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack)
case events.SpaceRenamed:
indexSpaceDebouncer.Debounce(ev.ID, e.Ack)
}
}()
}
}(
s,
ch,
)
}
return nil
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated search event metrics")
}
}()
}

View File

@@ -3,11 +3,15 @@ package grpc
import (
"context"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
svc "github.com/opencloud-eu/opencloud/services/search/pkg/service/grpc/v0"
"go.opentelemetry.io/otel/trace"
)
// Option defines a single option function.
@@ -15,14 +19,16 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Name string
Logger log.Logger
Context context.Context
Config *config.Config
Metrics *metrics.Metrics
Handler *svc.Service
JWTSecret string
TraceProvider trace.TracerProvider
Name string
Logger log.Logger
Context context.Context
Config *config.Config
Metrics *metrics.Metrics
Handler *svc.Service
JWTSecret string
TraceProvider trace.TracerProvider
GatewaySelector *pool.Selector[gateway.GatewayAPIClient]
Searcher search.Searcher
}
// newOptions initializes the available default options.
@@ -91,3 +97,17 @@ func TraceProvider(val trace.TracerProvider) Option {
o.TraceProvider = val
}
}
// GatewaySelector provides a function to set the GatewaySelector option.
func GatewaySelector(val *pool.Selector[gateway.GatewayAPIClient]) Option {
return func(o *Options) {
o.GatewaySelector = val
}
}
// Searcher provides a function to set the Searcher option.
func Searcher(val search.Searcher) Option {
return func(o *Options) {
o.Searcher = val
}
}

View File

@@ -8,7 +8,7 @@ import (
)
// Server initializes a new go-micro service ready to run
func Server(opts ...Option) (grpc.Service, func(), error) {
func Server(opts ...Option) (grpc.Service, error) {
options := newOptions(opts...)
service, err := grpc.NewServiceWithClient(
@@ -28,21 +28,23 @@ func Server(opts ...Option) (grpc.Service, func(), error) {
)
if err != nil {
options.Logger.Fatal().Err(err).Msg("Error creating search service")
return grpc.Service{}, func() {}, err
return grpc.Service{}, err
}
handle, teardown, err := svc.NewHandler(
handle, err := svc.NewHandler(
svc.Config(options.Config),
svc.Logger(options.Logger),
svc.JWTSecret(options.JWTSecret),
svc.TracerProvider(options.TraceProvider),
svc.Metrics(options.Metrics),
svc.GatewaySelector(options.GatewaySelector),
svc.Searcher(options.Searcher),
)
if err != nil {
options.Logger.Error().
Err(err).
Msg("Error initializing search service")
return grpc.Service{}, teardown, err
return grpc.Service{}, err
}
if err := searchsvc.RegisterSearchProviderHandler(
@@ -52,8 +54,8 @@ func Server(opts ...Option) (grpc.Service, func(), error) {
options.Logger.Error().
Err(err).
Msg("Error registering search provider handler")
return grpc.Service{}, teardown, err
return grpc.Service{}, err
}
return service, teardown, nil
return service, nil
}

View File

@@ -1,10 +1,11 @@
package search
package event
import (
"sync"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
)

View File

@@ -1,4 +1,4 @@
package search_test
package event_test
import (
"sync/atomic"
@@ -7,13 +7,14 @@ import (
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/opencloud/services/search/pkg/service/event"
)
var _ = Describe("SpaceDebouncer", func() {
var (
debouncer *search.SpaceDebouncer
debouncer *event.SpaceDebouncer
callCount atomic.Int32
@@ -24,7 +25,7 @@ var _ = Describe("SpaceDebouncer", func() {
BeforeEach(func() {
callCount = atomic.Int32{}
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) {
debouncer = event.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
@@ -55,7 +56,7 @@ var _ = Describe("SpaceDebouncer", func() {
})
It("doesn't trigger twice simultaneously", func() {
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) {
debouncer = event.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
@@ -74,7 +75,7 @@ var _ = Describe("SpaceDebouncer", func() {
})
It("fires at the timeout even when continuously debounced", func() {
debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) {
debouncer = event.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
@@ -116,7 +117,7 @@ var _ = Describe("SpaceDebouncer", func() {
})
It("doesn't run the timeout function if the work function has been called", func() {
debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) {
debouncer = event.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}

View File

@@ -0,0 +1,25 @@
package event_test
import (
"testing"
"github.com/opencloud-eu/opencloud/pkg/registry"
mRegistry "go-micro.dev/v4/registry"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func init() {
r := registry.GetRegistry(registry.Inmemory())
service := registry.BuildGRPCService("eu.opencloud.api.gateway", "", "", "")
service.Nodes = []*mRegistry.Node{{
Address: "any",
}}
_ = r.Register(service)
}
func TestEvent(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Event Suite")
}

View File

@@ -0,0 +1,220 @@
package event
import (
"context"
"sync"
"sync/atomic"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("github.com/opencloud-eu/opencloud/services/search/pkg/service/event")
}
// Service defines the service handlers.
type Service struct {
ctx context.Context
log log.Logger
tp trace.TracerProvider
m *metrics.Metrics
index search.Searcher
events []events.Unmarshaller
stream raw.Stream
indexSpaceDebouncer *SpaceDebouncer
numConsumers int
stopCh chan struct{}
stopped *atomic.Bool
}
// New returns a service implementation for Service.
func New(ctx context.Context, stream raw.Stream, logger log.Logger, tp trace.TracerProvider, m *metrics.Metrics, index search.Searcher, debounceDuration int, numConsumers int, asyncUploads bool) (Service, error) {
svc := Service{
ctx: ctx,
log: logger,
tp: tp,
m: m,
index: index,
stream: stream,
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
events: []events.Unmarshaller{
events.ItemTrashed{},
events.ItemPurged{},
events.ItemRestored{},
events.ItemMoved{},
events.TrashbinPurged{},
events.ContainerCreated{},
events.FileTouched{},
events.FileVersionRestored{},
events.TagsAdded{},
events.TagsRemoved{},
events.SpaceRenamed{},
},
numConsumers: numConsumers,
}
if asyncUploads {
svc.events = append(svc.events, events.UploadReady{})
} else {
svc.events = append(svc.events, events.FileUploaded{})
}
svc.indexSpaceDebouncer = NewSpaceDebouncer(time.Duration(debounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) {
if err := svc.index.IndexSpace(id); err != nil {
svc.log.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space")
}
}, svc.log)
return svc, nil
}
// Run to fulfil Runner interface
func (s Service) Run() error {
ch, err := s.stream.Consume("search-pull", s.events...)
if err != nil {
return err
}
if s.m != nil {
monitorMetrics(s.stream, "search-pull", s.m, s.log)
}
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
// start workers
for i := 0; i < s.numConsumers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
return
}
if err := s.processEvent(e); err != nil {
s.log.Error().Err(err).
Int("worker", workerID).
Interface("event", e).
Msg("failed to process event")
}
}
}
}(i)
}
// wait for stop signal
<-s.stopCh
cancel() // signal workers to stop
wg.Wait()
return nil
}
// Close will make the service to stop processing, so the `Run`
// method can finish.
// TODO: Underlying services can't be stopped. This means that some goroutines
// will get stuck trying to push events through a channel nobody is reading
// from, so resources won't be freed and there will be memory leaks. For now,
// if the service is stopped, you should close the app soon after.
func (s Service) Close() {
if s.stopped.CompareAndSwap(false, true) {
close(s.stopCh)
}
}
func getSpaceID(ref *provider.Reference) *provider.StorageSpaceId {
return &provider.StorageSpaceId{
OpaqueId: storagespace.FormatResourceID(
&provider.ResourceId{
StorageId: ref.GetResourceId().GetStorageId(),
SpaceId: ref.GetResourceId().GetSpaceId(),
},
),
}
}
func (s Service) processEvent(e raw.Event) error {
ctx := e.GetTraceContext(s.ctx)
ctx, span := tracer.Start(ctx, "processEvent")
defer span.End()
e.InProgress() // let nats know that we are processing this event
s.log.Debug().Interface("event", e).Msg("updating index")
switch ev := e.Event.Event.(type) {
case events.ItemTrashed:
s.index.TrashItem(ev.ID)
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemPurged:
s.index.PurgeItem(ev.Ref)
e.Ack()
case events.TrashbinPurged:
s.index.PurgeDeleted(getSpaceID(ev.Ref))
e.Ack()
case events.ItemMoved:
s.index.MoveItem(ev.Ref)
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemRestored:
s.index.RestoreItem(ev.Ref)
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ContainerCreated:
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileTouched:
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileVersionRestored:
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.TagsAdded:
s.index.UpsertItem(ev.Ref)
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.TagsRemoved:
s.index.UpsertItem(ev.Ref)
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileUploaded:
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.UploadReady:
s.indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack)
case events.SpaceRenamed:
s.indexSpaceDebouncer.Debounce(ev.ID, e.Ack)
}
return nil
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
continue
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated search event metrics")
}
}()
}

View File

@@ -1,22 +1,22 @@
package search_test
package event_test
import (
"context"
"sync/atomic"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
searchMocks "github.com/opencloud-eu/opencloud/services/search/pkg/search/mocks"
"github.com/opencloud-eu/opencloud/services/search/pkg/service/event"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
rawMocks "github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks"
"github.com/stretchr/testify/mock"
)
var _ = DescribeTable("events",
var _ = DescribeTable("event",
func(mcks []string, e any, asyncUploads bool) {
var (
s = &searchMocks.Searcher{}
@@ -27,11 +27,14 @@ var _ = DescribeTable("events",
ch := make(chan raw.Event, 1)
stream.EXPECT().Consume(mock.Anything, mock.Anything).Return((<-chan raw.Event)(ch), nil)
search.HandleEvents(s, stream, &config.Config{
Events: config.Events{
AsyncUploads: asyncUploads,
},
}, nil, log.NewLogger())
event, err := event.New(context.Background(), stream, log.NewLogger(), nil, nil, s, 50, 1, asyncUploads)
Expect(err).NotTo(HaveOccurred())
go func() {
err := event.Run()
Expect(err).NotTo(HaveOccurred())
}()
defer event.Close()
for _, mck := range mcks {
s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {

View File

@@ -1,10 +1,14 @@
package service
import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
// Option defines a single option function.
@@ -12,11 +16,13 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Logger log.Logger
Config *config.Config
JWTSecret string
TracerProvider trace.TracerProvider
Metrics *metrics.Metrics
Logger log.Logger
Config *config.Config
JWTSecret string
TracerProvider trace.TracerProvider
Metrics *metrics.Metrics
GatewaySelector *pool.Selector[gateway.GatewayAPIClient]
Searcher search.Searcher
}
func newOptions(opts ...Option) Options {
@@ -65,3 +71,17 @@ func Metrics(val *metrics.Metrics) Option {
}
}
}
// GatewaySelector provides a function to set the GatewaySelector option.
func GatewaySelector(val *pool.Selector[gateway.GatewayAPIClient]) Option {
return func(o *Options) {
o.GatewaySelector = val
}
}
// Searcher provides a function to set the Searcher option.
func Searcher(val search.Searcher) Option {
return func(o *Options) {
o.Searcher = val
}
}

View File

@@ -13,132 +13,35 @@ import (
"github.com/jellydator/ttlcache/v2"
revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/errtypes"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/reva/v2/pkg/token"
"github.com/opencloud-eu/reva/v2/pkg/token/manager/jwt"
"github.com/opencloud-eu/reva/v2/pkg/utils"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
merrors "go-micro.dev/v4/errors"
"go-micro.dev/v4/metadata"
grpcmetadata "google.golang.org/grpc/metadata"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/registry"
v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
searchsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/search/v0"
"github.com/opencloud-eu/opencloud/services/search/pkg/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
// NewHandler returns a service implementation for Service.
func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) {
teardown := func() {}
func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) {
options := newOptions(opts...)
logger := options.Logger
cfg := options.Config
// initialize search engine
var eng search.Engine
switch cfg.Engine.Type {
case "bleve":
idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath)
if err != nil {
return nil, teardown, err
}
teardown = func() {
_ = idx.Close()
}
eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger)
case "open-search":
client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{
Client: opensearchgo.Config{
Addresses: cfg.Engine.OpenSearch.Client.Addresses,
Username: cfg.Engine.OpenSearch.Client.Username,
Password: cfg.Engine.OpenSearch.Client.Password,
Header: cfg.Engine.OpenSearch.Client.Header,
CACert: cfg.Engine.OpenSearch.Client.CACert,
RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus,
DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry,
EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout,
MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries,
CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody,
DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart,
DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval,
EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics,
EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger,
},
})
if err != nil {
return nil, teardown, fmt.Errorf("failed to create OpenSearch client: %w", err)
}
openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client)
if err != nil {
return nil, teardown, fmt.Errorf("failed to create OpenSearch backend: %w", err)
}
eng = openSearchBackend
default:
return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
if options.GatewaySelector == nil {
return nil, errors.New("no GatewaySelector provided")
}
// initialize gateway
selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(options.TracerProvider))
if err != nil {
logger.Fatal().Err(err).Msg("could not get reva gateway selector")
return nil, teardown, err
}
// initialize search content extractor
var extractor content.Extractor
switch cfg.Extractor.Type {
case "basic":
if extractor, err = content.NewBasicExtractor(logger); err != nil {
return nil, teardown, err
}
case "tika":
if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil {
return nil, teardown, err
}
default:
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
ss := search.NewService(selector, eng, extractor, options.Metrics, logger, cfg)
// setup event handling
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
stream, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
MaxAckPending: cfg.Events.MaxAckPending,
AckWait: cfg.Events.AckWait,
})
if err != nil {
return nil, teardown, err
}
if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil {
return nil, teardown, err
if options.Searcher == nil {
return nil, errors.New("no Searcher provided")
}
cache := ttlcache.NewCache()
if err := cache.SetTTL(time.Second); err != nil {
return nil, teardown, err
return nil, err
}
tokenManager, err := jwt.New(map[string]interface{}{
@@ -146,24 +49,24 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
"expires": int64(24 * 60 * 60),
})
if err != nil {
return nil, teardown, err
return nil, err
}
return &Service{
id: cfg.GRPC.Namespace + "." + cfg.Service.Name,
log: logger,
searcher: ss,
log: &options.Logger,
searcher: options.Searcher,
cache: cache,
tokenManager: tokenManager,
gws: selector,
gws: options.GatewaySelector,
cfg: cfg,
}, teardown, nil
}, nil
}
// Service implements the searchServiceHandler interface
type Service struct {
id string
log log.Logger
log *log.Logger
searcher search.Searcher
cache *ttlcache.Cache
tokenManager token.Manager