mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-20 16:52:08 -05:00
Attempt to fix deadlock in PollScheduledWorkflows by scoping FOR UPDATE lock (#3261)
* make sure skip locked semantics is meaningful * fix: order by cond * fix: refs * chore: decrease diff size * fix: run triggered by not exists instead of left join * feat: add index for polling * chore: schema * fix: make cte doing the filtering also do the locking * fix: one more cte fix * fix: rm dupe for update lock * fix: order by --------- Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
-- +goose Up
|
||||
-- +goose NO TRANSACTION
|
||||
-- +goose StatementBegin
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId" ON "WorkflowTriggerScheduledRef" ("triggerAt", "tickerId");
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
DROP INDEX CONCURRENTLY IF EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId";
|
||||
-- +goose StatementEnd
|
||||
@@ -182,11 +182,8 @@ WITH latest_workflow_versions AS (
|
||||
"Workflow" AS workflow ON workflow."id" = versions."workflowId"
|
||||
JOIN
|
||||
latest_workflow_versions AS latestVersions ON latestVersions."workflowId" = workflow."id"
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" AS runTriggeredBy ON runTriggeredBy."scheduledId" = scheduledWorkflow."id"
|
||||
WHERE
|
||||
"triggerAt" <= NOW() + INTERVAL '5 seconds'
|
||||
AND runTriggeredBy IS NULL
|
||||
AND versions."deletedAt" IS NULL
|
||||
AND workflow."deletedAt" IS NULL
|
||||
AND (
|
||||
@@ -196,23 +193,34 @@ WITH latest_workflow_versions AS (
|
||||
)
|
||||
OR "tickerId" = @tickerId::uuid
|
||||
)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM "WorkflowRunTriggeredBy" AS runTriggeredBy
|
||||
WHERE runTriggeredBy."scheduledId" = scheduledWorkflow."id"
|
||||
)
|
||||
ORDER BY scheduledWorkflow."triggerAt" ASC, scheduledWorkflow."id" ASC
|
||||
),
|
||||
active_scheduled_workflows AS (
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
not_run_scheduled_workflows
|
||||
"WorkflowTriggerScheduledRef"
|
||||
WHERE "id" IN (SELECT "id" FROM not_run_scheduled_workflows)
|
||||
ORDER BY "triggerAt" ASC, "id" ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
|
||||
UPDATE
|
||||
"WorkflowTriggerScheduledRef" as scheduledWorkflows
|
||||
SET
|
||||
"tickerId" = @tickerId::uuid
|
||||
FROM
|
||||
active_scheduled_workflows
|
||||
JOIN "WorkflowVersion" as versions ON versions."id" = active_scheduled_workflows."parentId"
|
||||
JOIN "Workflow" as workflow ON workflow."id" = versions."workflowId"
|
||||
WHERE
|
||||
scheduledWorkflows."id" = active_scheduled_workflows."id"
|
||||
RETURNING scheduledWorkflows.*, active_scheduled_workflows."workflowVersionId", active_scheduled_workflows."tenantId";
|
||||
RETURNING scheduledWorkflows.*, versions."id" AS "workflowVersionId", workflow."tenantId";
|
||||
|
||||
-- name: PollTenantAlerts :many
|
||||
-- Finds tenant alerts which haven't alerted since their frequency and assigns them to a ticker
|
||||
|
||||
@@ -430,11 +430,8 @@ WITH latest_workflow_versions AS (
|
||||
"Workflow" AS workflow ON workflow."id" = versions."workflowId"
|
||||
JOIN
|
||||
latest_workflow_versions AS latestVersions ON latestVersions."workflowId" = workflow."id"
|
||||
LEFT JOIN
|
||||
"WorkflowRunTriggeredBy" AS runTriggeredBy ON runTriggeredBy."scheduledId" = scheduledWorkflow."id"
|
||||
WHERE
|
||||
"triggerAt" <= NOW() + INTERVAL '5 seconds'
|
||||
AND runTriggeredBy IS NULL
|
||||
AND versions."deletedAt" IS NULL
|
||||
AND workflow."deletedAt" IS NULL
|
||||
AND (
|
||||
@@ -444,23 +441,34 @@ WITH latest_workflow_versions AS (
|
||||
)
|
||||
OR "tickerId" = $1::uuid
|
||||
)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM "WorkflowRunTriggeredBy" AS runTriggeredBy
|
||||
WHERE runTriggeredBy."scheduledId" = scheduledWorkflow."id"
|
||||
)
|
||||
ORDER BY scheduledWorkflow."triggerAt" ASC, scheduledWorkflow."id" ASC
|
||||
),
|
||||
active_scheduled_workflows AS (
|
||||
SELECT
|
||||
id, "workflowVersionId", "tenantId", "additionalMetadata"
|
||||
id, "parentId", "triggerAt", "tickerId", input, "childIndex", "childKey", "parentStepRunId", "parentWorkflowRunId", "additionalMetadata", "createdAt", "deletedAt", "updatedAt", method, priority
|
||||
FROM
|
||||
not_run_scheduled_workflows
|
||||
"WorkflowTriggerScheduledRef"
|
||||
WHERE "id" IN (SELECT "id" FROM not_run_scheduled_workflows)
|
||||
ORDER BY "triggerAt" ASC, "id" ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
|
||||
UPDATE
|
||||
"WorkflowTriggerScheduledRef" as scheduledWorkflows
|
||||
SET
|
||||
"tickerId" = $1::uuid
|
||||
FROM
|
||||
active_scheduled_workflows
|
||||
JOIN "WorkflowVersion" as versions ON versions."id" = active_scheduled_workflows."parentId"
|
||||
JOIN "Workflow" as workflow ON workflow."id" = versions."workflowId"
|
||||
WHERE
|
||||
scheduledWorkflows."id" = active_scheduled_workflows."id"
|
||||
RETURNING scheduledworkflows.id, scheduledworkflows."parentId", scheduledworkflows."triggerAt", scheduledworkflows."tickerId", scheduledworkflows.input, scheduledworkflows."childIndex", scheduledworkflows."childKey", scheduledworkflows."parentStepRunId", scheduledworkflows."parentWorkflowRunId", scheduledworkflows."additionalMetadata", scheduledworkflows."createdAt", scheduledworkflows."deletedAt", scheduledworkflows."updatedAt", scheduledworkflows.method, scheduledworkflows.priority, active_scheduled_workflows."workflowVersionId", active_scheduled_workflows."tenantId"
|
||||
RETURNING scheduledworkflows.id, scheduledworkflows."parentId", scheduledworkflows."triggerAt", scheduledworkflows."tickerId", scheduledworkflows.input, scheduledworkflows."childIndex", scheduledworkflows."childKey", scheduledworkflows."parentStepRunId", scheduledworkflows."parentWorkflowRunId", scheduledworkflows."additionalMetadata", scheduledworkflows."createdAt", scheduledworkflows."deletedAt", scheduledworkflows."updatedAt", scheduledworkflows.method, scheduledworkflows.priority, versions."id" AS "workflowVersionId", workflow."tenantId"
|
||||
`
|
||||
|
||||
type PollScheduledWorkflowsRow struct {
|
||||
|
||||
@@ -1537,6 +1537,9 @@ CREATE UNIQUE INDEX "WorkflowTriggerEventRef_parentId_eventKey_key" ON "Workflow
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "WorkflowTriggerScheduledRef_id_key" ON "WorkflowTriggerScheduledRef" ("id" ASC);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_WorkflowTriggerScheduledRef_triggerAt_tickerId" ON "WorkflowTriggerScheduledRef" ("triggerAt", "tickerId");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "WorkflowTriggerScheduledRef_parentId_parentStepRunId_childK_key" ON "WorkflowTriggerScheduledRef" (
|
||||
"parentId" ASC,
|
||||
|
||||
Reference in New Issue
Block a user