diff --git a/services/activitylog/pkg/service/options.go b/services/activitylog/pkg/service/options.go index 3427f326f..0a34bd9a3 100644 --- a/services/activitylog/pkg/service/options.go +++ b/services/activitylog/pkg/service/options.go @@ -31,6 +31,7 @@ type Options struct { HistoryClient ehsvc.EventHistoryService ValueClient settingssvc.ValueService WriteBufferDuration time.Duration + MaxActivities int } // Logger configures a logger for the activitylog service @@ -109,3 +110,10 @@ func WriteBufferDuration(d time.Duration) Option { o.WriteBufferDuration = d } } + +// MaxActivities sets the maximum number of activities to store +func MaxActivities(max int) Option { + return func(o *Options) { + o.MaxActivities = max + } +} diff --git a/services/activitylog/pkg/service/service.go b/services/activitylog/pkg/service/service.go index ab05efc2d..0f73e4b68 100644 --- a/services/activitylog/pkg/service/service.go +++ b/services/activitylog/pkg/service/service.go @@ -2,11 +2,14 @@ package service import ( "context" + "encoding/base32" "encoding/json" - "errors" "fmt" "path/filepath" "reflect" + "sort" + "strconv" + "strings" "sync" "time" @@ -14,12 +17,13 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/go-chi/chi/v5" "github.com/jellydator/ttlcache/v2" + "github.com/nats-io/nats.go" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" + "github.com/pkg/errors" "github.com/vmihailenco/msgpack/v5" - microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" "github.com/opencloud-eu/opencloud/pkg/log" @@ -29,7 +33,7 @@ import ( ) // Nats runs into max payload exceeded errors at around 7k activities. Let's keep a buffer. -var _maxActivities = 6000 +var _maxActivitiesDefault = 6000 // RawActivity represents an activity as it is stored in the activitylog store type RawActivity struct { @@ -43,7 +47,6 @@ type ActivitylogService struct { cfg *config.Config log log.Logger events <-chan events.Event - store microstore.Store gws pool.Selectable[gateway.GatewayAPIClient] mux *chi.Mux evHistory ehsvc.EventHistoryService @@ -53,6 +56,9 @@ type ActivitylogService struct { tracer trace.Tracer debouncer *Debouncer parentIdCache *ttlcache.Cache + natskv nats.KeyValue + + maxActivities int registeredEvents map[string]events.Unmarshaller } @@ -71,6 +77,12 @@ type queueItem struct { timer *time.Timer } +type batchInfo struct { + key string + count int + timestamp time.Time +} + // NewDebouncer returns a new Debouncer instance func NewDebouncer(d time.Duration, f func(id string, ra []RawActivity) error) *Debouncer { return &Debouncer{ @@ -128,7 +140,9 @@ func (d *Debouncer) Debounce(id string, ra RawActivity) { // New creates a new ActivitylogService func New(opts ...Option) (*ActivitylogService, error) { - o := &Options{} + o := &Options{ + MaxActivities: _maxActivitiesDefault, + } for _, opt := range opts { opt(o) } @@ -137,10 +151,6 @@ func New(opts ...Option) (*ActivitylogService, error) { return nil, errors.New("stream is required") } - if o.Store == nil { - return nil, errors.New("store is required") - } - ch, err := events.Consume(o.Stream, o.Config.Service.Name, o.RegisteredEvents...) if err != nil { return nil, err @@ -152,11 +162,41 @@ func New(opts ...Option) (*ActivitylogService, error) { return nil, err } + // Connect to NATS servers + natsOptions := nats.Options{ + Servers: o.Config.Store.Nodes, + } + conn, err := natsOptions.Connect() + if err != nil { + return nil, err + } + + js, err := conn.JetStream() + if err != nil { + return nil, err + } + + kv, err := js.KeyValue(o.Config.Store.Database) + if err != nil { + if !errors.Is(err, nats.ErrBucketNotFound) { + return nil, errors.Wrapf(err, "Failed to get bucket (%s)", o.Config.Store.Database) + } + + kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: o.Config.Store.Database, + }) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create bucket (%s)", o.Config.Store.Database) + } + } + if err != nil { + return nil, err + } + s := &ActivitylogService{ log: o.Logger, cfg: o.Config, events: ch, - store: o.Store, gws: o.GatewaySelector, mux: o.Mux, evHistory: o.HistoryClient, @@ -166,6 +206,8 @@ func New(opts ...Option) (*ActivitylogService, error) { tp: o.TraceProvider, tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/activitylog/pkg/service"), parentIdCache: cache, + maxActivities: o.MaxActivities, + natskv: kv, } s.debouncer = NewDebouncer(o.WriteBufferDuration, s.storeActivity) @@ -250,7 +292,7 @@ func (a *ActivitylogService) AddActivity(initRef *provider.Reference, parentId * ctx, span = a.tracer.Start(ctx, "AddActivity") defer span.End() - return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) { return utils.GetResource(ctx, ref, gwc) }) } @@ -285,10 +327,10 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId, } var span trace.Span - ctx, span = a.tracer.Start(ctx, "AddActivity") + ctx, span = a.tracer.Start(ctx, "AddActivityTrashed") defer span.End() - return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) { return utils.GetResource(ctx, ref, gwc) }) } @@ -343,10 +385,8 @@ func (a *ActivitylogService) RemoveActivities(rid *provider.ResourceId, toDelete return err } - return a.store.Write(µstore.Record{ - Key: storagespace.FormatResourceID(rid), - Value: b, - }) + _, err = a.natskv.Put(storagespace.FormatResourceID(rid), b) + return err } // RemoveResource removes the resource from the store @@ -358,45 +398,52 @@ func (a *ActivitylogService) RemoveResource(rid *provider.ResourceId) error { a.lock.Lock() defer a.lock.Unlock() - return a.store.Delete(storagespace.FormatResourceID(rid)) + return a.natskv.Delete(storagespace.FormatResourceID(rid)) } func (a *ActivitylogService) activities(rid *provider.ResourceId) ([]RawActivity, error) { resourceID := storagespace.FormatResourceID(rid) - records, err := a.store.Read(resourceID) - if err != nil && err != microstore.ErrNotFound { - return nil, fmt.Errorf("could not read activities: %w", err) - } + glob := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID))) - if len(records) == 0 { - return []RawActivity{}, nil + watcher, err := a.natskv.Watch(glob, nats.IgnoreDeletes()) + if err != nil { + return nil, err } var activities []RawActivity - if err := msgpack.Unmarshal(records[0].Value, &activities); err != nil { - a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") - if err := json.Unmarshal(records[0].Value, &activities); err != nil { - return nil, fmt.Errorf("could not unmarshal activities: %w", err) + for update := range watcher.Updates() { + if update == nil { + break } + + var batchActivities []RawActivity + if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil { + a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") + } + activities = append(activities, batchActivities...) } return activities, nil } // note: getResource is abstracted to allow unit testing, in general this will just be utils.GetResource -func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(*provider.Reference) (*provider.ResourceInfo, error)) error { +func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(context.Context, *provider.Reference) (*provider.ResourceInfo, error)) error { var ( err error depth int ref = initRef ) + ctx, span := a.tracer.Start(ctx, "addActivity") + defer span.End() for { var info *provider.ResourceInfo id := ref.GetResourceId() if ref.Path != "" { // Path based reference, we need to resolve the resource id - info, err = getResource(ref) + ctx, span = a.tracer.Start(ctx, "addActivity.getResource") + info, err = getResource(ctx, ref) + span.End() if err != nil { return fmt.Errorf("could not get resource info: %w", err) } @@ -407,17 +454,15 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. } key := storagespace.FormatResourceID(id) - _, span := a.tracer.Start(ctx, "queueStoreActivity") a.debouncer.Debounce(key, RawActivity{ EventID: eventID, Depth: depth, Timestamp: timestamp, }) - span.End() if id.OpaqueId == id.SpaceId { // we are at the root of the space, no need to go further - return nil + break } // check if parent id is cached @@ -426,8 +471,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. if parentId == nil { if v, err := a.parentIdCache.Get(key); err != nil { if info == nil { - _, span = a.tracer.Start(ctx, "getResource") - info, err = getResource(ref) + ctx, span := a.tracer.Start(ctx, "addActivity.getResource parent") + info, err = getResource(ctx, ref) span.End() if err != nil || info.GetParentId() == nil || info.GetParentId().GetOpaqueId() == "" { return fmt.Errorf("could not get parent id: %w", err) @@ -446,6 +491,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. ref = &provider.Reference{ResourceId: parentId} parentId = nil // reset parent id so it's not reused in the next iteration } + + return nil } func (a *ActivitylogService) storeActivity(resourceID string, activities []RawActivity) error { @@ -454,43 +501,116 @@ func (a *ActivitylogService) storeActivity(resourceID string, activities []RawAc ctx, span := a.tracer.Start(context.Background(), "storeActivity") defer span.End() - _, subspan := a.tracer.Start(ctx, "store.Read") - records, err := a.store.Read(resourceID) - if err != nil && err != microstore.ErrNotFound { - return err - } - subspan.End() - _, subspan = a.tracer.Start(ctx, "Unmarshal") - var existingActivities []RawActivity - if len(records) > 0 { - if err := msgpack.Unmarshal(records[0].Value, &existingActivities); err != nil { - a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") - if err := json.Unmarshal(records[0].Value, &existingActivities); err != nil { - return err - } - } - } - subspan.End() - - if l := len(existingActivities) + len(activities); l >= _maxActivities { - start := min(len(existingActivities), l-_maxActivities+1) - existingActivities = existingActivities[start:] - } - - activities = append(existingActivities, activities...) - - _, subspan = a.tracer.Start(ctx, "Unmarshal") + _, subspan := a.tracer.Start(ctx, "storeActivity.Marshal") b, err := msgpack.Marshal(activities) if err != nil { return err } subspan.End() - return a.store.Write(µstore.Record{ - Key: resourceID, - Value: b, + _, subspan = a.tracer.Start(ctx, "storeActivity.natskv.Put") + key := a.natsKey(resourceID, len(activities)) + _, err = a.natskv.Put(key, b) + if err != nil { + return err + } + subspan.End() + + ctx, subspan = a.tracer.Start(ctx, "storeActivity.enforceMaxActivities") + a.enforceMaxActivities(ctx, resourceID) + subspan.End() + return nil +} + +func (a *ActivitylogService) natsKey(resourceID string, activitiesCount int) string { + return fmt.Sprintf("%s.%d.%d", + base32.StdEncoding.EncodeToString([]byte(resourceID)), + activitiesCount, + time.Now().UnixNano()) +} + +func (a *ActivitylogService) enforceMaxActivities(ctx context.Context, resourceID string) { + if a.maxActivities <= 0 { + return + } + + key := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID))) + + _, subspan := a.tracer.Start(ctx, "enforceMaxActivities.watch") + watcher, err := a.natskv.Watch(key, nats.IgnoreDeletes()) + if err != nil { + a.log.Error().Err(err).Str("resourceID", resourceID).Msg("could not watch") + return + } + + var keys []string + for update := range watcher.Updates() { + if update == nil { + break + } + + var batchActivities []RawActivity + if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil { + a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") + } + keys = append(keys, update.Key()) + } + subspan.End() + + _, subspan = a.tracer.Start(ctx, "enforceMaxActivities.compile") + // Parse keys into batches + batches := make([]batchInfo, 0) + var activitiesCount int + for _, k := range keys { + parts := strings.SplitN(k, ".", 3) + if len(parts) < 3 { + a.log.Warn().Str("key", k).Msg("skipping key, not enough parts") + continue + } + + c, err := strconv.Atoi(parts[1]) + if err != nil { + a.log.Warn().Str("key", k).Msg("skipping key, can not parse count") + continue + } + + // parse timestamp + nano, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + a.log.Warn().Str("key", k).Msg("skipping key, can not parse timestamp") + continue + } + + batches = append(batches, batchInfo{ + key: k, + count: c, + timestamp: time.Unix(0, nano), + }) + activitiesCount += c + } + + // sort batches by timestamp + sort.Slice(batches, func(i, j int) bool { + return batches[i].timestamp.Before(batches[j].timestamp) }) + subspan.End() + + _, subspan = a.tracer.Start(ctx, "enforceMaxActivities.delete") + // remove oldest keys until we are at max activities + for _, b := range batches { + if activitiesCount-b.count < a.maxActivities { + break + } + + activitiesCount -= b.count + err = a.natskv.Delete(b.key) + if err != nil { + a.log.Error().Err(err).Str("key", b.key).Msg("could not delete key") + break + } + } + subspan.End() } func toRef(r *provider.ResourceId) *provider.Reference { diff --git a/services/activitylog/pkg/service/service_test.go b/services/activitylog/pkg/service/service_test.go index 49f14461f..7e48e0c65 100644 --- a/services/activitylog/pkg/service/service_test.go +++ b/services/activitylog/pkg/service/service_test.go @@ -2,30 +2,101 @@ package service import ( "context" + "net" + "os" + "path/filepath" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/jellydator/ttlcache/v2" + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + nserver "github.com/nats-io/nats-server/v2/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/opencloud-eu/reva/v2/pkg/store" + "github.com/opencloud-eu/opencloud/services/activitylog/pkg/config" + eventsmocks "github.com/opencloud-eu/reva/v2/pkg/events/mocks" + "github.com/test-go/testify/mock" "go.opentelemetry.io/otel/trace/noop" ) +var ( + server *nserver.Server + tmpdir string +) + +func getFreeLocalhostPort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return -1, err + } + + port := l.Addr().(*net.TCPAddr).Port + _ = l.Close() // Close the listener immediately to free the port + return port, nil +} + +// Spawn a nats server and a JetStream instance for the duration of the test suite. +// The different tests need to make sure to use different databases to avoid conflicts. +var _ = SynchronizedBeforeSuite(func() { + port, err := getFreeLocalhostPort() + server, err = nserver.NewServer(&nserver.Options{ + Port: port, + }) + Expect(err).ToNot(HaveOccurred()) + + tmpdir, err = os.MkdirTemp("", "activitylog-test") + natsdir := filepath.Join(tmpdir, "nats-js") + jsConf := &nserver.JetStreamConfig{ + StoreDir: natsdir, + } + // first start NATS + go server.Start() + time.Sleep(time.Second) + + // second start JetStream + err = server.EnableJetStream(jsConf) + Expect(err).ToNot(HaveOccurred()) +}, func() {}) + +var _ = SynchronizedAfterSuite(func() { + server.Shutdown() + _ = os.RemoveAll(tmpdir) +}, func() {}) + var _ = Describe("ActivitylogService", func() { var ( - alog *ActivitylogService - getResource func(ref *provider.Reference) (*provider.ResourceInfo, error) + alog *ActivitylogService + getResource func(ref *provider.Reference) (*provider.ResourceInfo, error) + writebufferduration = 100 * time.Millisecond ) + JustBeforeEach(func() { + var err error + stream := &eventsmocks.Stream{} + stream.EXPECT().Consume(mock.Anything, mock.Anything).Return(nil, nil) + alog, err = New( + Config(&config.Config{ + Service: config.Service{ + Name: "activitylog-test", + }, + Store: config.Store{ + Store: "nats-js-kv", + Nodes: []string{server.Addr().String()}, + Database: "activitylog-test-" + uuid.New().String(), + }, + }), + MaxActivities(4), + WriteBufferDuration(writebufferduration), + Stream(stream), + TraceProvider(noop.NewTracerProvider()), + Mux(chi.NewMux()), + ) + Expect(err).ToNot(HaveOccurred()) + }) + Context("with a noop debouncer", func() { BeforeEach(func() { - alog = &ActivitylogService{ - store: store.Create(), - tracer: noop.NewTracerProvider().Tracer("test"), - parentIdCache: ttlcache.NewCache(), - } - alog.debouncer = NewDebouncer(0, alog.storeActivity) + writebufferduration = 0 }) Describe("AddActivity", func() { @@ -76,7 +147,7 @@ var _ = Describe("ActivitylogService", func() { for _, tc := range testCases { tc := tc // capture range variable Context(tc.Name, func() { - BeforeEach(func() { + JustBeforeEach(func() { getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil } @@ -107,30 +178,88 @@ var _ = Describe("ActivitylogService", func() { "spaceid": resourceInfo("spaceid", "spaceid"), } ) - BeforeEach(func() { - alog = &ActivitylogService{ - store: store.Create(), - tracer: noop.NewTracerProvider().Tracer("test"), - parentIdCache: ttlcache.NewCache(), - } - alog.debouncer = NewDebouncer(100*time.Millisecond, alog.storeActivity) + + Describe("addActivity", func() { + var ( + getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return tree[ref.GetResourceId().GetOpaqueId()], nil + } + ) + + It("debounces activities", func() { + + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) + }).Should(Succeed()) + }) + + It("adheres to the MaxActivities setting", func() { + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(activities)).To(Equal(1)) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(activities)).To(Equal(2)) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity5", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity2", 0, "activity3", 0, "activity4", 0, "activity5", 0))) + }).Should(Succeed()) + }) }) - It("should debounce activities", func() { - getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { - return tree[ref.GetResourceId().GetOpaqueId()], nil - } + Describe("Activities", func() { + It("combines multiple batches", func() { + getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return tree[ref.GetResourceId().GetOpaqueId()], nil + } - err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) - Expect(err).NotTo(HaveOccurred()) - err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) - Expect(err).NotTo(HaveOccurred()) + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) - Eventually(func(g Gomega) { - activities, err := alog.Activities(resourceID("base")) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) - }).Should(Succeed()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0, "activity3", 0, "activity4", 0))) + }).Should(Succeed()) + }) }) }) })