From 608af8191354e34f75936cf7c495954b9d0dc046 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Fri, 4 Jul 2025 14:05:35 +0200 Subject: [PATCH] Switch to the raw nats consumer instead of the go-micro events This allows more direct control of the nats events. As a first step we'll now acknowledge events manually, but in the future we'll add metrics etc. to gain more insight into the queues. --- services/search/pkg/search/debouncer.go | 61 +++++++-- services/search/pkg/search/debouncer_test.go | 125 +++++++++++++++--- services/search/pkg/search/events.go | 42 +++--- services/search/pkg/search/events_test.go | 4 +- .../search/pkg/service/grpc/v0/service.go | 16 +-- 5 files changed, 192 insertions(+), 56 deletions(-) diff --git a/services/search/pkg/search/debouncer.go b/services/search/pkg/search/debouncer.go index 49cf9cbdd9..e7accc1f2b 100644 --- a/services/search/pkg/search/debouncer.go +++ b/services/search/pkg/search/debouncer.go @@ -5,48 +5,91 @@ import ( "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/opencloud-eu/opencloud/pkg/log" ) // SpaceDebouncer debounces operations on spaces for a configurable amount of time type SpaceDebouncer struct { after time.Duration + timeout time.Duration f func(id *provider.StorageSpaceId) - pending map[string]*time.Timer + pending map[string]*workItem inProgress sync.Map mutex sync.Mutex + log log.Logger } +type workItem struct { + t *time.Timer + timeout *time.Timer + + trigger func() +} + +type AckFunc func() error + // NewSpaceDebouncer returns a new SpaceDebouncer instance -func NewSpaceDebouncer(d time.Duration, f func(id *provider.StorageSpaceId)) *SpaceDebouncer { +func NewSpaceDebouncer(d time.Duration, timeout time.Duration, f func(id *provider.StorageSpaceId), logger log.Logger) *SpaceDebouncer { return &SpaceDebouncer{ after: d, + timeout: timeout, f: f, - pending: map[string]*time.Timer{}, + pending: map[string]*workItem{}, inProgress: sync.Map{}, + log: logger, } } // Debounce restars the debounce timer for the given space -func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId) { +func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId, ack AckFunc) { d.mutex.Lock() defer d.mutex.Unlock() - if t := d.pending[id.OpaqueId]; t != nil { - t.Stop() + if wi := d.pending[id.OpaqueId]; wi != nil { + if ack != nil { + go ack() // Acknowledge the event immediately, the according space is already scheduled for indexing + } + wi.t.Reset(d.after) + return } - d.pending[id.OpaqueId] = time.AfterFunc(d.after, func() { + trigger := func() { if _, ok := d.inProgress.Load(id.OpaqueId); ok { // Reschedule this run for when the previous run has finished d.mutex.Lock() - d.pending[id.OpaqueId].Reset(d.after) + if wi := d.pending[id.OpaqueId]; wi != nil { + wi.t.Reset(d.after) + } d.mutex.Unlock() return } + d.mutex.Lock() + delete(d.pending, id.OpaqueId) d.inProgress.Store(id.OpaqueId, true) defer d.inProgress.Delete(id.OpaqueId) + d.mutex.Unlock() // release the lock early to allow other goroutines to debounce + d.f(id) - }) + go func() { + if ack != nil { + if err := ack(); err != nil { + d.log.Error().Err(err).Msg("error while acknowledging event") + } + } + }() + } + t := time.AfterFunc(d.after, trigger) + + d.pending[id.OpaqueId] = &workItem{ + trigger: trigger, + t: t, + timeout: time.AfterFunc(d.timeout, func() { + d.log.Debug().Msg("timeout while waiting for space debouncer to finish") + t.Stop() + trigger() + }), + } + } diff --git a/services/search/pkg/search/debouncer_test.go b/services/search/pkg/search/debouncer_test.go index b4714c01a5..829ebdb5e9 100644 --- a/services/search/pkg/search/debouncer_test.go +++ b/services/search/pkg/search/debouncer_test.go @@ -7,6 +7,7 @@ import ( sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/search" ) @@ -23,30 +24,30 @@ var _ = Describe("SpaceDebouncer", func() { BeforeEach(func() { callCount = atomic.Int32{} - debouncer = search.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId) { + debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } - }) + }, log.NewLogger()) }) It("debounces", func() { - debouncer.Debounce(spaceid) - debouncer.Debounce(spaceid) - debouncer.Debounce(spaceid) + debouncer.Debounce(spaceid, nil) + debouncer.Debounce(spaceid, nil) + debouncer.Debounce(spaceid, nil) Eventually(func() int { return int(callCount.Load()) }, "200ms").Should(Equal(1)) }) It("works multiple times", func() { - debouncer.Debounce(spaceid) - debouncer.Debounce(spaceid) - debouncer.Debounce(spaceid) + debouncer.Debounce(spaceid, nil) + debouncer.Debounce(spaceid, nil) + debouncer.Debounce(spaceid, nil) time.Sleep(100 * time.Millisecond) - debouncer.Debounce(spaceid) - debouncer.Debounce(spaceid) + debouncer.Debounce(spaceid, nil) + debouncer.Debounce(spaceid, nil) Eventually(func() int { return int(callCount.Load()) @@ -54,21 +55,115 @@ var _ = Describe("SpaceDebouncer", func() { }) It("doesn't trigger twice simultaneously", func() { - debouncer = search.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId) { + debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) { if id.OpaqueId == "spaceid" { callCount.Add(1) } time.Sleep(300 * time.Millisecond) - }) - debouncer.Debounce(spaceid) + }, log.NewLogger()) + debouncer.Debounce(spaceid, nil) time.Sleep(100 * time.Millisecond) // Let it trigger once - debouncer.Debounce(spaceid) + debouncer.Debounce(spaceid, nil) time.Sleep(100 * time.Millisecond) // shouldn't trigger as the other run is still in progress Expect(int(callCount.Load())).To(Equal(1)) Eventually(func() int { return int(callCount.Load()) - }, "500ms").Should(Equal(2)) + }, "2000ms").Should(Equal(2)) + }) + + It("fires at the timeout even when continuously debounced", func() { + debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) { + if id.OpaqueId == "spaceid" { + callCount.Add(1) + } + }, log.NewLogger()) + + // Initial call to start the timers + debouncer.Debounce(spaceid, nil) + + // Continuously reset the debounce timer using a ticker, at an interval + // shorter than the debounce time. + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + + done := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + debouncer.Debounce(spaceid, nil) + } + } + }() + + // The debounce timer (100ms) should be reset every 50ms and thus never fire. + // The timeout timer (250ms) should fire regardless. + Eventually(func() int { + return int(callCount.Load()) + }, "300ms").Should(Equal(1)) + + // Stop the ticker goroutine + close(done) + + // And it should not fire again + Consistently(func() int { + return int(callCount.Load()) + }, "300ms").Should(Equal(1)) + }) + + It("calls the ack function when the debounce fires", func() { + var ackCalled atomic.Bool + ackFunc := func() error { + ackCalled.Store(true) + return nil + } + + debouncer.Debounce(spaceid, ackFunc) + + Eventually(func() int { + return int(callCount.Load()) + }, "200ms").Should(Equal(1)) + Eventually(func() bool { + return ackCalled.Load() + }, "200ms").Should(BeTrue()) + }) + + It("calls the ack function immediately for subsequent calls", func() { + var firstAckCalled atomic.Bool + firstAckFunc := func() error { + firstAckCalled.Store(true) + return nil + } + + var secondAckCalled atomic.Bool + secondAckFunc := func() error { + secondAckCalled.Store(true) + return nil + } + + // First call, sets up the trigger + debouncer.Debounce(spaceid, firstAckFunc) + Expect(firstAckCalled.Load()).To(BeFalse()) + Expect(secondAckCalled.Load()).To(BeFalse()) + + // Second call, should call its ack immediately + debouncer.Debounce(spaceid, secondAckFunc) + Eventually(func() bool { + return secondAckCalled.Load() + }, "50ms").Should(BeTrue()) + // The first ack is not yet called. + Expect(firstAckCalled.Load()).To(BeFalse()) + + // After the debounce period, the trigger fires, calling the main function and the first ack. + Eventually(func() int { + return int(callCount.Load()) + }, "200ms").Should(Equal(1)) + Eventually(func() bool { + return firstAckCalled.Load() + }, "200ms").Should(BeTrue()) }) }) diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 4f1b004404..481878a0e8 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -1,18 +1,20 @@ package search import ( + "context" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/reva/v2/pkg/events" + "github.com/opencloud-eu/reva/v2/pkg/events/raw" "github.com/opencloud-eu/reva/v2/pkg/storagespace" ) // HandleEvents listens to the needed events, // it handles the whole resource indexing livecycle. -func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *config.Config) error { +func HandleEvents(s Searcher, cfg *config.Config, logger log.Logger) error { evts := []events.Unmarshaller{ events.ItemTrashed{}, events.ItemRestored{}, @@ -31,10 +33,19 @@ func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *confi evts = append(evts, events.FileUploaded{}) } - ch, err := events.Consume(bus, "search", evts...) + stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{ + Endpoint: cfg.Events.Endpoint, + Cluster: cfg.Events.Cluster, + EnableTLS: cfg.Events.EnableTLS, + TLSInsecure: cfg.Events.TLSInsecure, + TLSRootCACertificate: cfg.Events.TLSRootCACertificate, + AuthUsername: cfg.Events.AuthUsername, + AuthPassword: cfg.Events.AuthPassword, + }) if err != nil { return err } + ch, err := stream.Consume("search-pull", evts...) if cfg.Events.NumConsumers == 0 { cfg.Events.NumConsumers = 1 @@ -51,45 +62,46 @@ func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *confi } } - indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, func(id *provider.StorageSpaceId) { + indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) { if err := s.IndexSpace(id); err != nil { logger.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space") } - }) + }, logger) for i := 0; i < cfg.Events.NumConsumers; i++ { - go func(s Searcher, ch <-chan events.Event) { + go func(s Searcher, ch <-chan raw.Event) { for event := range ch { e := event go func() { + e.InProgress() // let nats know that we are processing this event logger.Debug().Interface("event", e).Msg("updating index") - switch ev := e.Event.(type) { + switch ev := e.Event.Event.(type) { case events.ItemTrashed: s.TrashItem(ev.ID) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.ItemMoved: s.MoveItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.ItemRestored: s.RestoreItem(ev.Ref) - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.ContainerCreated: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.FileTouched: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.FileVersionRestored: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.TagsAdded: s.UpsertItem(ev.Ref) case events.TagsRemoved: s.UpsertItem(ev.Ref) case events.FileUploaded: - indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack) case events.UploadReady: - indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef)) + indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack) case events.SpaceRenamed: - indexSpaceDebouncer.Debounce(ev.ID) + indexSpaceDebouncer.Debounce(ev.ID, e.Ack) } }() } diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 5959158f2d..c02b206297 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -25,11 +25,11 @@ var _ = DescribeTable("events", bus, _ := mEvents.NewStream() - search.HandleEvents(s, bus, log.NewLogger(), &config.Config{ + search.HandleEvents(s, &config.Config{ Events: config.Events{ AsyncUploads: asyncUploads, }, - }) + }, log.NewLogger()) for _, mck := range mcks { s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index e28209aca2..d4c39e5505 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -13,7 +13,6 @@ import ( "github.com/jellydator/ttlcache/v2" revactx "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/errtypes" - "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/token" "github.com/opencloud-eu/reva/v2/pkg/token/manager/jwt" @@ -79,23 +78,10 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) } - bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ - Endpoint: cfg.Events.Endpoint, - Cluster: cfg.Events.Cluster, - EnableTLS: cfg.Events.EnableTLS, - TLSInsecure: cfg.Events.TLSInsecure, - TLSRootCACertificate: cfg.Events.TLSRootCACertificate, - AuthUsername: cfg.Events.AuthUsername, - AuthPassword: cfg.Events.AuthPassword, - }) - if err != nil { - return nil, teardown, err - } - ss := search.NewService(selector, eng, extractor, logger, cfg) // setup event handling - if err := search.HandleEvents(ss, bus, logger, cfg); err != nil { + if err := search.HandleEvents(ss, cfg, logger); err != nil { return nil, teardown, err }