mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-15 21:29:46 -06:00
fix: semaphores increasing on manual replays (#441)
* fix: semaphores increasing on manual replays * chore: remove metrics queries
This commit is contained in:
@@ -76,6 +76,15 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
|
||||
return nil, fmt.Errorf("could not get step run for engine: %w", err)
|
||||
}
|
||||
|
||||
// Unlink the step run from its existing worker. This is necessary because automatic retries increment the
|
||||
// worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore.
|
||||
// FIXME: this is very far decoupled from the actual worker logic, and should be refactored.
|
||||
err = t.config.EngineRepository.StepRun().UnlinkStepRunFromWorker(ctx.Request().Context(), tenant.ID, stepRun.ID)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unlink step run from worker: %w", err)
|
||||
}
|
||||
|
||||
// send a task to the taskqueue
|
||||
err = t.config.MessageQueue.AddMessage(
|
||||
ctx.Request().Context(),
|
||||
|
||||
@@ -163,6 +163,16 @@ WHERE
|
||||
"tenantId" = @tenantId::uuid
|
||||
RETURNING "StepRun".*;
|
||||
|
||||
-- name: UnlinkStepRunFromWorker :one
|
||||
UPDATE
|
||||
"StepRun"
|
||||
SET
|
||||
"workerId" = NULL
|
||||
WHERE
|
||||
"id" = @stepRunId::uuid AND
|
||||
"tenantId" = @tenantId::uuid
|
||||
RETURNING *;
|
||||
|
||||
-- name: ResolveLaterStepRuns :many
|
||||
WITH currStepRun AS (
|
||||
SELECT *
|
||||
|
||||
@@ -867,6 +867,56 @@ func (q *Queries) ResolveLaterStepRuns(ctx context.Context, db DBTX, arg Resolve
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const unlinkStepRunFromWorker = `-- name: UnlinkStepRunFromWorker :one
|
||||
UPDATE
|
||||
"StepRun"
|
||||
SET
|
||||
"workerId" = NULL
|
||||
WHERE
|
||||
"id" = $1::uuid AND
|
||||
"tenantId" = $2::uuid
|
||||
RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount"
|
||||
`
|
||||
|
||||
type UnlinkStepRunFromWorkerParams struct {
|
||||
Steprunid pgtype.UUID `json:"steprunid"`
|
||||
Tenantid pgtype.UUID `json:"tenantid"`
|
||||
}
|
||||
|
||||
func (q *Queries) UnlinkStepRunFromWorker(ctx context.Context, db DBTX, arg UnlinkStepRunFromWorkerParams) (*StepRun, error) {
|
||||
row := db.QueryRow(ctx, unlinkStepRunFromWorker, arg.Steprunid, arg.Tenantid)
|
||||
var i StepRun
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.DeletedAt,
|
||||
&i.TenantId,
|
||||
&i.JobRunId,
|
||||
&i.StepId,
|
||||
&i.Order,
|
||||
&i.WorkerId,
|
||||
&i.TickerId,
|
||||
&i.Status,
|
||||
&i.Input,
|
||||
&i.Output,
|
||||
&i.RequeueAfter,
|
||||
&i.ScheduleTimeoutAt,
|
||||
&i.Error,
|
||||
&i.StartedAt,
|
||||
&i.FinishedAt,
|
||||
&i.TimeoutAt,
|
||||
&i.CancelledAt,
|
||||
&i.CancelledReason,
|
||||
&i.CancelledError,
|
||||
&i.InputSchema,
|
||||
&i.CallerFiles,
|
||||
&i.GitRepoBranch,
|
||||
&i.RetryCount,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const updateStepRateLimits = `-- name: UpdateStepRateLimits :many
|
||||
WITH step_rate_limits AS (
|
||||
SELECT
|
||||
|
||||
@@ -541,6 +541,19 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s
|
||||
return stepRun, updateInfo, nil
|
||||
}
|
||||
|
||||
func (s *stepRunEngineRepository) UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error {
|
||||
_, err := s.queries.UnlinkStepRunFromWorker(ctx, s.pool, dbsqlc.UnlinkStepRunFromWorkerParams{
|
||||
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not unlink step run from worker: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stepRunEngineRepository) UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *repository.UpdateStepRunOverridesDataOpts) ([]byte, error) {
|
||||
if err := s.v.Validate(opts); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -82,6 +82,8 @@ type StepRunEngineRepository interface {
|
||||
|
||||
UpdateStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, *StepRunUpdateInfo, error)
|
||||
|
||||
UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error
|
||||
|
||||
// UpdateStepRunOverridesData updates the overrides data field in the input for a step run. This returns the input
|
||||
// bytes.
|
||||
UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOverridesDataOpts) ([]byte, error)
|
||||
|
||||
Reference in New Issue
Block a user