From f71f17f5f78fb3a044e6775c19bd7a74da7ca695 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Wed, 1 May 2024 14:05:39 -0400 Subject: [PATCH] fix: semaphores increasing on manual replays (#441) * fix: semaphores increasing on manual replays * chore: remove metrics queries --- api/v1/server/handlers/step-runs/rerun.go | 9 ++++ .../repository/prisma/dbsqlc/step_runs.sql | 10 ++++ .../repository/prisma/dbsqlc/step_runs.sql.go | 50 +++++++++++++++++++ internal/repository/prisma/step_run.go | 13 +++++ internal/repository/step_run.go | 2 + 5 files changed, 84 insertions(+) diff --git a/api/v1/server/handlers/step-runs/rerun.go b/api/v1/server/handlers/step-runs/rerun.go index 40cbd8d87..9e3013eba 100644 --- a/api/v1/server/handlers/step-runs/rerun.go +++ b/api/v1/server/handlers/step-runs/rerun.go @@ -76,6 +76,15 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu return nil, fmt.Errorf("could not get step run for engine: %w", err) } + // Unlink the step run from its existing worker. This is necessary because automatic retries increment the + // worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore. + // FIXME: this is very far decoupled from the actual worker logic, and should be refactored. + err = t.config.EngineRepository.StepRun().UnlinkStepRunFromWorker(ctx.Request().Context(), tenant.ID, stepRun.ID) + + if err != nil { + return nil, fmt.Errorf("could not unlink step run from worker: %w", err) + } + // send a task to the taskqueue err = t.config.MessageQueue.AddMessage( ctx.Request().Context(), diff --git a/internal/repository/prisma/dbsqlc/step_runs.sql b/internal/repository/prisma/dbsqlc/step_runs.sql index fe6f860df..5da0a0861 100644 --- a/internal/repository/prisma/dbsqlc/step_runs.sql +++ b/internal/repository/prisma/dbsqlc/step_runs.sql @@ -163,6 +163,16 @@ WHERE "tenantId" = @tenantId::uuid RETURNING "StepRun".*; +-- name: UnlinkStepRunFromWorker :one +UPDATE + "StepRun" +SET + "workerId" = NULL +WHERE + "id" = @stepRunId::uuid AND + "tenantId" = @tenantId::uuid +RETURNING *; + -- name: ResolveLaterStepRuns :many WITH currStepRun AS ( SELECT * diff --git a/internal/repository/prisma/dbsqlc/step_runs.sql.go b/internal/repository/prisma/dbsqlc/step_runs.sql.go index 1dc3148ce..5f7414c84 100644 --- a/internal/repository/prisma/dbsqlc/step_runs.sql.go +++ b/internal/repository/prisma/dbsqlc/step_runs.sql.go @@ -867,6 +867,56 @@ func (q *Queries) ResolveLaterStepRuns(ctx context.Context, db DBTX, arg Resolve return items, nil } +const unlinkStepRunFromWorker = `-- name: UnlinkStepRunFromWorker :one +UPDATE + "StepRun" +SET + "workerId" = NULL +WHERE + "id" = $1::uuid AND + "tenantId" = $2::uuid +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount" +` + +type UnlinkStepRunFromWorkerParams struct { + Steprunid pgtype.UUID `json:"steprunid"` + Tenantid pgtype.UUID `json:"tenantid"` +} + +func (q *Queries) UnlinkStepRunFromWorker(ctx context.Context, db DBTX, arg UnlinkStepRunFromWorkerParams) (*StepRun, error) { + row := db.QueryRow(ctx, unlinkStepRunFromWorker, arg.Steprunid, arg.Tenantid) + var i StepRun + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.JobRunId, + &i.StepId, + &i.Order, + &i.WorkerId, + &i.TickerId, + &i.Status, + &i.Input, + &i.Output, + &i.RequeueAfter, + &i.ScheduleTimeoutAt, + &i.Error, + &i.StartedAt, + &i.FinishedAt, + &i.TimeoutAt, + &i.CancelledAt, + &i.CancelledReason, + &i.CancelledError, + &i.InputSchema, + &i.CallerFiles, + &i.GitRepoBranch, + &i.RetryCount, + ) + return &i, err +} + const updateStepRateLimits = `-- name: UpdateStepRateLimits :many WITH step_rate_limits AS ( SELECT diff --git a/internal/repository/prisma/step_run.go b/internal/repository/prisma/step_run.go index 6fd1d8268..fc25d70b5 100644 --- a/internal/repository/prisma/step_run.go +++ b/internal/repository/prisma/step_run.go @@ -541,6 +541,19 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s return stepRun, updateInfo, nil } +func (s *stepRunEngineRepository) UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error { + _, err := s.queries.UnlinkStepRunFromWorker(ctx, s.pool, dbsqlc.UnlinkStepRunFromWorkerParams{ + Steprunid: sqlchelpers.UUIDFromStr(stepRunId), + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + }) + + if err != nil { + return fmt.Errorf("could not unlink step run from worker: %w", err) + } + + return nil +} + func (s *stepRunEngineRepository) UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *repository.UpdateStepRunOverridesDataOpts) ([]byte, error) { if err := s.v.Validate(opts); err != nil { return nil, err diff --git a/internal/repository/step_run.go b/internal/repository/step_run.go index f3a23f1a8..7f1c1fcf9 100644 --- a/internal/repository/step_run.go +++ b/internal/repository/step_run.go @@ -82,6 +82,8 @@ type StepRunEngineRepository interface { UpdateStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, *StepRunUpdateInfo, error) + UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error + // UpdateStepRunOverridesData updates the overrides data field in the input for a step run. This returns the input // bytes. UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOverridesDataOpts) ([]byte, error)