diff --git a/api/v1/server/run/run.go b/api/v1/server/run/run.go index 659fadec7..f782ce53c 100644 --- a/api/v1/server/run/run.go +++ b/api/v1/server/run/run.go @@ -401,10 +401,16 @@ func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T) (*populator.Po return nil, "", err } + payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID.String(), v1Event.ExternalID) + + if err != nil { + return nil, "", err + } + event = &dbsqlc.Event{ ID: v1Event.ExternalID, TenantId: v1Event.TenantID, - Data: v1Event.Payload, + Data: payload, CreatedAt: pgtype.Timestamp(v1Event.SeenAt), AdditionalMetadata: v1Event.AdditionalMetadata, Key: v1Event.Key, diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 10b7e8b05..0684f8f5c 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -1976,9 +1976,14 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev defer rollback() - // todo: remove this when we remove dual writes eventsToInsert := events + eventExternalIdToPayload := make(map[pgtype.UUID][]byte) + for i, payload := range eventsToInsert.Payloads { + eventExternalIdToPayload[eventsToInsert.Externalids[i]] = payload + } + + // todo: remove this when we remove dual writes if !r.payloadStore.OLAPDualWritesEnabled() { payloads := make([][]byte, len(eventsToInsert.Payloads)) @@ -2066,6 +2071,7 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev ID: id, InsertedAt: insertedAt, }] = event.ExternalID + payload := eventExternalIdToPayload[event.ExternalID] offloadToExternalOpts = append(offloadToExternalOpts, OffloadToExternalStoreOpts{ StorePayloadOpts: &StorePayloadOpts{ @@ -2073,7 +2079,7 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev InsertedAt: event.SeenAt, ExternalId: event.ExternalID, Type: sqlcv1.V1PayloadTypeTASKINPUT, - Payload: event.Payload, + Payload: payload, TenantId: event.TenantID.String(), }, OffloadAt: time.Now(),