mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-06 18:09:49 -05:00
Fix: Add back task event tmp dual write (#3828)
* fix: add back tmp event write * fix: handle error
This commit is contained in:
@@ -1714,6 +1714,7 @@ func (r *OLAPRepositoryImpl) acquireAdvisoryLocksForWorkflowRuns(ctx context.Con
|
||||
func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId uuid.UUID, events []sqlcv1.CreateTaskEventsOLAPParams, workflowRunIds []uuid.UUID) (*StatusUpdateResult, error) {
|
||||
eventsToWrite := make([]sqlcv1.CreateTaskEventsOLAPParams, 0)
|
||||
eventsForStatusUpdate := make([]sqlcv1.CreateTaskEventsOLAPParams, 0, len(events))
|
||||
tmpEventsToWrite := make([]sqlcv1.CreateTaskEventsOLAPTmpParams, 0)
|
||||
payloadsToWrite := make([]StoreOLAPPayloadOpts, 0)
|
||||
|
||||
for _, event := range events {
|
||||
@@ -1726,6 +1727,16 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId u
|
||||
}
|
||||
|
||||
eventsToWrite = append(eventsToWrite, event)
|
||||
|
||||
tmpEventsToWrite = append(tmpEventsToWrite, sqlcv1.CreateTaskEventsOLAPTmpParams{
|
||||
TenantID: event.TenantID,
|
||||
TaskID: event.TaskID,
|
||||
TaskInsertedAt: event.TaskInsertedAt,
|
||||
EventType: event.EventType,
|
||||
RetryCount: event.RetryCount,
|
||||
ReadableStatus: event.ReadableStatus,
|
||||
WorkerID: event.WorkerID,
|
||||
})
|
||||
}
|
||||
|
||||
eventsForStatusUpdate = append(eventsForStatusUpdate, event)
|
||||
@@ -1762,6 +1773,13 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId u
|
||||
}
|
||||
}
|
||||
|
||||
if len(tmpEventsToWrite) > 0 {
|
||||
_, err = r.queries.CreateTaskEventsOLAPTmp(ctx, tx, tmpEventsToWrite)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
statusUpdates := r.prepareStatusUpdateBatch(ctx, tenantId, eventsForStatusUpdate)
|
||||
result := &StatusUpdateResult{}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user