diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql b/pkg/repository/prisma/dbsqlc/step_runs.sql index c2faf414a..1801e473f 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql @@ -501,8 +501,7 @@ WITH RECURSIVE currStepRun AS ( SELECT "id", "status", "cancelledReason" FROM "StepRun" WHERE - "id" = @stepRunId::uuid AND - "tenantId" = @tenantId::uuid + "id" = @stepRunId::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -537,8 +536,7 @@ SET "status" = CASE FROM childStepRuns csr WHERE - sr."id" = csr."id" AND - sr."tenantId" = @tenantId::uuid + sr."id" = csr."id" RETURNING sr.*; -- name: UpdateStepRunOverridesData :one @@ -1240,8 +1238,7 @@ WITH RECURSIVE currStepRun AS ( SELECT * FROM "StepRun" WHERE - "id" = @stepRunId::uuid AND - "tenantId" = @tenantId::uuid + "id" = @stepRunId::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -1260,17 +1257,14 @@ SELECT FROM "StepRun" sr JOIN - childStepRuns csr ON sr."id" = csr."id" -WHERE - sr."tenantId" = @tenantId::uuid; + childStepRuns csr ON sr."id" = csr."id"; -- name: ReplayStepRunResetStepRuns :many WITH RECURSIVE currStepRun AS ( SELECT * FROM "StepRun" WHERE - "id" = @stepRunId::uuid AND - "tenantId" = @tenantId::uuid + "id" = @stepRunId::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -1303,11 +1297,8 @@ SET FROM childStepRuns csr WHERE - sr."tenantId" = @tenantId::uuid AND - ( - sr."id" = csr."id" OR - sr."id" = @stepRunId::uuid - ) + sr."id" = csr."id" OR + sr."id" = @stepRunId::uuid RETURNING sr.*; -- name: ResetStepRunsByIds :many @@ -1334,8 +1325,7 @@ WITH RECURSIVE currStepRun AS ( SELECT * FROM "StepRun" WHERE - "id" = @stepRunId::uuid AND - "tenantId" = @tenantId::uuid + "id" = @stepRunId::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -1358,7 +1348,6 @@ FROM JOIN childStepRuns csr ON sr."id" = csr."id" WHERE - sr."tenantId" = @tenantId::uuid AND sr."deletedAt" IS NULL AND sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED'); diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql.go b/pkg/repository/prisma/dbsqlc/step_runs.sql.go index 508f7ec85..164c7b534 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql.go @@ -761,8 +761,7 @@ WITH RECURSIVE currStepRun AS ( SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased", queue, priority FROM "StepRun" WHERE - "id" = $2::uuid AND - "tenantId" = $1::uuid + "id" = $1::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -782,17 +781,10 @@ FROM "StepRun" sr JOIN childStepRuns csr ON sr."id" = csr."id" -WHERE - sr."tenantId" = $1::uuid ` -type GetLaterStepRunsParams struct { - Tenantid pgtype.UUID `json:"tenantid"` - Steprunid pgtype.UUID `json:"steprunid"` -} - -func (q *Queries) GetLaterStepRuns(ctx context.Context, db DBTX, arg GetLaterStepRunsParams) ([]*StepRun, error) { - rows, err := db.Query(ctx, getLaterStepRuns, arg.Tenantid, arg.Steprunid) +func (q *Queries) GetLaterStepRuns(ctx context.Context, db DBTX, steprunid pgtype.UUID) ([]*StepRun, error) { + rows, err := db.Query(ctx, getLaterStepRuns, steprunid) if err != nil { return nil, err } @@ -1579,8 +1571,7 @@ WITH RECURSIVE currStepRun AS ( SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased", queue, priority FROM "StepRun" WHERE - "id" = $2::uuid AND - "tenantId" = $1::uuid + "id" = $1::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -1602,19 +1593,13 @@ FROM JOIN childStepRuns csr ON sr."id" = csr."id" WHERE - sr."tenantId" = $1::uuid AND sr."deletedAt" IS NULL AND sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED') ` -type ListNonFinalChildStepRunsParams struct { - Tenantid pgtype.UUID `json:"tenantid"` - Steprunid pgtype.UUID `json:"steprunid"` -} - // Select all child step runs that are not in a final state -func (q *Queries) ListNonFinalChildStepRuns(ctx context.Context, db DBTX, arg ListNonFinalChildStepRunsParams) ([]*StepRun, error) { - rows, err := db.Query(ctx, listNonFinalChildStepRuns, arg.Tenantid, arg.Steprunid) +func (q *Queries) ListNonFinalChildStepRuns(ctx context.Context, db DBTX, steprunid pgtype.UUID) ([]*StepRun, error) { + rows, err := db.Query(ctx, listNonFinalChildStepRuns, steprunid) if err != nil { return nil, err } @@ -2415,8 +2400,7 @@ WITH RECURSIVE currStepRun AS ( SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased", queue, priority FROM "StepRun" WHERE - "id" = $1::uuid AND - "tenantId" = $3::uuid + "id" = $1::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -2449,22 +2433,18 @@ SET FROM childStepRuns csr WHERE - sr."tenantId" = $3::uuid AND - ( - sr."id" = csr."id" OR - sr."id" = $1::uuid - ) + sr."id" = csr."id" OR + sr."id" = $1::uuid RETURNING sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased", sr.queue, sr.priority ` type ReplayStepRunResetStepRunsParams struct { Steprunid pgtype.UUID `json:"steprunid"` Input []byte `json:"input"` - Tenantid pgtype.UUID `json:"tenantid"` } func (q *Queries) ReplayStepRunResetStepRuns(ctx context.Context, db DBTX, arg ReplayStepRunResetStepRunsParams) ([]*StepRun, error) { - rows, err := db.Query(ctx, replayStepRunResetStepRuns, arg.Steprunid, arg.Input, arg.Tenantid) + rows, err := db.Query(ctx, replayStepRunResetStepRuns, arg.Steprunid, arg.Input) if err != nil { return nil, err } @@ -2684,8 +2664,7 @@ WITH RECURSIVE currStepRun AS ( SELECT "id", "status", "cancelledReason" FROM "StepRun" WHERE - "id" = $3::uuid AND - "tenantId" = $2::uuid + "id" = $2::uuid ), childStepRuns AS ( SELECT sr."id", sr."status" FROM "StepRun" sr @@ -2720,19 +2699,17 @@ SET "status" = CASE FROM childStepRuns csr WHERE - sr."id" = csr."id" AND - sr."tenantId" = $2::uuid + sr."id" = csr."id" RETURNING sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased", sr.queue, sr.priority ` type ResolveLaterStepRunsParams struct { Status StepRunStatus `json:"status"` - Tenantid pgtype.UUID `json:"tenantid"` Steprunid pgtype.UUID `json:"steprunid"` } func (q *Queries) ResolveLaterStepRuns(ctx context.Context, db DBTX, arg ResolveLaterStepRunsParams) ([]*StepRun, error) { - rows, err := db.Query(ctx, resolveLaterStepRuns, arg.Status, arg.Tenantid, arg.Steprunid) + rows, err := db.Query(ctx, resolveLaterStepRuns, arg.Status, arg.Steprunid) if err != nil { return nil, err } diff --git a/pkg/repository/prisma/dbsqlc/workflow_runs.sql b/pkg/repository/prisma/dbsqlc/workflow_runs.sql index 339a589ea..6f6fa34c1 100644 --- a/pkg/repository/prisma/dbsqlc/workflow_runs.sql +++ b/pkg/repository/prisma/dbsqlc/workflow_runs.sql @@ -1084,42 +1084,32 @@ WHERE ); -- name: GetChildWorkflowRunsByIndex :many -WITH input_data AS ( - SELECT - UNNEST(@parentIds::uuid[]) AS parentId, - UNNEST(@parentStepRunIds::uuid[]) AS parentStepRunId, - UNNEST(@childIndexes::int[]) AS childIndex -) SELECT wr.* FROM "WorkflowRun" wr -JOIN - input_data i ON - wr."parentId" = i.parentId AND - wr."parentStepRunId" = i.parentStepRunId AND - wr."childIndex" = i.childIndex WHERE - wr."deletedAt" IS NULL; + (wr."parentId", wr."parentStepRunId", wr."childIndex") IN ( + SELECT + UNNEST(@parentIds::uuid[]), + UNNEST(@parentStepRunIds::uuid[]), + UNNEST(@childIndexes::int[]) + ) + AND wr."deletedAt" IS NULL; -- name: GetChildWorkflowRunsByKey :many -WITH input_data AS ( - SELECT - UNNEST(@parentIds::uuid[]) AS parentId, - UNNEST(@parentStepRunIds::uuid[]) AS parentStepRunId, - UNNEST(@childKeys::text[]) AS childKey -) SELECT wr.* FROM "WorkflowRun" wr -JOIN - input_data i ON - wr."parentId" = i.parentId AND - wr."parentStepRunId" = i.parentStepRunId AND - wr."childKey" = i.childKey WHERE - wr."deletedAt" IS NULL; + (wr."parentId", wr."parentStepRunId", wr."childKey") IN ( + SELECT + UNNEST(@parentIds::uuid[]), + UNNEST(@parentStepRunIds::uuid[]), + UNNEST(@childKeys::text[]) + ) + AND wr."deletedAt" IS NULL; -- name: GetScheduledChildWorkflowRun :one SELECT diff --git a/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go b/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go index ff9d53e70..bd453b7da 100644 --- a/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go @@ -1092,23 +1092,18 @@ func (q *Queries) GetChildWorkflowRun(ctx context.Context, db DBTX, arg GetChild } const getChildWorkflowRunsByIndex = `-- name: GetChildWorkflowRunsByIndex :many -WITH input_data AS ( - SELECT - UNNEST($1::uuid[]) AS parentId, - UNNEST($2::uuid[]) AS parentStepRunId, - UNNEST($3::int[]) AS childIndex -) SELECT wr."createdAt", wr."updatedAt", wr."deletedAt", wr."tenantId", wr."workflowVersionId", wr.status, wr.error, wr."startedAt", wr."finishedAt", wr."concurrencyGroupId", wr."displayName", wr.id, wr."childIndex", wr."childKey", wr."parentId", wr."parentStepRunId", wr."additionalMetadata", wr.duration, wr.priority, wr."insertOrder" FROM "WorkflowRun" wr -JOIN - input_data i ON - wr."parentId" = i.parentId AND - wr."parentStepRunId" = i.parentStepRunId AND - wr."childIndex" = i.childIndex WHERE - wr."deletedAt" IS NULL + (wr."parentId", wr."parentStepRunId", wr."childIndex") IN ( + SELECT + UNNEST($1::uuid[]), + UNNEST($2::uuid[]), + UNNEST($3::int[]) + ) + AND wr."deletedAt" IS NULL ` type GetChildWorkflowRunsByIndexParams struct { @@ -1159,23 +1154,18 @@ func (q *Queries) GetChildWorkflowRunsByIndex(ctx context.Context, db DBTX, arg } const getChildWorkflowRunsByKey = `-- name: GetChildWorkflowRunsByKey :many -WITH input_data AS ( - SELECT - UNNEST($1::uuid[]) AS parentId, - UNNEST($2::uuid[]) AS parentStepRunId, - UNNEST($3::text[]) AS childKey -) SELECT wr."createdAt", wr."updatedAt", wr."deletedAt", wr."tenantId", wr."workflowVersionId", wr.status, wr.error, wr."startedAt", wr."finishedAt", wr."concurrencyGroupId", wr."displayName", wr.id, wr."childIndex", wr."childKey", wr."parentId", wr."parentStepRunId", wr."additionalMetadata", wr.duration, wr.priority, wr."insertOrder" FROM "WorkflowRun" wr -JOIN - input_data i ON - wr."parentId" = i.parentId AND - wr."parentStepRunId" = i.parentStepRunId AND - wr."childKey" = i.childKey WHERE - wr."deletedAt" IS NULL + (wr."parentId", wr."parentStepRunId", wr."childKey") IN ( + SELECT + UNNEST($1::uuid[]), + UNNEST($2::uuid[]), + UNNEST($3::text[]) + ) + AND wr."deletedAt" IS NULL ` type GetChildWorkflowRunsByKeyParams struct { diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index 51b9b6fd9..a615ae63a 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -2487,10 +2487,7 @@ func (s *stepRunEngineRepository) StepRunCancelled(ctx context.Context, tenantId return fmt.Errorf("could not buffer step run succeeded: %w", err) } - laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, dbsqlc.GetLaterStepRunsParams{ - Tenantid: sqlchelpers.UUIDFromStr(tenantId), - Steprunid: sqlchelpers.UUIDFromStr(stepRunId), - }) + laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId)) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("could not get later step runs: %w", err) @@ -2549,10 +2546,7 @@ func (s *stepRunEngineRepository) StepRunFailed(ctx context.Context, tenantId, w return fmt.Errorf("could not buffer step run succeeded: %w", err) } - laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, dbsqlc.GetLaterStepRunsParams{ - Tenantid: sqlchelpers.UUIDFromStr(tenantId), - Steprunid: sqlchelpers.UUIDFromStr(stepRunId), - }) + laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId)) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("could not get later step runs: %w", err) @@ -2636,10 +2630,7 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s return nil, err } - laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, tx, dbsqlc.GetLaterStepRunsParams{ - Tenantid: sqlchelpers.UUIDFromStr(tenantId), - Steprunid: sqlchelpers.UUIDFromStr(stepRunId), - }) + laterStepRuns, err := s.queries.GetLaterStepRuns(ctx, tx, sqlchelpers.UUIDFromStr(stepRunId)) if err != nil { return nil, err @@ -2686,7 +2677,6 @@ func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, s // reset all later step runs to a pending state _, err = s.queries.ReplayStepRunResetStepRuns(ctx, tx, dbsqlc.ReplayStepRunResetStepRunsParams{ - Tenantid: sqlchelpers.UUIDFromStr(tenantId), Steprunid: sqlchelpers.UUIDFromStr(stepRunId), Input: input, }) @@ -2723,10 +2713,7 @@ func (s *stepRunEngineRepository) PreflightCheckReplayStepRun(ctx context.Contex } // verify that child step runs are in a final state - childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, dbsqlc.ListNonFinalChildStepRunsParams{ - Steprunid: sqlchelpers.UUIDFromStr(stepRunId), - Tenantid: sqlchelpers.UUIDFromStr(tenantId), - }) + childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, sqlchelpers.UUIDFromStr(stepRunId)) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("could not list non-final child step runs: %w", err) diff --git a/sql/constraints.sql b/sql/constraints.sql index a6d39e8a4..38d55fcc0 100644 --- a/sql/constraints.sql +++ b/sql/constraints.sql @@ -32,3 +32,8 @@ ON "WorkflowTriggers" ("workflowVersionId"); -- Additional indexes on WorkflowTriggerEventRef CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id ON "WorkflowTriggerEventRef" ("eventKey", "parentId"); + +-- Additional indexes on WorkflowRun +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentId_parentStepRunId_childIndex_key" +ON "WorkflowRun"("parentId", "parentStepRunId", "childIndex") +WHERE "deletedAt" IS NULL; diff --git a/sql/migrations/20241023223039_v0.50.4.sql b/sql/migrations/20241023223039_v0.50.4.sql new file mode 100644 index 000000000..c8e6f2d4b --- /dev/null +++ b/sql/migrations/20241023223039_v0.50.4.sql @@ -0,0 +1,6 @@ +-- atlas:txmode none + +-- Create index "WorkflowRun_parentId_parentStepRunId_childIndex_key" to table: "WorkflowRun" +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentId_parentStepRunId_childIndex_key" +ON "WorkflowRun" ("parentId", "parentStepRunId", "childIndex") +WHERE ("deletedAt" IS NULL); diff --git a/sql/migrations/atlas.sum b/sql/migrations/atlas.sum index 972c2761b..5290e4e9c 100644 --- a/sql/migrations/atlas.sum +++ b/sql/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:lNjyxVX0eSBxZnBeGyc4VtrjBez24XAQ9K779KQzVNE= +h1:CIM7NAv26L+YhyBC6tFMerwS5zoQM4Nxj2gERfvn1i0= 20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k= 20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo= 20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs= @@ -70,3 +70,4 @@ h1:lNjyxVX0eSBxZnBeGyc4VtrjBez24XAQ9K779KQzVNE= 20241018142125_v0.50.1.sql h1:j0fNH72m40gU3v5e8XolQIALBm4Te0/I8uvezcZ5EbU= 20241022124210_v0.50.2.sql h1:UerCQiYQTz9dC4EvuOiPH2afsL/ORa8fZt71K9uGDyU= 20241023112235_v0.50.3.sql h1:aW7fMNTT9o3gg8TJhDM3juB/tWho3j9M6VYsbMqSNcw= +20241023223039_v0.50.4.sql h1:eXJLlkM6ZzqzZ4RbAZtoTiW7WjJPIJ5L6UkIKy6w9Uk= diff --git a/sql/schema/schema.sql b/sql/schema/schema.sql index dbdf9be87..d9e31b46d 100644 --- a/sql/schema/schema.sql +++ b/sql/schema/schema.sql @@ -1666,3 +1666,8 @@ ON "WorkflowTriggers" ("workflowVersionId"); -- Additional indexes on WorkflowTriggerEventRef CREATE INDEX idx_workflow_trigger_event_ref_event_key_parent_id ON "WorkflowTriggerEventRef" ("eventKey", "parentId"); + +-- Additional indexes on WorkflowRun +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_parentId_parentStepRunId_childIndex_key" +ON "WorkflowRun"("parentId", "parentStepRunId", "childIndex") +WHERE "deletedAt" IS NULL;