Switch to the raw nats consumer instead of the go-micro events

This allows more direct control of the nats events. As a first step
we'll now acknowledge events manually, but in the future we'll add
metrics etc. to gain more insight into the queues.
This commit is contained in:
André Duffeck
2025-07-04 14:05:35 +02:00
parent 1213908eb2
commit 608af81913
5 changed files with 192 additions and 56 deletions

View File

@@ -5,48 +5,91 @@ import (
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
)
// SpaceDebouncer debounces operations on spaces for a configurable amount of time
type SpaceDebouncer struct {
after time.Duration
timeout time.Duration
f func(id *provider.StorageSpaceId)
pending map[string]*time.Timer
pending map[string]*workItem
inProgress sync.Map
mutex sync.Mutex
log log.Logger
}
type workItem struct {
t *time.Timer
timeout *time.Timer
trigger func()
}
type AckFunc func() error
// NewSpaceDebouncer returns a new SpaceDebouncer instance
func NewSpaceDebouncer(d time.Duration, f func(id *provider.StorageSpaceId)) *SpaceDebouncer {
func NewSpaceDebouncer(d time.Duration, timeout time.Duration, f func(id *provider.StorageSpaceId), logger log.Logger) *SpaceDebouncer {
return &SpaceDebouncer{
after: d,
timeout: timeout,
f: f,
pending: map[string]*time.Timer{},
pending: map[string]*workItem{},
inProgress: sync.Map{},
log: logger,
}
}
// Debounce restars the debounce timer for the given space
func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId) {
func (d *SpaceDebouncer) Debounce(id *provider.StorageSpaceId, ack AckFunc) {
d.mutex.Lock()
defer d.mutex.Unlock()
if t := d.pending[id.OpaqueId]; t != nil {
t.Stop()
if wi := d.pending[id.OpaqueId]; wi != nil {
if ack != nil {
go ack() // Acknowledge the event immediately, the according space is already scheduled for indexing
}
wi.t.Reset(d.after)
return
}
d.pending[id.OpaqueId] = time.AfterFunc(d.after, func() {
trigger := func() {
if _, ok := d.inProgress.Load(id.OpaqueId); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
d.pending[id.OpaqueId].Reset(d.after)
if wi := d.pending[id.OpaqueId]; wi != nil {
wi.t.Reset(d.after)
}
d.mutex.Unlock()
return
}
d.mutex.Lock()
delete(d.pending, id.OpaqueId)
d.inProgress.Store(id.OpaqueId, true)
defer d.inProgress.Delete(id.OpaqueId)
d.mutex.Unlock() // release the lock early to allow other goroutines to debounce
d.f(id)
})
go func() {
if ack != nil {
if err := ack(); err != nil {
d.log.Error().Err(err).Msg("error while acknowledging event")
}
}
}()
}
t := time.AfterFunc(d.after, trigger)
d.pending[id.OpaqueId] = &workItem{
trigger: trigger,
t: t,
timeout: time.AfterFunc(d.timeout, func() {
d.log.Debug().Msg("timeout while waiting for space debouncer to finish")
t.Stop()
trigger()
}),
}
}

View File

@@ -7,6 +7,7 @@ import (
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
)
@@ -23,30 +24,30 @@ var _ = Describe("SpaceDebouncer", func() {
BeforeEach(func() {
callCount = atomic.Int32{}
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId) {
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 10*time.Second, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
})
}, log.NewLogger())
})
It("debounces", func() {
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid, nil)
debouncer.Debounce(spaceid, nil)
debouncer.Debounce(spaceid, nil)
Eventually(func() int {
return int(callCount.Load())
}, "200ms").Should(Equal(1))
})
It("works multiple times", func() {
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid, nil)
debouncer.Debounce(spaceid, nil)
debouncer.Debounce(spaceid, nil)
time.Sleep(100 * time.Millisecond)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid, nil)
debouncer.Debounce(spaceid, nil)
Eventually(func() int {
return int(callCount.Load())
@@ -54,21 +55,115 @@ var _ = Describe("SpaceDebouncer", func() {
})
It("doesn't trigger twice simultaneously", func() {
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, func(id *sprovider.StorageSpaceId) {
debouncer = search.NewSpaceDebouncer(50*time.Millisecond, 5*time.Second, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
time.Sleep(300 * time.Millisecond)
})
debouncer.Debounce(spaceid)
}, log.NewLogger())
debouncer.Debounce(spaceid, nil)
time.Sleep(100 * time.Millisecond) // Let it trigger once
debouncer.Debounce(spaceid)
debouncer.Debounce(spaceid, nil)
time.Sleep(100 * time.Millisecond) // shouldn't trigger as the other run is still in progress
Expect(int(callCount.Load())).To(Equal(1))
Eventually(func() int {
return int(callCount.Load())
}, "500ms").Should(Equal(2))
}, "2000ms").Should(Equal(2))
})
It("fires at the timeout even when continuously debounced", func() {
debouncer = search.NewSpaceDebouncer(100*time.Millisecond, 250*time.Millisecond, func(id *sprovider.StorageSpaceId) {
if id.OpaqueId == "spaceid" {
callCount.Add(1)
}
}, log.NewLogger())
// Initial call to start the timers
debouncer.Debounce(spaceid, nil)
// Continuously reset the debounce timer using a ticker, at an interval
// shorter than the debounce time.
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
debouncer.Debounce(spaceid, nil)
}
}
}()
// The debounce timer (100ms) should be reset every 50ms and thus never fire.
// The timeout timer (250ms) should fire regardless.
Eventually(func() int {
return int(callCount.Load())
}, "300ms").Should(Equal(1))
// Stop the ticker goroutine
close(done)
// And it should not fire again
Consistently(func() int {
return int(callCount.Load())
}, "300ms").Should(Equal(1))
})
It("calls the ack function when the debounce fires", func() {
var ackCalled atomic.Bool
ackFunc := func() error {
ackCalled.Store(true)
return nil
}
debouncer.Debounce(spaceid, ackFunc)
Eventually(func() int {
return int(callCount.Load())
}, "200ms").Should(Equal(1))
Eventually(func() bool {
return ackCalled.Load()
}, "200ms").Should(BeTrue())
})
It("calls the ack function immediately for subsequent calls", func() {
var firstAckCalled atomic.Bool
firstAckFunc := func() error {
firstAckCalled.Store(true)
return nil
}
var secondAckCalled atomic.Bool
secondAckFunc := func() error {
secondAckCalled.Store(true)
return nil
}
// First call, sets up the trigger
debouncer.Debounce(spaceid, firstAckFunc)
Expect(firstAckCalled.Load()).To(BeFalse())
Expect(secondAckCalled.Load()).To(BeFalse())
// Second call, should call its ack immediately
debouncer.Debounce(spaceid, secondAckFunc)
Eventually(func() bool {
return secondAckCalled.Load()
}, "50ms").Should(BeTrue())
// The first ack is not yet called.
Expect(firstAckCalled.Load()).To(BeFalse())
// After the debounce period, the trigger fires, calling the main function and the first ack.
Eventually(func() int {
return int(callCount.Load())
}, "200ms").Should(Equal(1))
Eventually(func() bool {
return firstAckCalled.Load()
}, "200ms").Should(BeTrue())
})
})

