diff --git a/services/activitylog/pkg/service/migrations.go b/services/activitylog/pkg/service/migrations.go index 95356424dc..02e9f2c64c 100644 --- a/services/activitylog/pkg/service/migrations.go +++ b/services/activitylog/pkg/service/migrations.go @@ -10,39 +10,37 @@ import ( "github.com/vmihailenco/msgpack/v5" ) -const activitylogVersionKey = "activilog.version" +const activitylogVersionKey = "activitylog.version" const currentMigrationVersion = "1" -// RunMigrations checks the activilog data version and runs migrations if necessary. +// RunMigrations checks the activitylog data version and runs migrations if necessary. // It should be called during service startup, after the NATS KeyValue store is initialized. -func runMigrations(ctx context.Context, kv nats.KeyValue) error { +func (a *ActivitylogService) runMigrations(ctx context.Context, kv nats.KeyValue) error { entry, err := kv.Get(activitylogVersionKey) if err == nats.ErrKeyNotFound { - // Key doesn't exist, version is implicitly pre-V1. Run migration to V1. - log.Println("Activilog version key not found. Running migration to V1...") - return migrateToV1(ctx, kv) + a.log.Info().Msg("activitylog version key not found. Running migration to V1...") + return a.migrateToV1(ctx, kv) } else if err != nil { - return fmt.Errorf("failed to get activilog version from NATS KV store: %w", err) + return fmt.Errorf("failed to get activitylog version from NATS KV store: %w", err) } version := string(entry.Value()) if version == currentMigrationVersion { - log.Printf("Activilog data version is '%s'. No migration needed.", version) + a.log.Debug().Str("currentVersion", version).Msg("No migration needed") return nil } // If version is something else, it might indicate a future version or an unexpected state. // Add logic here if more complex version handling is needed. - log.Printf("Activilog data version is '%s', expected '%s'. Migration logic for this scenario is not implemented.", version, currentMigrationVersion) - return fmt.Errorf("unexpected activilog version: %s, expected %s", version, currentMigrationVersion) + return fmt.Errorf("unexpected activitylog version: %s, expected %s or older", version, currentMigrationVersion) } // migrateToV1 performs the data migration to version 1. // It iterates over all keys, expecting their values to be JSON arrays of strings. // For each such key, it creates a new key in the format "originalKey:count:timestamp" // and stores the original list of strings (re-marshalled to JSON) as its value. -// Finally, it sets the activilog.version key to "1". -func migrateToV1(_ context.Context, kv nats.KeyValue) error { +// Finally, it sets the activitylog.version key to "1". +func (a *ActivitylogService) migrateToV1(_ context.Context, kv nats.KeyValue) error { lister, err := kv.ListKeys() if err != nil { return fmt.Errorf("migrateToV1: failed to list keys from NATS KV store: %w", err) @@ -69,12 +67,8 @@ func migrateToV1(_ context.Context, kv nats.KeyValue) error { // Get the original value entry, err := kv.Get(key) - if err == nats.ErrKeyNotFound { - log.Printf("migrateToV1: Key '%s' disappeared during migration, skipping.", key) - skippedCount++ - continue - } else if err != nil { - log.Printf("migrateToV1: Failed to get value for key '%s': %v. Skipping.", key, err) + if err != nil { + a.log.Error().Err(err).Str("key", key).Msg("migrateToV1: Failed to get value for key. Skipping.") skippedCount++ continue } @@ -83,7 +77,7 @@ func migrateToV1(_ context.Context, kv nats.KeyValue) error { val := keyValueEnvelope{} // Unmarshal the value into the keyValueEnvelope structure if err := json.Unmarshal(valBytes, &val); err != nil { - log.Printf("migrateToV1: Value for key '%s' is not a keyValueEnvelope: %v. Skipping.", key, err) + a.log.Error().Err(err).Str("key", key).Msg("migrateToV1: Value for key ss not a keyValueEnvelope. Skipping.") skippedCount++ } @@ -92,7 +86,7 @@ func migrateToV1(_ context.Context, kv nats.KeyValue) error { if err := msgpack.Unmarshal(val.Data, &activities); err != nil { if err := json.Unmarshal(val.Data, &activities); err != nil { // This key's value is not a JSON array of strings. Skip it. - log.Printf("migrateToV1: Value for key '%s' is not a msgback or JSON array of strings: %v. Skipping.", key, err) + a.log.Error().Err(err).Str("key", key).Msg("migrateToV1: Value for key is not a msgback or JSON array of strings. Skipping.") skippedCount++ continue } @@ -102,14 +96,14 @@ func migrateToV1(_ context.Context, kv nats.KeyValue) error { newKey := natsKey(val.Key, len(activities)) newValue, err := msgpack.Marshal(activities) if err != nil { - log.Printf("migrateToV1: Failed to marshal activities for key '%s': %v. Skipping.", key, err) + a.log.Error().Err(err).Str("key", key).Msg("migrateToV1: Failed to marshal activities. Skipping.") skippedCount++ continue } // Write the value (the list of strings, marshalled as JSON) under the new key if _, err := kv.Put(newKey, newValue); err != nil { - log.Printf("migrateToV1: Failed to put new key '%s' (original key '%s') in NATS KV store: %v. Skipping.", newKey, key, err) + a.log.Error().Err(err).Str("newKey", newKey).Str("key", key).Msg("migrateToV1: Failed to put new key. Skipping.") skippedCount++ continue } @@ -121,15 +115,14 @@ func migrateToV1(_ context.Context, kv nats.KeyValue) error { continue } - log.Printf("migrateToV1: Migrated key '%s' to '%s' with %d elements.", key, newKey, len(activities)) migratedCount++ } - // Set the activilog version to "1" after migration + // Set the activitylog version to "1" after migration if _, err := kv.PutString(activitylogVersionKey, currentMigrationVersion); err != nil { - return fmt.Errorf("migrateToV1: failed to set activilog version key to '%s' in NATS KV store: %w", currentMigrationVersion, err) + return fmt.Errorf("migrateToV1: failed to set activitylog version key to '%s' in NATS KV store: %w", currentMigrationVersion, err) } - log.Printf("Migration to V1 complete. Migrated %d keys, skipped %d keys. Activilog version set to '%s'.", migratedCount, skippedCount, currentMigrationVersion) + a.log.Info().Int("migrated", migratedCount).Int("skipped", skippedCount).Msg("Migration to V1 complete") return nil } diff --git a/services/activitylog/pkg/service/service.go b/services/activitylog/pkg/service/service.go index 75889ba299..b9488a84ba 100644 --- a/services/activitylog/pkg/service/service.go +++ b/services/activitylog/pkg/service/service.go @@ -193,12 +193,6 @@ func New(opts ...Option) (*ActivitylogService, error) { return nil, err } - // run migrations - err = runMigrations(context.Background(), kv) - if err != nil { - return nil, err - } - s := &ActivitylogService{ log: o.Logger, cfg: o.Config, @@ -217,6 +211,12 @@ func New(opts ...Option) (*ActivitylogService, error) { } s.debouncer = NewDebouncer(o.Config.WriteBufferDuration, s.storeActivity) + // run migrations + err = s.runMigrations(context.Background(), kv) + if err != nil { + return nil, err + } + s.mux.Get("/graph/v1beta1/extensions/org.libregraph/activities", s.HandleGetItemActivities) for _, e := range o.RegisteredEvents {