Fix: read payloads from payload store for event API (#2471)

* fix: read payloads from payload store

* debug: add log

* debug: more log lines

* fix: bug

* fix: rm debug lines

* fix: comment loc
This commit is contained in:
matt
2025-10-31 00:57:36 +01:00
committed by GitHub
parent b19dc230ed
commit 99544bbd4e
2 changed files with 15 additions and 3 deletions

View File

@@ -401,10 +401,16 @@ func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T) (*populator.Po
return nil, "", err
}
payload, err := t.config.V1.OLAP().ReadPayload(timeoutCtx, v1Event.TenantID.String(), v1Event.ExternalID)
if err != nil {
return nil, "", err
}
event = &dbsqlc.Event{
ID: v1Event.ExternalID,
TenantId: v1Event.TenantID,
Data: v1Event.Payload,
Data: payload,
CreatedAt: pgtype.Timestamp(v1Event.SeenAt),
AdditionalMetadata: v1Event.AdditionalMetadata,
Key: v1Event.Key,

View File

@@ -1976,9 +1976,14 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
defer rollback()
// todo: remove this when we remove dual writes
eventsToInsert := events
eventExternalIdToPayload := make(map[pgtype.UUID][]byte)
for i, payload := range eventsToInsert.Payloads {
eventExternalIdToPayload[eventsToInsert.Externalids[i]] = payload
}
// todo: remove this when we remove dual writes
if !r.payloadStore.OLAPDualWritesEnabled() {
payloads := make([][]byte, len(eventsToInsert.Payloads))
@@ -2066,6 +2071,7 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
ID: id,
InsertedAt: insertedAt,
}] = event.ExternalID
payload := eventExternalIdToPayload[event.ExternalID]
offloadToExternalOpts = append(offloadToExternalOpts, OffloadToExternalStoreOpts{
StorePayloadOpts: &StorePayloadOpts{
@@ -2073,7 +2079,7 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
InsertedAt: event.SeenAt,
ExternalId: event.ExternalID,
Type: sqlcv1.V1PayloadTypeTASKINPUT,
Payload: event.Payload,
Payload: payload,
TenantId: event.TenantID.String(),
},
OffloadAt: time.Now(),