diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index ddd5f123f..953a474ab 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -525,21 +525,7 @@ func (ec *JobsControllerImpl) runStepRunRequeueTenant(ctx context.Context, tenan isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now) if isTimedOut { - var updateInfo *repository.StepRunUpdateInfo - - stepRunCp, updateInfo, err = ec.repo.StepRun().UpdateStepRun(ctx, tenantId, stepRunId, &repository.UpdateStepRunOpts{ - CancelledAt: &now, - CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"), - Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled), - }) - - if err != nil { - return fmt.Errorf("could not update step run %s: %w", stepRunId, err) - } - - defer ec.handleStepRunUpdateInfo(stepRunCp, updateInfo) - - return nil + return ec.cancelStepRun(ctx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT") } return ec.scheduleStepRun(ctx, tenantId, stepRunCp) @@ -619,21 +605,7 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now) if isTimedOut { - var updateInfo *repository.StepRunUpdateInfo - - stepRunCp, updateInfo, err = ec.repo.StepRun().UpdateStepRun(ctx, tenantId, stepRunId, &repository.UpdateStepRunOpts{ - CancelledAt: &now, - CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"), - Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled), - }) - - if err != nil { - return fmt.Errorf("could not update step run %s: %w", stepRunId, err) - } - - defer ec.handleStepRunUpdateInfo(stepRunCp, updateInfo) - - return nil + return ec.cancelStepRun(ctx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT") } return ec.scheduleStepRun(ctx, tenantId, stepRunCp) diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index 801bae6ba..f65b28dd7 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -376,7 +376,7 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten // cancel current step run now := time.Now().UTC() - _, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{ + groupKeyRun, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{ CancelledAt: &now, CancelledReason: repository.StringPtr(reason), Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled), @@ -386,7 +386,14 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten return fmt.Errorf("could not update step run: %w", err) } - // FIXME: eventually send the cancellation to the worker + // cancel all existing jobs on the workflow run + workflowRunId := sqlchelpers.UUIDToStr(groupKeyRun.WorkflowRunId) - return nil + workflowRun, err := wc.repo.WorkflowRun().GetWorkflowRunById(ctx, tenantId, workflowRunId) + + if err != nil { + return fmt.Errorf("could not get workflow run: %w", err) + } + + return wc.cancelWorkflowRunJobs(ctx, workflowRun) } diff --git a/internal/services/controllers/workflows/queue.go b/internal/services/controllers/workflows/queue.go index 8c9eadfd9..aa98dca83 100644 --- a/internal/services/controllers/workflows/queue.go +++ b/internal/services/controllers/workflows/queue.go @@ -255,7 +255,7 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction( } func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, workflowRun *dbsqlc.GetWorkflowRunRow) error { - ctx, span := telemetry.NewSpan(ctx, "process-event") // nolint:ineffassign + ctx, span := telemetry.NewSpan(ctx, "queue-workflow-run-jobs") // nolint:ineffassign defer span.End() tenantId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.TenantId) @@ -291,6 +291,43 @@ func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, wor return returnErr } +func (wc *WorkflowsControllerImpl) cancelWorkflowRunJobs(ctx context.Context, workflowRun *dbsqlc.GetWorkflowRunRow) error { + ctx, span := telemetry.NewSpan(ctx, "cancel-workflow-run-jobs") // nolint:ineffassign + defer span.End() + + tenantId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.TenantId) + workflowRunId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.ID) + + jobRuns, err := wc.repo.JobRun().ListJobRunsForWorkflowRun(ctx, tenantId, workflowRunId) + + if err != nil { + return fmt.Errorf("could not list job runs: %w", err) + } + + var returnErr error + + for i := range jobRuns { + // don't cancel job runs that are onFailure + if workflowRun.WorkflowVersion.OnFailureJobId.Valid && jobRuns[i].JobId == workflowRun.WorkflowVersion.OnFailureJobId { + continue + } + + jobRunId := sqlchelpers.UUIDToStr(jobRuns[i].ID) + + err := wc.mq.AddMessage( + context.Background(), + msgqueue.JOB_PROCESSING_QUEUE, + tasktypes.JobRunCancelledToTask(tenantId, jobRunId), + ) + + if err != nil { + returnErr = multierror.Append(err, fmt.Errorf("could not add job run to task queue: %w", err)) + } + } + + return returnErr +} + func (wc *WorkflowsControllerImpl) runGetGroupKeyRunRequeue(ctx context.Context) func() { return func() { wc.l.Debug().Msgf("workflows controller: checking get group key run requeue") @@ -359,17 +396,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunRequeueTenant(ctx context.Co isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now) if isTimedOut { - _, err := ec.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{ - CancelledAt: &now, - CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"), - Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled), - }) - - if err != nil { - return fmt.Errorf("could not update get group key run %s: %w", getGroupKeyRunId, err) - } - - return nil + return ec.cancelGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, "SCHEDULING_TIMED_OUT") } requeueAfter := time.Now().UTC().Add(time.Second * 4)