From f9d4a038b37da20aa38e174b91234e2eca893b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 26 May 2025 11:07:46 +0200 Subject: [PATCH] Switch activitylog servie to a native nats store Also split the list of activities in different chunks, one per buffered write. That allows for scaling the service horizontally and also helps reduce the load caused by the write operations. --- services/activitylog/pkg/service/options.go | 8 + services/activitylog/pkg/service/service.go | 248 +++++++++++++----- .../activitylog/pkg/service/service_test.go | 191 +++++++++++--- 3 files changed, 352 insertions(+), 95 deletions(-) diff --git a/services/activitylog/pkg/service/options.go b/services/activitylog/pkg/service/options.go index 3427f326f..0a34bd9a3 100644 --- a/services/activitylog/pkg/service/options.go +++ b/services/activitylog/pkg/service/options.go @@ -31,6 +31,7 @@ type Options struct { HistoryClient ehsvc.EventHistoryService ValueClient settingssvc.ValueService WriteBufferDuration time.Duration + MaxActivities int } // Logger configures a logger for the activitylog service @@ -109,3 +110,10 @@ func WriteBufferDuration(d time.Duration) Option { o.WriteBufferDuration = d } } + +// MaxActivities sets the maximum number of activities to store +func MaxActivities(max int) Option { + return func(o *Options) { + o.MaxActivities = max + } +} diff --git a/services/activitylog/pkg/service/service.go b/services/activitylog/pkg/service/service.go index ab05efc2d..0f73e4b68 100644 --- a/services/activitylog/pkg/service/service.go +++ b/services/activitylog/pkg/service/service.go @@ -2,11 +2,14 @@ package service import ( "context" + "encoding/base32" "encoding/json" - "errors" "fmt" "path/filepath" "reflect" + "sort" + "strconv" + "strings" "sync" "time" @@ -14,12 +17,13 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/go-chi/chi/v5" "github.com/jellydator/ttlcache/v2" + "github.com/nats-io/nats.go" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" + "github.com/pkg/errors" "github.com/vmihailenco/msgpack/v5" - microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" "github.com/opencloud-eu/opencloud/pkg/log" @@ -29,7 +33,7 @@ import ( ) // Nats runs into max payload exceeded errors at around 7k activities. Let's keep a buffer. -var _maxActivities = 6000 +var _maxActivitiesDefault = 6000 // RawActivity represents an activity as it is stored in the activitylog store type RawActivity struct { @@ -43,7 +47,6 @@ type ActivitylogService struct { cfg *config.Config log log.Logger events <-chan events.Event - store microstore.Store gws pool.Selectable[gateway.GatewayAPIClient] mux *chi.Mux evHistory ehsvc.EventHistoryService @@ -53,6 +56,9 @@ type ActivitylogService struct { tracer trace.Tracer debouncer *Debouncer parentIdCache *ttlcache.Cache + natskv nats.KeyValue + + maxActivities int registeredEvents map[string]events.Unmarshaller } @@ -71,6 +77,12 @@ type queueItem struct { timer *time.Timer } +type batchInfo struct { + key string + count int + timestamp time.Time +} + // NewDebouncer returns a new Debouncer instance func NewDebouncer(d time.Duration, f func(id string, ra []RawActivity) error) *Debouncer { return &Debouncer{ @@ -128,7 +140,9 @@ func (d *Debouncer) Debounce(id string, ra RawActivity) { // New creates a new ActivitylogService func New(opts ...Option) (*ActivitylogService, error) { - o := &Options{} + o := &Options{ + MaxActivities: _maxActivitiesDefault, + } for _, opt := range opts { opt(o) } @@ -137,10 +151,6 @@ func New(opts ...Option) (*ActivitylogService, error) { return nil, errors.New("stream is required") } - if o.Store == nil { - return nil, errors.New("store is required") - } - ch, err := events.Consume(o.Stream, o.Config.Service.Name, o.RegisteredEvents...) if err != nil { return nil, err @@ -152,11 +162,41 @@ func New(opts ...Option) (*ActivitylogService, error) { return nil, err } + // Connect to NATS servers + natsOptions := nats.Options{ + Servers: o.Config.Store.Nodes, + } + conn, err := natsOptions.Connect() + if err != nil { + return nil, err + } + + js, err := conn.JetStream() + if err != nil { + return nil, err + } + + kv, err := js.KeyValue(o.Config.Store.Database) + if err != nil { + if !errors.Is(err, nats.ErrBucketNotFound) { + return nil, errors.Wrapf(err, "Failed to get bucket (%s)", o.Config.Store.Database) + } + + kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: o.Config.Store.Database, + }) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create bucket (%s)", o.Config.Store.Database) + } + } + if err != nil { + return nil, err + } + s := &ActivitylogService{ log: o.Logger, cfg: o.Config, events: ch, - store: o.Store, gws: o.GatewaySelector, mux: o.Mux, evHistory: o.HistoryClient, @@ -166,6 +206,8 @@ func New(opts ...Option) (*ActivitylogService, error) { tp: o.TraceProvider, tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/activitylog/pkg/service"), parentIdCache: cache, + maxActivities: o.MaxActivities, + natskv: kv, } s.debouncer = NewDebouncer(o.WriteBufferDuration, s.storeActivity) @@ -250,7 +292,7 @@ func (a *ActivitylogService) AddActivity(initRef *provider.Reference, parentId * ctx, span = a.tracer.Start(ctx, "AddActivity") defer span.End() - return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return a.addActivity(ctx, initRef, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) { return utils.GetResource(ctx, ref, gwc) }) } @@ -285,10 +327,10 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId, } var span trace.Span - ctx, span = a.tracer.Start(ctx, "AddActivity") + ctx, span = a.tracer.Start(ctx, "AddActivityTrashed") defer span.End() - return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return a.addActivity(ctx, ref, parentId, eventID, timestamp, func(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, error) { return utils.GetResource(ctx, ref, gwc) }) } @@ -343,10 +385,8 @@ func (a *ActivitylogService) RemoveActivities(rid *provider.ResourceId, toDelete return err } - return a.store.Write(µstore.Record{ - Key: storagespace.FormatResourceID(rid), - Value: b, - }) + _, err = a.natskv.Put(storagespace.FormatResourceID(rid), b) + return err } // RemoveResource removes the resource from the store @@ -358,45 +398,52 @@ func (a *ActivitylogService) RemoveResource(rid *provider.ResourceId) error { a.lock.Lock() defer a.lock.Unlock() - return a.store.Delete(storagespace.FormatResourceID(rid)) + return a.natskv.Delete(storagespace.FormatResourceID(rid)) } func (a *ActivitylogService) activities(rid *provider.ResourceId) ([]RawActivity, error) { resourceID := storagespace.FormatResourceID(rid) - records, err := a.store.Read(resourceID) - if err != nil && err != microstore.ErrNotFound { - return nil, fmt.Errorf("could not read activities: %w", err) - } + glob := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID))) - if len(records) == 0 { - return []RawActivity{}, nil + watcher, err := a.natskv.Watch(glob, nats.IgnoreDeletes()) + if err != nil { + return nil, err } var activities []RawActivity - if err := msgpack.Unmarshal(records[0].Value, &activities); err != nil { - a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") - if err := json.Unmarshal(records[0].Value, &activities); err != nil { - return nil, fmt.Errorf("could not unmarshal activities: %w", err) + for update := range watcher.Updates() { + if update == nil { + break } + + var batchActivities []RawActivity + if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil { + a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") + } + activities = append(activities, batchActivities...) } return activities, nil } // note: getResource is abstracted to allow unit testing, in general this will just be utils.GetResource -func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(*provider.Reference) (*provider.ResourceInfo, error)) error { +func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider.Reference, parentId *provider.ResourceId, eventID string, timestamp time.Time, getResource func(context.Context, *provider.Reference) (*provider.ResourceInfo, error)) error { var ( err error depth int ref = initRef ) + ctx, span := a.tracer.Start(ctx, "addActivity") + defer span.End() for { var info *provider.ResourceInfo id := ref.GetResourceId() if ref.Path != "" { // Path based reference, we need to resolve the resource id - info, err = getResource(ref) + ctx, span = a.tracer.Start(ctx, "addActivity.getResource") + info, err = getResource(ctx, ref) + span.End() if err != nil { return fmt.Errorf("could not get resource info: %w", err) } @@ -407,17 +454,15 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. } key := storagespace.FormatResourceID(id) - _, span := a.tracer.Start(ctx, "queueStoreActivity") a.debouncer.Debounce(key, RawActivity{ EventID: eventID, Depth: depth, Timestamp: timestamp, }) - span.End() if id.OpaqueId == id.SpaceId { // we are at the root of the space, no need to go further - return nil + break } // check if parent id is cached @@ -426,8 +471,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. if parentId == nil { if v, err := a.parentIdCache.Get(key); err != nil { if info == nil { - _, span = a.tracer.Start(ctx, "getResource") - info, err = getResource(ref) + ctx, span := a.tracer.Start(ctx, "addActivity.getResource parent") + info, err = getResource(ctx, ref) span.End() if err != nil || info.GetParentId() == nil || info.GetParentId().GetOpaqueId() == "" { return fmt.Errorf("could not get parent id: %w", err) @@ -446,6 +491,8 @@ func (a *ActivitylogService) addActivity(ctx context.Context, initRef *provider. ref = &provider.Reference{ResourceId: parentId} parentId = nil // reset parent id so it's not reused in the next iteration } + + return nil } func (a *ActivitylogService) storeActivity(resourceID string, activities []RawActivity) error { @@ -454,43 +501,116 @@ func (a *ActivitylogService) storeActivity(resourceID string, activities []RawAc ctx, span := a.tracer.Start(context.Background(), "storeActivity") defer span.End() - _, subspan := a.tracer.Start(ctx, "store.Read") - records, err := a.store.Read(resourceID) - if err != nil && err != microstore.ErrNotFound { - return err - } - subspan.End() - _, subspan = a.tracer.Start(ctx, "Unmarshal") - var existingActivities []RawActivity - if len(records) > 0 { - if err := msgpack.Unmarshal(records[0].Value, &existingActivities); err != nil { - a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") - if err := json.Unmarshal(records[0].Value, &existingActivities); err != nil { - return err - } - } - } - subspan.End() - - if l := len(existingActivities) + len(activities); l >= _maxActivities { - start := min(len(existingActivities), l-_maxActivities+1) - existingActivities = existingActivities[start:] - } - - activities = append(existingActivities, activities...) - - _, subspan = a.tracer.Start(ctx, "Unmarshal") + _, subspan := a.tracer.Start(ctx, "storeActivity.Marshal") b, err := msgpack.Marshal(activities) if err != nil { return err } subspan.End() - return a.store.Write(µstore.Record{ - Key: resourceID, - Value: b, + _, subspan = a.tracer.Start(ctx, "storeActivity.natskv.Put") + key := a.natsKey(resourceID, len(activities)) + _, err = a.natskv.Put(key, b) + if err != nil { + return err + } + subspan.End() + + ctx, subspan = a.tracer.Start(ctx, "storeActivity.enforceMaxActivities") + a.enforceMaxActivities(ctx, resourceID) + subspan.End() + return nil +} + +func (a *ActivitylogService) natsKey(resourceID string, activitiesCount int) string { + return fmt.Sprintf("%s.%d.%d", + base32.StdEncoding.EncodeToString([]byte(resourceID)), + activitiesCount, + time.Now().UnixNano()) +} + +func (a *ActivitylogService) enforceMaxActivities(ctx context.Context, resourceID string) { + if a.maxActivities <= 0 { + return + } + + key := fmt.Sprintf("%s.>", base32.StdEncoding.EncodeToString([]byte(resourceID))) + + _, subspan := a.tracer.Start(ctx, "enforceMaxActivities.watch") + watcher, err := a.natskv.Watch(key, nats.IgnoreDeletes()) + if err != nil { + a.log.Error().Err(err).Str("resourceID", resourceID).Msg("could not watch") + return + } + + var keys []string + for update := range watcher.Updates() { + if update == nil { + break + } + + var batchActivities []RawActivity + if err := msgpack.Unmarshal(update.Value(), &batchActivities); err != nil { + a.log.Debug().Err(err).Str("resourceID", resourceID).Msg("could not unmarshal messagepack, trying json") + } + keys = append(keys, update.Key()) + } + subspan.End() + + _, subspan = a.tracer.Start(ctx, "enforceMaxActivities.compile") + // Parse keys into batches + batches := make([]batchInfo, 0) + var activitiesCount int + for _, k := range keys { + parts := strings.SplitN(k, ".", 3) + if len(parts) < 3 { + a.log.Warn().Str("key", k).Msg("skipping key, not enough parts") + continue + } + + c, err := strconv.Atoi(parts[1]) + if err != nil { + a.log.Warn().Str("key", k).Msg("skipping key, can not parse count") + continue + } + + // parse timestamp + nano, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + a.log.Warn().Str("key", k).Msg("skipping key, can not parse timestamp") + continue + } + + batches = append(batches, batchInfo{ + key: k, + count: c, + timestamp: time.Unix(0, nano), + }) + activitiesCount += c + } + + // sort batches by timestamp + sort.Slice(batches, func(i, j int) bool { + return batches[i].timestamp.Before(batches[j].timestamp) }) + subspan.End() + + _, subspan = a.tracer.Start(ctx, "enforceMaxActivities.delete") + // remove oldest keys until we are at max activities + for _, b := range batches { + if activitiesCount-b.count < a.maxActivities { + break + } + + activitiesCount -= b.count + err = a.natskv.Delete(b.key) + if err != nil { + a.log.Error().Err(err).Str("key", b.key).Msg("could not delete key") + break + } + } + subspan.End() } func toRef(r *provider.ResourceId) *provider.Reference { diff --git a/services/activitylog/pkg/service/service_test.go b/services/activitylog/pkg/service/service_test.go index 49f14461f..7e48e0c65 100644 --- a/services/activitylog/pkg/service/service_test.go +++ b/services/activitylog/pkg/service/service_test.go @@ -2,30 +2,101 @@ package service import ( "context" + "net" + "os" + "path/filepath" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/jellydator/ttlcache/v2" + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + nserver "github.com/nats-io/nats-server/v2/server" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/opencloud-eu/reva/v2/pkg/store" + "github.com/opencloud-eu/opencloud/services/activitylog/pkg/config" + eventsmocks "github.com/opencloud-eu/reva/v2/pkg/events/mocks" + "github.com/test-go/testify/mock" "go.opentelemetry.io/otel/trace/noop" ) +var ( + server *nserver.Server + tmpdir string +) + +func getFreeLocalhostPort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return -1, err + } + + port := l.Addr().(*net.TCPAddr).Port + _ = l.Close() // Close the listener immediately to free the port + return port, nil +} + +// Spawn a nats server and a JetStream instance for the duration of the test suite. +// The different tests need to make sure to use different databases to avoid conflicts. +var _ = SynchronizedBeforeSuite(func() { + port, err := getFreeLocalhostPort() + server, err = nserver.NewServer(&nserver.Options{ + Port: port, + }) + Expect(err).ToNot(HaveOccurred()) + + tmpdir, err = os.MkdirTemp("", "activitylog-test") + natsdir := filepath.Join(tmpdir, "nats-js") + jsConf := &nserver.JetStreamConfig{ + StoreDir: natsdir, + } + // first start NATS + go server.Start() + time.Sleep(time.Second) + + // second start JetStream + err = server.EnableJetStream(jsConf) + Expect(err).ToNot(HaveOccurred()) +}, func() {}) + +var _ = SynchronizedAfterSuite(func() { + server.Shutdown() + _ = os.RemoveAll(tmpdir) +}, func() {}) + var _ = Describe("ActivitylogService", func() { var ( - alog *ActivitylogService - getResource func(ref *provider.Reference) (*provider.ResourceInfo, error) + alog *ActivitylogService + getResource func(ref *provider.Reference) (*provider.ResourceInfo, error) + writebufferduration = 100 * time.Millisecond ) + JustBeforeEach(func() { + var err error + stream := &eventsmocks.Stream{} + stream.EXPECT().Consume(mock.Anything, mock.Anything).Return(nil, nil) + alog, err = New( + Config(&config.Config{ + Service: config.Service{ + Name: "activitylog-test", + }, + Store: config.Store{ + Store: "nats-js-kv", + Nodes: []string{server.Addr().String()}, + Database: "activitylog-test-" + uuid.New().String(), + }, + }), + MaxActivities(4), + WriteBufferDuration(writebufferduration), + Stream(stream), + TraceProvider(noop.NewTracerProvider()), + Mux(chi.NewMux()), + ) + Expect(err).ToNot(HaveOccurred()) + }) + Context("with a noop debouncer", func() { BeforeEach(func() { - alog = &ActivitylogService{ - store: store.Create(), - tracer: noop.NewTracerProvider().Tracer("test"), - parentIdCache: ttlcache.NewCache(), - } - alog.debouncer = NewDebouncer(0, alog.storeActivity) + writebufferduration = 0 }) Describe("AddActivity", func() { @@ -76,7 +147,7 @@ var _ = Describe("ActivitylogService", func() { for _, tc := range testCases { tc := tc // capture range variable Context(tc.Name, func() { - BeforeEach(func() { + JustBeforeEach(func() { getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { return tc.Tree[ref.GetResourceId().GetOpaqueId()], nil } @@ -107,30 +178,88 @@ var _ = Describe("ActivitylogService", func() { "spaceid": resourceInfo("spaceid", "spaceid"), } ) - BeforeEach(func() { - alog = &ActivitylogService{ - store: store.Create(), - tracer: noop.NewTracerProvider().Tracer("test"), - parentIdCache: ttlcache.NewCache(), - } - alog.debouncer = NewDebouncer(100*time.Millisecond, alog.storeActivity) + + Describe("addActivity", func() { + var ( + getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return tree[ref.GetResourceId().GetOpaqueId()], nil + } + ) + + It("debounces activities", func() { + + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) + }).Should(Succeed()) + }) + + It("adheres to the MaxActivities setting", func() { + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(activities)).To(Equal(1)) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(activities)).To(Equal(2)) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity5", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity2", 0, "activity3", 0, "activity4", 0, "activity5", 0))) + }).Should(Succeed()) + }) }) - It("should debounce activities", func() { - getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { - return tree[ref.GetResourceId().GetOpaqueId()], nil - } + Describe("Activities", func() { + It("combines multiple batches", func() { + getResource = func(ref *provider.Reference) (*provider.ResourceInfo, error) { + return tree[ref.GetResourceId().GetOpaqueId()], nil + } - err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) - Expect(err).NotTo(HaveOccurred()) - err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) - Expect(err).NotTo(HaveOccurred()) + err := alog.addActivity(context.Background(), reference("base"), nil, "activity1", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity2", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) - Eventually(func(g Gomega) { - activities, err := alog.Activities(resourceID("base")) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) - }).Should(Succeed()) + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0))) + }).Should(Succeed()) + + err = alog.addActivity(context.Background(), reference("base"), nil, "activity3", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + err = alog.addActivity(context.Background(), reference("base"), nil, "activity4", time.Time{}, getResource) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func(g Gomega) { + activities, err := alog.Activities(resourceID("base")) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(activities).To(ConsistOf(activitites("activity1", 0, "activity2", 0, "activity3", 0, "activity4", 0))) + }).Should(Succeed()) + }) }) }) })