mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-08 01:39:46 -06:00
fix: get group key run cancellations should cancel job runs, clean up cancellation logic (#469)
This commit is contained in:
@@ -525,21 +525,7 @@ func (ec *JobsControllerImpl) runStepRunRequeueTenant(ctx context.Context, tenan
|
||||
isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now)
|
||||
|
||||
if isTimedOut {
|
||||
var updateInfo *repository.StepRunUpdateInfo
|
||||
|
||||
stepRunCp, updateInfo, err = ec.repo.StepRun().UpdateStepRun(ctx, tenantId, stepRunId, &repository.UpdateStepRunOpts{
|
||||
CancelledAt: &now,
|
||||
CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"),
|
||||
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update step run %s: %w", stepRunId, err)
|
||||
}
|
||||
|
||||
defer ec.handleStepRunUpdateInfo(stepRunCp, updateInfo)
|
||||
|
||||
return nil
|
||||
return ec.cancelStepRun(ctx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT")
|
||||
}
|
||||
|
||||
return ec.scheduleStepRun(ctx, tenantId, stepRunCp)
|
||||
@@ -619,21 +605,7 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
|
||||
isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now)
|
||||
|
||||
if isTimedOut {
|
||||
var updateInfo *repository.StepRunUpdateInfo
|
||||
|
||||
stepRunCp, updateInfo, err = ec.repo.StepRun().UpdateStepRun(ctx, tenantId, stepRunId, &repository.UpdateStepRunOpts{
|
||||
CancelledAt: &now,
|
||||
CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"),
|
||||
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update step run %s: %w", stepRunId, err)
|
||||
}
|
||||
|
||||
defer ec.handleStepRunUpdateInfo(stepRunCp, updateInfo)
|
||||
|
||||
return nil
|
||||
return ec.cancelStepRun(ctx, tenantId, stepRunId, "SCHEDULING_TIMED_OUT")
|
||||
}
|
||||
|
||||
return ec.scheduleStepRun(ctx, tenantId, stepRunCp)
|
||||
|
||||
@@ -376,7 +376,7 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten
|
||||
// cancel current step run
|
||||
now := time.Now().UTC()
|
||||
|
||||
_, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{
|
||||
groupKeyRun, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{
|
||||
CancelledAt: &now,
|
||||
CancelledReason: repository.StringPtr(reason),
|
||||
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
|
||||
@@ -386,7 +386,14 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten
|
||||
return fmt.Errorf("could not update step run: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: eventually send the cancellation to the worker
|
||||
// cancel all existing jobs on the workflow run
|
||||
workflowRunId := sqlchelpers.UUIDToStr(groupKeyRun.WorkflowRunId)
|
||||
|
||||
return nil
|
||||
workflowRun, err := wc.repo.WorkflowRun().GetWorkflowRunById(ctx, tenantId, workflowRunId)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get workflow run: %w", err)
|
||||
}
|
||||
|
||||
return wc.cancelWorkflowRunJobs(ctx, workflowRun)
|
||||
}
|
||||
|
||||
@@ -255,7 +255,7 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction(
|
||||
}
|
||||
|
||||
func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, workflowRun *dbsqlc.GetWorkflowRunRow) error {
|
||||
ctx, span := telemetry.NewSpan(ctx, "process-event") // nolint:ineffassign
|
||||
ctx, span := telemetry.NewSpan(ctx, "queue-workflow-run-jobs") // nolint:ineffassign
|
||||
defer span.End()
|
||||
|
||||
tenantId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.TenantId)
|
||||
@@ -291,6 +291,43 @@ func (wc *WorkflowsControllerImpl) queueWorkflowRunJobs(ctx context.Context, wor
|
||||
return returnErr
|
||||
}
|
||||
|
||||
func (wc *WorkflowsControllerImpl) cancelWorkflowRunJobs(ctx context.Context, workflowRun *dbsqlc.GetWorkflowRunRow) error {
|
||||
ctx, span := telemetry.NewSpan(ctx, "cancel-workflow-run-jobs") // nolint:ineffassign
|
||||
defer span.End()
|
||||
|
||||
tenantId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.TenantId)
|
||||
workflowRunId := sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.ID)
|
||||
|
||||
jobRuns, err := wc.repo.JobRun().ListJobRunsForWorkflowRun(ctx, tenantId, workflowRunId)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not list job runs: %w", err)
|
||||
}
|
||||
|
||||
var returnErr error
|
||||
|
||||
for i := range jobRuns {
|
||||
// don't cancel job runs that are onFailure
|
||||
if workflowRun.WorkflowVersion.OnFailureJobId.Valid && jobRuns[i].JobId == workflowRun.WorkflowVersion.OnFailureJobId {
|
||||
continue
|
||||
}
|
||||
|
||||
jobRunId := sqlchelpers.UUIDToStr(jobRuns[i].ID)
|
||||
|
||||
err := wc.mq.AddMessage(
|
||||
context.Background(),
|
||||
msgqueue.JOB_PROCESSING_QUEUE,
|
||||
tasktypes.JobRunCancelledToTask(tenantId, jobRunId),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
returnErr = multierror.Append(err, fmt.Errorf("could not add job run to task queue: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
return returnErr
|
||||
}
|
||||
|
||||
func (wc *WorkflowsControllerImpl) runGetGroupKeyRunRequeue(ctx context.Context) func() {
|
||||
return func() {
|
||||
wc.l.Debug().Msgf("workflows controller: checking get group key run requeue")
|
||||
@@ -359,17 +396,7 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunRequeueTenant(ctx context.Co
|
||||
isTimedOut := !scheduleTimeoutAt.IsZero() && scheduleTimeoutAt.Before(now)
|
||||
|
||||
if isTimedOut {
|
||||
_, err := ec.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{
|
||||
CancelledAt: &now,
|
||||
CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"),
|
||||
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update get group key run %s: %w", getGroupKeyRunId, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return ec.cancelGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, "SCHEDULING_TIMED_OUT")
|
||||
}
|
||||
|
||||
requeueAfter := time.Now().UTC().Add(time.Second * 4)
|
||||
|
||||
Reference in New Issue
Block a user