mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-06 08:49:53 -06:00
2424 lines
72 KiB
Go
2424 lines
72 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.24.0
|
|
// source: workflow_runs.sql
|
|
|
|
package dbsqlc
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const bulkCreateWorkflowRunEvent = `-- name: BulkCreateWorkflowRunEvent :exec
|
|
WITH input_values AS (
|
|
SELECT
|
|
unnest($1::timestamp[]) AS "timeFirstSeen",
|
|
unnest($1::timestamp[]) AS "timeLastSeen",
|
|
unnest($2::uuid[]) AS "workflowRunId",
|
|
unnest(cast($3::text[] as"StepRunEventReason"[])) AS "reason",
|
|
unnest(cast($4::text[] as "StepRunEventSeverity"[])) AS "severity",
|
|
unnest($5::text[]) AS "message",
|
|
1 AS "count",
|
|
unnest($6::jsonb[]) AS "data"
|
|
),
|
|
updated AS (
|
|
UPDATE "StepRunEvent"
|
|
SET
|
|
"timeLastSeen" = input_values."timeLastSeen",
|
|
"message" = input_values."message",
|
|
"count" = "StepRunEvent"."count" + 1,
|
|
"data" = input_values."data"
|
|
FROM input_values
|
|
WHERE
|
|
"StepRunEvent"."workflowRunId" = input_values."workflowRunId"
|
|
AND "StepRunEvent"."reason" = input_values."reason"
|
|
AND "StepRunEvent"."severity" = input_values."severity"
|
|
AND "StepRunEvent"."id" = (
|
|
SELECT "id"
|
|
FROM "StepRunEvent"
|
|
WHERE "workflowRunId" = input_values."workflowRunId"
|
|
ORDER BY "id" DESC
|
|
LIMIT 1
|
|
)
|
|
RETURNING "StepRunEvent".id, "StepRunEvent"."timeFirstSeen", "StepRunEvent"."timeLastSeen", "StepRunEvent"."stepRunId", "StepRunEvent".reason, "StepRunEvent".severity, "StepRunEvent".message, "StepRunEvent".count, "StepRunEvent".data, "StepRunEvent"."workflowRunId"
|
|
)
|
|
INSERT INTO "StepRunEvent" (
|
|
"timeFirstSeen",
|
|
"timeLastSeen",
|
|
"workflowRunId",
|
|
"reason",
|
|
"severity",
|
|
"message",
|
|
"count",
|
|
"data"
|
|
)
|
|
SELECT
|
|
"timeFirstSeen",
|
|
"timeLastSeen",
|
|
"workflowRunId",
|
|
"reason",
|
|
"severity",
|
|
"message",
|
|
"count",
|
|
"data"
|
|
FROM input_values
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM updated WHERE "workflowRunId" = input_values."workflowRunId"
|
|
)
|
|
`
|
|
|
|
type BulkCreateWorkflowRunEventParams struct {
|
|
Timeseen []pgtype.Timestamp `json:"timeseen"`
|
|
Workflowrunids []pgtype.UUID `json:"workflowrunids"`
|
|
Reasons []string `json:"reasons"`
|
|
Severities []string `json:"severities"`
|
|
Messages []string `json:"messages"`
|
|
Data [][]byte `json:"data"`
|
|
}
|
|
|
|
func (q *Queries) BulkCreateWorkflowRunEvent(ctx context.Context, db DBTX, arg BulkCreateWorkflowRunEventParams) error {
|
|
_, err := db.Exec(ctx, bulkCreateWorkflowRunEvent,
|
|
arg.Timeseen,
|
|
arg.Workflowrunids,
|
|
arg.Reasons,
|
|
arg.Severities,
|
|
arg.Messages,
|
|
arg.Data,
|
|
)
|
|
return err
|
|
}
|
|
|
|
const countWorkflowRuns = `-- name: CountWorkflowRuns :one
|
|
WITH runs AS (
|
|
SELECT runs."id", runs."createdAt"
|
|
FROM
|
|
"WorkflowRun" as runs
|
|
LEFT JOIN
|
|
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
|
LEFT JOIN
|
|
"Event" as events ON
|
|
runTriggers."eventId" = events."id"
|
|
AND (
|
|
$2::uuid IS NULL OR
|
|
events."id" = $2::uuid
|
|
)
|
|
LEFT JOIN
|
|
"WorkflowVersion" as workflowVersion ON
|
|
runs."workflowVersionId" = workflowVersion."id"
|
|
AND (
|
|
$3::uuid IS NULL OR
|
|
workflowVersion."id" = $3::uuid
|
|
)
|
|
AND (
|
|
$4::text[] IS NULL OR
|
|
workflowVersion."kind" = ANY(cast($4::text[] as "WorkflowKind"[]))
|
|
)
|
|
LEFT JOIN
|
|
"Workflow" as workflow ON
|
|
workflowVersion."workflowId" = workflow."id"
|
|
AND (
|
|
$5::uuid IS NULL OR
|
|
workflow."id" = $5::uuid
|
|
)
|
|
WHERE
|
|
runs."tenantId" = $1 AND
|
|
runs."deletedAt" IS NULL AND
|
|
workflowVersion."deletedAt" IS NULL AND
|
|
workflow."deletedAt" IS NULL AND
|
|
(
|
|
$6::uuid[] IS NULL OR
|
|
runs."id" = ANY($6::uuid[])
|
|
) AND
|
|
(
|
|
$7::jsonb IS NULL OR
|
|
runs."additionalMetadata" @> $7::jsonb
|
|
) AND
|
|
(
|
|
$8::uuid IS NULL OR
|
|
runs."parentId" = $8::uuid
|
|
) AND
|
|
(
|
|
$9::uuid IS NULL OR
|
|
runs."parentStepRunId" = $9::uuid
|
|
) AND
|
|
(
|
|
$10::text IS NULL OR
|
|
runs."concurrencyGroupId" = $10::text
|
|
) AND
|
|
(
|
|
$11::text[] IS NULL OR
|
|
"status" = ANY(cast($11::text[] as "WorkflowRunStatus"[]))
|
|
) AND
|
|
(
|
|
$12::timestamp IS NULL OR
|
|
runs."createdAt" > $12::timestamp
|
|
) AND
|
|
(
|
|
$13::timestamp IS NULL OR
|
|
runs."createdAt" < $13::timestamp
|
|
) AND
|
|
(
|
|
$14::timestamp IS NULL OR
|
|
runs."finishedAt" > $14::timestamp
|
|
)
|
|
ORDER BY
|
|
case when $15 = 'createdAt ASC' THEN runs."createdAt" END ASC ,
|
|
case when $15 = 'createdAt DESC' then runs."createdAt" END DESC,
|
|
runs."id" ASC
|
|
LIMIT 10000
|
|
)
|
|
SELECT
|
|
count(runs) AS total
|
|
FROM
|
|
runs
|
|
`
|
|
|
|
type CountWorkflowRunsParams struct {
|
|
TenantId pgtype.UUID `json:"tenantId"`
|
|
EventId pgtype.UUID `json:"eventId"`
|
|
WorkflowVersionId pgtype.UUID `json:"workflowVersionId"`
|
|
Kinds []string `json:"kinds"`
|
|
WorkflowId pgtype.UUID `json:"workflowId"`
|
|
Ids []pgtype.UUID `json:"ids"`
|
|
AdditionalMetadata []byte `json:"additionalMetadata"`
|
|
ParentId pgtype.UUID `json:"parentId"`
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
GroupKey pgtype.Text `json:"groupKey"`
|
|
Statuses []string `json:"statuses"`
|
|
CreatedAfter pgtype.Timestamp `json:"createdAfter"`
|
|
CreatedBefore pgtype.Timestamp `json:"createdBefore"`
|
|
FinishedAfter pgtype.Timestamp `json:"finishedAfter"`
|
|
Orderby interface{} `json:"orderby"`
|
|
}
|
|
|
|
func (q *Queries) CountWorkflowRuns(ctx context.Context, db DBTX, arg CountWorkflowRunsParams) (int64, error) {
|
|
row := db.QueryRow(ctx, countWorkflowRuns,
|
|
arg.TenantId,
|
|
arg.EventId,
|
|
arg.WorkflowVersionId,
|
|
arg.Kinds,
|
|
arg.WorkflowId,
|
|
arg.Ids,
|
|
arg.AdditionalMetadata,
|
|
arg.ParentId,
|
|
arg.ParentStepRunId,
|
|
arg.GroupKey,
|
|
arg.Statuses,
|
|
arg.CreatedAfter,
|
|
arg.CreatedBefore,
|
|
arg.FinishedAfter,
|
|
arg.Orderby,
|
|
)
|
|
var total int64
|
|
err := row.Scan(&total)
|
|
return total, err
|
|
}
|
|
|
|
const createGetGroupKeyRun = `-- name: CreateGetGroupKeyRun :one
|
|
INSERT INTO "GetGroupKeyRun" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"deletedAt",
|
|
"tenantId",
|
|
"workflowRunId",
|
|
"workerId",
|
|
"tickerId",
|
|
"status",
|
|
"input",
|
|
"output",
|
|
"requeueAfter",
|
|
"scheduleTimeoutAt",
|
|
"error",
|
|
"startedAt",
|
|
"finishedAt",
|
|
"timeoutAt",
|
|
"cancelledAt",
|
|
"cancelledReason",
|
|
"cancelledError"
|
|
) VALUES (
|
|
COALESCE($1::uuid, gen_random_uuid()),
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
NULL,
|
|
$2::uuid,
|
|
$3::uuid,
|
|
NULL,
|
|
NULL,
|
|
'PENDING', -- default status
|
|
$4::jsonb,
|
|
NULL,
|
|
$5::timestamp,
|
|
$6::timestamp,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
NULL
|
|
) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "workerId", "tickerId", status, input, output, "requeueAfter", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "workflowRunId", "scheduleTimeoutAt"
|
|
`
|
|
|
|
type CreateGetGroupKeyRunParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Input []byte `json:"input"`
|
|
Requeueafter pgtype.Timestamp `json:"requeueafter"`
|
|
Scheduletimeoutat pgtype.Timestamp `json:"scheduletimeoutat"`
|
|
}
|
|
|
|
func (q *Queries) CreateGetGroupKeyRun(ctx context.Context, db DBTX, arg CreateGetGroupKeyRunParams) (*GetGroupKeyRun, error) {
|
|
row := db.QueryRow(ctx, createGetGroupKeyRun,
|
|
arg.ID,
|
|
arg.Tenantid,
|
|
arg.Workflowrunid,
|
|
arg.Input,
|
|
arg.Requeueafter,
|
|
arg.Scheduletimeoutat,
|
|
)
|
|
var i GetGroupKeyRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkerId,
|
|
&i.TickerId,
|
|
&i.Status,
|
|
&i.Input,
|
|
&i.Output,
|
|
&i.RequeueAfter,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.TimeoutAt,
|
|
&i.CancelledAt,
|
|
&i.CancelledReason,
|
|
&i.CancelledError,
|
|
&i.WorkflowRunId,
|
|
&i.ScheduleTimeoutAt,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const createJobRunLookupData = `-- name: CreateJobRunLookupData :one
|
|
INSERT INTO "JobRunLookupData" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"deletedAt",
|
|
"jobRunId",
|
|
"tenantId",
|
|
"data"
|
|
) VALUES (
|
|
COALESCE($1::uuid, gen_random_uuid()),
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
NULL,
|
|
$2::uuid,
|
|
$3::uuid,
|
|
jsonb_build_object(
|
|
'input', COALESCE($4::jsonb, '{}'::jsonb),
|
|
'triggered_by', $5::text,
|
|
'steps', '{}'::jsonb
|
|
)
|
|
) RETURNING id, "createdAt", "updatedAt", "deletedAt", "jobRunId", "tenantId", data
|
|
`
|
|
|
|
type CreateJobRunLookupDataParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Jobrunid pgtype.UUID `json:"jobrunid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Input []byte `json:"input"`
|
|
Triggeredby string `json:"triggeredby"`
|
|
}
|
|
|
|
func (q *Queries) CreateJobRunLookupData(ctx context.Context, db DBTX, arg CreateJobRunLookupDataParams) (*JobRunLookupData, error) {
|
|
row := db.QueryRow(ctx, createJobRunLookupData,
|
|
arg.ID,
|
|
arg.Jobrunid,
|
|
arg.Tenantid,
|
|
arg.Input,
|
|
arg.Triggeredby,
|
|
)
|
|
var i JobRunLookupData
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.JobRunId,
|
|
&i.TenantId,
|
|
&i.Data,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const createJobRuns = `-- name: CreateJobRuns :many
|
|
INSERT INTO "JobRun" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"tenantId",
|
|
"workflowRunId",
|
|
"jobId",
|
|
"status"
|
|
)
|
|
SELECT
|
|
gen_random_uuid(),
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
$1::uuid,
|
|
$2::uuid,
|
|
"id",
|
|
'PENDING' -- default status
|
|
FROM
|
|
"Job"
|
|
WHERE
|
|
"workflowVersionId" = $3::uuid
|
|
RETURNING "id"
|
|
`
|
|
|
|
type CreateJobRunsParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Workflowversionid pgtype.UUID `json:"workflowversionid"`
|
|
}
|
|
|
|
func (q *Queries) CreateJobRuns(ctx context.Context, db DBTX, arg CreateJobRunsParams) ([]pgtype.UUID, error) {
|
|
rows, err := db.Query(ctx, createJobRuns, arg.Tenantid, arg.Workflowrunid, arg.Workflowversionid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []pgtype.UUID
|
|
for rows.Next() {
|
|
var id pgtype.UUID
|
|
if err := rows.Scan(&id); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, id)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const createStepRun = `-- name: CreateStepRun :one
|
|
INSERT INTO "StepRun" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"tenantId",
|
|
"jobRunId",
|
|
"stepId",
|
|
"status",
|
|
"requeueAfter",
|
|
"queue",
|
|
"priority"
|
|
)
|
|
SELECT
|
|
gen_random_uuid(),
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
$1::uuid,
|
|
$2::uuid,
|
|
$3::uuid,
|
|
'PENDING', -- default status
|
|
CURRENT_TIMESTAMP + INTERVAL '5 seconds',
|
|
$4::text,
|
|
$5::int
|
|
RETURNING "id"
|
|
`
|
|
|
|
type CreateStepRunParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Jobrunid pgtype.UUID `json:"jobrunid"`
|
|
Stepid pgtype.UUID `json:"stepid"`
|
|
Queue pgtype.Text `json:"queue"`
|
|
Priority pgtype.Int4 `json:"priority"`
|
|
}
|
|
|
|
func (q *Queries) CreateStepRun(ctx context.Context, db DBTX, arg CreateStepRunParams) (pgtype.UUID, error) {
|
|
row := db.QueryRow(ctx, createStepRun,
|
|
arg.Tenantid,
|
|
arg.Jobrunid,
|
|
arg.Stepid,
|
|
arg.Queue,
|
|
arg.Priority,
|
|
)
|
|
var id pgtype.UUID
|
|
err := row.Scan(&id)
|
|
return id, err
|
|
}
|
|
|
|
const createWorkflowRun = `-- name: CreateWorkflowRun :one
|
|
INSERT INTO "WorkflowRun" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"deletedAt",
|
|
"displayName",
|
|
"tenantId",
|
|
"workflowVersionId",
|
|
"status",
|
|
"error",
|
|
"startedAt",
|
|
"finishedAt",
|
|
"childIndex",
|
|
"childKey",
|
|
"parentId",
|
|
"parentStepRunId",
|
|
"additionalMetadata",
|
|
"priority"
|
|
) VALUES (
|
|
COALESCE($1::uuid, gen_random_uuid()),
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
NULL, -- assuming deletedAt is not set on creation
|
|
$2::text,
|
|
$3::uuid,
|
|
$4::uuid,
|
|
'PENDING', -- default status
|
|
NULL, -- assuming error is not set on creation
|
|
NULL, -- assuming startedAt is not set on creation
|
|
NULL, -- assuming finishedAt is not set on creation
|
|
$5::int,
|
|
$6::text,
|
|
$7::uuid,
|
|
$8::uuid,
|
|
$9::jsonb,
|
|
$10::int
|
|
) RETURNING "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata", duration, priority
|
|
`
|
|
|
|
type CreateWorkflowRunParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
DisplayName pgtype.Text `json:"displayName"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowversionid pgtype.UUID `json:"workflowversionid"`
|
|
ChildIndex pgtype.Int4 `json:"childIndex"`
|
|
ChildKey pgtype.Text `json:"childKey"`
|
|
ParentId pgtype.UUID `json:"parentId"`
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
Additionalmetadata []byte `json:"additionalmetadata"`
|
|
Priority pgtype.Int4 `json:"priority"`
|
|
}
|
|
|
|
func (q *Queries) CreateWorkflowRun(ctx context.Context, db DBTX, arg CreateWorkflowRunParams) (*WorkflowRun, error) {
|
|
row := db.QueryRow(ctx, createWorkflowRun,
|
|
arg.ID,
|
|
arg.DisplayName,
|
|
arg.Tenantid,
|
|
arg.Workflowversionid,
|
|
arg.ChildIndex,
|
|
arg.ChildKey,
|
|
arg.ParentId,
|
|
arg.ParentStepRunId,
|
|
arg.Additionalmetadata,
|
|
arg.Priority,
|
|
)
|
|
var i WorkflowRun
|
|
err := row.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const createWorkflowRunDedupe = `-- name: CreateWorkflowRunDedupe :one
|
|
WITH workflow_id AS (
|
|
SELECT w."id" FROM "Workflow" w
|
|
JOIN "WorkflowVersion" wv ON wv."workflowId" = w."id"
|
|
WHERE wv."id" = $4::uuid
|
|
)
|
|
INSERT INTO "WorkflowRunDedupe" (
|
|
"createdAt",
|
|
"updatedAt",
|
|
"tenantId",
|
|
"workflowId",
|
|
"workflowRunId",
|
|
"value"
|
|
) VALUES (
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
$1::uuid,
|
|
(SELECT "id" FROM workflow_id),
|
|
$2::uuid,
|
|
$3::text
|
|
) RETURNING id, "createdAt", "updatedAt", "tenantId", "workflowId", "workflowRunId", value
|
|
`
|
|
|
|
type CreateWorkflowRunDedupeParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Value pgtype.Text `json:"value"`
|
|
Workflowversionid pgtype.UUID `json:"workflowversionid"`
|
|
}
|
|
|
|
func (q *Queries) CreateWorkflowRunDedupe(ctx context.Context, db DBTX, arg CreateWorkflowRunDedupeParams) (*WorkflowRunDedupe, error) {
|
|
row := db.QueryRow(ctx, createWorkflowRunDedupe,
|
|
arg.Tenantid,
|
|
arg.Workflowrunid,
|
|
arg.Value,
|
|
arg.Workflowversionid,
|
|
)
|
|
var i WorkflowRunDedupe
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowId,
|
|
&i.WorkflowRunId,
|
|
&i.Value,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const createWorkflowRunStickyState = `-- name: CreateWorkflowRunStickyState :one
|
|
WITH workflow_version AS (
|
|
SELECT "sticky"
|
|
FROM "WorkflowVersion"
|
|
WHERE "id" = $4::uuid
|
|
)
|
|
INSERT INTO "WorkflowRunStickyState" (
|
|
"createdAt",
|
|
"updatedAt",
|
|
"tenantId",
|
|
"workflowRunId",
|
|
"desiredWorkerId",
|
|
"strategy"
|
|
)
|
|
SELECT
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
$1::uuid,
|
|
$2::uuid,
|
|
$3::uuid,
|
|
workflow_version."sticky"
|
|
FROM workflow_version
|
|
WHERE workflow_version."sticky" IS NOT NULL
|
|
RETURNING id, "createdAt", "updatedAt", "tenantId", "workflowRunId", "desiredWorkerId", strategy
|
|
`
|
|
|
|
type CreateWorkflowRunStickyStateParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
DesiredWorkerId pgtype.UUID `json:"desiredWorkerId"`
|
|
Workflowversionid pgtype.UUID `json:"workflowversionid"`
|
|
}
|
|
|
|
func (q *Queries) CreateWorkflowRunStickyState(ctx context.Context, db DBTX, arg CreateWorkflowRunStickyStateParams) (*WorkflowRunStickyState, error) {
|
|
row := db.QueryRow(ctx, createWorkflowRunStickyState,
|
|
arg.Tenantid,
|
|
arg.Workflowrunid,
|
|
arg.DesiredWorkerId,
|
|
arg.Workflowversionid,
|
|
)
|
|
var i WorkflowRunStickyState
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowRunId,
|
|
&i.DesiredWorkerId,
|
|
&i.Strategy,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const createWorkflowRunTriggeredBy = `-- name: CreateWorkflowRunTriggeredBy :one
|
|
INSERT INTO "WorkflowRunTriggeredBy" (
|
|
"id",
|
|
"createdAt",
|
|
"updatedAt",
|
|
"deletedAt",
|
|
"tenantId",
|
|
"parentId",
|
|
"eventId",
|
|
"cronParentId",
|
|
"cronSchedule",
|
|
"scheduledId"
|
|
) VALUES (
|
|
gen_random_uuid(), -- Generates a new UUID for id
|
|
CURRENT_TIMESTAMP,
|
|
CURRENT_TIMESTAMP,
|
|
NULL, -- assuming deletedAt is not set on creation
|
|
$1::uuid,
|
|
$2::uuid, -- assuming parentId is the workflowRunId
|
|
$3::uuid, -- NULL if not provided
|
|
$4::uuid, -- NULL if not provided
|
|
$5::text, -- NULL if not provided
|
|
$6::uuid -- NULL if not provided
|
|
) RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "eventId", "cronParentId", "cronSchedule", "scheduledId", input, "parentId"
|
|
`
|
|
|
|
type CreateWorkflowRunTriggeredByParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
EventId pgtype.UUID `json:"eventId"`
|
|
CronParentId pgtype.UUID `json:"cronParentId"`
|
|
Cron pgtype.Text `json:"cron"`
|
|
ScheduledId pgtype.UUID `json:"scheduledId"`
|
|
}
|
|
|
|
func (q *Queries) CreateWorkflowRunTriggeredBy(ctx context.Context, db DBTX, arg CreateWorkflowRunTriggeredByParams) (*WorkflowRunTriggeredBy, error) {
|
|
row := db.QueryRow(ctx, createWorkflowRunTriggeredBy,
|
|
arg.Tenantid,
|
|
arg.Workflowrunid,
|
|
arg.EventId,
|
|
arg.CronParentId,
|
|
arg.Cron,
|
|
arg.ScheduledId,
|
|
)
|
|
var i WorkflowRunTriggeredBy
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.EventId,
|
|
&i.CronParentId,
|
|
&i.CronSchedule,
|
|
&i.ScheduledId,
|
|
&i.Input,
|
|
&i.ParentId,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const getChildWorkflowRun = `-- name: GetChildWorkflowRun :one
|
|
SELECT
|
|
"createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata", duration, priority
|
|
FROM
|
|
"WorkflowRun"
|
|
WHERE
|
|
"parentId" = $1::uuid AND
|
|
"deletedAt" IS NULL AND
|
|
"parentStepRunId" = $2::uuid AND
|
|
(
|
|
-- if childKey is set, use that
|
|
($3::text IS NULL AND "childIndex" = $4) OR
|
|
($3::text IS NOT NULL AND "childKey" = $3::text)
|
|
)
|
|
`
|
|
|
|
type GetChildWorkflowRunParams struct {
|
|
Parentid pgtype.UUID `json:"parentid"`
|
|
Parentsteprunid pgtype.UUID `json:"parentsteprunid"`
|
|
ChildKey pgtype.Text `json:"childKey"`
|
|
Childindex pgtype.Int4 `json:"childindex"`
|
|
}
|
|
|
|
func (q *Queries) GetChildWorkflowRun(ctx context.Context, db DBTX, arg GetChildWorkflowRunParams) (*WorkflowRun, error) {
|
|
row := db.QueryRow(ctx, getChildWorkflowRun,
|
|
arg.Parentid,
|
|
arg.Parentsteprunid,
|
|
arg.ChildKey,
|
|
arg.Childindex,
|
|
)
|
|
var i WorkflowRun
|
|
err := row.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const getScheduledChildWorkflowRun = `-- name: GetScheduledChildWorkflowRun :one
|
|
SELECT
|
|
id, "parentId", "triggerAt", "tickerId", input, "childIndex", "childKey", "parentStepRunId", "parentWorkflowRunId"
|
|
FROM
|
|
"WorkflowTriggerScheduledRef"
|
|
WHERE
|
|
"parentId" = $1::uuid AND
|
|
"parentStepRunId" = $2::uuid AND
|
|
(
|
|
-- if childKey is set, use that
|
|
($3::text IS NULL AND "childIndex" = $4) OR
|
|
($3::text IS NOT NULL AND "childKey" = $3::text)
|
|
)
|
|
`
|
|
|
|
type GetScheduledChildWorkflowRunParams struct {
|
|
Parentid pgtype.UUID `json:"parentid"`
|
|
Parentsteprunid pgtype.UUID `json:"parentsteprunid"`
|
|
ChildKey pgtype.Text `json:"childKey"`
|
|
Childindex pgtype.Int4 `json:"childindex"`
|
|
}
|
|
|
|
func (q *Queries) GetScheduledChildWorkflowRun(ctx context.Context, db DBTX, arg GetScheduledChildWorkflowRunParams) (*WorkflowTriggerScheduledRef, error) {
|
|
row := db.QueryRow(ctx, getScheduledChildWorkflowRun,
|
|
arg.Parentid,
|
|
arg.Parentsteprunid,
|
|
arg.ChildKey,
|
|
arg.Childindex,
|
|
)
|
|
var i WorkflowTriggerScheduledRef
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.ParentId,
|
|
&i.TriggerAt,
|
|
&i.TickerId,
|
|
&i.Input,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentStepRunId,
|
|
&i.ParentWorkflowRunId,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const getStepRunsForJobRuns = `-- name: GetStepRunsForJobRuns :many
|
|
SELECT
|
|
sr."id",
|
|
sr."createdAt",
|
|
sr."updatedAt",
|
|
sr."status",
|
|
sr."jobRunId",
|
|
sr."stepId",
|
|
sr."tenantId",
|
|
sr."startedAt",
|
|
sr."finishedAt",
|
|
sr."cancelledAt",
|
|
sr."cancelledError",
|
|
sr."cancelledReason",
|
|
sr."timeoutAt",
|
|
sr."error",
|
|
sr."workerId"
|
|
FROM "StepRun" sr
|
|
WHERE
|
|
sr."jobRunId" = ANY($1::uuid[])
|
|
AND sr."tenantId" = $2::uuid
|
|
AND sr."deletedAt" IS NULL
|
|
ORDER BY sr."order" DESC
|
|
`
|
|
|
|
type GetStepRunsForJobRunsParams struct {
|
|
Jobids []pgtype.UUID `json:"jobids"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type GetStepRunsForJobRunsRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
CreatedAt pgtype.Timestamp `json:"createdAt"`
|
|
UpdatedAt pgtype.Timestamp `json:"updatedAt"`
|
|
Status StepRunStatus `json:"status"`
|
|
JobRunId pgtype.UUID `json:"jobRunId"`
|
|
StepId pgtype.UUID `json:"stepId"`
|
|
TenantId pgtype.UUID `json:"tenantId"`
|
|
StartedAt pgtype.Timestamp `json:"startedAt"`
|
|
FinishedAt pgtype.Timestamp `json:"finishedAt"`
|
|
CancelledAt pgtype.Timestamp `json:"cancelledAt"`
|
|
CancelledError pgtype.Text `json:"cancelledError"`
|
|
CancelledReason pgtype.Text `json:"cancelledReason"`
|
|
TimeoutAt pgtype.Timestamp `json:"timeoutAt"`
|
|
Error pgtype.Text `json:"error"`
|
|
WorkerId pgtype.UUID `json:"workerId"`
|
|
}
|
|
|
|
func (q *Queries) GetStepRunsForJobRuns(ctx context.Context, db DBTX, arg GetStepRunsForJobRunsParams) ([]*GetStepRunsForJobRunsRow, error) {
|
|
rows, err := db.Query(ctx, getStepRunsForJobRuns, arg.Jobids, arg.Tenantid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*GetStepRunsForJobRunsRow
|
|
for rows.Next() {
|
|
var i GetStepRunsForJobRunsRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Status,
|
|
&i.JobRunId,
|
|
&i.StepId,
|
|
&i.TenantId,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.CancelledAt,
|
|
&i.CancelledError,
|
|
&i.CancelledReason,
|
|
&i.TimeoutAt,
|
|
&i.Error,
|
|
&i.WorkerId,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getStepsForJobs = `-- name: GetStepsForJobs :many
|
|
SELECT
|
|
j."id" as "jobId",
|
|
s.id, s."createdAt", s."updatedAt", s."deletedAt", s."readableId", s."tenantId", s."jobId", s."actionId", s.timeout, s."customUserData", s.retries, s."scheduleTimeout",
|
|
(
|
|
SELECT array_agg(so."A")::uuid[] -- Casting the array_agg result to uuid[]
|
|
FROM "_StepOrder" so
|
|
WHERE so."B" = s."id"
|
|
) AS "parents"
|
|
FROM "Job" j
|
|
JOIN "Step" s ON s."jobId" = j."id"
|
|
WHERE
|
|
j."id" = ANY($1::uuid[])
|
|
AND j."tenantId" = $2::uuid
|
|
AND j."deletedAt" IS NULL
|
|
`
|
|
|
|
type GetStepsForJobsParams struct {
|
|
Jobids []pgtype.UUID `json:"jobids"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type GetStepsForJobsRow struct {
|
|
JobId pgtype.UUID `json:"jobId"`
|
|
Step Step `json:"step"`
|
|
Parents []pgtype.UUID `json:"parents"`
|
|
}
|
|
|
|
func (q *Queries) GetStepsForJobs(ctx context.Context, db DBTX, arg GetStepsForJobsParams) ([]*GetStepsForJobsRow, error) {
|
|
rows, err := db.Query(ctx, getStepsForJobs, arg.Jobids, arg.Tenantid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*GetStepsForJobsRow
|
|
for rows.Next() {
|
|
var i GetStepsForJobsRow
|
|
if err := rows.Scan(
|
|
&i.JobId,
|
|
&i.Step.ID,
|
|
&i.Step.CreatedAt,
|
|
&i.Step.UpdatedAt,
|
|
&i.Step.DeletedAt,
|
|
&i.Step.ReadableId,
|
|
&i.Step.TenantId,
|
|
&i.Step.JobId,
|
|
&i.Step.ActionId,
|
|
&i.Step.Timeout,
|
|
&i.Step.CustomUserData,
|
|
&i.Step.Retries,
|
|
&i.Step.ScheduleTimeout,
|
|
&i.Parents,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkflowRun = `-- name: GetWorkflowRun :many
|
|
SELECT
|
|
runs."createdAt", runs."updatedAt", runs."deletedAt", runs."tenantId", runs."workflowVersionId", runs.status, runs.error, runs."startedAt", runs."finishedAt", runs."concurrencyGroupId", runs."displayName", runs.id, runs."childIndex", runs."childKey", runs."parentId", runs."parentStepRunId", runs."additionalMetadata", runs.duration, runs.priority,
|
|
runtriggers.id, runtriggers."createdAt", runtriggers."updatedAt", runtriggers."deletedAt", runtriggers."tenantId", runtriggers."eventId", runtriggers."cronParentId", runtriggers."cronSchedule", runtriggers."scheduledId", runtriggers.input, runtriggers."parentId",
|
|
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout", workflowversion."onFailureJobId", workflowversion.sticky, workflowversion.kind, workflowversion."defaultPriority",
|
|
workflow."name" as "workflowName",
|
|
-- waiting on https://github.com/sqlc-dev/sqlc/pull/2858 for nullable fields
|
|
wc."limitStrategy" as "concurrencyLimitStrategy",
|
|
wc."maxRuns" as "concurrencyMaxRuns",
|
|
wc."concurrencyGroupExpression" as "concurrencyGroupExpression",
|
|
groupKeyRun."id" as "getGroupKeyRunId"
|
|
FROM
|
|
"WorkflowRun" as runs
|
|
LEFT JOIN
|
|
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
|
LEFT JOIN
|
|
"WorkflowVersion" as workflowVersion ON runs."workflowVersionId" = workflowVersion."id"
|
|
LEFT JOIN
|
|
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
|
LEFT JOIN
|
|
"WorkflowConcurrency" as wc ON wc."workflowVersionId" = workflowVersion."id"
|
|
LEFT JOIN
|
|
"GetGroupKeyRun" as groupKeyRun ON groupKeyRun."workflowRunId" = runs."id"
|
|
WHERE
|
|
runs."deletedAt" IS NULL AND
|
|
workflowVersion."deletedAt" IS NULL AND
|
|
workflow."deletedAt" IS NULL AND
|
|
runs."id" = ANY($1::uuid[]) AND
|
|
runs."tenantId" = $2::uuid
|
|
`
|
|
|
|
type GetWorkflowRunParams struct {
|
|
Ids []pgtype.UUID `json:"ids"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type GetWorkflowRunRow struct {
|
|
WorkflowRun WorkflowRun `json:"workflow_run"`
|
|
WorkflowRunTriggeredBy WorkflowRunTriggeredBy `json:"workflow_run_triggered_by"`
|
|
WorkflowVersion WorkflowVersion `json:"workflow_version"`
|
|
WorkflowName pgtype.Text `json:"workflowName"`
|
|
ConcurrencyLimitStrategy NullConcurrencyLimitStrategy `json:"concurrencyLimitStrategy"`
|
|
ConcurrencyMaxRuns pgtype.Int4 `json:"concurrencyMaxRuns"`
|
|
ConcurrencyGroupExpression pgtype.Text `json:"concurrencyGroupExpression"`
|
|
GetGroupKeyRunId pgtype.UUID `json:"getGroupKeyRunId"`
|
|
}
|
|
|
|
func (q *Queries) GetWorkflowRun(ctx context.Context, db DBTX, arg GetWorkflowRunParams) ([]*GetWorkflowRunRow, error) {
|
|
rows, err := db.Query(ctx, getWorkflowRun, arg.Ids, arg.Tenantid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*GetWorkflowRunRow
|
|
for rows.Next() {
|
|
var i GetWorkflowRunRow
|
|
if err := rows.Scan(
|
|
&i.WorkflowRun.CreatedAt,
|
|
&i.WorkflowRun.UpdatedAt,
|
|
&i.WorkflowRun.DeletedAt,
|
|
&i.WorkflowRun.TenantId,
|
|
&i.WorkflowRun.WorkflowVersionId,
|
|
&i.WorkflowRun.Status,
|
|
&i.WorkflowRun.Error,
|
|
&i.WorkflowRun.StartedAt,
|
|
&i.WorkflowRun.FinishedAt,
|
|
&i.WorkflowRun.ConcurrencyGroupId,
|
|
&i.WorkflowRun.DisplayName,
|
|
&i.WorkflowRun.ID,
|
|
&i.WorkflowRun.ChildIndex,
|
|
&i.WorkflowRun.ChildKey,
|
|
&i.WorkflowRun.ParentId,
|
|
&i.WorkflowRun.ParentStepRunId,
|
|
&i.WorkflowRun.AdditionalMetadata,
|
|
&i.WorkflowRun.Duration,
|
|
&i.WorkflowRun.Priority,
|
|
&i.WorkflowRunTriggeredBy.ID,
|
|
&i.WorkflowRunTriggeredBy.CreatedAt,
|
|
&i.WorkflowRunTriggeredBy.UpdatedAt,
|
|
&i.WorkflowRunTriggeredBy.DeletedAt,
|
|
&i.WorkflowRunTriggeredBy.TenantId,
|
|
&i.WorkflowRunTriggeredBy.EventId,
|
|
&i.WorkflowRunTriggeredBy.CronParentId,
|
|
&i.WorkflowRunTriggeredBy.CronSchedule,
|
|
&i.WorkflowRunTriggeredBy.ScheduledId,
|
|
&i.WorkflowRunTriggeredBy.Input,
|
|
&i.WorkflowRunTriggeredBy.ParentId,
|
|
&i.WorkflowVersion.ID,
|
|
&i.WorkflowVersion.CreatedAt,
|
|
&i.WorkflowVersion.UpdatedAt,
|
|
&i.WorkflowVersion.DeletedAt,
|
|
&i.WorkflowVersion.Version,
|
|
&i.WorkflowVersion.Order,
|
|
&i.WorkflowVersion.WorkflowId,
|
|
&i.WorkflowVersion.Checksum,
|
|
&i.WorkflowVersion.ScheduleTimeout,
|
|
&i.WorkflowVersion.OnFailureJobId,
|
|
&i.WorkflowVersion.Sticky,
|
|
&i.WorkflowVersion.Kind,
|
|
&i.WorkflowVersion.DefaultPriority,
|
|
&i.WorkflowName,
|
|
&i.ConcurrencyLimitStrategy,
|
|
&i.ConcurrencyMaxRuns,
|
|
&i.ConcurrencyGroupExpression,
|
|
&i.GetGroupKeyRunId,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkflowRunAdditionalMeta = `-- name: GetWorkflowRunAdditionalMeta :one
|
|
SELECT
|
|
"additionalMetadata",
|
|
"id"
|
|
FROM
|
|
"WorkflowRun"
|
|
WHERE
|
|
"id" = $1::uuid AND
|
|
"tenantId" = $2::uuid
|
|
`
|
|
|
|
type GetWorkflowRunAdditionalMetaParams struct {
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type GetWorkflowRunAdditionalMetaRow struct {
|
|
AdditionalMetadata []byte `json:"additionalMetadata"`
|
|
ID pgtype.UUID `json:"id"`
|
|
}
|
|
|
|
func (q *Queries) GetWorkflowRunAdditionalMeta(ctx context.Context, db DBTX, arg GetWorkflowRunAdditionalMetaParams) (*GetWorkflowRunAdditionalMetaRow, error) {
|
|
row := db.QueryRow(ctx, getWorkflowRunAdditionalMeta, arg.Workflowrunid, arg.Tenantid)
|
|
var i GetWorkflowRunAdditionalMetaRow
|
|
err := row.Scan(&i.AdditionalMetadata, &i.ID)
|
|
return &i, err
|
|
}
|
|
|
|
const getWorkflowRunById = `-- name: GetWorkflowRunById :one
|
|
SELECT
|
|
r."createdAt", r."updatedAt", r."deletedAt", r."tenantId", r."workflowVersionId", r.status, r.error, r."startedAt", r."finishedAt", r."concurrencyGroupId", r."displayName", r.id, r."childIndex", r."childKey", r."parentId", r."parentStepRunId", r."additionalMetadata", r.duration, r.priority,
|
|
wv.id, wv."createdAt", wv."updatedAt", wv."deletedAt", wv.version, wv."order", wv."workflowId", wv.checksum, wv."scheduleTimeout", wv."onFailureJobId", wv.sticky, wv.kind, wv."defaultPriority",
|
|
w.id, w."createdAt", w."updatedAt", w."deletedAt", w."tenantId", w.name, w.description,
|
|
tb.id, tb."createdAt", tb."updatedAt", tb."deletedAt", tb."tenantId", tb."eventId", tb."cronParentId", tb."cronSchedule", tb."scheduledId", tb.input, tb."parentId"
|
|
FROM
|
|
"WorkflowRun" r
|
|
JOIN
|
|
"WorkflowVersion" as wv ON
|
|
r."workflowVersionId" = wv."id"
|
|
JOIN "Workflow" as w ON
|
|
wv."workflowId" = w."id"
|
|
JOIN "WorkflowRunTriggeredBy" as tb ON
|
|
r."id" = tb."parentId"
|
|
WHERE
|
|
r."id" = $1::uuid AND
|
|
r."tenantId" = $2::uuid
|
|
`
|
|
|
|
type GetWorkflowRunByIdParams struct {
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type GetWorkflowRunByIdRow struct {
|
|
CreatedAt pgtype.Timestamp `json:"createdAt"`
|
|
UpdatedAt pgtype.Timestamp `json:"updatedAt"`
|
|
DeletedAt pgtype.Timestamp `json:"deletedAt"`
|
|
TenantId pgtype.UUID `json:"tenantId"`
|
|
WorkflowVersionId pgtype.UUID `json:"workflowVersionId"`
|
|
Status WorkflowRunStatus `json:"status"`
|
|
Error pgtype.Text `json:"error"`
|
|
StartedAt pgtype.Timestamp `json:"startedAt"`
|
|
FinishedAt pgtype.Timestamp `json:"finishedAt"`
|
|
ConcurrencyGroupId pgtype.Text `json:"concurrencyGroupId"`
|
|
DisplayName pgtype.Text `json:"displayName"`
|
|
ID pgtype.UUID `json:"id"`
|
|
ChildIndex pgtype.Int4 `json:"childIndex"`
|
|
ChildKey pgtype.Text `json:"childKey"`
|
|
ParentId pgtype.UUID `json:"parentId"`
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
AdditionalMetadata []byte `json:"additionalMetadata"`
|
|
Duration pgtype.Int8 `json:"duration"`
|
|
Priority pgtype.Int4 `json:"priority"`
|
|
WorkflowVersion WorkflowVersion `json:"workflow_version"`
|
|
Workflow Workflow `json:"workflow"`
|
|
WorkflowRunTriggeredBy WorkflowRunTriggeredBy `json:"workflow_run_triggered_by"`
|
|
}
|
|
|
|
func (q *Queries) GetWorkflowRunById(ctx context.Context, db DBTX, arg GetWorkflowRunByIdParams) (*GetWorkflowRunByIdRow, error) {
|
|
row := db.QueryRow(ctx, getWorkflowRunById, arg.Workflowrunid, arg.Tenantid)
|
|
var i GetWorkflowRunByIdRow
|
|
err := row.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
&i.WorkflowVersion.ID,
|
|
&i.WorkflowVersion.CreatedAt,
|
|
&i.WorkflowVersion.UpdatedAt,
|
|
&i.WorkflowVersion.DeletedAt,
|
|
&i.WorkflowVersion.Version,
|
|
&i.WorkflowVersion.Order,
|
|
&i.WorkflowVersion.WorkflowId,
|
|
&i.WorkflowVersion.Checksum,
|
|
&i.WorkflowVersion.ScheduleTimeout,
|
|
&i.WorkflowVersion.OnFailureJobId,
|
|
&i.WorkflowVersion.Sticky,
|
|
&i.WorkflowVersion.Kind,
|
|
&i.WorkflowVersion.DefaultPriority,
|
|
&i.Workflow.ID,
|
|
&i.Workflow.CreatedAt,
|
|
&i.Workflow.UpdatedAt,
|
|
&i.Workflow.DeletedAt,
|
|
&i.Workflow.TenantId,
|
|
&i.Workflow.Name,
|
|
&i.Workflow.Description,
|
|
&i.WorkflowRunTriggeredBy.ID,
|
|
&i.WorkflowRunTriggeredBy.CreatedAt,
|
|
&i.WorkflowRunTriggeredBy.UpdatedAt,
|
|
&i.WorkflowRunTriggeredBy.DeletedAt,
|
|
&i.WorkflowRunTriggeredBy.TenantId,
|
|
&i.WorkflowRunTriggeredBy.EventId,
|
|
&i.WorkflowRunTriggeredBy.CronParentId,
|
|
&i.WorkflowRunTriggeredBy.CronSchedule,
|
|
&i.WorkflowRunTriggeredBy.ScheduledId,
|
|
&i.WorkflowRunTriggeredBy.Input,
|
|
&i.WorkflowRunTriggeredBy.ParentId,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const getWorkflowRunInput = `-- name: GetWorkflowRunInput :one
|
|
SELECT jld."data" AS lookupData
|
|
FROM "JobRun" jr
|
|
JOIN "JobRunLookupData" jld ON jr."id" = jld."jobRunId"
|
|
WHERE jld."data" ? 'input' AND jr."workflowRunId" = $1::uuid
|
|
LIMIT 1
|
|
`
|
|
|
|
func (q *Queries) GetWorkflowRunInput(ctx context.Context, db DBTX, workflowrunid pgtype.UUID) ([]byte, error) {
|
|
row := db.QueryRow(ctx, getWorkflowRunInput, workflowrunid)
|
|
var lookupdata []byte
|
|
err := row.Scan(&lookupdata)
|
|
return lookupdata, err
|
|
}
|
|
|
|
const getWorkflowRunStickyStateForUpdate = `-- name: GetWorkflowRunStickyStateForUpdate :one
|
|
SELECT
|
|
id, "createdAt", "updatedAt", "tenantId", "workflowRunId", "desiredWorkerId", strategy
|
|
FROM
|
|
"WorkflowRunStickyState"
|
|
WHERE
|
|
"workflowRunId" = $1::uuid AND
|
|
"tenantId" = $2::uuid
|
|
FOR UPDATE
|
|
`
|
|
|
|
type GetWorkflowRunStickyStateForUpdateParams struct {
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
func (q *Queries) GetWorkflowRunStickyStateForUpdate(ctx context.Context, db DBTX, arg GetWorkflowRunStickyStateForUpdateParams) (*WorkflowRunStickyState, error) {
|
|
row := db.QueryRow(ctx, getWorkflowRunStickyStateForUpdate, arg.Workflowrunid, arg.Tenantid)
|
|
var i WorkflowRunStickyState
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowRunId,
|
|
&i.DesiredWorkerId,
|
|
&i.Strategy,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const getWorkflowRunTrigger = `-- name: GetWorkflowRunTrigger :one
|
|
SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "eventId", "cronParentId", "cronSchedule", "scheduledId", input, "parentId"
|
|
FROM
|
|
"WorkflowRunTriggeredBy"
|
|
WHERE
|
|
"parentId" = $1::uuid AND
|
|
"tenantId" = $2::uuid
|
|
`
|
|
|
|
type GetWorkflowRunTriggerParams struct {
|
|
Runid pgtype.UUID `json:"runid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
func (q *Queries) GetWorkflowRunTrigger(ctx context.Context, db DBTX, arg GetWorkflowRunTriggerParams) (*WorkflowRunTriggeredBy, error) {
|
|
row := db.QueryRow(ctx, getWorkflowRunTrigger, arg.Runid, arg.Tenantid)
|
|
var i WorkflowRunTriggeredBy
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.EventId,
|
|
&i.CronParentId,
|
|
&i.CronSchedule,
|
|
&i.ScheduledId,
|
|
&i.Input,
|
|
&i.ParentId,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const linkStepRunParents = `-- name: LinkStepRunParents :exec
|
|
WITH step_runs AS (
|
|
SELECT "id", "stepId"
|
|
FROM "StepRun"
|
|
WHERE "id" = ANY($1::uuid[])
|
|
)
|
|
INSERT INTO "_StepRunOrder" ("A", "B")
|
|
SELECT
|
|
parent_run."id" AS "A",
|
|
child_run."id" AS "B"
|
|
FROM
|
|
"_StepOrder" AS step_order
|
|
JOIN
|
|
step_runs AS parent_run ON parent_run."stepId" = step_order."A"
|
|
JOIN
|
|
step_runs AS child_run ON child_run."stepId" = step_order."B"
|
|
`
|
|
|
|
func (q *Queries) LinkStepRunParents(ctx context.Context, db DBTX, steprunids []pgtype.UUID) error {
|
|
_, err := db.Exec(ctx, linkStepRunParents, steprunids)
|
|
return err
|
|
}
|
|
|
|
const listActiveQueuedWorkflowVersions = `-- name: ListActiveQueuedWorkflowVersions :many
|
|
WITH QueuedRuns AS (
|
|
SELECT DISTINCT ON (wr."workflowVersionId")
|
|
wr."workflowVersionId",
|
|
w."tenantId",
|
|
wr."status",
|
|
wr."id",
|
|
wr."concurrencyGroupId"
|
|
FROM "WorkflowRun" wr
|
|
JOIN "WorkflowVersion" wv ON wv."id" = wr."workflowVersionId"
|
|
JOIN "Workflow" w ON w."id" = wv."workflowId"
|
|
WHERE
|
|
wr."tenantId" = $1::uuid
|
|
AND wr."status" = 'QUEUED'
|
|
AND wr."concurrencyGroupId" IS NOT NULL
|
|
ORDER BY wr."workflowVersionId"
|
|
)
|
|
SELECT
|
|
q."workflowVersionId",
|
|
q."tenantId",
|
|
q."status",
|
|
q."id",
|
|
q."concurrencyGroupId"
|
|
FROM QueuedRuns q
|
|
GROUP BY q."workflowVersionId", q."tenantId", q."concurrencyGroupId", q."status", q."id"
|
|
`
|
|
|
|
type ListActiveQueuedWorkflowVersionsRow struct {
|
|
WorkflowVersionId pgtype.UUID `json:"workflowVersionId"`
|
|
TenantId pgtype.UUID `json:"tenantId"`
|
|
Status WorkflowRunStatus `json:"status"`
|
|
ID pgtype.UUID `json:"id"`
|
|
ConcurrencyGroupId pgtype.Text `json:"concurrencyGroupId"`
|
|
}
|
|
|
|
func (q *Queries) ListActiveQueuedWorkflowVersions(ctx context.Context, db DBTX, tenantid pgtype.UUID) ([]*ListActiveQueuedWorkflowVersionsRow, error) {
|
|
rows, err := db.Query(ctx, listActiveQueuedWorkflowVersions, tenantid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*ListActiveQueuedWorkflowVersionsRow
|
|
for rows.Next() {
|
|
var i ListActiveQueuedWorkflowVersionsRow
|
|
if err := rows.Scan(
|
|
&i.WorkflowVersionId,
|
|
&i.TenantId,
|
|
&i.Status,
|
|
&i.ID,
|
|
&i.ConcurrencyGroupId,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listChildWorkflowRunCounts = `-- name: ListChildWorkflowRunCounts :many
|
|
SELECT
|
|
wr."parentStepRunId",
|
|
COUNT(wr."id") as "count"
|
|
FROM
|
|
"WorkflowRun" wr
|
|
WHERE
|
|
wr."parentStepRunId" = ANY($1::uuid[])
|
|
GROUP BY
|
|
wr."parentStepRunId"
|
|
`
|
|
|
|
type ListChildWorkflowRunCountsRow struct {
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
Count int64 `json:"count"`
|
|
}
|
|
|
|
func (q *Queries) ListChildWorkflowRunCounts(ctx context.Context, db DBTX, steprunids []pgtype.UUID) ([]*ListChildWorkflowRunCountsRow, error) {
|
|
rows, err := db.Query(ctx, listChildWorkflowRunCounts, steprunids)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*ListChildWorkflowRunCountsRow
|
|
for rows.Next() {
|
|
var i ListChildWorkflowRunCountsRow
|
|
if err := rows.Scan(&i.ParentStepRunId, &i.Count); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listStepsForJob = `-- name: ListStepsForJob :many
|
|
WITH job_id AS (
|
|
SELECT "jobId"
|
|
FROM "JobRun"
|
|
WHERE "id" = $1::uuid
|
|
)
|
|
SELECT
|
|
s."id",
|
|
s."actionId"
|
|
FROM
|
|
"Step" s, job_id
|
|
WHERE
|
|
s."jobId" = job_id."jobId"
|
|
`
|
|
|
|
type ListStepsForJobRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
ActionId string `json:"actionId"`
|
|
}
|
|
|
|
func (q *Queries) ListStepsForJob(ctx context.Context, db DBTX, jobrunid pgtype.UUID) ([]*ListStepsForJobRow, error) {
|
|
rows, err := db.Query(ctx, listStepsForJob, jobrunid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*ListStepsForJobRow
|
|
for rows.Next() {
|
|
var i ListStepsForJobRow
|
|
if err := rows.Scan(&i.ID, &i.ActionId); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listWorkflowRunEventsByWorkflowRunId = `-- name: ListWorkflowRunEventsByWorkflowRunId :many
|
|
SELECT
|
|
sre.id, sre."timeFirstSeen", sre."timeLastSeen", sre."stepRunId", sre.reason, sre.severity, sre.message, sre.count, sre.data, sre."workflowRunId"
|
|
FROM
|
|
"StepRunEvent" sre
|
|
WHERE
|
|
sre."workflowRunId" = $1::uuid
|
|
ORDER BY
|
|
sre."id" DESC
|
|
`
|
|
|
|
func (q *Queries) ListWorkflowRunEventsByWorkflowRunId(ctx context.Context, db DBTX, workflowrunid pgtype.UUID) ([]*StepRunEvent, error) {
|
|
rows, err := db.Query(ctx, listWorkflowRunEventsByWorkflowRunId, workflowrunid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*StepRunEvent
|
|
for rows.Next() {
|
|
var i StepRunEvent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.TimeFirstSeen,
|
|
&i.TimeLastSeen,
|
|
&i.StepRunId,
|
|
&i.Reason,
|
|
&i.Severity,
|
|
&i.Message,
|
|
&i.Count,
|
|
&i.Data,
|
|
&i.WorkflowRunId,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listWorkflowRuns = `-- name: ListWorkflowRuns :many
|
|
SELECT
|
|
runs."createdAt", runs."updatedAt", runs."deletedAt", runs."tenantId", runs."workflowVersionId", runs.status, runs.error, runs."startedAt", runs."finishedAt", runs."concurrencyGroupId", runs."displayName", runs.id, runs."childIndex", runs."childKey", runs."parentId", runs."parentStepRunId", runs."additionalMetadata", runs.duration, runs.priority,
|
|
workflow.id, workflow."createdAt", workflow."updatedAt", workflow."deletedAt", workflow."tenantId", workflow.name, workflow.description,
|
|
runtriggers.id, runtriggers."createdAt", runtriggers."updatedAt", runtriggers."deletedAt", runtriggers."tenantId", runtriggers."eventId", runtriggers."cronParentId", runtriggers."cronSchedule", runtriggers."scheduledId", runtriggers.input, runtriggers."parentId",
|
|
workflowversion.id, workflowversion."createdAt", workflowversion."updatedAt", workflowversion."deletedAt", workflowversion.version, workflowversion."order", workflowversion."workflowId", workflowversion.checksum, workflowversion."scheduleTimeout", workflowversion."onFailureJobId", workflowversion.sticky, workflowversion.kind, workflowversion."defaultPriority",
|
|
-- waiting on https://github.com/sqlc-dev/sqlc/pull/2858 for nullable events field
|
|
events.id, events.key, events."createdAt", events."updatedAt"
|
|
FROM
|
|
"WorkflowRun" as runs
|
|
LEFT JOIN
|
|
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
|
LEFT JOIN
|
|
"Event" as events ON
|
|
runTriggers."eventId" = events."id"
|
|
AND (
|
|
$2::uuid IS NULL OR
|
|
events."id" = $2::uuid
|
|
)
|
|
JOIN
|
|
"WorkflowVersion" as workflowVersion ON
|
|
runs."workflowVersionId" = workflowVersion."id"
|
|
AND (
|
|
$3::uuid IS NULL OR
|
|
workflowVersion."id" = $3::uuid
|
|
)
|
|
AND (
|
|
$4::text[] IS NULL OR
|
|
workflowVersion."kind" = ANY(cast($4::text[] as "WorkflowKind"[]))
|
|
)
|
|
JOIN
|
|
"Workflow" as workflow ON
|
|
workflowVersion."workflowId" = workflow."id"
|
|
AND (
|
|
$5::uuid IS NULL OR
|
|
workflow."id" = $5::uuid
|
|
)
|
|
WHERE
|
|
runs."tenantId" = $1 AND
|
|
runs."deletedAt" IS NULL AND
|
|
workflowVersion."deletedAt" IS NULL AND
|
|
workflow."deletedAt" IS NULL AND
|
|
(
|
|
$2::uuid IS NULL OR
|
|
events."id" = $2::uuid
|
|
) AND
|
|
(
|
|
$6::uuid[] IS NULL OR
|
|
runs."id" = ANY($6::uuid[])
|
|
) AND
|
|
(
|
|
$7::jsonb IS NULL OR
|
|
runs."additionalMetadata" @> $7::jsonb
|
|
) AND
|
|
(
|
|
$8::uuid IS NULL OR
|
|
runs."parentId" = $8::uuid
|
|
) AND
|
|
(
|
|
$9::uuid IS NULL OR
|
|
runs."parentStepRunId" = $9::uuid
|
|
) AND
|
|
(
|
|
$10::text IS NULL OR
|
|
runs."concurrencyGroupId" = $10::text
|
|
) AND
|
|
(
|
|
$11::text[] IS NULL OR
|
|
"status" = ANY(cast($11::text[] as "WorkflowRunStatus"[]))
|
|
) AND
|
|
(
|
|
$12::timestamp IS NULL OR
|
|
runs."createdAt" > $12::timestamp
|
|
) AND
|
|
(
|
|
$13::timestamp IS NULL OR
|
|
runs."createdAt" < $13::timestamp
|
|
) AND
|
|
(
|
|
$14::timestamp IS NULL OR
|
|
runs."finishedAt" > $14::timestamp
|
|
)
|
|
ORDER BY
|
|
case when $15 = 'createdAt ASC' THEN runs."createdAt" END ASC ,
|
|
case when $15 = 'createdAt DESC' THEN runs."createdAt" END DESC,
|
|
case when $15 = 'finishedAt ASC' THEN runs."finishedAt" END ASC ,
|
|
case when $15 = 'finishedAt DESC' THEN runs."finishedAt" END DESC,
|
|
case when $15 = 'startedAt ASC' THEN runs."startedAt" END ASC ,
|
|
case when $15 = 'startedAt DESC' THEN runs."startedAt" END DESC,
|
|
case when $15 = 'duration ASC' THEN runs."duration" END ASC NULLS FIRST,
|
|
case when $15 = 'duration DESC' THEN runs."duration" END DESC NULLS LAST,
|
|
runs."id" ASC
|
|
OFFSET
|
|
COALESCE($16, 0)
|
|
LIMIT
|
|
COALESCE($17, 50)
|
|
`
|
|
|
|
type ListWorkflowRunsParams struct {
|
|
TenantId pgtype.UUID `json:"tenantId"`
|
|
EventId pgtype.UUID `json:"eventId"`
|
|
WorkflowVersionId pgtype.UUID `json:"workflowVersionId"`
|
|
Kinds []string `json:"kinds"`
|
|
WorkflowId pgtype.UUID `json:"workflowId"`
|
|
Ids []pgtype.UUID `json:"ids"`
|
|
AdditionalMetadata []byte `json:"additionalMetadata"`
|
|
ParentId pgtype.UUID `json:"parentId"`
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
GroupKey pgtype.Text `json:"groupKey"`
|
|
Statuses []string `json:"statuses"`
|
|
CreatedAfter pgtype.Timestamp `json:"createdAfter"`
|
|
CreatedBefore pgtype.Timestamp `json:"createdBefore"`
|
|
FinishedAfter pgtype.Timestamp `json:"finishedAfter"`
|
|
Orderby interface{} `json:"orderby"`
|
|
Offset interface{} `json:"offset"`
|
|
Limit interface{} `json:"limit"`
|
|
}
|
|
|
|
type ListWorkflowRunsRow struct {
|
|
WorkflowRun WorkflowRun `json:"workflow_run"`
|
|
Workflow Workflow `json:"workflow"`
|
|
WorkflowRunTriggeredBy WorkflowRunTriggeredBy `json:"workflow_run_triggered_by"`
|
|
WorkflowVersion WorkflowVersion `json:"workflow_version"`
|
|
ID pgtype.UUID `json:"id"`
|
|
Key pgtype.Text `json:"key"`
|
|
CreatedAt pgtype.Timestamp `json:"createdAt"`
|
|
UpdatedAt pgtype.Timestamp `json:"updatedAt"`
|
|
}
|
|
|
|
func (q *Queries) ListWorkflowRuns(ctx context.Context, db DBTX, arg ListWorkflowRunsParams) ([]*ListWorkflowRunsRow, error) {
|
|
rows, err := db.Query(ctx, listWorkflowRuns,
|
|
arg.TenantId,
|
|
arg.EventId,
|
|
arg.WorkflowVersionId,
|
|
arg.Kinds,
|
|
arg.WorkflowId,
|
|
arg.Ids,
|
|
arg.AdditionalMetadata,
|
|
arg.ParentId,
|
|
arg.ParentStepRunId,
|
|
arg.GroupKey,
|
|
arg.Statuses,
|
|
arg.CreatedAfter,
|
|
arg.CreatedBefore,
|
|
arg.FinishedAfter,
|
|
arg.Orderby,
|
|
arg.Offset,
|
|
arg.Limit,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*ListWorkflowRunsRow
|
|
for rows.Next() {
|
|
var i ListWorkflowRunsRow
|
|
if err := rows.Scan(
|
|
&i.WorkflowRun.CreatedAt,
|
|
&i.WorkflowRun.UpdatedAt,
|
|
&i.WorkflowRun.DeletedAt,
|
|
&i.WorkflowRun.TenantId,
|
|
&i.WorkflowRun.WorkflowVersionId,
|
|
&i.WorkflowRun.Status,
|
|
&i.WorkflowRun.Error,
|
|
&i.WorkflowRun.StartedAt,
|
|
&i.WorkflowRun.FinishedAt,
|
|
&i.WorkflowRun.ConcurrencyGroupId,
|
|
&i.WorkflowRun.DisplayName,
|
|
&i.WorkflowRun.ID,
|
|
&i.WorkflowRun.ChildIndex,
|
|
&i.WorkflowRun.ChildKey,
|
|
&i.WorkflowRun.ParentId,
|
|
&i.WorkflowRun.ParentStepRunId,
|
|
&i.WorkflowRun.AdditionalMetadata,
|
|
&i.WorkflowRun.Duration,
|
|
&i.WorkflowRun.Priority,
|
|
&i.Workflow.ID,
|
|
&i.Workflow.CreatedAt,
|
|
&i.Workflow.UpdatedAt,
|
|
&i.Workflow.DeletedAt,
|
|
&i.Workflow.TenantId,
|
|
&i.Workflow.Name,
|
|
&i.Workflow.Description,
|
|
&i.WorkflowRunTriggeredBy.ID,
|
|
&i.WorkflowRunTriggeredBy.CreatedAt,
|
|
&i.WorkflowRunTriggeredBy.UpdatedAt,
|
|
&i.WorkflowRunTriggeredBy.DeletedAt,
|
|
&i.WorkflowRunTriggeredBy.TenantId,
|
|
&i.WorkflowRunTriggeredBy.EventId,
|
|
&i.WorkflowRunTriggeredBy.CronParentId,
|
|
&i.WorkflowRunTriggeredBy.CronSchedule,
|
|
&i.WorkflowRunTriggeredBy.ScheduledId,
|
|
&i.WorkflowRunTriggeredBy.Input,
|
|
&i.WorkflowRunTriggeredBy.ParentId,
|
|
&i.WorkflowVersion.ID,
|
|
&i.WorkflowVersion.CreatedAt,
|
|
&i.WorkflowVersion.UpdatedAt,
|
|
&i.WorkflowVersion.DeletedAt,
|
|
&i.WorkflowVersion.Version,
|
|
&i.WorkflowVersion.Order,
|
|
&i.WorkflowVersion.WorkflowId,
|
|
&i.WorkflowVersion.Checksum,
|
|
&i.WorkflowVersion.ScheduleTimeout,
|
|
&i.WorkflowVersion.OnFailureJobId,
|
|
&i.WorkflowVersion.Sticky,
|
|
&i.WorkflowVersion.Kind,
|
|
&i.WorkflowVersion.DefaultPriority,
|
|
&i.ID,
|
|
&i.Key,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const popWorkflowRunsRoundRobin = `-- name: PopWorkflowRunsRoundRobin :many
|
|
WITH workflow_runs AS (
|
|
SELECT
|
|
r2.id,
|
|
r2."status",
|
|
row_number() OVER (PARTITION BY r2."concurrencyGroupId" ORDER BY r2."createdAt") AS rn,
|
|
row_number() over (order by r2."createdAt" ASC) as seqnum
|
|
FROM
|
|
"WorkflowRun" r2
|
|
LEFT JOIN
|
|
"WorkflowVersion" workflowVersion ON r2."workflowVersionId" = workflowVersion."id"
|
|
WHERE
|
|
r2."tenantId" = $1::uuid AND
|
|
r2."deletedAt" IS NULL AND
|
|
workflowVersion."deletedAt" IS NULL AND
|
|
(r2."status" = 'QUEUED' OR r2."status" = 'RUNNING') AND
|
|
workflowVersion."workflowId" = $2::uuid
|
|
ORDER BY
|
|
rn, seqnum ASC
|
|
), min_rn AS (
|
|
SELECT
|
|
MIN(rn) as min_rn
|
|
FROM
|
|
workflow_runs
|
|
), total_group_count AS ( -- counts the number of groups
|
|
SELECT
|
|
COUNT(*) as count
|
|
FROM
|
|
workflow_runs
|
|
WHERE
|
|
rn = (SELECT min_rn FROM min_rn)
|
|
), eligible_runs AS (
|
|
SELECT
|
|
id
|
|
FROM
|
|
"WorkflowRun" wr
|
|
WHERE
|
|
wr."id" IN (
|
|
SELECT
|
|
id
|
|
FROM
|
|
workflow_runs
|
|
ORDER BY
|
|
rn, seqnum ASC
|
|
LIMIT
|
|
-- We can run up to maxRuns per group, so we multiple max runs by the number of groups, then subtract the
|
|
-- total number of running workflows.
|
|
($3::int) * (SELECT count FROM total_group_count)
|
|
) AND
|
|
wr."status" = 'QUEUED'
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
UPDATE "WorkflowRun"
|
|
SET
|
|
"status" = 'RUNNING'
|
|
FROM
|
|
eligible_runs
|
|
WHERE
|
|
"WorkflowRun".id = eligible_runs.id AND
|
|
"WorkflowRun"."status" = 'QUEUED'
|
|
RETURNING
|
|
"WorkflowRun"."createdAt", "WorkflowRun"."updatedAt", "WorkflowRun"."deletedAt", "WorkflowRun"."tenantId", "WorkflowRun"."workflowVersionId", "WorkflowRun".status, "WorkflowRun".error, "WorkflowRun"."startedAt", "WorkflowRun"."finishedAt", "WorkflowRun"."concurrencyGroupId", "WorkflowRun"."displayName", "WorkflowRun".id, "WorkflowRun"."childIndex", "WorkflowRun"."childKey", "WorkflowRun"."parentId", "WorkflowRun"."parentStepRunId", "WorkflowRun"."additionalMetadata", "WorkflowRun".duration, "WorkflowRun".priority
|
|
`
|
|
|
|
type PopWorkflowRunsRoundRobinParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Workflowid pgtype.UUID `json:"workflowid"`
|
|
Maxruns int32 `json:"maxruns"`
|
|
}
|
|
|
|
func (q *Queries) PopWorkflowRunsRoundRobin(ctx context.Context, db DBTX, arg PopWorkflowRunsRoundRobinParams) ([]*WorkflowRun, error) {
|
|
rows, err := db.Query(ctx, popWorkflowRunsRoundRobin, arg.Tenantid, arg.Workflowid, arg.Maxruns)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*WorkflowRun
|
|
for rows.Next() {
|
|
var i WorkflowRun
|
|
if err := rows.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const replayWorkflowRunResetJobRun = `-- name: ReplayWorkflowRunResetJobRun :one
|
|
UPDATE
|
|
"JobRun"
|
|
SET
|
|
-- We set this to pending so that the entire workflow starts fresh, and we
|
|
-- don't accidentally trigger on failure jobs
|
|
"status" = 'PENDING',
|
|
"updatedAt" = CURRENT_TIMESTAMP,
|
|
"startedAt" = NULL,
|
|
"finishedAt" = NULL,
|
|
"timeoutAt" = NULL,
|
|
"cancelledAt" = NULL,
|
|
"cancelledReason" = NULL,
|
|
"cancelledError" = NULL
|
|
WHERE
|
|
"id" = $1::uuid
|
|
RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobId", "tickerId", status, result, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "workflowRunId"
|
|
`
|
|
|
|
func (q *Queries) ReplayWorkflowRunResetJobRun(ctx context.Context, db DBTX, jobrunid pgtype.UUID) (*JobRun, error) {
|
|
row := db.QueryRow(ctx, replayWorkflowRunResetJobRun, jobrunid)
|
|
var i JobRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.JobId,
|
|
&i.TickerId,
|
|
&i.Status,
|
|
&i.Result,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.TimeoutAt,
|
|
&i.CancelledAt,
|
|
&i.CancelledReason,
|
|
&i.CancelledError,
|
|
&i.WorkflowRunId,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const resolveWorkflowRunStatus = `-- name: ResolveWorkflowRunStatus :many
|
|
WITH jobRuns AS (
|
|
SELECT
|
|
runs."workflowRunId",
|
|
sum(case when runs."status" = 'PENDING' then 1 else 0 end) AS pendingRuns,
|
|
sum(case when runs."status" = 'RUNNING' then 1 else 0 end) AS runningRuns,
|
|
sum(case when runs."status" = 'SUCCEEDED' then 1 else 0 end) AS succeededRuns,
|
|
sum(case when runs."status" = 'FAILED' then 1 else 0 end) AS failedRuns,
|
|
sum(case when runs."status" = 'CANCELLED' then 1 else 0 end) AS cancelledRuns
|
|
FROM "JobRun" as runs
|
|
JOIN "Job" as job ON runs."jobId" = job."id"
|
|
WHERE
|
|
runs."workflowRunId" = ANY(
|
|
SELECT "workflowRunId"
|
|
FROM "JobRun"
|
|
WHERE "id" = ANY($1::uuid[])
|
|
) AND
|
|
runs."deletedAt" IS NULL AND
|
|
runs."tenantId" = $2::uuid AND
|
|
-- we should not include onFailure jobs in the calculation
|
|
job."kind" = 'DEFAULT'
|
|
GROUP BY runs."workflowRunId"
|
|
), updated_workflow_runs AS (
|
|
UPDATE "WorkflowRun" wr
|
|
SET "status" = CASE
|
|
-- Final states are final, cannot be updated
|
|
WHEN "status" IN ('SUCCEEDED', 'FAILED') THEN "status"
|
|
-- We check for running first, because if a job run is running, then the workflow is running
|
|
WHEN j.runningRuns > 0 THEN 'RUNNING'
|
|
-- When at least one job run has failed or been cancelled, then the workflow is failed
|
|
WHEN j.failedRuns > 0 OR j.cancelledRuns > 0 THEN 'FAILED'
|
|
-- When all job runs have succeeded, then the workflow is succeeded
|
|
WHEN j.succeededRuns > 0 AND j.pendingRuns = 0 AND j.runningRuns = 0 AND j.failedRuns = 0 AND j.cancelledRuns = 0 THEN 'SUCCEEDED'
|
|
ELSE "status"
|
|
END,
|
|
"finishedAt" = CASE
|
|
-- Final states are final, cannot be updated
|
|
WHEN "finishedAt" IS NOT NULL THEN "finishedAt"
|
|
-- We check for running first, because if a job run is running, then the workflow is not finished
|
|
WHEN j.runningRuns > 0 THEN NULL
|
|
-- When one job run has failed or been cancelled, then the workflow is failed
|
|
WHEN j.failedRuns > 0 OR j.cancelledRuns > 0 OR j.succeededRuns > 0 THEN NOW()
|
|
ELSE "finishedAt"
|
|
END,
|
|
"startedAt" = CASE
|
|
-- Started at is final, cannot be changed
|
|
WHEN "startedAt" IS NOT NULL THEN "startedAt"
|
|
-- If a job is running or in a final state, then the workflow has started
|
|
WHEN j.runningRuns > 0 OR j.succeededRuns > 0 OR j.failedRuns > 0 OR j.cancelledRuns > 0 THEN NOW()
|
|
ELSE "startedAt"
|
|
END,
|
|
"duration" = CASE
|
|
-- duration is final, cannot be changed
|
|
WHEN "duration" IS NOT NULL THEN "duration"
|
|
-- We check for running first, because if a job run is running, then the workflow is not finished
|
|
WHEN j.runningRuns > 0 THEN NULL
|
|
-- When one job run has failed or been cancelled, then the workflow is failed
|
|
WHEN j.failedRuns > 0 OR j.cancelledRuns > 0 OR j.succeededRuns > 0 THEN
|
|
EXTRACT(EPOCH FROM (NOW() - "startedAt")) * 1000
|
|
ELSE "duration"
|
|
END
|
|
FROM
|
|
jobRuns j
|
|
WHERE
|
|
wr."id" = j."workflowRunId"
|
|
AND "tenantId" = $2::uuid
|
|
RETURNING wr."id", wr."status"
|
|
)
|
|
SELECT DISTINCT "id", "status"
|
|
FROM updated_workflow_runs
|
|
WHERE "status" IN ('SUCCEEDED', 'FAILED')
|
|
`
|
|
|
|
type ResolveWorkflowRunStatusParams struct {
|
|
Jobrunids []pgtype.UUID `json:"jobrunids"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
type ResolveWorkflowRunStatusRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status WorkflowRunStatus `json:"status"`
|
|
}
|
|
|
|
// Return distinct workflow run ids in a final state
|
|
func (q *Queries) ResolveWorkflowRunStatus(ctx context.Context, db DBTX, arg ResolveWorkflowRunStatusParams) ([]*ResolveWorkflowRunStatusRow, error) {
|
|
rows, err := db.Query(ctx, resolveWorkflowRunStatus, arg.Jobrunids, arg.Tenantid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*ResolveWorkflowRunStatusRow
|
|
for rows.Next() {
|
|
var i ResolveWorkflowRunStatusRow
|
|
if err := rows.Scan(&i.ID, &i.Status); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const softDeleteExpiredWorkflowRunsWithDependencies = `-- name: SoftDeleteExpiredWorkflowRunsWithDependencies :one
|
|
WITH for_delete AS (
|
|
SELECT
|
|
"id"
|
|
FROM "WorkflowRun" wr2
|
|
WHERE
|
|
wr2."tenantId" = $1::uuid AND
|
|
wr2."status" = ANY(cast($2::text[] as "WorkflowRunStatus"[])) AND
|
|
wr2."createdAt" < $3::timestamp AND
|
|
"deletedAt" IS NULL
|
|
ORDER BY "createdAt" ASC
|
|
LIMIT $4 +1
|
|
FOR UPDATE SKIP LOCKED
|
|
),
|
|
expired_with_limit AS (
|
|
SELECT
|
|
for_delete."id" as "id"
|
|
FROM for_delete
|
|
LIMIT $4
|
|
),
|
|
has_more AS (
|
|
SELECT
|
|
CASE
|
|
WHEN COUNT(*) > $4 THEN TRUE
|
|
ELSE FALSE
|
|
END as has_more
|
|
FROM for_delete
|
|
),
|
|
job_runs_to_delete AS (
|
|
SELECT
|
|
"id"
|
|
FROM
|
|
"JobRun"
|
|
WHERE
|
|
"workflowRunId" IN (SELECT "id" FROM expired_with_limit)
|
|
AND "deletedAt" IS NULL
|
|
), step_runs_to_delete AS (
|
|
SELECT
|
|
"id"
|
|
FROM
|
|
"StepRun"
|
|
WHERE
|
|
"jobRunId" IN (SELECT "id" FROM job_runs_to_delete)
|
|
AND "deletedAt" IS NULL
|
|
), update_step_runs AS (
|
|
UPDATE
|
|
"StepRun"
|
|
SET
|
|
"deletedAt" = CURRENT_TIMESTAMP
|
|
WHERE
|
|
"id" IN (SELECT "id" FROM step_runs_to_delete)
|
|
), update_job_runs AS (
|
|
UPDATE
|
|
"JobRun" jr
|
|
SET
|
|
"deletedAt" = CURRENT_TIMESTAMP
|
|
WHERE
|
|
jr."id" IN (SELECT "id" FROM job_runs_to_delete)
|
|
)
|
|
UPDATE
|
|
"WorkflowRun" wr
|
|
SET
|
|
"deletedAt" = CURRENT_TIMESTAMP
|
|
WHERE
|
|
"id" IN (SELECT "id" FROM expired_with_limit) AND
|
|
wr."tenantId" = $1::uuid
|
|
RETURNING
|
|
(SELECT has_more FROM has_more) as has_more
|
|
`
|
|
|
|
type SoftDeleteExpiredWorkflowRunsWithDependenciesParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Statuses []string `json:"statuses"`
|
|
Createdbefore pgtype.Timestamp `json:"createdbefore"`
|
|
Limit interface{} `json:"limit"`
|
|
}
|
|
|
|
func (q *Queries) SoftDeleteExpiredWorkflowRunsWithDependencies(ctx context.Context, db DBTX, arg SoftDeleteExpiredWorkflowRunsWithDependenciesParams) (bool, error) {
|
|
row := db.QueryRow(ctx, softDeleteExpiredWorkflowRunsWithDependencies,
|
|
arg.Tenantid,
|
|
arg.Statuses,
|
|
arg.Createdbefore,
|
|
arg.Limit,
|
|
)
|
|
var has_more bool
|
|
err := row.Scan(&has_more)
|
|
return has_more, err
|
|
}
|
|
|
|
const updateManyWorkflowRun = `-- name: UpdateManyWorkflowRun :many
|
|
UPDATE
|
|
"WorkflowRun"
|
|
SET
|
|
"status" = COALESCE($1::"WorkflowRunStatus", "status"),
|
|
"error" = COALESCE($2::text, "error"),
|
|
"startedAt" = COALESCE($3::timestamp, "startedAt"),
|
|
"finishedAt" = COALESCE($4::timestamp, "finishedAt"),
|
|
"duration" = COALESCE($4::timestamp, "finishedAt") - COALESCE($3::timestamp, "startedAt")
|
|
WHERE
|
|
"tenantId" = $5::uuid AND
|
|
"id" = ANY($6::uuid[])
|
|
RETURNING "WorkflowRun"."createdAt", "WorkflowRun"."updatedAt", "WorkflowRun"."deletedAt", "WorkflowRun"."tenantId", "WorkflowRun"."workflowVersionId", "WorkflowRun".status, "WorkflowRun".error, "WorkflowRun"."startedAt", "WorkflowRun"."finishedAt", "WorkflowRun"."concurrencyGroupId", "WorkflowRun"."displayName", "WorkflowRun".id, "WorkflowRun"."childIndex", "WorkflowRun"."childKey", "WorkflowRun"."parentId", "WorkflowRun"."parentStepRunId", "WorkflowRun"."additionalMetadata", "WorkflowRun".duration, "WorkflowRun".priority
|
|
`
|
|
|
|
type UpdateManyWorkflowRunParams struct {
|
|
Status NullWorkflowRunStatus `json:"status"`
|
|
Error pgtype.Text `json:"error"`
|
|
StartedAt pgtype.Timestamp `json:"startedAt"`
|
|
FinishedAt pgtype.Timestamp `json:"finishedAt"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Ids []pgtype.UUID `json:"ids"`
|
|
}
|
|
|
|
func (q *Queries) UpdateManyWorkflowRun(ctx context.Context, db DBTX, arg UpdateManyWorkflowRunParams) ([]*WorkflowRun, error) {
|
|
rows, err := db.Query(ctx, updateManyWorkflowRun,
|
|
arg.Status,
|
|
arg.Error,
|
|
arg.StartedAt,
|
|
arg.FinishedAt,
|
|
arg.Tenantid,
|
|
arg.Ids,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []*WorkflowRun
|
|
for rows.Next() {
|
|
var i WorkflowRun
|
|
if err := rows.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, &i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const updateWorkflowRun = `-- name: UpdateWorkflowRun :one
|
|
UPDATE
|
|
"WorkflowRun"
|
|
SET
|
|
"status" = CASE
|
|
-- Final states are final, cannot be updated
|
|
WHEN "status" IN ('SUCCEEDED', 'FAILED') THEN "status"
|
|
ELSE COALESCE($1::"WorkflowRunStatus", "status")
|
|
END,
|
|
"error" = COALESCE($2::text, "error"),
|
|
"startedAt" = COALESCE($3::timestamp, "startedAt"),
|
|
"finishedAt" = COALESCE($4::timestamp, "finishedAt"),
|
|
"duration" =
|
|
EXTRACT(EPOCH FROM (COALESCE($4::timestamp, "finishedAt") - COALESCE($3::timestamp, "startedAt")) * 1000)
|
|
|
|
WHERE
|
|
"id" = $5::uuid AND
|
|
"tenantId" = $6::uuid
|
|
RETURNING "WorkflowRun"."createdAt", "WorkflowRun"."updatedAt", "WorkflowRun"."deletedAt", "WorkflowRun"."tenantId", "WorkflowRun"."workflowVersionId", "WorkflowRun".status, "WorkflowRun".error, "WorkflowRun"."startedAt", "WorkflowRun"."finishedAt", "WorkflowRun"."concurrencyGroupId", "WorkflowRun"."displayName", "WorkflowRun".id, "WorkflowRun"."childIndex", "WorkflowRun"."childKey", "WorkflowRun"."parentId", "WorkflowRun"."parentStepRunId", "WorkflowRun"."additionalMetadata", "WorkflowRun".duration, "WorkflowRun".priority
|
|
`
|
|
|
|
type UpdateWorkflowRunParams struct {
|
|
Status NullWorkflowRunStatus `json:"status"`
|
|
Error pgtype.Text `json:"error"`
|
|
StartedAt pgtype.Timestamp `json:"startedAt"`
|
|
FinishedAt pgtype.Timestamp `json:"finishedAt"`
|
|
ID pgtype.UUID `json:"id"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
func (q *Queries) UpdateWorkflowRun(ctx context.Context, db DBTX, arg UpdateWorkflowRunParams) (*WorkflowRun, error) {
|
|
row := db.QueryRow(ctx, updateWorkflowRun,
|
|
arg.Status,
|
|
arg.Error,
|
|
arg.StartedAt,
|
|
arg.FinishedAt,
|
|
arg.ID,
|
|
arg.Tenantid,
|
|
)
|
|
var i WorkflowRun
|
|
err := row.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const updateWorkflowRunGroupKeyFromExpr = `-- name: UpdateWorkflowRunGroupKeyFromExpr :one
|
|
UPDATE "WorkflowRun" wr
|
|
SET "error" = CASE
|
|
-- Final states are final, cannot be updated. We also can't move out of a queued state
|
|
WHEN "status" IN ('SUCCEEDED', 'FAILED', 'QUEUED') THEN "error"
|
|
WHEN $1::text IS NOT NULL THEN $1::text
|
|
ELSE "error"
|
|
END,
|
|
"status" = CASE
|
|
-- Final states are final, cannot be updated. We also can't move out of a queued state
|
|
WHEN "status" IN ('SUCCEEDED', 'FAILED', 'QUEUED') THEN "status"
|
|
-- When the concurrency expression errored, then the workflow is failed
|
|
WHEN $1::text IS NOT NULL THEN 'FAILED'
|
|
-- When the expression evaluated successfully, then queue the workflow run
|
|
ELSE 'QUEUED'
|
|
END,
|
|
"concurrencyGroupId" = CASE
|
|
WHEN $2::text IS NOT NULL THEN $2::text
|
|
ELSE "concurrencyGroupId"
|
|
END
|
|
WHERE
|
|
wr."id" = $3::uuid
|
|
RETURNING wr."id"
|
|
`
|
|
|
|
type UpdateWorkflowRunGroupKeyFromExprParams struct {
|
|
Error pgtype.Text `json:"error"`
|
|
ConcurrencyGroupId pgtype.Text `json:"concurrencyGroupId"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
}
|
|
|
|
func (q *Queries) UpdateWorkflowRunGroupKeyFromExpr(ctx context.Context, db DBTX, arg UpdateWorkflowRunGroupKeyFromExprParams) (pgtype.UUID, error) {
|
|
row := db.QueryRow(ctx, updateWorkflowRunGroupKeyFromExpr, arg.Error, arg.ConcurrencyGroupId, arg.Workflowrunid)
|
|
var id pgtype.UUID
|
|
err := row.Scan(&id)
|
|
return id, err
|
|
}
|
|
|
|
const updateWorkflowRunGroupKeyFromRun = `-- name: UpdateWorkflowRunGroupKeyFromRun :one
|
|
WITH groupKeyRun AS (
|
|
SELECT "id", "status" as groupKeyRunStatus, "output", "workflowRunId"
|
|
FROM "GetGroupKeyRun" as groupKeyRun
|
|
WHERE
|
|
"id" = $2::uuid AND
|
|
"tenantId" = $1::uuid AND
|
|
"deletedAt" IS NULL
|
|
)
|
|
UPDATE "WorkflowRun" workflowRun
|
|
SET "status" = CASE
|
|
-- Final states are final, cannot be updated. We also can't move out of a queued state
|
|
WHEN "status" IN ('SUCCEEDED', 'FAILED', 'QUEUED') THEN "status"
|
|
-- When the GetGroupKeyRun failed or been cancelled, then the workflow is failed
|
|
WHEN groupKeyRun.groupKeyRunStatus IN ('FAILED', 'CANCELLED') THEN 'FAILED'
|
|
WHEN groupKeyRun.output IS NOT NULL THEN 'QUEUED'
|
|
ELSE "status"
|
|
END,
|
|
"finishedAt" = CASE
|
|
-- Final states are final, cannot be updated
|
|
WHEN "finishedAt" IS NOT NULL THEN "finishedAt"
|
|
-- When one job run has failed or been cancelled, then the workflow is failed
|
|
WHEN groupKeyRun.groupKeyRunStatus IN ('FAILED', 'CANCELLED') THEN NOW()
|
|
ELSE "finishedAt"
|
|
END,
|
|
"duration" = CASE
|
|
-- duration is final, cannot be changed
|
|
WHEN "duration" IS NOT NULL THEN "duration"
|
|
WHEN "startedAt" IS NOT NULL AND groupKeyRun.groupKeyRunStatus IN ('FAILED', 'CANCELLED') THEN
|
|
EXTRACT(EPOCH FROM (NOW() - "startedAt")) * 1000
|
|
ELSE "duration"
|
|
END,
|
|
"concurrencyGroupId" = groupKeyRun."output"
|
|
FROM
|
|
groupKeyRun
|
|
WHERE
|
|
workflowRun."id" = groupKeyRun."workflowRunId" AND
|
|
workflowRun."tenantId" = $1::uuid
|
|
RETURNING workflowrun."createdAt", workflowrun."updatedAt", workflowrun."deletedAt", workflowrun."tenantId", workflowrun."workflowVersionId", workflowrun.status, workflowrun.error, workflowrun."startedAt", workflowrun."finishedAt", workflowrun."concurrencyGroupId", workflowrun."displayName", workflowrun.id, workflowrun."childIndex", workflowrun."childKey", workflowrun."parentId", workflowrun."parentStepRunId", workflowrun."additionalMetadata", workflowrun.duration, workflowrun.priority
|
|
`
|
|
|
|
type UpdateWorkflowRunGroupKeyFromRunParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
Groupkeyrunid pgtype.UUID `json:"groupkeyrunid"`
|
|
}
|
|
|
|
func (q *Queries) UpdateWorkflowRunGroupKeyFromRun(ctx context.Context, db DBTX, arg UpdateWorkflowRunGroupKeyFromRunParams) (*WorkflowRun, error) {
|
|
row := db.QueryRow(ctx, updateWorkflowRunGroupKeyFromRun, arg.Tenantid, arg.Groupkeyrunid)
|
|
var i WorkflowRun
|
|
err := row.Scan(
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.DeletedAt,
|
|
&i.TenantId,
|
|
&i.WorkflowVersionId,
|
|
&i.Status,
|
|
&i.Error,
|
|
&i.StartedAt,
|
|
&i.FinishedAt,
|
|
&i.ConcurrencyGroupId,
|
|
&i.DisplayName,
|
|
&i.ID,
|
|
&i.ChildIndex,
|
|
&i.ChildKey,
|
|
&i.ParentId,
|
|
&i.ParentStepRunId,
|
|
&i.AdditionalMetadata,
|
|
&i.Duration,
|
|
&i.Priority,
|
|
)
|
|
return &i, err
|
|
}
|
|
|
|
const updateWorkflowRunStickyState = `-- name: UpdateWorkflowRunStickyState :exec
|
|
UPDATE "WorkflowRunStickyState"
|
|
SET
|
|
"updatedAt" = CURRENT_TIMESTAMP,
|
|
"desiredWorkerId" = $1::uuid
|
|
WHERE
|
|
"workflowRunId" = $2::uuid AND
|
|
"tenantId" = $3::uuid
|
|
`
|
|
|
|
type UpdateWorkflowRunStickyStateParams struct {
|
|
DesiredWorkerId pgtype.UUID `json:"desiredWorkerId"`
|
|
Workflowrunid pgtype.UUID `json:"workflowrunid"`
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
}
|
|
|
|
func (q *Queries) UpdateWorkflowRunStickyState(ctx context.Context, db DBTX, arg UpdateWorkflowRunStickyStateParams) error {
|
|
_, err := db.Exec(ctx, updateWorkflowRunStickyState, arg.DesiredWorkerId, arg.Workflowrunid, arg.Tenantid)
|
|
return err
|
|
}
|
|
|
|
const workflowRunsMetricsCount = `-- name: WorkflowRunsMetricsCount :one
|
|
SELECT
|
|
COUNT(CASE WHEN runs."status" = 'PENDING' THEN 1 END) AS "PENDING",
|
|
COUNT(CASE WHEN runs."status" = 'RUNNING' THEN 1 END) AS "RUNNING",
|
|
COUNT(CASE WHEN runs."status" = 'SUCCEEDED' THEN 1 END) AS "SUCCEEDED",
|
|
COUNT(CASE WHEN runs."status" = 'FAILED' THEN 1 END) AS "FAILED",
|
|
COUNT(CASE WHEN runs."status" = 'QUEUED' THEN 1 END) AS "QUEUED"
|
|
FROM
|
|
"WorkflowRun" as runs
|
|
LEFT JOIN
|
|
"WorkflowRunTriggeredBy" as runTriggers ON runTriggers."parentId" = runs."id"
|
|
LEFT JOIN
|
|
"Event" as events ON runTriggers."eventId" = events."id"
|
|
LEFT JOIN
|
|
"WorkflowVersion" as workflowVersion ON runs."workflowVersionId" = workflowVersion."id"
|
|
LEFT JOIN
|
|
"Workflow" as workflow ON workflowVersion."workflowId" = workflow."id"
|
|
WHERE
|
|
runs."tenantId" = $1::uuid AND
|
|
runs."createdAt" > NOW() - INTERVAL '1 day' AND
|
|
(
|
|
$2::timestamp IS NULL OR
|
|
runs."createdAt" > $2::timestamp
|
|
) AND
|
|
(
|
|
$3::timestamp IS NULL OR
|
|
runs."createdAt" < $3::timestamp
|
|
) AND
|
|
runs."deletedAt" IS NULL AND
|
|
workflowVersion."deletedAt" IS NULL AND
|
|
workflow."deletedAt" IS NULL AND
|
|
(
|
|
$4::uuid IS NULL OR
|
|
workflow."id" = $4::uuid
|
|
) AND
|
|
(
|
|
$5::uuid IS NULL OR
|
|
runs."parentId" = $5::uuid
|
|
) AND
|
|
(
|
|
$6::uuid IS NULL OR
|
|
runs."parentStepRunId" = $6::uuid
|
|
) AND
|
|
(
|
|
$7::jsonb IS NULL OR
|
|
runs."additionalMetadata" @> $7::jsonb
|
|
) AND
|
|
(
|
|
$8::uuid IS NULL OR
|
|
events."id" = $8::uuid
|
|
)
|
|
`
|
|
|
|
type WorkflowRunsMetricsCountParams struct {
|
|
Tenantid pgtype.UUID `json:"tenantid"`
|
|
CreatedAfter pgtype.Timestamp `json:"createdAfter"`
|
|
CreatedBefore pgtype.Timestamp `json:"createdBefore"`
|
|
WorkflowId pgtype.UUID `json:"workflowId"`
|
|
ParentId pgtype.UUID `json:"parentId"`
|
|
ParentStepRunId pgtype.UUID `json:"parentStepRunId"`
|
|
AdditionalMetadata []byte `json:"additionalMetadata"`
|
|
EventId pgtype.UUID `json:"eventId"`
|
|
}
|
|
|
|
type WorkflowRunsMetricsCountRow struct {
|
|
PENDING int64 `json:"PENDING"`
|
|
RUNNING int64 `json:"RUNNING"`
|
|
SUCCEEDED int64 `json:"SUCCEEDED"`
|
|
FAILED int64 `json:"FAILED"`
|
|
QUEUED int64 `json:"QUEUED"`
|
|
}
|
|
|
|
func (q *Queries) WorkflowRunsMetricsCount(ctx context.Context, db DBTX, arg WorkflowRunsMetricsCountParams) (*WorkflowRunsMetricsCountRow, error) {
|
|
row := db.QueryRow(ctx, workflowRunsMetricsCount,
|
|
arg.Tenantid,
|
|
arg.CreatedAfter,
|
|
arg.CreatedBefore,
|
|
arg.WorkflowId,
|
|
arg.ParentId,
|
|
arg.ParentStepRunId,
|
|
arg.AdditionalMetadata,
|
|
arg.EventId,
|
|
)
|
|
var i WorkflowRunsMetricsCountRow
|
|
err := row.Scan(
|
|
&i.PENDING,
|
|
&i.RUNNING,
|
|
&i.SUCCEEDED,
|
|
&i.FAILED,
|
|
&i.QUEUED,
|
|
)
|
|
return &i, err
|
|
}
|