From 0911c11fcab2c1135c28029a3bf7b17bd155f1eb Mon Sep 17 00:00:00 2001 From: matt Date: Tue, 5 May 2026 16:02:34 -0400 Subject: [PATCH] Fix: Add back task event tmp dual write (#3828) * fix: add back tmp event write * fix: handle error --- pkg/repository/olap.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/repository/olap.go b/pkg/repository/olap.go index 144bd4389..7c33fba0e 100644 --- a/pkg/repository/olap.go +++ b/pkg/repository/olap.go @@ -1714,6 +1714,7 @@ func (r *OLAPRepositoryImpl) acquireAdvisoryLocksForWorkflowRuns(ctx context.Con func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId uuid.UUID, events []sqlcv1.CreateTaskEventsOLAPParams, workflowRunIds []uuid.UUID) (*StatusUpdateResult, error) { eventsToWrite := make([]sqlcv1.CreateTaskEventsOLAPParams, 0) eventsForStatusUpdate := make([]sqlcv1.CreateTaskEventsOLAPParams, 0, len(events)) + tmpEventsToWrite := make([]sqlcv1.CreateTaskEventsOLAPTmpParams, 0) payloadsToWrite := make([]StoreOLAPPayloadOpts, 0) for _, event := range events { @@ -1726,6 +1727,16 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId u } eventsToWrite = append(eventsToWrite, event) + + tmpEventsToWrite = append(tmpEventsToWrite, sqlcv1.CreateTaskEventsOLAPTmpParams{ + TenantID: event.TenantID, + TaskID: event.TaskID, + TaskInsertedAt: event.TaskInsertedAt, + EventType: event.EventType, + RetryCount: event.RetryCount, + ReadableStatus: event.ReadableStatus, + WorkerID: event.WorkerID, + }) } eventsForStatusUpdate = append(eventsForStatusUpdate, event) @@ -1762,6 +1773,13 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId u } } + if len(tmpEventsToWrite) > 0 { + _, err = r.queries.CreateTaskEventsOLAPTmp(ctx, tx, tmpEventsToWrite) + if err != nil { + return nil, err + } + } + statusUpdates := r.prepareStatusUpdateBatch(ctx, tenantId, eventsForStatusUpdate) result := &StatusUpdateResult{}