diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 422eea76a..46d595339 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -972,6 +972,30 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m return fmt.Errorf("could not update step run: %w", err) } + stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run: %w", err) + } + + nextStepRuns, err := ec.repo.StepRun().ListStartableStepRuns(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.JobRunId), &payload.StepRunId) + + if err != nil { + ec.l.Error().Err(err).Msg("could not list startable step runs") + } else { + for _, nextStepRun := range nextStepRuns { + err := ec.mq.AddMessage( + context.Background(), + msgqueue.JOB_PROCESSING_QUEUE, + tasktypes.StepRunQueuedToTask(nextStepRun), + ) + + if err != nil { + ec.l.Error().Err(err).Msg("could not queue next step run") + } + } + } + // recheck the tenant queue ec.checkTenantQueue(ctx, metadata.TenantId) diff --git a/internal/services/controllers/jobs/queue.go b/internal/services/controllers/jobs/queue.go index 4b4a5c8ac..1edd22056 100644 --- a/internal/services/controllers/jobs/queue.go +++ b/internal/services/controllers/jobs/queue.go @@ -417,6 +417,7 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo if err != nil { q.l.Error().Err(err).Msg("could not list startable step runs") + continue } for _, nextStepRun := range nextStepRuns { diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql b/pkg/repository/prisma/dbsqlc/step_runs.sql index 6edc47989..c1c7dedd8 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql @@ -838,8 +838,8 @@ WHERE NOT EXISTS ( -- name: BulkCreateStepRunEvent :exec WITH input_values AS ( SELECT - CURRENT_TIMESTAMP AS "timeFirstSeen", - CURRENT_TIMESTAMP AS "timeLastSeen", + unnest(@timeSeen::timestamp[]) AS "timeFirstSeen", + unnest(@timeSeen::timestamp[]) AS "timeLastSeen", unnest(@stepRunIds::uuid[]) AS "stepRunId", unnest(cast(@reasons::text[] as"StepRunEventReason"[])) AS "reason", unnest(cast(@severities::text[] as "StepRunEventSeverity"[])) AS "severity", @@ -850,7 +850,7 @@ WITH input_values AS ( updated AS ( UPDATE "StepRunEvent" SET - "timeLastSeen" = CURRENT_TIMESTAMP, + "timeLastSeen" = input_values."timeLastSeen", "message" = input_values."message", "count" = "StepRunEvent"."count" + 1, "data" = input_values."data" diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql.go b/pkg/repository/prisma/dbsqlc/step_runs.sql.go index f6d6159a2..bd10d7c4b 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql.go @@ -155,19 +155,19 @@ func (q *Queries) BulkCancelStepRun(ctx context.Context, db DBTX, arg BulkCancel const bulkCreateStepRunEvent = `-- name: BulkCreateStepRunEvent :exec WITH input_values AS ( SELECT - CURRENT_TIMESTAMP AS "timeFirstSeen", - CURRENT_TIMESTAMP AS "timeLastSeen", - unnest($1::uuid[]) AS "stepRunId", - unnest(cast($2::text[] as"StepRunEventReason"[])) AS "reason", - unnest(cast($3::text[] as "StepRunEventSeverity"[])) AS "severity", - unnest($4::text[]) AS "message", + unnest($1::timestamp[]) AS "timeFirstSeen", + unnest($1::timestamp[]) AS "timeLastSeen", + unnest($2::uuid[]) AS "stepRunId", + unnest(cast($3::text[] as"StepRunEventReason"[])) AS "reason", + unnest(cast($4::text[] as "StepRunEventSeverity"[])) AS "severity", + unnest($5::text[]) AS "message", 1 AS "count", - unnest($5::jsonb[]) AS "data" + unnest($6::jsonb[]) AS "data" ), updated AS ( UPDATE "StepRunEvent" SET - "timeLastSeen" = CURRENT_TIMESTAMP, + "timeLastSeen" = input_values."timeLastSeen", "message" = input_values."message", "count" = "StepRunEvent"."count" + 1, "data" = input_values."data" @@ -211,15 +211,17 @@ WHERE NOT EXISTS ( ` type BulkCreateStepRunEventParams struct { - Steprunids []pgtype.UUID `json:"steprunids"` - Reasons []string `json:"reasons"` - Severities []string `json:"severities"` - Messages []string `json:"messages"` - Data [][]byte `json:"data"` + Timeseen []pgtype.Timestamp `json:"timeseen"` + Steprunids []pgtype.UUID `json:"steprunids"` + Reasons []string `json:"reasons"` + Severities []string `json:"severities"` + Messages []string `json:"messages"` + Data [][]byte `json:"data"` } func (q *Queries) BulkCreateStepRunEvent(ctx context.Context, db DBTX, arg BulkCreateStepRunEventParams) error { _, err := db.Exec(ctx, bulkCreateStepRunEvent, + arg.Timeseen, arg.Steprunids, arg.Reasons, arg.Severities, diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index b4722466a..db5b2f1c9 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -398,6 +398,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te } messages := make([]string, len(stepRunIds)) + timeSeen := make([]pgtype.Timestamp, len(stepRunIds)) reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds)) severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds)) data := make([]map[string]interface{}, len(stepRunIds)) @@ -407,6 +408,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te messages[i] = "Worker has become inactive" reasons[i] = dbsqlc.StepRunEventReasonREASSIGNED severities[i] = dbsqlc.StepRunEventSeverityCRITICAL + timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC()) data[i] = map[string]interface{}{"worker_id": workerId} } @@ -416,6 +418,7 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te s.pool, s.queries, stepRunIds, + timeSeen, reasons, severities, messages, @@ -634,6 +637,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned( workerIdToStepRunIds := make(map[string][]string) messages := make([]string, len(stepRunIds)) + timeSeen := make([]pgtype.Timestamp, len(stepRunIds)) reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds)) severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds)) data := make([]map[string]interface{}, len(stepRunIds)) @@ -647,6 +651,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned( workerIdToStepRunIds[workerId] = append(workerIdToStepRunIds[workerId], sqlchelpers.UUIDToStr(stepRunIds[i])) messages[i] = fmt.Sprintf("Assigned to worker %s", workerId) + timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC()) reasons[i] = dbsqlc.StepRunEventReasonASSIGNED severities[i] = dbsqlc.StepRunEventSeverityINFO data[i] = map[string]interface{}{"worker_id": workerId} @@ -676,6 +681,7 @@ func (s *stepRunEngineRepository) bulkStepRunsAssigned( s.pool, s.queries, stepRunIds, + timeSeen, reasons, severities, messages, @@ -690,6 +696,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned( defer cancel() messages := make([]string, len(stepRunIds)) + timeSeen := make([]pgtype.Timestamp, len(stepRunIds)) reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds)) severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds)) data := make([]map[string]interface{}, len(stepRunIds)) @@ -698,6 +705,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned( messages[i] = "No worker available" reasons[i] = dbsqlc.StepRunEventReasonREQUEUEDNOWORKER severities[i] = dbsqlc.StepRunEventSeverityWARNING + timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC()) // TODO: semaphore extra data data[i] = map[string]interface{}{} } @@ -708,6 +716,7 @@ func (s *stepRunEngineRepository) bulkStepRunsUnassigned( s.pool, s.queries, stepRunIds, + timeSeen, reasons, severities, messages, @@ -722,6 +731,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited( defer cancel() messages := make([]string, len(stepRunIds)) + timeSeen := make([]pgtype.Timestamp, len(stepRunIds)) reasons := make([]dbsqlc.StepRunEventReason, len(stepRunIds)) severities := make([]dbsqlc.StepRunEventSeverity, len(stepRunIds)) data := make([]map[string]interface{}, len(stepRunIds)) @@ -730,6 +740,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited( messages[i] = "Rate limit exceeded" reasons[i] = dbsqlc.StepRunEventReasonREQUEUEDRATELIMIT severities[i] = dbsqlc.StepRunEventSeverityWARNING + timeSeen[i] = sqlchelpers.TimestampFromTime(time.Now().UTC()) // TODO: semaphore extra data data[i] = map[string]interface{}{} } @@ -740,6 +751,7 @@ func (s *stepRunEngineRepository) bulkStepRunsRateLimited( s.pool, s.queries, stepRunIds, + timeSeen, reasons, severities, messages, @@ -753,6 +765,7 @@ func deferredBulkStepRunEvents( dbtx dbsqlc.DBTX, queries *dbsqlc.Queries, stepRunIds []pgtype.UUID, + timeSeen []pgtype.Timestamp, reasons []dbsqlc.StepRunEventReason, severities []dbsqlc.StepRunEventSeverity, messages []string, @@ -787,6 +800,7 @@ func deferredBulkStepRunEvents( Severities: inputSeverities, Messages: messages, Data: inputData, + Timeseen: timeSeen, }) if err != nil { @@ -1374,6 +1388,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp finishParams := dbsqlc.BulkFinishStepRunParams{} stepRunIds := make([]pgtype.UUID, 0, len(data)) + eventTimeSeen := make([]pgtype.Timestamp, 0, len(data)) eventReasons := make([]dbsqlc.StepRunEventReason, 0, len(data)) eventStepRunIds := make([]pgtype.UUID, 0, len(data)) eventSeverities := make([]dbsqlc.StepRunEventSeverity, 0, len(data)) @@ -1413,6 +1428,12 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp eventData = append(eventData, map[string]interface{}{}) } + if item.Event.Timestamp != nil { + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.Event.Timestamp)) + } else { + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(time.Now().UTC())) + } + continue } @@ -1427,6 +1448,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp startParams.Steprunids = append(startParams.Steprunids, stepRunId) startParams.Startedats = append(startParams.Startedats, sqlchelpers.TimestampFromTime(*item.StartedAt)) eventStepRunIds = append(eventStepRunIds, stepRunId) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.StartedAt)) eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonSTARTED) eventSeverities = append(eventSeverities, dbsqlc.StepRunEventSeverityINFO) eventMessages = append(eventMessages, fmt.Sprintf("Step run started at %s", item.StartedAt.Format(time.RFC1123))) @@ -1434,6 +1456,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp case dbsqlc.StepRunStatusFAILED: failParams.Steprunids = append(failParams.Steprunids, stepRunId) failParams.Finishedats = append(failParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) failParams.Errors = append(failParams.Errors, *item.Error) eventStepRunIds = append(eventStepRunIds, stepRunId) @@ -1454,6 +1477,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp case dbsqlc.StepRunStatusCANCELLED: cancelParams.Steprunids = append(cancelParams.Steprunids, stepRunId) cancelParams.Cancelledats = append(cancelParams.Cancelledats, sqlchelpers.TimestampFromTime(*item.CancelledAt)) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.CancelledAt)) cancelParams.Cancelledreasons = append(cancelParams.Cancelledreasons, *item.CancelledReason) eventStepRunIds = append(eventStepRunIds, stepRunId) eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonCANCELLED) @@ -1463,6 +1487,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp case dbsqlc.StepRunStatusSUCCEEDED: finishParams.Steprunids = append(finishParams.Steprunids, stepRunId) finishParams.Finishedats = append(finishParams.Finishedats, sqlchelpers.TimestampFromTime(*item.FinishedAt)) + eventTimeSeen = append(eventTimeSeen, sqlchelpers.TimestampFromTime(*item.FinishedAt)) finishParams.Outputs = append(finishParams.Outputs, item.Output) eventStepRunIds = append(eventStepRunIds, stepRunId) eventReasons = append(eventReasons, dbsqlc.StepRunEventReasonFINISHED) @@ -1562,7 +1587,7 @@ func (s *stepRunEngineRepository) ProcessStepRunUpdates(ctx context.Context, qlp startRunEvents := time.Now() // NOTE: actually not deferred - deferredBulkStepRunEvents(ctx, s.l, tx, s.queries, eventStepRunIds, eventReasons, eventSeverities, eventMessages, eventData) + deferredBulkStepRunEvents(ctx, s.l, tx, s.queries, eventStepRunIds, eventTimeSeen, eventReasons, eventSeverities, eventMessages, eventData) durationRunEvents := time.Since(startRunEvents) diff --git a/pkg/repository/prisma/workflow_run.go b/pkg/repository/prisma/workflow_run.go index 77f300bbe..4522c7f8d 100644 --- a/pkg/repository/prisma/workflow_run.go +++ b/pkg/repository/prisma/workflow_run.go @@ -400,7 +400,7 @@ func (s *workflowRunEngineRepository) ReplayWorkflowRun(ctx context.Context, ten // reset concurrency key _, err = s.queries.ReplayWorkflowRunResetGetGroupKeyRun(ctx, tx, pgWorkflowRunId) - if err != nil { + if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("error resetting get group key run: %w", err) } diff --git a/pkg/repository/step_run.go b/pkg/repository/step_run.go index 4e5ba3f92..45c4fb56d 100644 --- a/pkg/repository/step_run.go +++ b/pkg/repository/step_run.go @@ -44,6 +44,8 @@ type CreateStepRunEventOpts struct { EventSeverity *dbsqlc.StepRunEventSeverity + Timestamp *time.Time + EventData map[string]interface{} }