fix: LockSignalCreatedEvents

This commit is contained in:
gabriel ruttner
2025-04-02 11:18:59 -04:00
parent b6c912eb6b
commit cce5af242f
3 changed files with 54 additions and 34 deletions
+28 -14
View File
@@ -99,7 +99,7 @@ func (s *sharedRepository) PopulateExternalIdsForWorkflow(ctx context.Context, t
err := s.generateExternalIdsForChildWorkflows(ctx, tenantId, optsWithParents)
if err != nil {
return err
return fmt.Errorf("error generating external ids for child workflows: %w", err)
}
}
@@ -110,7 +110,7 @@ func (s *sharedRepository) generateExternalIdsForChildWorkflows(ctx context.Cont
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, s.pool, s.l, 5000)
if err != nil {
return err
return fmt.Errorf("error preparing tx: %w", err)
}
defer rollback()
@@ -130,7 +130,7 @@ func (s *sharedRepository) generateExternalIdsForChildWorkflows(ctx context.Cont
})
if err != nil {
return err
return fmt.Errorf("error looking up external ids: %w", err)
}
externalIdToLookupRow := make(map[string]*sqlcv1.V1LookupTable)
@@ -154,18 +154,32 @@ func (s *sharedRepository) generateExternalIdsForChildWorkflows(ctx context.Cont
eventTaskInsertedAts = append(eventTaskInsertedAts, lookupRow.InsertedAt)
eventKeys = append(eventKeys, getChildSignalEventKey(*opt.ParentExternalId, 0, *opt.ChildIndex, opt.ChildKey))
}
var allLockedEvents []*sqlcv1.LockSignalCreatedEventsRow
batchSize := 100000
lockedEvents, err := s.queries.LockSignalCreatedEvents(ctx, tx, sqlcv1.LockSignalCreatedEventsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Taskids: eventTaskIds,
Taskinsertedats: eventTaskInsertedAts,
Eventkeys: eventKeys,
})
// Process in batches of 100
for i := 0; i < len(eventTaskIds); i += batchSize {
end := i + batchSize
if end > len(eventTaskIds) {
end = len(eventTaskIds)
}
if err != nil {
return err
lockedEvents, err := s.queries.LockSignalCreatedEvents(ctx, tx, sqlcv1.LockSignalCreatedEventsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Taskids: eventTaskIds[i:end],
Taskinsertedats: eventTaskInsertedAts[i:end],
Eventkeys: eventKeys[i:end],
})
if err != nil {
return fmt.Errorf("error locking signal created events: %w", err)
}
allLockedEvents = append(allLockedEvents, lockedEvents...)
}
lockedEvents := allLockedEvents
// for each locked event, write the correct external id to the opt
for _, lockedEvent := range lockedEvents {
opt := spawnKeyToOpt[lockedEvent.EventKey.String]
@@ -173,7 +187,7 @@ func (s *sharedRepository) generateExternalIdsForChildWorkflows(ctx context.Cont
c, err := newChildWorkflowSignalCreatedDataFromBytes(lockedEvent.Data)
if err != nil {
return err
return fmt.Errorf("error parsing child workflow signal created data: %w", err)
}
opt.ExternalId = c.ChildExternalId
@@ -227,11 +241,11 @@ func (s *sharedRepository) generateExternalIdsForChildWorkflows(ctx context.Cont
)
if err != nil {
return err
return fmt.Errorf("error creating task events: %w", err)
}
if err := commit(ctx); err != nil {
return err
return fmt.Errorf("error committing tx: %w", err)
}
return nil
+13 -10
View File
@@ -469,17 +469,20 @@ SELECT
e.id,
e.event_key,
e.data
FROM
v1_task_event e
WHERE
(e.tenant_id, e.task_id, e.task_inserted_at, e.event_type, e.event_key) IN (
SELECT
@tenantId::uuid, task_id, task_inserted_at, 'SIGNAL_CREATED'::v1_task_event_type, event_key
FROM
input
)
FROM input i
JOIN LATERAL (
SELECT id, event_key, data
FROM ONLY v1_task_event
WHERE
tenant_id = @tenantId::uuid AND
task_id = i.task_id AND
task_inserted_at = i.task_inserted_at AND
event_type = 'SIGNAL_CREATED'::v1_task_event_type AND
event_key = i.event_key
LIMIT 1
) e ON true
ORDER BY
e.tenant_id, e.task_id, e.task_inserted_at, e.event_type, e.event_key
i.task_id, i.task_inserted_at, i.event_key
FOR UPDATE;
-- name: ListMatchingSignalEvents :many
+13 -10
View File
@@ -1361,17 +1361,20 @@ SELECT
e.id,
e.event_key,
e.data
FROM
v1_task_event e
WHERE
(e.tenant_id, e.task_id, e.task_inserted_at, e.event_type, e.event_key) IN (
SELECT
$1::uuid, task_id, task_inserted_at, 'SIGNAL_CREATED'::v1_task_event_type, event_key
FROM
input
)
FROM input i
JOIN LATERAL (
SELECT id, event_key, data
FROM ONLY v1_task_event
WHERE
tenant_id = $1::uuid AND
task_id = i.task_id AND
task_inserted_at = i.task_inserted_at AND
event_type = 'SIGNAL_CREATED'::v1_task_event_type AND
event_key = i.event_key
LIMIT 1
) e ON true
ORDER BY
e.tenant_id, e.task_id, e.task_inserted_at, e.event_type, e.event_key
i.task_id, i.task_inserted_at, i.event_key
FOR UPDATE
`