View File

@@ -1,18 +1,20 @@
package search
import (
"context"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
)
// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *config.Config) error {
func HandleEvents(s Searcher, cfg *config.Config, logger log.Logger) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemRestored{},
@@ -31,10 +33,19 @@ func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *confi
evts = append(evts, events.FileUploaded{})
}
ch, err := events.Consume(bus, "search", evts...)
stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
})
if err != nil {
return err
}
ch, err := stream.Consume("search-pull", evts...)
if cfg.Events.NumConsumers == 0 {
cfg.Events.NumConsumers = 1
@@ -51,45 +62,46 @@ func HandleEvents(s Searcher, bus events.Consumer, logger log.Logger, cfg *confi
}
}
indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, func(id *provider.StorageSpaceId) {
indexSpaceDebouncer := NewSpaceDebouncer(time.Duration(cfg.Events.DebounceDuration)*time.Millisecond, 30*time.Second, func(id *provider.StorageSpaceId) {
if err := s.IndexSpace(id); err != nil {
logger.Error().Err(err).Interface("spaceID", id).Msg("error while indexing a space")
}
})
}, logger)
for i := 0; i < cfg.Events.NumConsumers; i++ {
go func(s Searcher, ch <-chan events.Event) {
go func(s Searcher, ch <-chan raw.Event) {
for event := range ch {
e := event
go func() {
e.InProgress() // let nats know that we are processing this event
logger.Debug().Interface("event", e).Msg("updating index")
switch ev := e.Event.(type) {
switch ev := e.Event.Event.(type) {
case events.ItemTrashed:
s.TrashItem(ev.ID)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemMoved:
s.MoveItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ItemRestored:
s.RestoreItem(ev.Ref)
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.ContainerCreated:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileTouched:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.FileVersionRestored:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.TagsAdded:
s.UpsertItem(ev.Ref)
case events.TagsRemoved:
s.UpsertItem(ev.Ref)
case events.FileUploaded:
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref))
indexSpaceDebouncer.Debounce(getSpaceID(ev.Ref), e.Ack)
case events.UploadReady:
indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef))
indexSpaceDebouncer.Debounce(getSpaceID(ev.FileRef), e.Ack)
case events.SpaceRenamed:
indexSpaceDebouncer.Debounce(ev.ID)
indexSpaceDebouncer.Debounce(ev.ID, e.Ack)
}
}()
}

View File

@@ -25,11 +25,11 @@ var _ = DescribeTable("events",
bus, _ := mEvents.NewStream()
search.HandleEvents(s, bus, log.NewLogger(), &config.Config{
search.HandleEvents(s, &config.Config{
Events: config.Events{
AsyncUploads: asyncUploads,
},
})
}, log.NewLogger())
for _, mck := range mcks {
s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {

View File

@@ -13,7 +13,6 @@ import (
"github.com/jellydator/ttlcache/v2"
revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/errtypes"
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/reva/v2/pkg/token"
"github.com/opencloud-eu/reva/v2/pkg/token/manager/jwt"
@@ -79,23 +78,10 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
})
if err != nil {
return nil, teardown, err
}
ss := search.NewService(selector, eng, extractor, logger, cfg)
// setup event handling
if err := search.HandleEvents(ss, bus, logger, cfg); err != nil {
if err := search.HandleEvents(ss, cfg, logger); err != nil {
return nil, teardown, err
}