mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-25 18:18:45 -06:00
fix: replay without group keys and status updates (#883)
This commit is contained in:
@@ -972,6 +972,30 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
|
||||
return fmt.Errorf("could not update step run: %w", err)
|
||||
}
|
||||
|
||||
stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get step run: %w", err)
|
||||
}
|
||||
|
||||
nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.JobRunId), &payload.StepRunId)
|
||||
|
||||
if err != nil {
|
||||
ec.l.Error().Err(err).Msg("could not list startable step runs")
|
||||
} else {
|
||||
for _, nextStepRun := range nextStepRuns {
|
||||
err := ec.mq.AddMessage(
|
||||
context.Background(),
|
||||
msgqueue.JOB_PROCESSING_QUEUE,
|
||||
tasktypes.StepRunQueuedToTask(nextStepRun),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
ec.l.Error().Err(err).Msg("could not queue next step run")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recheck the tenant queue
|
||||
ec.checkTenantQueue(ctx, metadata.TenantId)
|
||||
|
||||
|
||||
@@ -417,6 +417,7 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo
|
||||
|
||||
if err != nil {
|
||||
q.l.Error().Err(err).Msg("could not list startable step runs")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, nextStepRun := range nextStepRuns {
|
||||
|
||||
@@ -838,8 +838,8 @@ WHERE NOT EXISTS (
|
||||
-- name: BulkCreateStepRunEvent :exec
|
||||
WITH input_values AS (
|
||||
SELECT
|
||||
CURRENT_TIMESTAMP AS "timeFirstSeen",
|
||||
CURRENT_TIMESTAMP AS "timeLastSeen",
|
||||
unnest(@timeSeen::timestamp[]) AS "timeFirstSeen",
|
||||
unnest(@timeSeen::timestamp[]) AS "timeLastSeen",
|
||||
unnest(@stepRunIds::uuid[]) AS "stepRunId",
|
||||
unnest(cast(@reasons::text[] as"StepRunEventReason"[])) AS "reason",
|
||||
unnest(cast(@severities::text[] as "StepRunEventSeverity"[])) AS "severity",
|
||||
@@ -850,7 +850,7 @@ WITH input_values AS (
|
||||
updated AS (
|
||||
UPDATE "StepRunEvent"
|
||||
SET
|
||||
"timeLastSeen" = CURRENT_TIMESTAMP,
|
||||
"timeLastSeen" = input_values."timeLastSeen",
|
||||
"message" = input_values."message",
|
||||
"count" = "StepRunEvent"."count" + 1,
|
||||
"data" = input_values."data"
|
||||
|
||||
@@ -155,19 +155,19 @@ func (q *Queries) BulkCancelStepRun(ctx context.Context, db DBTX, arg BulkCancel
|
||||
const bulkCreateStepRunEvent = `-- name: BulkCreateStepRunEvent :exec
|
||||
WITH input_values AS (
|
||||
SELECT
|
||||
CURRENT_TIMESTAMP AS "timeFirstSeen",
|
||||
CURRENT_TIMESTAMP AS "timeLastSeen",
|
||||
unnest($1::uuid[]) AS "stepRunId",
|
||||
unnest(cast($2::text[] as"StepRunEventReason"[])) AS "reason",
|
||||
unnest(cast($3::text[] as "StepRunEventSeverity"[])) AS "severity",
|
||||
unnest($4::text[]) AS "message",
|
||||
unnest($1::timestamp[]) AS "timeFirstSeen",
|
||||
unnest($1::timestamp[]) AS "timeLastSeen",
|
||||
unnest($2::uuid[]) AS "stepRunId",
|
||||
unnest(cast($3::text[] as"StepRunEventReason"[])) AS "reason",
|
||||
unnest(cast($4::text[] as "StepRunEventSeverity"[])) AS "severity",
|
||||
unnest($5::text[]) AS "message",
|
||||
1 AS "count",
|
||||
unnest($5::jsonb[]) AS "data"
|
||||
unnest($6::jsonb[]) AS "data"
|
||||
),
|
||||
updated AS (
|
||||
UPDATE "StepRunEvent"
|
||||
SET
|
||||
"timeLastSeen" = CURRENT_TIMESTAMP,
|
||||
"timeLastSeen" = input_values."timeLastSeen",
|
||||
"message" = input_values."message",
|
||||
"count" = "StepRunEvent"."count" + 1,
|
||||
"data" = input_values."data"
|
||||
@@ -211,15 +211,17 @@ WHERE NOT EXISTS (
|
||||
`
|
||||
|
||||
type BulkCreateStepRunEventParams struct {
|
||||
Steprunids []pgtype.UUID `json:"steprunids"`
|
||||
Reasons []string `json:"reasons"`
|
||||
Severities []string `json:"severities"`
|
||||
Messages []string `json:"messages"`
|
||||
Data [][]byte `json:"data"`
|
||||
Timeseen []pgtype.Timestamp `json:"timeseen"`
|
||||
Steprunids []pgtype.UUID `json:"steprunids"`
|
||||
Reasons []string `json:"reasons"`
|
||||
Severities []string `json:"severities"`
|
||||
Messages []string `json:"messages"`
|
||||
Data [][]byte `json:"data"`
|
||||
}
|
||||
|
||||
func (q *Queries) BulkCreateStepRunEvent(ctx context.Context, db DBTX, arg BulkCreateStepRunEventParams) error {
|
||||
_, err := db.Exec(ctx, bulkCreateStepRunEvent,
|
||||
arg.Timeseen,
|
||||
arg.Steprunids,
|
||||
arg.Reasons,
|
||||
arg.Severities,
|
||||
|
||||
@@ -398,6 +398,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te
|
||||
}
|
||||
|
||||
messages := make([]string, len(stepRunIds))
|
||||
timeSeen := make([]pgtype.Timestamp, len(stepRunIds))
|
||||
reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds))
|
||||
severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds))
|
||||
data := make([]map[string]interface{}, len(stepRunIds))
|
||||
@@ -407,6 +408,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te
|
||||
messages[i] = "Worker has become inactive"
|
||||
reasons[i] = dbsqlc.StepRunEventReasonREASSIGNED
|
||||
severities[i] = dbsqlc.StepRunEventSeverityCRITICAL
|
||||
timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC())
|
||||
data[i] = map[string]interface{}{"worker_id": workerId}
|
||||
}
|
||||
|
||||
@@ -416,6 +418,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te
|
||||
s.pool,
|
||||
s.queries,
|
||||
stepRunIds,
|
||||
timeSeen,
|
||||
reasons,
|
||||
severities,
|
||||
messages,
|
||||
@@ -634,6 +637,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned(
|
||||
|
||||
workerIdToStepRunIds := make(map[string][]string)
|
||||
messages := make([]string, len(stepRunIds))
|
||||
timeSeen := make([]pgtype.Timestamp, len(stepRunIds))
|
||||
reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds))
|
||||
severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds))
|
||||
data := make([]map[string]interface{}, len(stepRunIds))
|
||||
@@ -647,6 +651,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned(
|
||||
|
||||
workerIdToStepRunIds[workerId] = append(workerIdToStepRunIds[workerId], sqlchelpers.UUIDToStr(stepRunIds[i]))
|
||||
messages[i] = fmt.Sprintf("Assigned to worker %s", workerId)
|
||||
timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC())
|
||||
reasons[i] = dbsqlc.StepRunEventReasonASSIGNED
|
||||
severities[i] = dbsqlc.StepRunEventSeverityINFO
|
||||
data[i] = map[string]interface{}{"worker_id": workerId}
|
||||
@@ -676,6 +681,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned(
|
||||
s.pool,
|
||||
s.queries,
|
||||
stepRunIds,
|
||||
timeSeen,
|
||||
reasons,
|
||||
severities,
|
||||
messages,
|
||||
@@ -690,6 +696,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned(
|
||||
defer cancel()
|
||||
|
||||
messages := make([]string, len(stepRunIds))
|
||||
timeSeen := make([]pgtype.Timestamp, len(stepRunIds))
|
||||
reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds))
|
||||
severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds))
|
||||
data := make([]map[string]interface{}, len(stepRunIds))
|
||||
@@ -698,6 +705,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned(
|
||||
messages[i] = "No worker available"
|
||||
reasons[i] = dbsqlc.StepRunEventReasonREQUEUEDNOWORKER
|
||||
severities[i] = dbsqlc.StepRunEventSeverityWARNING
|
||||
timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC())
|
||||
// TODO: semaphore extra data
|
||||
data[i] = map[string]interface{}{}
|
||||
}
|
||||
@@ -708,6 +716,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned(
|
||||
s.pool,
|
||||
s.queries,
|
||||
stepRunIds,
|
||||
timeSeen,
|
||||
reasons,
|
||||
severities,
|
||||
messages,
|
||||
@@ -722,6 +731,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited(
|
||||
defer cancel()
|
||||
|
||||
messages := make([]string, len(stepRunIds))
|
||||
timeSeen := make([]pgtype.Timestamp, len(stepRunIds))
|
||||
reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds))
|
||||
severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds))
|
||||
data := make([]map[string]interface{}, len(stepRunIds))
|
||||
@@ -730,6 +740,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited(
|
||||
messages[i] = "Rate limit exceeded"
|
||||
reasons[i] = dbsqlc.StepRunEventReasonREQUEUEDRATELIMIT
|
||||
severities[i] = dbsqlc.StepRunEventSeverityWARNING
|
||||
timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC())
|
||||
// TODO: semaphore extra data
|
||||
data[i] = map[string]interface{}{}
|
||||
}
|
||||
@@ -740,6 +751,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited(
|
||||
s.pool,
|
||||
s.queries,
|
||||
stepRunIds,
|
||||
timeSeen,
|
||||
reasons,
|
||||
severities,
|
||||
messages,
|
||||
@@ -753,6 +765,7 @@ func deferredBulkStepRunEvents(
|
||||
dbtx dbsqlc.DBTX,
|
||||
queries *dbsqlc.Queries,
|
||||
stepRunIds []pgtype.UUID,
|
||||
timeSeen []pgtype.Timestamp,
|
||||
reasons []dbsqlc.StepRunEventReason,
|
||||
severities []dbsqlc.StepRunEventSeverity,
|
||||
messages []string,
|
||||
@@ -787,6 +800,7 @@ func deferredBulkStepRunEvents(
|
||||
Severities: inputSeverities,
|
||||
Messages: messages,
|
||||
Data: inputData,
|
||||
Timeseen: timeSeen,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -1374,6 +1388,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
finishParams := dbsqlc.BulkFinishStepRunParams{}
|
||||
|
||||
stepRunIds := make([]pgtype.UUID, 0, len(data))
|
||||
eventTimeSeen := make([]pgtype.Timestamp, 0, len(data))
|
||||
eventReasons := make([]dbsqlc.StepRunEventReason, 0, len(data))
|
||||
eventStepRunIds := make([]pgtype.UUID, 0, len(data))
|
||||
eventSeverities := make([]dbsqlc.StepRunEventSeverity, 0, len(data))
|
||||
@@ -1413,6 +1428,12 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
eventData = append(eventData, map[string]interface{}{})
|
||||
}
|
||||
|
||||
if item.Event.Timestamp != nil {
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.Event.Timestamp))
|
||||
} else {
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(time.Now().UTC()))
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1427,6 +1448,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
startParams.Steprunids = append(startParams.Steprunids, stepRunId)
|
||||
startParams.Startedats = append(startParams.Startedats, sqlchelpers.TimestampFromTime(*item.StartedAt))
|
||||
eventStepRunIds = append(eventStepRunIds, stepRunId)
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.StartedAt))
|
||||
eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonSTARTED)
|
||||
eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO)
|
||||
eventMessages = append(eventMessages, fmt.Sprintf("Step run started at %s", item.StartedAt.Format(time.RFC1123)))
|
||||
@@ -1434,6 +1456,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
case dbsqlc.StepRunStatusFAILED:
|
||||
failParams.Steprunids = append(failParams.Steprunids, stepRunId)
|
||||
failParams.Finishedats = append(failParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt))
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt))
|
||||
failParams.Errors = append(failParams.Errors, *item.Error)
|
||||
|
||||
eventStepRunIds = append(eventStepRunIds, stepRunId)
|
||||
@@ -1454,6 +1477,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
case dbsqlc.StepRunStatusCANCELLED:
|
||||
cancelParams.Steprunids = append(cancelParams.Steprunids, stepRunId)
|
||||
cancelParams.Cancelledats = append(cancelParams.Cancelledats, sqlchelpers.TimestampFromTime(*item.CancelledAt))
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.CancelledAt))
|
||||
cancelParams.Cancelledreasons = append(cancelParams.Cancelledreasons, *item.CancelledReason)
|
||||
eventStepRunIds = append(eventStepRunIds, stepRunId)
|
||||
eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonCANCELLED)
|
||||
@@ -1463,6 +1487,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
case dbsqlc.StepRunStatusSUCCEEDED:
|
||||
finishParams.Steprunids = append(finishParams.Steprunids, stepRunId)
|
||||
finishParams.Finishedats = append(finishParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt))
|
||||
eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt))
|
||||
finishParams.Outputs = append(finishParams.Outputs, item.Output)
|
||||
eventStepRunIds = append(eventStepRunIds, stepRunId)
|
||||
eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonFINISHED)
|
||||
@@ -1562,7 +1587,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp
|
||||
startRunEvents := time.Now()
|
||||
|
||||
// NOTE: actually not deferred
|
||||
deferredBulkStepRunEvents(ctx, s.l, tx, s.queries, eventStepRunIds, eventReasons, eventSeverities, eventMessages, eventData)
|
||||
deferredBulkStepRunEvents(ctx, s.l, tx, s.queries, eventStepRunIds, eventTimeSeen, eventReasons, eventSeverities, eventMessages, eventData)
|
||||
|
||||
durationRunEvents := time.Since(startRunEvents)
|
||||
|
||||
|
||||
@@ -400,7 +400,7 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten
|
||||
// reset concurrency key
|
||||
_, err = s.queries.ReplayWorkflowRunResetGetGroupKeyRun(ctx, tx, pgWorkflowRunId)
|
||||
|
||||
if err != nil {
|
||||
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return fmt.Errorf("error resetting get group key run: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,8 @@ type CreateStepRunEventOpts struct {
|
||||
|
||||
EventSeverity *dbsqlc.StepRunEventSeverity
|
||||
|
||||
Timestamp *time.Time
|
||||
|
||||
EventData map[string]interface{}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user