Switch activitylog servie to a native nats store

Also split the list of activities in different chunks, one per buffered
write. That allows for scaling the service horizontally and also helps
reduce the load caused by the write operations.
This commit is contained in:
André Duffeck
2025-05-26 11:07:46 +02:00
parent 54f934cbb0
commit f9d4a038b3
3 changed files with 352 additions and 95 deletions

View File

@@ -31,6 +31,7 @@ type Options struct {
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
WriteBufferDuration time.Duration
MaxActivities int
}
// Logger configures a logger for the activitylog service
@@ -109,3 +110,10 @@ func WriteBufferDuration(d time.Duration) Option {
o.WriteBufferDuration = d
}
}
// MaxActivities sets the maximum number of activities to store
func MaxActivities(max int) Option {
return func(o *Options) {
o.MaxActivities = max
}
}

View File

@@ -2,11 +2,14 @@ package service
import (
"context"
"encoding/base32"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -14,12 +17,13 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/go-chi/chi/v5"
"github.com/jellydator/ttlcache/v2"
"github.com/nats-io/nats.go"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/reva/v2/pkg/utils"
"github.com/pkg/errors"
"github.com/vmihailenco/msgpack/v5"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/log"
@@ -29,7 +33,7 @@ import (
)
// Nats runs into max payload exceeded errors at around 7k activities. Let's keep a buffer.
var _maxActivities = 6000
var _maxActivitiesDefault = 6000
// RawActivity represents an activity as it is stored in the activitylog store
type RawActivity struct {
@@ -43,7 +47,6 @@ type ActivitylogService struct {
cfg *config.Config
log log.Logger
events <-chan events.Event
store microstore.Store
gws pool.Selectable[gateway.GatewayAPIClient]
mux *chi.Mux
evHistory ehsvc.EventHistoryService
@@ -53,6 +56,9 @@ type ActivitylogService struct {
tracer trace.Tracer
debouncer *Debouncer
parentIdCache *ttlcache.Cache
natskv nats.KeyValue
maxActivities int
registeredEvents map[string]events.Unmarshaller
}
@@ -71,6 +77,12 @@ type queueItem struct {
timer *time.Timer
}
type batchInfo struct {
key string
count int
timestamp time.Time
}
// NewDebouncer returns a new Debouncer instance
func NewDebouncer(d time.Duration, f func(id string, ra []RawActivity) error) *Debouncer {
return &Debouncer{
@@ -128,7 +140,9 @@ func (d *Debouncer) Debounce(id string, ra RawActivity) {
// New creates a new ActivitylogService
func New(opts ...Option) (*ActivitylogService, error) {
o := &Options{}
o := &Options{
MaxActivities: _maxActivitiesDefault,
}
for _, opt := range opts {
opt(o)
}
@@ -137,10 +151,6 @@ func New(opts ...Option) (*ActivitylogService, error) {
return nil, errors.New("stream is required")
}
if o.Store == nil {
return nil, errors.New("store is required")
}
ch, err := events.Consume(o.Stream, o.Config.Service.Name, o.RegisteredEvents...)
if err != nil {
return nil, err
@@ -152,11 +162,41 @@ func New(opts ...Option) (*ActivitylogService, error) {
return nil, err
}
// Connect to NATS servers
natsOptions := nats.Options{
Servers: o.Config.Store.Nodes,
}
conn, err := natsOptions.Connect()
if err != nil {
return nil, err
}
js, err := conn.JetStream()
if err != nil {
return nil, err
}
kv, err := js.KeyValue(o.Config.Store.Database)
if err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) {
return nil, errors.Wrapf(err, "Failed to get bucket (%s)", o.Config.Store.Database)
}
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: o.Config.Store.Database,
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to create bucket (%s)", o.Config.Store.Database)
}
}
if err != nil {
return nil, err
}
s := &ActivitylogService{
log: o.Logger,
cfg: o.Config,
events: ch,
store: o.Store,
gws: o.GatewaySelector,
mux: o.Mux,
evHistory: o.HistoryClient,
@@ -166,6 +206,8 @@ func New(opts ...Option) (*ActivitylogService, error) {
tp: o.TraceProvider,
tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/activitylog/pkg/service"),
parentIdCache: cache,
maxActivities: o.MaxActivities,
natskv: kv,
}
s.debouncer = NewDebouncer(o.WriteBufferDuration, s.storeActivity)
@@ -250,7 +292,7 @@ func (a *ActivitylogService) AddActivity(initRef *provider.Reference, parentId *
ctx, span = a.tracer.Start(ctx, "AddActivity")
defer span.End()
return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) {
return utils.GetResource(ctx, ref, gwc)
})
}
@@ -285,10 +327,10 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId,
}
var span trace.Span
ctx, span = a.tracer.Start(ctx, "AddActivity")
ctx, span = a.tracer.Start(ctx, "AddActivityTrashed")
defer span.End()
return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) {
return utils.GetResource(ctx, ref, gwc)
})
}
@@ -343,10 +385,8 @@ func (a *ActivitylogService) RemoveActivities(rid *provider.ResourceId, toDelete
return err
}
return a.store.Write(&microstore.Record{
Key: storagespace.FormatResourceID(rid),
Value: b,
})
_, err = a.natskv.Put(storagespace.FormatResourceID(rid), b)
return err
}
// RemoveResource removes the resource from the store
@@ -358,45 +398,52 @@ func (a *ActivitylogService) RemoveResource(rid *provider.ResourceId) error {
a.lock.Lock()
defer a.lock.Unlock()
return a.store.Delete(storagespace.FormatResourceID(rid))
return a.natskv.Delete(storagespace.FormatResourceID(rid))
}
func (a *ActivitylogService) activities(rid *provider.ResourceId) ([]RawActivity, error) {
resourceID := storagespace.FormatResourceID(rid)
records, err := a.store.Read(resourceID)
if err != nil && err != microstore.ErrNotFound {
return nil, fmt.Errorf("could not read activities: %w", err)
}
glob := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID)))
if len(records) == 0 {
return []RawActivity{}, nil
watcher, err := a.natskv.Watch(glob, nats.IgnoreDeletes())
if err != nil {
return nil, err
}
var activities []RawActivity
if err := msgpack.Unmarshal(records[0].Value, &activities); err != nil {
a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json")
if err := json.Unmarshal(records[0].Value, &activities); err != nil {
return nil, fmt.Errorf("could not unmarshal activities: %w", err)
for update := range watcher.Updates() {
if update == nil {
break
}
var batchActivities []RawActivity
if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil {
a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json")
}
activities = append(activities, batchActivities...)
}
return activities, nil
}
// note: getResource is abstracted to allow unit testing, in general this will just be utils.GetResource
func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(*provider.Reference) (*provider.ResourceInfo, error)) error {
func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(context.Context, *provider.Reference) (*provider.ResourceInfo, error)) error {
var (
err error
depth int
ref = initRef
)
ctx, span := a.tracer.Start(ctx, "addActivity")
defer span.End()
for {
var info *provider.ResourceInfo
id := ref.GetResourceId()
if ref.Path != "" {
// Path based reference, we need to resolve the resource id
info, err = getResource(ref)
ctx, span = a.tracer.Start(ctx, "addActivity.getResource")
info, err = getResource(ctx, ref)
span.End()
if err != nil {
return fmt.Errorf("could not get resource info: %w", err)
}
@@ -407,17 +454,15 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.
}
key := storagespace.FormatResourceID(id)
_, span := a.tracer.Start(ctx, "queueStoreActivity")
a.debouncer.Debounce(key, RawActivity{
EventID: eventID,
Depth: depth,
Timestamp: timestamp,
})
span.End()
if id.OpaqueId == id.SpaceId {
// we are at the root of the space, no need to go further
return nil
break
}
// check if parent id is cached
@@ -426,8 +471,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.
if parentId == nil {
if v, err := a.parentIdCache.Get(key); err != nil {
if info == nil {
_, span = a.tracer.Start(ctx, "getResource")
info, err = getResource(ref)
ctx, span := a.tracer.Start(ctx, "addActivity.getResource parent")
info, err = getResource(ctx, ref)
span.End()
if err != nil || info.GetParentId() == nil || info.GetParentId().GetOpaqueId() == "" {
return fmt.Errorf("could not get parent id: %w", err)
@@ -446,6 +491,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.
ref = &provider.Reference{ResourceId: parentId}
parentId = nil // reset parent id so it's not reused in the next iteration
}
return nil
}
func (a *ActivitylogService) storeActivity(resourceID string, activities []RawActivity) error {
@@ -454,43 +501,116 @@ func (a *ActivitylogService) storeActivity(resourceID string, activities []RawAc
ctx, span := a.tracer.Start(context.Background(), "storeActivity")
defer span.End()
_, subspan := a.tracer.Start(ctx, "store.Read")
records, err := a.store.Read(resourceID)
if err != nil && err != microstore.ErrNotFound {
return err
}
subspan.End()
_, subspan = a.tracer.Start(ctx, "Unmarshal")
var existingActivities []RawActivity
if len(records) > 0 {
if err := msgpack.Unmarshal(records[0].Value, &existingActivities); err != nil {
a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json")
if err := json.Unmarshal(records[0].Value, &existingActivities); err != nil {
return err
}
}
}
subspan.End()
if l := len(existingActivities) + len(activities); l >= _maxActivities {
start := min(len(existingActivities), l-_maxActivities+1)
existingActivities = existingActivities[start:]
}
activities = append(existingActivities, activities...)
_, subspan = a.tracer.Start(ctx, "Unmarshal")
_, subspan := a.tracer.Start(ctx, "storeActivity.Marshal")
b, err := msgpack.Marshal(activities)
if err != nil {
return err
}
subspan.End()
return a.store.Write(&microstore.Record{
Key: resourceID,
Value: b,
_, subspan = a.tracer.Start(ctx, "storeActivity.natskv.Put")
key := a.natsKey(resourceID, len(activities))
_, err = a.natskv.Put(key, b)
if err != nil {
return err
}
subspan.End()
ctx, subspan = a.tracer.Start(ctx, "storeActivity.enforceMaxActivities")
a.enforceMaxActivities(ctx, resourceID)
subspan.End()
return nil
}
func (a *ActivitylogService) natsKey(resourceID string, activitiesCount int) string {
return fmt.Sprintf("%s.%d.%d",
base32.StdEncoding.EncodeToString([]byte(resourceID)),
activitiesCount,
time.Now().UnixNano())
}
func (a *ActivitylogService) enforceMaxActivities(ctx context.Context, resourceID string) {
if a.maxActivities <= 0 {
return
}
key := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID)))
_, subspan := a.tracer.Start(ctx, "enforceMaxActivities.watch")
watcher, err := a.natskv.Watch(key, nats.IgnoreDeletes())
if err != nil {
a.log.Error().Err(err).Str("resourceID", resourceID).Msg("could not watch")
return
}
var keys []string
for update := range watcher.Updates() {
if update == nil {
break
}
var batchActivities []RawActivity
if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil {
a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json")
}
keys = append(keys, update.Key())
}
subspan.End()
_, subspan = a.tracer.Start(ctx, "enforceMaxActivities.compile")
// Parse keys into batches
batches := make([]batchInfo, 0)
var activitiesCount int
for _, k := range keys {
parts := strings.SplitN(k, ".", 3)
if len(parts) < 3 {
a.log.Warn().Str("key", k).Msg("skipping key, not enough parts")
continue
}
c, err := strconv.Atoi(parts[1])
if err != nil {
a.log.Warn().Str("key", k).Msg("skipping key, can not parse count")
continue
}
// parse timestamp
nano, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
a.log.Warn().Str("key", k).Msg("skipping key, can not parse timestamp")
continue
}
batches = append(batches, batchInfo{
key: k,
count: c,
timestamp: time.Unix(0, nano),
})
activitiesCount += c
}
// sort batches by timestamp
sort.Slice(batches, func(i, j int) bool {
return batches[i].timestamp.Before(batches[j].timestamp)
})
subspan.End()
_, subspan = a.tracer.Start(ctx, "enforceMaxActivities.delete")
// remove oldest keys until we are at max activities
for _, b := range batches {
if activitiesCount-b.count < a.maxActivities {
break
}
activitiesCount -= b.count
err = a.natskv.Delete(b.key)
if err != nil {
a.log.Error().Err(err).Str("key", b.key).Msg("could not delete key")
break
}
}
subspan.End()
}
func toRef(r *provider.ResourceId) *provider.Reference {

View File

@@ -2,30 +2,101 @@ package service
import (
"context"
"net"
"os"
"path/filepath"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/jellydator/ttlcache/v2"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
nserver "github.com/nats-io/nats-server/v2/server"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/opencloud-eu/reva/v2/pkg/store"
"github.com/opencloud-eu/opencloud/services/activitylog/pkg/config"
eventsmocks "github.com/opencloud-eu/reva/v2/pkg/events/mocks"
"github.com/test-go/testify/mock"
"go.opentelemetry.io/otel/trace/noop"
)
var (
server *nserver.Server
tmpdir string
)
func getFreeLocalhostPort() (int, error) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return -1, err
}
port := l.Addr().(*net.TCPAddr).Port
_ = l.Close() // Close the listener immediately to free the port
return port, nil
}
// Spawn a nats server and a JetStream instance for the duration of the test suite.
// The different tests need to make sure to use different databases to avoid conflicts.
var _ = SynchronizedBeforeSuite(func() {
port, err := getFreeLocalhostPort()
server, err = nserver.NewServer(&nserver.Options{
Port: port,
})
Expect(err).ToNot(HaveOccurred())
tmpdir, err = os.MkdirTemp("", "activitylog-test")
natsdir := filepath.Join(tmpdir, "nats-js")
jsConf := &nserver.JetStreamConfig{
StoreDir: natsdir,
}
// first start NATS
go server.Start()
time.Sleep(time.Second)
// second start JetStream
err = server.EnableJetStream(jsConf)
Expect(err).ToNot(HaveOccurred())
}, func() {})
var _ = SynchronizedAfterSuite(func() {
server.Shutdown()
_ = os.RemoveAll(tmpdir)
}, func() {})
var _ = Describe("ActivitylogService", func() {
var (
alog *ActivitylogService
getResource func(ref *provider.Reference) (*provider.ResourceInfo, error)
alog *ActivitylogService
getResource func(ref *provider.Reference) (*provider.ResourceInfo, error)
writebufferduration = 100 * time.Millisecond
)
JustBeforeEach(func() {
var err error
stream := &eventsmocks.Stream{}
stream.EXPECT().Consume(mock.Anything, mock.Anything).Return(nil, nil)
alog, err = New(
Config(&config.Config{
Service: config.Service{
Name: "activitylog-test",
},
Store: config.Store{
Store: "nats-js-kv",
Nodes: []string{server.Addr().String()},
Database: "activitylog-test-" + uuid.New().String(),
},
}),
MaxActivities(4),
WriteBufferDuration(writebufferduration),
Stream(stream),
TraceProvider(noop.NewTracerProvider()),
Mux(chi.NewMux()),
)
Expect(err).ToNot(HaveOccurred())
})
Context("with a noop debouncer", func() {
BeforeEach(func() {
alog = &ActivitylogService{
store: store.Create(),
tracer: noop.NewTracerProvider().Tracer("test"),
parentIdCache: ttlcache.NewCache(),
}
alog.debouncer = NewDebouncer(0, alog.storeActivity)
writebufferduration = 0
})
Describe("AddActivity", func() {
@@ -76,7 +147,7 @@ var _ = Describe("ActivitylogService", func() {
for _, tc := range testCases {
tc := tc // capture range variable
Context(tc.Name, func() {
BeforeEach(func() {
JustBeforeEach(func() {
getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil
}
@@ -107,30 +178,88 @@ var _ = Describe("ActivitylogService", func() {
"spaceid": resourceInfo("spaceid", "spaceid"),
}
)
BeforeEach(func() {
alog = &ActivitylogService{
store: store.Create(),
tracer: noop.NewTracerProvider().Tracer("test"),
parentIdCache: ttlcache.NewCache(),
}
alog.debouncer = NewDebouncer(100*time.Millisecond, alog.storeActivity)
Describe("addActivity", func() {
var (
getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tree[ref.GetResourceId().GetOpaqueId()], nil
}
)
It("debounces activities", func() {
err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0)))
}).Should(Succeed())
})
It("adheres to the MaxActivities setting", func() {
err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(activities)).To(Equal(1))
}).Should(Succeed())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(activities)).To(Equal(2))
}).Should(Succeed())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity5", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(activities).To(ConsistOf(activitites("activity2", 0, "activity3", 0, "activity4", 0, "activity5", 0)))
}).Should(Succeed())
})
})
It("should debounce activities", func() {
getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tree[ref.GetResourceId().GetOpaqueId()], nil
}
Describe("Activities", func() {
It("combines multiple batches", func() {
getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return tree[ref.GetResourceId().GetOpaqueId()], nil
}
err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0)))
}).Should(Succeed())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0)))
}).Should(Succeed())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource)
Expect(err).NotTo(HaveOccurred())
Eventually(func(g Gomega) {
activities, err := alog.Activities(resourceID("base"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0, "activity3", 0, "activity4", 0)))
}).Should(Succeed())
})
})
})
})