Files
hatchet/pkg/repository/v1/sqlcv1/olap.sql.go
2025-06-20 01:31:04 +05:30

2468 lines
78 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.29.0
// source: olap.sql
package sqlcv1
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
type BulkCreateEventTriggersParams struct {
RunID int64 `json:"run_id"`
RunInsertedAt pgtype.Timestamptz `json:"run_inserted_at"`
EventID int64 `json:"event_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
FilterID pgtype.UUID `json:"filter_id"`
}
const countEvents = `-- name: CountEvents :one
WITH included_events AS (
SELECT DISTINCT e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope
FROM v1_event_lookup_table_olap elt
JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at)
LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at)
LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at)
WHERE
e.tenant_id = $1
AND (
$2::TEXT[] IS NULL OR
"key" = ANY($2::TEXT[])
)
AND e.seen_at >= $3::TIMESTAMPTZ
AND (
$4::TIMESTAMPTZ IS NULL OR
e.seen_at <= $4::TIMESTAMPTZ
)
AND (
$5::UUID[] IS NULL OR
r.workflow_id = ANY($5::UUID[])
)
AND (
$6::UUID[] IS NULL OR
elt.external_id = ANY($6::UUID[])
)
AND (
$7::JSONB IS NULL OR
e.additional_metadata @> $7::JSONB
)
AND (
CAST($8::text[] AS v1_readable_status_olap[]) IS NULL OR
r.readable_status = ANY(CAST($8::text[] AS v1_readable_status_olap[]))
)
AND (
$9::TEXT[] IS NULL OR
e.scope = ANY($9::TEXT[])
)
ORDER BY e.seen_at DESC, e.id
LIMIT 20000
)
SELECT COUNT(*)
FROM included_events e
`
type CountEventsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Keys []string `json:"keys"`
Since pgtype.Timestamptz `json:"since"`
Until pgtype.Timestamptz `json:"until"`
WorkflowIds []pgtype.UUID `json:"workflowIds"`
EventIds []pgtype.UUID `json:"eventIds"`
AdditionalMetadata []byte `json:"additionalMetadata"`
Statuses []string `json:"statuses"`
Scopes []string `json:"scopes"`
}
func (q *Queries) CountEvents(ctx context.Context, db DBTX, arg CountEventsParams) (int64, error) {
row := db.QueryRow(ctx, countEvents,
arg.Tenantid,
arg.Keys,
arg.Since,
arg.Until,
arg.WorkflowIds,
arg.EventIds,
arg.AdditionalMetadata,
arg.Statuses,
arg.Scopes,
)
var count int64
err := row.Scan(&count)
return count, err
}
type CreateDAGsOLAPParams struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
DisplayName string `json:"display_name"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
TotalTasks int32 `json:"total_tasks"`
}
const createOLAPEventPartitions = `-- name: CreateOLAPEventPartitions :exec
SELECT
create_v1_range_partition('v1_events_olap'::text, $1::date),
create_v1_range_partition('v1_event_to_run_olap'::text, $1::date)
`
func (q *Queries) CreateOLAPEventPartitions(ctx context.Context, db DBTX, date pgtype.Date) error {
_, err := db.Exec(ctx, createOLAPEventPartitions, date)
return err
}
const createOLAPPartitions = `-- name: CreateOLAPPartitions :exec
SELECT
create_v1_hash_partitions('v1_task_events_olap_tmp'::text, $1::int),
create_v1_hash_partitions('v1_task_status_updates_tmp'::text, $1::int),
create_v1_olap_partition_with_date_and_status('v1_tasks_olap'::text, $2::date),
create_v1_olap_partition_with_date_and_status('v1_runs_olap'::text, $2::date),
create_v1_olap_partition_with_date_and_status('v1_dags_olap'::text, $2::date),
create_v1_weekly_range_partition('v1_event_lookup_table_olap'::text, $2::date)
`
type CreateOLAPPartitionsParams struct {
Partitions int32 `json:"partitions"`
Date pgtype.Date `json:"date"`
}
func (q *Queries) CreateOLAPPartitions(ctx context.Context, db DBTX, arg CreateOLAPPartitionsParams) error {
_, err := db.Exec(ctx, createOLAPPartitions, arg.Partitions, arg.Date)
return err
}
type CreateTaskEventsOLAPParams struct {
TenantID pgtype.UUID `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"`
EventType V1EventTypeOlap `json:"event_type"`
WorkflowID pgtype.UUID `json:"workflow_id"`
EventTimestamp pgtype.Timestamptz `json:"event_timestamp"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
RetryCount int32 `json:"retry_count"`
ErrorMessage pgtype.Text `json:"error_message"`
Output []byte `json:"output"`
WorkerID pgtype.UUID `json:"worker_id"`
AdditionalEventData pgtype.Text `json:"additional__event_data"`
AdditionalEventMessage pgtype.Text `json:"additional__event_message"`
}
type CreateTaskEventsOLAPTmpParams struct {
TenantID pgtype.UUID `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"`
EventType V1EventTypeOlap `json:"event_type"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
RetryCount int32 `json:"retry_count"`
WorkerID pgtype.UUID `json:"worker_id"`
}
type CreateTasksOLAPParams struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
Queue string `json:"queue"`
ActionID string `json:"action_id"`
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
WorkflowRunID pgtype.UUID `json:"workflow_run_id"`
ScheduleTimeout string `json:"schedule_timeout"`
StepTimeout pgtype.Text `json:"step_timeout"`
Priority pgtype.Int4 `json:"priority"`
Sticky V1StickyStrategyOlap `json:"sticky"`
DesiredWorkerID pgtype.UUID `json:"desired_worker_id"`
ExternalID pgtype.UUID `json:"external_id"`
DisplayName string `json:"display_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
DagID pgtype.Int8 `json:"dag_id"`
DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
}
const flattenTasksByExternalIds = `-- name: FlattenTasksByExternalIds :many
WITH lookups AS (
SELECT
tenant_id, external_id, task_id, dag_id, inserted_at
FROM
v1_lookup_table_olap
WHERE
external_id = ANY($1::uuid[])
AND tenant_id = $2::uuid
), tasks_from_dags AS (
SELECT
l.tenant_id,
dt.task_id,
dt.task_inserted_at
FROM
lookups l
JOIN
v1_dag_to_task_olap dt ON l.dag_id = dt.dag_id AND l.inserted_at = dt.dag_inserted_at
WHERE
l.dag_id IS NOT NULL
), unioned_tasks AS (
SELECT
l.tenant_id AS tenant_id,
l.task_id AS task_id,
l.inserted_at AS task_inserted_at
FROM
lookups l
UNION ALL
SELECT
t.tenant_id AS tenant_id,
t.task_id AS task_id,
t.task_inserted_at AS task_inserted_at
FROM
tasks_from_dags t
)
SELECT
t.tenant_id,
t.id,
t.inserted_at,
t.external_id,
t.latest_retry_count AS retry_count
FROM
v1_tasks_olap t
JOIN
unioned_tasks ut ON (t.inserted_at, t.id) = (ut.task_inserted_at, ut.task_id)
`
type FlattenTasksByExternalIdsParams struct {
Externalids []pgtype.UUID `json:"externalids"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type FlattenTasksByExternalIdsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
RetryCount int32 `json:"retry_count"`
}
// Get retry counts for each task
func (q *Queries) FlattenTasksByExternalIds(ctx context.Context, db DBTX, arg FlattenTasksByExternalIdsParams) ([]*FlattenTasksByExternalIdsRow, error) {
rows, err := db.Query(ctx, flattenTasksByExternalIds, arg.Externalids, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*FlattenTasksByExternalIdsRow
for rows.Next() {
var i FlattenTasksByExternalIdsRow
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.RetryCount,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getRunsListRecursive = `-- name: GetRunsListRecursive :many
WITH RECURSIVE all_runs AS (
-- seed term
SELECT
t.id,
t.inserted_at,
t.tenant_id,
t.external_id,
t.parent_task_external_id,
0 AS depth
FROM
v1_lookup_table_olap lt
JOIN v1_tasks_olap t
ON t.inserted_at = lt.inserted_at
AND t.id = lt.task_id
WHERE
lt.external_id = ANY($2::uuid[])
UNION ALL
-- single recursive term for both DAG- and TASK-driven children
SELECT
t.id,
t.inserted_at,
t.tenant_id,
t.external_id,
t.parent_task_external_id,
ar.depth + 1 AS depth
FROM
v1_runs_olap r
JOIN all_runs ar ON ar.external_id = r.parent_task_external_id
-- only present when r.kind = 'DAG'
LEFT JOIN v1_dag_to_task_olap dt ON r.kind = 'DAG' AND r.id = dt.dag_id AND r.inserted_at = dt.dag_inserted_at
-- pick the correct task row for either branch
JOIN v1_tasks_olap t
ON (
r.kind = 'DAG'
AND t.id = dt.task_id
AND t.inserted_at = dt.task_inserted_at
)
OR (
r.kind = 'TASK'
AND t.id = r.id
AND t.inserted_at = r.inserted_at
)
WHERE
r.tenant_id = $1::uuid
AND ar.depth < $3::int
AND r.inserted_at >= $4::timestamptz
AND t.inserted_at >= $4::timestamptz
)
SELECT
tenant_id,
id,
inserted_at,
external_id,
parent_task_external_id,
depth
FROM
all_runs
WHERE
tenant_id = $1::uuid
`
type GetRunsListRecursiveParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Taskexternalids []pgtype.UUID `json:"taskexternalids"`
Depth int32 `json:"depth"`
Createdafter pgtype.Timestamptz `json:"createdafter"`
}
type GetRunsListRecursiveRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
Depth int32 `json:"depth"`
}
func (q *Queries) GetRunsListRecursive(ctx context.Context, db DBTX, arg GetRunsListRecursiveParams) ([]*GetRunsListRecursiveRow, error) {
rows, err := db.Query(ctx, getRunsListRecursive,
arg.Tenantid,
arg.Taskexternalids,
arg.Depth,
arg.Createdafter,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*GetRunsListRecursiveRow
for rows.Next() {
var i GetRunsListRecursiveRow
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.ParentTaskExternalID,
&i.Depth,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTaskPointMetrics = `-- name: GetTaskPointMetrics :many
SELECT
DATE_BIN(
COALESCE($1::INTERVAL, '1 minute'),
task_inserted_at,
TIMESTAMPTZ '1970-01-01 00:00:00+00'
) :: TIMESTAMPTZ AS bucket_2,
COUNT(*) FILTER (WHERE readable_status = 'COMPLETED') AS completed_count,
COUNT(*) FILTER (WHERE readable_status = 'FAILED') AS failed_count
FROM
v1_task_events_olap
WHERE
tenant_id = $2::UUID
AND task_inserted_at BETWEEN $3::TIMESTAMPTZ AND $4::TIMESTAMPTZ
GROUP BY bucket_2
ORDER BY bucket_2
`
type GetTaskPointMetricsParams struct {
Interval pgtype.Interval `json:"interval"`
Tenantid pgtype.UUID `json:"tenantid"`
Createdafter pgtype.Timestamptz `json:"createdafter"`
Createdbefore pgtype.Timestamptz `json:"createdbefore"`
}
type GetTaskPointMetricsRow struct {
Bucket2 pgtype.Timestamptz `json:"bucket_2"`
CompletedCount int64 `json:"completed_count"`
FailedCount int64 `json:"failed_count"`
}
func (q *Queries) GetTaskPointMetrics(ctx context.Context, db DBTX, arg GetTaskPointMetricsParams) ([]*GetTaskPointMetricsRow, error) {
rows, err := db.Query(ctx, getTaskPointMetrics,
arg.Interval,
arg.Tenantid,
arg.Createdafter,
arg.Createdbefore,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*GetTaskPointMetricsRow
for rows.Next() {
var i GetTaskPointMetricsRow
if err := rows.Scan(&i.Bucket2, &i.CompletedCount, &i.FailedCount); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTenantStatusMetrics = `-- name: GetTenantStatusMetrics :one
WITH task_external_ids AS (
SELECT external_id
FROM v1_runs_olap
WHERE (
$5::UUID IS NULL OR parent_task_external_id = $5::UUID
) AND (
$6::UUID IS NULL
OR (id, inserted_at) IN (
SELECT etr.run_id, etr.run_inserted_at
FROM v1_event_lookup_table_olap lt
JOIN v1_events_olap e ON (lt.tenant_id, lt.event_id, lt.event_seen_at) = (e.tenant_id, e.id, e.seen_at)
JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at)
WHERE
lt.tenant_id = $1::uuid
AND lt.external_id = $6::UUID
)
)
)
SELECT
tenant_id,
COUNT(*) FILTER (WHERE readable_status = 'QUEUED') AS total_queued,
COUNT(*) FILTER (WHERE readable_status = 'RUNNING') AS total_running,
COUNT(*) FILTER (WHERE readable_status = 'COMPLETED') AS total_completed,
COUNT(*) FILTER (WHERE readable_status = 'CANCELLED') AS total_cancelled,
COUNT(*) FILTER (WHERE readable_status = 'FAILED') AS total_failed
FROM v1_statuses_olap
WHERE
tenant_id = $1::UUID
AND inserted_at >= $2::TIMESTAMPTZ
AND (
$3::TIMESTAMPTZ IS NULL OR inserted_at <= $3::TIMESTAMPTZ
)
AND (
$4::UUID[] IS NULL OR workflow_id = ANY($4::UUID[])
)
AND external_id IN (
SELECT external_id
FROM task_external_ids
)
GROUP BY tenant_id
`
type GetTenantStatusMetricsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Createdafter pgtype.Timestamptz `json:"createdafter"`
CreatedBefore pgtype.Timestamptz `json:"createdBefore"`
WorkflowIds []pgtype.UUID `json:"workflowIds"`
ParentTaskExternalId pgtype.UUID `json:"parentTaskExternalId"`
TriggeringEventExternalId pgtype.UUID `json:"triggeringEventExternalId"`
}
type GetTenantStatusMetricsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
TotalQueued int64 `json:"total_queued"`
TotalRunning int64 `json:"total_running"`
TotalCompleted int64 `json:"total_completed"`
TotalCancelled int64 `json:"total_cancelled"`
TotalFailed int64 `json:"total_failed"`
}
func (q *Queries) GetTenantStatusMetrics(ctx context.Context, db DBTX, arg GetTenantStatusMetricsParams) (*GetTenantStatusMetricsRow, error) {
row := db.QueryRow(ctx, getTenantStatusMetrics,
arg.Tenantid,
arg.Createdafter,
arg.CreatedBefore,
arg.WorkflowIds,
arg.ParentTaskExternalId,
arg.TriggeringEventExternalId,
)
var i GetTenantStatusMetricsRow
err := row.Scan(
&i.TenantID,
&i.TotalQueued,
&i.TotalRunning,
&i.TotalCompleted,
&i.TotalCancelled,
&i.TotalFailed,
)
return &i, err
}
const getWorkflowRunIdFromDagIdInsertedAt = `-- name: GetWorkflowRunIdFromDagIdInsertedAt :one
SELECT external_id
FROM v1_dags_olap
WHERE
id = $1::bigint
AND inserted_at = $2::timestamptz
`
type GetWorkflowRunIdFromDagIdInsertedAtParams struct {
Dagid int64 `json:"dagid"`
Daginsertedat pgtype.Timestamptz `json:"daginsertedat"`
}
func (q *Queries) GetWorkflowRunIdFromDagIdInsertedAt(ctx context.Context, db DBTX, arg GetWorkflowRunIdFromDagIdInsertedAtParams) (pgtype.UUID, error) {
row := db.QueryRow(ctx, getWorkflowRunIdFromDagIdInsertedAt, arg.Dagid, arg.Daginsertedat)
var external_id pgtype.UUID
err := row.Scan(&external_id)
return external_id, err
}
const listEventKeys = `-- name: ListEventKeys :many
SELECT DISTINCT key
FROM
v1_events_olap
WHERE
tenant_id = $1::uuid
AND seen_at > NOW() - INTERVAL '1 day'
`
func (q *Queries) ListEventKeys(ctx context.Context, db DBTX, tenantid pgtype.UUID) ([]string, error) {
rows, err := db.Query(ctx, listEventKeys, tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
return nil, err
}
items = append(items, key)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listEvents = `-- name: ListEvents :many
WITH included_events AS (
SELECT
e.tenant_id, e.id, e.external_id, e.seen_at, e.key, e.payload, e.additional_metadata, e.scope,
JSON_AGG(JSON_BUILD_OBJECT('run_external_id', r.external_id, 'filter_id', etr.filter_id)) FILTER (WHERE r.external_id IS NOT NULL)::JSONB AS triggered_runs
FROM v1_event_lookup_table_olap elt
JOIN v1_events_olap e ON (elt.tenant_id, elt.event_id, elt.event_seen_at) = (e.tenant_id, e.id, e.seen_at)
LEFT JOIN v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at)
LEFT JOIN v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at)
WHERE
e.tenant_id = $1
AND (
$2::TEXT[] IS NULL OR
"key" = ANY($2::TEXT[])
)
AND e.seen_at >= $3::TIMESTAMPTZ
AND (
$4::TIMESTAMPTZ IS NULL OR
e.seen_at <= $4::TIMESTAMPTZ
)
AND (
$5::UUID[] IS NULL OR
r.workflow_id = ANY($5::UUID[])
)
AND (
$6::UUID[] IS NULL OR
elt.external_id = ANY($6::UUID[])
)
AND (
$7::JSONB IS NULL OR
e.additional_metadata @> $7::JSONB
)
AND (
CAST($8::text[] AS v1_readable_status_olap[]) IS NULL OR
r.readable_status = ANY(CAST($8::text[] AS v1_readable_status_olap[]))
)
AND (
$9::TEXT[] IS NULL OR
e.scope = ANY($9::TEXT[])
)
GROUP BY
e.tenant_id,
e.id,
e.external_id,
e.seen_at,
e.key,
e.payload,
e.additional_metadata,
e.scope
ORDER BY e.seen_at DESC, e.id
OFFSET
COALESCE($10::BIGINT, 0)
LIMIT
COALESCE($11::BIGINT, 50)
), status_counts AS (
SELECT
e.tenant_id,
e.id,
e.seen_at,
COUNT(*) FILTER (WHERE r.readable_status = 'QUEUED') AS queued_count,
COUNT(*) FILTER (WHERE r.readable_status = 'RUNNING') AS running_count,
COUNT(*) FILTER (WHERE r.readable_status = 'COMPLETED') AS completed_count,
COUNT(*) FILTER (WHERE r.readable_status = 'CANCELLED') AS cancelled_count,
COUNT(*) FILTER (WHERE r.readable_status = 'FAILED') AS failed_count
FROM
included_events e
LEFT JOIN
v1_event_to_run_olap etr ON (e.id, e.seen_at) = (etr.event_id, etr.event_seen_at)
LEFT JOIN
v1_runs_olap r ON (etr.run_id, etr.run_inserted_at) = (r.id, r.inserted_at)
GROUP BY
e.tenant_id, e.id, e.seen_at
)
SELECT
e.tenant_id,
e.id AS event_id,
e.external_id AS event_external_id,
e.seen_at AS event_seen_at,
e.key AS event_key,
e.payload AS event_payload,
e.additional_metadata AS event_additional_metadata,
e.scope AS event_scope,
sc.queued_count,
sc.running_count,
sc.completed_count,
sc.cancelled_count,
sc.failed_count,
e.triggered_runs
FROM
included_events e
LEFT JOIN
status_counts sc ON (e.tenant_id, e.id, e.seen_at) = (sc.tenant_id, sc.id, sc.seen_at)
ORDER BY e.seen_at DESC
`
type ListEventsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Keys []string `json:"keys"`
Since pgtype.Timestamptz `json:"since"`
Until pgtype.Timestamptz `json:"until"`
WorkflowIds []pgtype.UUID `json:"workflowIds"`
EventIds []pgtype.UUID `json:"eventIds"`
AdditionalMetadata []byte `json:"additionalMetadata"`
Statuses []string `json:"statuses"`
Scopes []string `json:"scopes"`
Offset pgtype.Int8 `json:"offset"`
Limit pgtype.Int8 `json:"limit"`
}
type ListEventsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
EventID int64 `json:"event_id"`
EventExternalID pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
EventKey string `json:"event_key"`
EventPayload []byte `json:"event_payload"`
EventAdditionalMetadata []byte `json:"event_additional_metadata"`
EventScope pgtype.Text `json:"event_scope"`
QueuedCount pgtype.Int8 `json:"queued_count"`
RunningCount pgtype.Int8 `json:"running_count"`
CompletedCount pgtype.Int8 `json:"completed_count"`
CancelledCount pgtype.Int8 `json:"cancelled_count"`
FailedCount pgtype.Int8 `json:"failed_count"`
TriggeredRuns []byte `json:"triggered_runs"`
}
func (q *Queries) ListEvents(ctx context.Context, db DBTX, arg ListEventsParams) ([]*ListEventsRow, error) {
rows, err := db.Query(ctx, listEvents,
arg.Tenantid,
arg.Keys,
arg.Since,
arg.Until,
arg.WorkflowIds,
arg.EventIds,
arg.AdditionalMetadata,
arg.Statuses,
arg.Scopes,
arg.Offset,
arg.Limit,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListEventsRow
for rows.Next() {
var i ListEventsRow
if err := rows.Scan(
&i.TenantID,
&i.EventID,
&i.EventExternalID,
&i.EventSeenAt,
&i.EventKey,
&i.EventPayload,
&i.EventAdditionalMetadata,
&i.EventScope,
&i.QueuedCount,
&i.RunningCount,
&i.CompletedCount,
&i.CancelledCount,
&i.FailedCount,
&i.TriggeredRuns,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listOLAPPartitionsBeforeDate = `-- name: ListOLAPPartitionsBeforeDate :many
WITH task_partitions AS (
SELECT 'v1_tasks_olap' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_tasks_olap'::text, $2::date) AS p
), dag_partitions AS (
SELECT 'v1_dags_olap' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dags_olap', $2::date) AS p
), runs_partitions AS (
SELECT 'v1_runs_olap' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_runs_olap', $2::date) AS p
), events_partitions AS (
SELECT 'v1_events_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_events_olap', $2::date) AS p
), event_trigger_partitions AS (
SELECT 'v1_event_to_run_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_event_to_run_olap', $2::date) AS p
), events_lookup_table_partitions AS (
SELECT 'v1_event_lookup_table_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_event_lookup_table_olap', $2::date) AS p
), candidates AS (
SELECT
parent_table, partition_name
FROM
task_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
dag_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
runs_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
events_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
event_trigger_partitions
UNION ALL
SELECT
parent_table, partition_name
FROM
events_lookup_table_partitions
)
SELECT parent_table, partition_name
FROM candidates
WHERE
CASE
WHEN $1::BOOLEAN THEN TRUE
ELSE parent_table NOT IN ('v1_events_olap', 'v1_event_to_run_olap')
END
`
type ListOLAPPartitionsBeforeDateParams struct {
Shouldpartitioneventstables bool `json:"shouldpartitioneventstables"`
Date pgtype.Date `json:"date"`
}
type ListOLAPPartitionsBeforeDateRow struct {
ParentTable string `json:"parent_table"`
PartitionName string `json:"partition_name"`
}
func (q *Queries) ListOLAPPartitionsBeforeDate(ctx context.Context, db DBTX, arg ListOLAPPartitionsBeforeDateParams) ([]*ListOLAPPartitionsBeforeDateRow, error) {
rows, err := db.Query(ctx, listOLAPPartitionsBeforeDate, arg.Shouldpartitioneventstables, arg.Date)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListOLAPPartitionsBeforeDateRow
for rows.Next() {
var i ListOLAPPartitionsBeforeDateRow
if err := rows.Scan(&i.ParentTable, &i.PartitionName); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTaskEvents = `-- name: ListTaskEvents :many
WITH aggregated_events AS (
SELECT
tenant_id,
task_id,
task_inserted_at,
retry_count,
event_type,
MIN(event_timestamp) AS time_first_seen,
MAX(event_timestamp) AS time_last_seen,
COUNT(*) AS count,
MIN(id) AS first_id
FROM v1_task_events_olap
WHERE
tenant_id = $1::uuid
AND task_id = $2::bigint
AND task_inserted_at = $3::timestamptz
GROUP BY tenant_id, task_id, task_inserted_at, retry_count, event_type
)
SELECT
a.tenant_id,
a.task_id,
a.task_inserted_at,
a.retry_count,
a.event_type,
a.time_first_seen,
a.time_last_seen,
a.count,
t.id,
t.event_timestamp,
t.readable_status,
t.error_message,
t.output,
t.worker_id,
t.additional__event_data,
t.additional__event_message
FROM aggregated_events a
JOIN v1_task_events_olap t
ON t.tenant_id = a.tenant_id
AND t.task_id = a.task_id
AND t.task_inserted_at = a.task_inserted_at
AND t.id = a.first_id
ORDER BY a.time_first_seen DESC, t.event_timestamp DESC
`
type ListTaskEventsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Taskid int64 `json:"taskid"`
Taskinsertedat pgtype.Timestamptz `json:"taskinsertedat"`
}
type ListTaskEventsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"`
RetryCount int32 `json:"retry_count"`
EventType V1EventTypeOlap `json:"event_type"`
TimeFirstSeen interface{} `json:"time_first_seen"`
TimeLastSeen interface{} `json:"time_last_seen"`
Count int64 `json:"count"`
ID int64 `json:"id"`
EventTimestamp pgtype.Timestamptz `json:"event_timestamp"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
ErrorMessage pgtype.Text `json:"error_message"`
Output []byte `json:"output"`
WorkerID pgtype.UUID `json:"worker_id"`
AdditionalEventData pgtype.Text `json:"additional__event_data"`
AdditionalEventMessage pgtype.Text `json:"additional__event_message"`
}
func (q *Queries) ListTaskEvents(ctx context.Context, db DBTX, arg ListTaskEventsParams) ([]*ListTaskEventsRow, error) {
rows, err := db.Query(ctx, listTaskEvents, arg.Tenantid, arg.Taskid, arg.Taskinsertedat)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTaskEventsRow
for rows.Next() {
var i ListTaskEventsRow
if err := rows.Scan(
&i.TenantID,
&i.TaskID,
&i.TaskInsertedAt,
&i.RetryCount,
&i.EventType,
&i.TimeFirstSeen,
&i.TimeLastSeen,
&i.Count,
&i.ID,
&i.EventTimestamp,
&i.ReadableStatus,
&i.ErrorMessage,
&i.Output,
&i.WorkerID,
&i.AdditionalEventData,
&i.AdditionalEventMessage,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTaskEventsForWorkflowRun = `-- name: ListTaskEventsForWorkflowRun :many
WITH tasks AS (
SELECT dt.task_id, dt.task_inserted_at
FROM v1_lookup_table_olap lt
JOIN v1_dag_to_task_olap dt ON lt.dag_id = dt.dag_id AND lt.inserted_at = dt.dag_inserted_at
WHERE
lt.external_id = $1::uuid
AND lt.tenant_id = $2::uuid
), aggregated_events AS (
SELECT
tenant_id,
task_id,
task_inserted_at,
retry_count,
event_type,
MIN(event_timestamp)::timestamptz AS time_first_seen,
MAX(event_timestamp)::timestamptz AS time_last_seen,
COUNT(*) AS count,
MIN(id) AS first_id
FROM v1_task_events_olap
WHERE
tenant_id = $2::uuid
AND (task_id, task_inserted_at) IN (SELECT task_id, task_inserted_at FROM tasks)
GROUP BY tenant_id, task_id, task_inserted_at, retry_count, event_type
)
SELECT
a.tenant_id,
a.task_id,
a.task_inserted_at,
a.retry_count,
a.event_type,
a.time_first_seen,
a.time_last_seen,
a.count,
t.id,
t.event_timestamp,
t.readable_status,
t.error_message,
t.output,
t.worker_id,
t.additional__event_data,
t.additional__event_message,
tsk.display_name,
tsk.external_id AS task_external_id
FROM aggregated_events a
JOIN v1_task_events_olap t
ON t.tenant_id = a.tenant_id
AND t.task_id = a.task_id
AND t.task_inserted_at = a.task_inserted_at
AND t.id = a.first_id
JOIN v1_tasks_olap tsk
ON (tsk.tenant_id, tsk.id, tsk.inserted_at) = (t.tenant_id, t.task_id, t.task_inserted_at)
ORDER BY a.time_first_seen DESC, t.event_timestamp DESC
`
type ListTaskEventsForWorkflowRunParams struct {
Workflowrunid pgtype.UUID `json:"workflowrunid"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListTaskEventsForWorkflowRunRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"`
RetryCount int32 `json:"retry_count"`
EventType V1EventTypeOlap `json:"event_type"`
TimeFirstSeen pgtype.Timestamptz `json:"time_first_seen"`
TimeLastSeen pgtype.Timestamptz `json:"time_last_seen"`
Count int64 `json:"count"`
ID int64 `json:"id"`
EventTimestamp pgtype.Timestamptz `json:"event_timestamp"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
ErrorMessage pgtype.Text `json:"error_message"`
Output []byte `json:"output"`
WorkerID pgtype.UUID `json:"worker_id"`
AdditionalEventData pgtype.Text `json:"additional__event_data"`
AdditionalEventMessage pgtype.Text `json:"additional__event_message"`
DisplayName string `json:"display_name"`
TaskExternalID pgtype.UUID `json:"task_external_id"`
}
func (q *Queries) ListTaskEventsForWorkflowRun(ctx context.Context, db DBTX, arg ListTaskEventsForWorkflowRunParams) ([]*ListTaskEventsForWorkflowRunRow, error) {
rows, err := db.Query(ctx, listTaskEventsForWorkflowRun, arg.Workflowrunid, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTaskEventsForWorkflowRunRow
for rows.Next() {
var i ListTaskEventsForWorkflowRunRow
if err := rows.Scan(
&i.TenantID,
&i.TaskID,
&i.TaskInsertedAt,
&i.RetryCount,
&i.EventType,
&i.TimeFirstSeen,
&i.TimeLastSeen,
&i.Count,
&i.ID,
&i.EventTimestamp,
&i.ReadableStatus,
&i.ErrorMessage,
&i.Output,
&i.WorkerID,
&i.AdditionalEventData,
&i.AdditionalEventMessage,
&i.DisplayName,
&i.TaskExternalID,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTasksByDAGIds = `-- name: ListTasksByDAGIds :many
SELECT
DISTINCT ON (t.external_id)
dt.dag_id, dt.dag_inserted_at, dt.task_id, dt.task_inserted_at,
lt.external_id AS dag_external_id
FROM
v1_lookup_table_olap lt
JOIN
v1_dag_to_task_olap dt ON (lt.dag_id, lt.inserted_at)= (dt.dag_id, dt.dag_inserted_at)
JOIN
v1_tasks_olap t ON (t.id, t.inserted_at) = (dt.task_id, dt.task_inserted_at)
WHERE
lt.external_id = ANY($1::uuid[])
AND lt.tenant_id = $2::uuid
ORDER BY
t.external_id, t.inserted_at DESC
`
type ListTasksByDAGIdsParams struct {
Dagids []pgtype.UUID `json:"dagids"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListTasksByDAGIdsRow struct {
DagID int64 `json:"dag_id"`
DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"`
TaskID int64 `json:"task_id"`
TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"`
DagExternalID pgtype.UUID `json:"dag_external_id"`
}
func (q *Queries) ListTasksByDAGIds(ctx context.Context, db DBTX, arg ListTasksByDAGIdsParams) ([]*ListTasksByDAGIdsRow, error) {
rows, err := db.Query(ctx, listTasksByDAGIds, arg.Dagids, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTasksByDAGIdsRow
for rows.Next() {
var i ListTasksByDAGIdsRow
if err := rows.Scan(
&i.DagID,
&i.DagInsertedAt,
&i.TaskID,
&i.TaskInsertedAt,
&i.DagExternalID,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTasksByExternalIds = `-- name: ListTasksByExternalIds :many
SELECT
tenant_id,
task_id,
inserted_at
FROM
v1_lookup_table_olap
WHERE
external_id = ANY($1::uuid[])
AND tenant_id = $2::uuid
`
type ListTasksByExternalIdsParams struct {
Externalids []pgtype.UUID `json:"externalids"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListTasksByExternalIdsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
TaskID pgtype.Int8 `json:"task_id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}
func (q *Queries) ListTasksByExternalIds(ctx context.Context, db DBTX, arg ListTasksByExternalIdsParams) ([]*ListTasksByExternalIdsRow, error) {
rows, err := db.Query(ctx, listTasksByExternalIds, arg.Externalids, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTasksByExternalIdsRow
for rows.Next() {
var i ListTasksByExternalIdsRow
if err := rows.Scan(&i.TenantID, &i.TaskID, &i.InsertedAt); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listWorkflowRunDisplayNames = `-- name: ListWorkflowRunDisplayNames :many
SELECT
lt.external_id,
COALESCE(t.display_name, d.display_name) AS display_name,
COALESCE(t.inserted_at, d.inserted_at) AS inserted_at
FROM v1_lookup_table_olap lt
LEFT JOIN v1_dags_olap d ON (lt.dag_id, lt.inserted_at) = (d.id, d.inserted_at)
LEFT JOIN v1_tasks_olap t ON (lt.task_id, lt.inserted_at) = (t.id, t.inserted_at)
WHERE
lt.external_id = ANY($1::uuid[])
AND lt.tenant_id = $2::uuid
LIMIT 10000
`
type ListWorkflowRunDisplayNamesParams struct {
Externalids []pgtype.UUID `json:"externalids"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListWorkflowRunDisplayNamesRow struct {
ExternalID pgtype.UUID `json:"external_id"`
DisplayName string `json:"display_name"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}
func (q *Queries) ListWorkflowRunDisplayNames(ctx context.Context, db DBTX, arg ListWorkflowRunDisplayNamesParams) ([]*ListWorkflowRunDisplayNamesRow, error) {
rows, err := db.Query(ctx, listWorkflowRunDisplayNames, arg.Externalids, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListWorkflowRunDisplayNamesRow
for rows.Next() {
var i ListWorkflowRunDisplayNamesRow
if err := rows.Scan(&i.ExternalID, &i.DisplayName, &i.InsertedAt); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const populateDAGMetadata = `-- name: PopulateDAGMetadata :many
WITH input AS (
SELECT
UNNEST($2::bigint[]) AS id,
UNNEST($3::timestamptz[]) AS inserted_at
), runs AS (
SELECT
d.id AS dag_id,
r.id AS run_id,
r.tenant_id,
r.inserted_at,
r.external_id,
r.readable_status,
r.kind,
r.workflow_id,
d.display_name,
CASE
WHEN $1::BOOLEAN THEN d.input
ELSE '{}'::JSONB
END::JSONB AS input,
d.additional_metadata,
d.workflow_version_id,
d.parent_task_external_id
FROM v1_runs_olap r
JOIN v1_dags_olap d ON (r.id, r.inserted_at) = (d.id, d.inserted_at)
JOIN input i ON (i.id, i.inserted_at) = (r.id, r.inserted_at)
WHERE r.tenant_id = $4::uuid AND r.kind = 'DAG'
), relevant_events AS (
SELECT
r.run_id,
e.tenant_id, e.id, e.inserted_at, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
FROM runs r
JOIN v1_dag_to_task_olap dt ON (r.dag_id, r.inserted_at) = (dt.dag_id, dt.dag_inserted_at)
JOIN v1_task_events_olap e ON (e.task_id, e.task_inserted_at) = (dt.task_id, dt.task_inserted_at)
WHERE e.tenant_id = $4::uuid
), max_retry_count AS (
SELECT run_id, MAX(retry_count) AS max_retry_count
FROM relevant_events
GROUP BY run_id
), metadata AS (
SELECT
e.run_id,
MIN(e.inserted_at)::timestamptz AS created_at,
MIN(e.inserted_at) FILTER (WHERE e.readable_status = 'RUNNING')::timestamptz AS started_at,
MAX(e.inserted_at) FILTER (WHERE e.readable_status IN ('COMPLETED', 'CANCELLED', 'FAILED'))::timestamptz AS finished_at
FROM
relevant_events e
JOIN max_retry_count mrc ON (e.run_id, e.retry_count) = (mrc.run_id, mrc.max_retry_count)
GROUP BY e.run_id
), error_message AS (
SELECT
DISTINCT ON (e.run_id) e.run_id::bigint,
e.error_message
FROM
relevant_events e
WHERE
e.readable_status = 'FAILED'
ORDER BY
e.run_id, e.retry_count DESC
), task_output AS (
SELECT
run_id,
output
FROM
relevant_events
WHERE
event_type = 'FINISHED'
)
SELECT
r.dag_id, r.run_id, r.tenant_id, r.inserted_at, r.external_id, r.readable_status, r.kind, r.workflow_id, r.display_name, r.input, r.additional_metadata, r.workflow_version_id, r.parent_task_external_id,
m.created_at,
m.started_at,
m.finished_at,
e.error_message,
CASE
WHEN $1::BOOLEAN THEN o.output::JSONB
ELSE '{}'::JSONB
END::JSONB AS output,
COALESCE(mrc.max_retry_count, 0)::int as retry_count
FROM runs r
LEFT JOIN metadata m ON r.run_id = m.run_id
LEFT JOIN error_message e ON r.run_id = e.run_id
LEFT JOIN task_output o ON r.run_id = o.run_id
LEFT JOIN max_retry_count mrc ON r.run_id = mrc.run_id
ORDER BY r.inserted_at DESC, r.run_id DESC
`
type PopulateDAGMetadataParams struct {
Includepayloads bool `json:"includepayloads"`
Ids []int64 `json:"ids"`
Insertedats []pgtype.Timestamptz `json:"insertedats"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type PopulateDAGMetadataRow struct {
DagID int64 `json:"dag_id"`
RunID int64 `json:"run_id"`
TenantID pgtype.UUID `json:"tenant_id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
Kind V1RunKind `json:"kind"`
WorkflowID pgtype.UUID `json:"workflow_id"`
DisplayName string `json:"display_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
ErrorMessage pgtype.Text `json:"error_message"`
Output []byte `json:"output"`
RetryCount int32 `json:"retry_count"`
}
func (q *Queries) PopulateDAGMetadata(ctx context.Context, db DBTX, arg PopulateDAGMetadataParams) ([]*PopulateDAGMetadataRow, error) {
rows, err := db.Query(ctx, populateDAGMetadata,
arg.Includepayloads,
arg.Ids,
arg.Insertedats,
arg.Tenantid,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*PopulateDAGMetadataRow
for rows.Next() {
var i PopulateDAGMetadataRow
if err := rows.Scan(
&i.DagID,
&i.RunID,
&i.TenantID,
&i.InsertedAt,
&i.ExternalID,
&i.ReadableStatus,
&i.Kind,
&i.WorkflowID,
&i.DisplayName,
&i.Input,
&i.AdditionalMetadata,
&i.WorkflowVersionID,
&i.ParentTaskExternalID,
&i.CreatedAt,
&i.StartedAt,
&i.FinishedAt,
&i.ErrorMessage,
&i.Output,
&i.RetryCount,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const populateSingleTaskRunData = `-- name: PopulateSingleTaskRunData :one
WITH selected_retry_count AS (
SELECT
CASE
WHEN $4::int IS NOT NULL THEN $4::int
ELSE MAX(retry_count)::int
END AS retry_count
FROM
v1_task_events_olap
WHERE
tenant_id = $1::uuid
AND task_id = $2::bigint
AND task_inserted_at = $3::timestamptz
LIMIT 1
), relevant_events AS (
SELECT
tenant_id, id, inserted_at, task_id, task_inserted_at, event_type, workflow_id, event_timestamp, readable_status, retry_count, error_message, output, worker_id, additional__event_data, additional__event_message
FROM
v1_task_events_olap
WHERE
tenant_id = $1::uuid
AND task_id = $2::bigint
AND task_inserted_at = $3::timestamptz
AND retry_count = (SELECT retry_count FROM selected_retry_count)
), finished_at AS (
SELECT
MAX(event_timestamp) AS finished_at
FROM
relevant_events
WHERE
readable_status = ANY(ARRAY['COMPLETED', 'FAILED', 'CANCELLED']::v1_readable_status_olap[])
), started_at AS (
SELECT
MAX(event_timestamp) AS started_at
FROM
relevant_events
WHERE
event_type = 'STARTED'
), task_output AS (
SELECT
output
FROM
relevant_events
WHERE
event_type = 'FINISHED'
LIMIT 1
), status AS (
SELECT
readable_status
FROM
relevant_events
ORDER BY
readable_status DESC
LIMIT 1
), error_message AS (
SELECT
error_message
FROM
relevant_events
WHERE
readable_status = 'FAILED'
ORDER BY
event_timestamp DESC
LIMIT 1
), spawned_children AS (
SELECT COUNT(*) AS spawned_children
FROM v1_runs_olap
WHERE parent_task_external_id = (
SELECT external_id
FROM v1_tasks_olap
WHERE
tenant_id = $1::uuid
AND id = $2::bigint
AND inserted_at = $3::timestamptz
LIMIT 1
)
)
SELECT
t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id,
st.readable_status::v1_readable_status_olap as status,
f.finished_at::timestamptz as finished_at,
s.started_at::timestamptz as started_at,
o.output::jsonb as output,
e.error_message as error_message,
sc.spawned_children,
(SELECT retry_count FROM selected_retry_count) as retry_count
FROM
v1_tasks_olap t
LEFT JOIN
finished_at f ON true
LEFT JOIN
started_at s ON true
LEFT JOIN
task_output o ON true
LEFT JOIN
status st ON true
LEFT JOIN
error_message e ON true
LEFT JOIN
spawned_children sc ON true
WHERE
(t.tenant_id, t.id, t.inserted_at) = ($1::uuid, $2::bigint, $3::timestamptz)
`
type PopulateSingleTaskRunDataParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Taskid int64 `json:"taskid"`
Taskinsertedat pgtype.Timestamptz `json:"taskinsertedat"`
RetryCount pgtype.Int4 `json:"retry_count"`
}
type PopulateSingleTaskRunDataRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
Queue string `json:"queue"`
ActionID string `json:"action_id"`
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
WorkflowRunID pgtype.UUID `json:"workflow_run_id"`
ScheduleTimeout string `json:"schedule_timeout"`
StepTimeout pgtype.Text `json:"step_timeout"`
Priority pgtype.Int4 `json:"priority"`
Sticky V1StickyStrategyOlap `json:"sticky"`
DesiredWorkerID pgtype.UUID `json:"desired_worker_id"`
DisplayName string `json:"display_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
LatestRetryCount int32 `json:"latest_retry_count"`
LatestWorkerID pgtype.UUID `json:"latest_worker_id"`
DagID pgtype.Int8 `json:"dag_id"`
DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
Status V1ReadableStatusOlap `json:"status"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
Output []byte `json:"output"`
ErrorMessage pgtype.Text `json:"error_message"`
SpawnedChildren pgtype.Int8 `json:"spawned_children"`
RetryCount int32 `json:"retry_count"`
}
func (q *Queries) PopulateSingleTaskRunData(ctx context.Context, db DBTX, arg PopulateSingleTaskRunDataParams) (*PopulateSingleTaskRunDataRow, error) {
row := db.QueryRow(ctx, populateSingleTaskRunData,
arg.Tenantid,
arg.Taskid,
arg.Taskinsertedat,
arg.RetryCount,
)
var i PopulateSingleTaskRunDataRow
err := row.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.WorkflowRunID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DesiredWorkerID,
&i.DisplayName,
&i.Input,
&i.AdditionalMetadata,
&i.ReadableStatus,
&i.LatestRetryCount,
&i.LatestWorkerID,
&i.DagID,
&i.DagInsertedAt,
&i.ParentTaskExternalID,
&i.Status,
&i.FinishedAt,
&i.StartedAt,
&i.Output,
&i.ErrorMessage,
&i.SpawnedChildren,
&i.RetryCount,
)
return &i, err
}
const populateTaskRunData = `-- name: PopulateTaskRunData :many
WITH input AS (
SELECT
UNNEST($2::bigint[]) AS id,
UNNEST($3::timestamptz[]) AS inserted_at
), tasks AS (
SELECT
DISTINCT ON(t.tenant_id, t.id, t.inserted_at)
t.tenant_id,
t.id,
t.inserted_at,
t.queue,
t.action_id,
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.schedule_timeout,
t.step_timeout,
t.priority,
t.sticky,
t.desired_worker_id,
t.external_id,
t.display_name,
t.input,
t.additional_metadata,
t.readable_status,
t.parent_task_external_id,
t.workflow_run_id,
t.latest_retry_count
FROM
v1_tasks_olap t
JOIN
input i ON i.id = t.id AND i.inserted_at = t.inserted_at
WHERE
t.tenant_id = $4::uuid
), relevant_events AS (
SELECT
e.tenant_id, e.id, e.inserted_at, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
FROM
v1_task_events_olap e
JOIN
tasks t ON t.id = e.task_id AND t.tenant_id = e.tenant_id AND t.inserted_at = e.task_inserted_at
), max_retry_counts AS (
SELECT
e.tenant_id,
e.task_id,
e.task_inserted_at,
MAX(e.retry_count) AS max_retry_count
FROM
relevant_events e
GROUP BY
e.tenant_id, e.task_id, e.task_inserted_at
), queued_ats AS (
SELECT
e.task_id::bigint,
MAX(e.event_timestamp) AS queued_at
FROM
relevant_events e
JOIN
max_retry_counts mrc ON
e.tenant_id = mrc.tenant_id
AND e.task_id = mrc.task_id
AND e.task_inserted_at = mrc.task_inserted_at
AND e.retry_count = mrc.max_retry_count
WHERE
e.event_type = 'QUEUED'
GROUP BY e.task_id
), started_ats AS (
SELECT
e.task_id::bigint,
MAX(e.event_timestamp) AS started_at
FROM
relevant_events e
JOIN
max_retry_counts mrc ON
e.tenant_id = mrc.tenant_id
AND e.task_id = mrc.task_id
AND e.task_inserted_at = mrc.task_inserted_at
AND e.retry_count = mrc.max_retry_count
WHERE
e.event_type = 'STARTED'
GROUP BY e.task_id
), finished_ats AS (
SELECT
e.task_id::bigint,
MAX(e.event_timestamp) AS finished_at
FROM
relevant_events e
JOIN
max_retry_counts mrc ON
e.tenant_id = mrc.tenant_id
AND e.task_id = mrc.task_id
AND e.task_inserted_at = mrc.task_inserted_at
AND e.retry_count = mrc.max_retry_count
WHERE
e.readable_status = ANY(ARRAY['COMPLETED', 'FAILED', 'CANCELLED']::v1_readable_status_olap[])
GROUP BY e.task_id
), error_message AS (
SELECT
DISTINCT ON (e.task_id) e.task_id::bigint,
e.error_message
FROM
relevant_events e
JOIN
max_retry_counts mrc ON
e.tenant_id = mrc.tenant_id
AND e.task_id = mrc.task_id
AND e.task_inserted_at = mrc.task_inserted_at
AND e.retry_count = mrc.max_retry_count
WHERE
e.readable_status = 'FAILED'
ORDER BY
e.task_id, e.retry_count DESC
), task_output AS (
SELECT
task_id,
MAX(output::TEXT)::JSONB AS output
FROM
relevant_events
WHERE
readable_status = 'COMPLETED'
GROUP BY
task_id
)
SELECT
t.tenant_id,
t.id,
t.inserted_at,
t.external_id,
t.queue,
t.action_id,
t.step_id,
t.workflow_id,
t.workflow_version_id,
t.schedule_timeout,
t.step_timeout,
t.priority,
t.sticky,
t.display_name,
t.additional_metadata,
t.parent_task_external_id,
CASE
WHEN $1::BOOLEAN THEN t.input
ELSE '{}'::JSONB
END::JSONB AS input,
t.readable_status::v1_readable_status_olap as status,
t.workflow_run_id,
f.finished_at::timestamptz as finished_at,
s.started_at::timestamptz as started_at,
q.queued_at::timestamptz as queued_at,
e.error_message as error_message,
COALESCE(t.latest_retry_count, 0)::int as retry_count,
CASE
WHEN $1::BOOLEAN THEN o.output::JSONB
ELSE '{}'::JSONB
END::JSONB as output
FROM
tasks t
LEFT JOIN
finished_ats f ON f.task_id = t.id
LEFT JOIN
started_ats s ON s.task_id = t.id
LEFT JOIN
queued_ats q ON q.task_id = t.id
LEFT JOIN
error_message e ON e.task_id = t.id
LEFT JOIN
task_output o ON o.task_id = t.id
ORDER BY t.inserted_at DESC, t.id DESC
`
type PopulateTaskRunDataParams struct {
Includepayloads bool `json:"includepayloads"`
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type PopulateTaskRunDataRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
Queue string `json:"queue"`
ActionID string `json:"action_id"`
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
ScheduleTimeout string `json:"schedule_timeout"`
StepTimeout pgtype.Text `json:"step_timeout"`
Priority pgtype.Int4 `json:"priority"`
Sticky V1StickyStrategyOlap `json:"sticky"`
DisplayName string `json:"display_name"`
AdditionalMetadata []byte `json:"additional_metadata"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
Input []byte `json:"input"`
Status V1ReadableStatusOlap `json:"status"`
WorkflowRunID pgtype.UUID `json:"workflow_run_id"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
QueuedAt pgtype.Timestamptz `json:"queued_at"`
ErrorMessage pgtype.Text `json:"error_message"`
RetryCount int32 `json:"retry_count"`
Output []byte `json:"output"`
}
func (q *Queries) PopulateTaskRunData(ctx context.Context, db DBTX, arg PopulateTaskRunDataParams) ([]*PopulateTaskRunDataRow, error) {
rows, err := db.Query(ctx, populateTaskRunData,
arg.Includepayloads,
arg.Taskids,
arg.Taskinsertedats,
arg.Tenantid,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*PopulateTaskRunDataRow
for rows.Next() {
var i PopulateTaskRunDataRow
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DisplayName,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.Input,
&i.Status,
&i.WorkflowRunID,
&i.FinishedAt,
&i.StartedAt,
&i.QueuedAt,
&i.ErrorMessage,
&i.RetryCount,
&i.Output,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const readDAGByExternalID = `-- name: ReadDAGByExternalID :one
WITH lookup_task AS (
SELECT
tenant_id,
dag_id,
inserted_at
FROM
v1_lookup_table_olap
WHERE
external_id = $1::uuid
)
SELECT
d.id, d.inserted_at, d.tenant_id, d.external_id, d.display_name, d.workflow_id, d.workflow_version_id, d.readable_status, d.input, d.additional_metadata, d.parent_task_external_id, d.total_tasks
FROM
v1_dags_olap d
JOIN
lookup_task lt ON lt.tenant_id = d.tenant_id AND lt.dag_id = d.id AND lt.inserted_at = d.inserted_at
`
func (q *Queries) ReadDAGByExternalID(ctx context.Context, db DBTX, externalid pgtype.UUID) (*V1DagsOlap, error) {
row := db.QueryRow(ctx, readDAGByExternalID, externalid)
var i V1DagsOlap
err := row.Scan(
&i.ID,
&i.InsertedAt,
&i.TenantID,
&i.ExternalID,
&i.DisplayName,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.ReadableStatus,
&i.Input,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.TotalTasks,
)
return &i, err
}
const readTaskByExternalID = `-- name: ReadTaskByExternalID :one
WITH lookup_task AS (
SELECT
tenant_id,
task_id,
inserted_at
FROM
v1_lookup_table_olap
WHERE
external_id = $1::uuid
)
SELECT
t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id,
e.output,
e.error_message
FROM
v1_tasks_olap t
JOIN
lookup_task lt ON lt.tenant_id = t.tenant_id AND lt.task_id = t.id AND lt.inserted_at = t.inserted_at
JOIN
v1_task_events_olap e ON (e.tenant_id, e.task_id, e.readable_status, e.retry_count) = (t.tenant_id, t.id, t.readable_status, t.latest_retry_count)
`
type ReadTaskByExternalIDRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
Queue string `json:"queue"`
ActionID string `json:"action_id"`
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
WorkflowRunID pgtype.UUID `json:"workflow_run_id"`
ScheduleTimeout string `json:"schedule_timeout"`
StepTimeout pgtype.Text `json:"step_timeout"`
Priority pgtype.Int4 `json:"priority"`
Sticky V1StickyStrategyOlap `json:"sticky"`
DesiredWorkerID pgtype.UUID `json:"desired_worker_id"`
DisplayName string `json:"display_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
LatestRetryCount int32 `json:"latest_retry_count"`
LatestWorkerID pgtype.UUID `json:"latest_worker_id"`
DagID pgtype.Int8 `json:"dag_id"`
DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
Output []byte `json:"output"`
ErrorMessage pgtype.Text `json:"error_message"`
}
func (q *Queries) ReadTaskByExternalID(ctx context.Context, db DBTX, externalid pgtype.UUID) (*ReadTaskByExternalIDRow, error) {
row := db.QueryRow(ctx, readTaskByExternalID, externalid)
var i ReadTaskByExternalIDRow
err := row.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.WorkflowRunID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DesiredWorkerID,
&i.DisplayName,
&i.Input,
&i.AdditionalMetadata,
&i.ReadableStatus,
&i.LatestRetryCount,
&i.LatestWorkerID,
&i.DagID,
&i.DagInsertedAt,
&i.ParentTaskExternalID,
&i.Output,
&i.ErrorMessage,
)
return &i, err
}
const readWorkflowRunByExternalId = `-- name: ReadWorkflowRunByExternalId :one
WITH runs AS (
SELECT
lt.dag_id AS dag_id,
lt.task_id AS task_id,
r.id AS id,
r.tenant_id,
r.inserted_at,
r.external_id,
r.readable_status,
r.kind,
r.workflow_id,
d.display_name AS display_name,
d.input AS input,
d.additional_metadata AS additional_metadata,
d.workflow_version_id AS workflow_version_id,
d.parent_task_external_id AS parent_task_external_id
FROM
v1_lookup_table_olap lt
JOIN
v1_runs_olap r ON r.inserted_at = lt.inserted_at AND r.id = lt.dag_id
JOIN
v1_dags_olap d ON (lt.tenant_id, lt.dag_id, lt.inserted_at) = (d.tenant_id, d.id, d.inserted_at)
WHERE
lt.external_id = $1::uuid
AND lt.dag_id IS NOT NULL
UNION ALL
SELECT
lt.dag_id AS dag_id,
lt.task_id AS task_id,
r.id AS id,
r.tenant_id,
r.inserted_at,
r.external_id,
r.readable_status,
r.kind,
r.workflow_id,
t.display_name AS display_name,
t.input AS input,
t.additional_metadata AS additional_metadata,
t.workflow_version_id AS workflow_version_id,
NULL :: UUID AS parent_task_external_id
FROM
v1_lookup_table_olap lt
JOIN
v1_runs_olap r ON r.inserted_at = lt.inserted_at AND r.id = lt.task_id
JOIN
v1_tasks_olap t ON (lt.tenant_id, lt.task_id, lt.inserted_at) = (t.tenant_id, t.id, t.inserted_at)
WHERE
lt.external_id = $1::uuid
AND lt.task_id IS NOT NULL
), relevant_events AS (
SELECT
e.tenant_id, e.id, e.inserted_at, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
FROM runs r
JOIN v1_dag_to_task_olap dt ON r.dag_id = dt.dag_id AND r.inserted_at = dt.dag_inserted_at
JOIN v1_task_events_olap e ON e.task_id = dt.task_id AND e.task_inserted_at = dt.task_inserted_at
WHERE r.dag_id IS NOT NULL
UNION ALL
SELECT
e.tenant_id, e.id, e.inserted_at, e.task_id, e.task_inserted_at, e.event_type, e.workflow_id, e.event_timestamp, e.readable_status, e.retry_count, e.error_message, e.output, e.worker_id, e.additional__event_data, e.additional__event_message
FROM runs r
JOIN v1_task_events_olap e ON e.task_id = r.task_id AND e.task_inserted_at = r.inserted_at
WHERE r.task_id IS NOT NULL
), max_retry_counts AS (
SELECT task_id, MAX(retry_count) AS max_retry_count
FROM relevant_events
GROUP BY task_id
), metadata AS (
SELECT
MIN(e.inserted_at)::timestamptz AS created_at,
MIN(e.inserted_at) FILTER (WHERE e.readable_status = 'RUNNING')::timestamptz AS started_at,
MAX(e.inserted_at) FILTER (WHERE e.readable_status IN ('COMPLETED', 'CANCELLED', 'FAILED'))::timestamptz AS finished_at,
JSON_AGG(JSON_BUILD_OBJECT('task_id', e.task_id,'task_inserted_at', e.task_inserted_at)) AS task_metadata
FROM
relevant_events e
JOIN max_retry_counts mrc ON (e.task_id, e.retry_count) = (mrc.task_id, mrc.max_retry_count)
), error_message AS (
SELECT
e.error_message
FROM
relevant_events e
WHERE
e.readable_status = 'FAILED'
ORDER BY
e.retry_count DESC
LIMIT 1
)
SELECT
r.dag_id, r.task_id, r.id, r.tenant_id, r.inserted_at, r.external_id, r.readable_status, r.kind, r.workflow_id, r.display_name, r.input, r.additional_metadata, r.workflow_version_id, r.parent_task_external_id,
m.created_at,
m.started_at,
m.finished_at,
e.error_message,
m.task_metadata
FROM runs r
LEFT JOIN metadata m ON true
LEFT JOIN error_message e ON true
ORDER BY r.inserted_at DESC
`
type ReadWorkflowRunByExternalIdRow struct {
DagID pgtype.Int8 `json:"dag_id"`
TaskID pgtype.Int8 `json:"task_id"`
ID int64 `json:"id"`
TenantID pgtype.UUID `json:"tenant_id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
Kind V1RunKind `json:"kind"`
WorkflowID pgtype.UUID `json:"workflow_id"`
DisplayName string `json:"display_name"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
WorkflowVersionID pgtype.UUID `json:"workflow_version_id"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
ErrorMessage pgtype.Text `json:"error_message"`
TaskMetadata []byte `json:"task_metadata"`
}
func (q *Queries) ReadWorkflowRunByExternalId(ctx context.Context, db DBTX, workflowrunexternalid pgtype.UUID) (*ReadWorkflowRunByExternalIdRow, error) {
row := db.QueryRow(ctx, readWorkflowRunByExternalId, workflowrunexternalid)
var i ReadWorkflowRunByExternalIdRow
err := row.Scan(
&i.DagID,
&i.TaskID,
&i.ID,
&i.TenantID,
&i.InsertedAt,
&i.ExternalID,
&i.ReadableStatus,
&i.Kind,
&i.WorkflowID,
&i.DisplayName,
&i.Input,
&i.AdditionalMetadata,
&i.WorkflowVersionID,
&i.ParentTaskExternalID,
&i.CreatedAt,
&i.StartedAt,
&i.FinishedAt,
&i.ErrorMessage,
&i.TaskMetadata,
)
return &i, err
}
const updateDAGStatuses = `-- name: UpdateDAGStatuses :one
WITH locked_events AS (
SELECT
tenant_id, requeue_after, requeue_retries, id, dag_id, dag_inserted_at
FROM
list_task_status_updates_tmp(
$1::int,
$2::uuid,
$3::int
)
), distinct_dags AS (
SELECT
DISTINCT ON (e.tenant_id, e.dag_id, e.dag_inserted_at)
e.tenant_id,
e.dag_id,
e.dag_inserted_at
FROM
locked_events e
), locked_dags AS (
SELECT
d.id,
d.inserted_at,
d.readable_status,
d.tenant_id,
d.total_tasks
FROM
v1_dags_olap d
JOIN
distinct_dags dd ON
(d.tenant_id, d.id, d.inserted_at) = (dd.tenant_id, dd.dag_id, dd.dag_inserted_at)
ORDER BY
d.id, d.inserted_at
FOR UPDATE
), dag_task_counts AS (
SELECT
d.id,
d.inserted_at,
d.total_tasks,
COUNT(t.id) AS task_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'COMPLETED') AS completed_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'FAILED') AS failed_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'CANCELLED') AS cancelled_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'QUEUED') AS queued_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'RUNNING') AS running_count
FROM
locked_dags d
LEFT JOIN
v1_dag_to_task_olap dt ON
(d.id, d.inserted_at) = (dt.dag_id, dt.dag_inserted_at)
LEFT JOIN
v1_tasks_olap t ON
(dt.task_id, dt.task_inserted_at) = (t.id, t.inserted_at)
GROUP BY
d.id, d.inserted_at, d.total_tasks
), updated_dags AS (
UPDATE
v1_dags_olap d
SET
readable_status = CASE
-- If we only have queued events, we should keep the status as is
WHEN dtc.queued_count = dtc.task_count THEN d.readable_status
-- If the task count is not equal to the total tasks, we should set the status to running
WHEN dtc.task_count != dtc.total_tasks THEN 'RUNNING'
-- If we have any running or queued tasks, we should set the status to running
WHEN dtc.running_count > 0 OR dtc.queued_count > 0 THEN 'RUNNING'
WHEN dtc.failed_count > 0 THEN 'FAILED'
WHEN dtc.cancelled_count > 0 THEN 'CANCELLED'
WHEN dtc.completed_count = dtc.task_count THEN 'COMPLETED'
ELSE 'RUNNING'
END
FROM
dag_task_counts dtc
WHERE
(d.id, d.inserted_at) = (dtc.id, dtc.inserted_at)
RETURNING
d.id, d.inserted_at, d.readable_status, d.external_id
), events_to_requeue AS (
-- Get events which don't have a corresponding locked_task
SELECT
e.tenant_id,
e.requeue_retries,
e.dag_id,
e.dag_inserted_at
FROM
locked_events e
LEFT JOIN
locked_dags d ON (e.tenant_id, e.dag_id, e.dag_inserted_at) = (d.tenant_id, d.id, d.inserted_at)
WHERE
d.id IS NULL
), deleted_events AS (
DELETE FROM
v1_task_status_updates_tmp
WHERE
(tenant_id, requeue_after, dag_id, id) IN (SELECT tenant_id, requeue_after, dag_id, id FROM locked_events)
), requeued_events AS (
INSERT INTO
v1_task_status_updates_tmp (
tenant_id,
requeue_after,
requeue_retries,
dag_id,
dag_inserted_at
)
SELECT
tenant_id,
-- Exponential backoff, we limit to 10 retries which is 2048 seconds/34 minutes
CURRENT_TIMESTAMP + (2 ^ requeue_retries) * INTERVAL '2 seconds',
requeue_retries + 1,
dag_id,
dag_inserted_at
FROM
events_to_requeue
WHERE
requeue_retries < 10
RETURNING
tenant_id, requeue_after, requeue_retries, id, dag_id, dag_inserted_at
), event_count AS (
SELECT
COUNT(*) as count
FROM
locked_events
), rows_to_return AS (
SELECT
ARRAY_REMOVE(ARRAY_AGG(d.id), NULL)::bigint[] AS dag_ids,
ARRAY_REMOVE(ARRAY_AGG(d.inserted_at), NULL)::timestamptz[] AS dag_inserted_ats,
ARRAY_REMOVE(ARRAY_AGG(d.readable_status), NULL)::text[] AS readable_statuses,
ARRAY_REMOVE(ARRAY_AGG(d.external_id), NULL)::uuid[] AS external_ids
FROM
updated_dags d
)
SELECT
(SELECT count FROM event_count) AS count,
dag_ids,
dag_inserted_ats,
readable_statuses,
external_ids
FROM
rows_to_return
`
type UpdateDAGStatusesParams struct {
Partitionnumber int32 `json:"partitionnumber"`
Tenantid pgtype.UUID `json:"tenantid"`
Eventlimit int32 `json:"eventlimit"`
}
type UpdateDAGStatusesRow struct {
Count int64 `json:"count"`
DagIds []int64 `json:"dag_ids"`
DagInsertedAts []pgtype.Timestamptz `json:"dag_inserted_ats"`
ReadableStatuses []string `json:"readable_statuses"`
ExternalIds []pgtype.UUID `json:"external_ids"`
}
func (q *Queries) UpdateDAGStatuses(ctx context.Context, db DBTX, arg UpdateDAGStatusesParams) (*UpdateDAGStatusesRow, error) {
row := db.QueryRow(ctx, updateDAGStatuses, arg.Partitionnumber, arg.Tenantid, arg.Eventlimit)
var i UpdateDAGStatusesRow
err := row.Scan(
&i.Count,
&i.DagIds,
&i.DagInsertedAts,
&i.ReadableStatuses,
&i.ExternalIds,
)
return &i, err
}
const updateTaskStatuses = `-- name: UpdateTaskStatuses :one
WITH locked_events AS (
SELECT
tenant_id, requeue_after, requeue_retries, id, task_id, task_inserted_at, event_type, readable_status, retry_count, worker_id
FROM
list_task_events_tmp(
$1::int,
$2::uuid,
$3::int
)
), max_retry_counts AS (
SELECT
tenant_id,
task_id,
task_inserted_at,
MAX(retry_count) AS max_retry_count
FROM
locked_events
GROUP BY
tenant_id, task_id, task_inserted_at
), updatable_events AS (
SELECT
e.tenant_id,
e.task_id,
e.task_inserted_at,
e.retry_count,
MAX(e.readable_status) AS max_readable_status
FROM
locked_events e
JOIN
max_retry_counts mrc ON
e.tenant_id = mrc.tenant_id
AND e.task_id = mrc.task_id
AND e.task_inserted_at = mrc.task_inserted_at
AND e.retry_count = mrc.max_retry_count
GROUP BY
e.tenant_id, e.task_id, e.task_inserted_at, e.retry_count
), latest_worker_id AS (
SELECT
tenant_id,
task_id,
task_inserted_at,
retry_count,
MAX(worker_id::text) AS worker_id
FROM
locked_events
WHERE
worker_id IS NOT NULL
GROUP BY
tenant_id, task_id, task_inserted_at, retry_count
), locked_tasks AS (
SELECT
t.tenant_id,
t.id,
t.inserted_at,
e.retry_count,
e.max_readable_status
FROM
v1_tasks_olap t
JOIN
updatable_events e ON
(t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at)
ORDER BY
t.id
FOR UPDATE
), updated_tasks AS (
UPDATE
v1_tasks_olap t
SET
readable_status = e.max_readable_status,
latest_retry_count = e.retry_count,
latest_worker_id = CASE WHEN lw.worker_id::uuid IS NOT NULL THEN lw.worker_id::uuid ELSE t.latest_worker_id END
FROM
updatable_events e
LEFT JOIN
latest_worker_id lw ON
(e.tenant_id, e.task_id, e.task_inserted_at, e.retry_count) = (lw.tenant_id, lw.task_id, lw.task_inserted_at, lw.retry_count)
WHERE
(t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at)
AND
(
-- if the retry count is greater than the latest retry count, update the status
(
e.retry_count > t.latest_retry_count
AND e.max_readable_status != t.readable_status
) OR
-- if the retry count is equal to the latest retry count, update the status if the status is greater
(
e.retry_count = t.latest_retry_count
AND e.max_readable_status > t.readable_status
)
)
RETURNING
t.tenant_id, t.id, t.inserted_at, t.readable_status, t.external_id
), events_to_requeue AS (
-- Get events which don't have a corresponding locked_task
SELECT
e.tenant_id,
e.requeue_retries,
e.task_id,
e.task_inserted_at,
e.event_type,
e.readable_status,
e.retry_count
FROM
locked_events e
LEFT JOIN
locked_tasks t ON (e.tenant_id, e.task_id, e.task_inserted_at) = (t.tenant_id, t.id, t.inserted_at)
WHERE
t.id IS NULL
), deleted_events AS (
DELETE FROM
v1_task_events_olap_tmp
WHERE
(tenant_id, requeue_after, task_id, id) IN (SELECT tenant_id, requeue_after, task_id, id FROM locked_events)
), requeued_events AS (
INSERT INTO
v1_task_events_olap_tmp (
tenant_id,
requeue_after,
requeue_retries,
task_id,
task_inserted_at,
event_type,
readable_status,
retry_count
)
SELECT
tenant_id,
-- Exponential backoff, we limit to 10 retries which is 2048 seconds/34 minutes
CURRENT_TIMESTAMP + (2 ^ requeue_retries) * INTERVAL '2 seconds',
requeue_retries + 1,
task_id,
task_inserted_at,
event_type,
readable_status,
retry_count
FROM
events_to_requeue
WHERE
requeue_retries < 10
RETURNING
tenant_id, requeue_after, requeue_retries, id, task_id, task_inserted_at, event_type, readable_status, retry_count, worker_id
), event_count AS (
SELECT
COUNT(*) as count
FROM
locked_events
), rows_to_return AS (
SELECT
ARRAY_REMOVE(ARRAY_AGG(t.id), NULL)::bigint[] AS task_ids,
ARRAY_REMOVE(ARRAY_AGG(t.inserted_at), NULL)::timestamptz[] AS task_inserted_ats,
ARRAY_REMOVE(ARRAY_AGG(t.readable_status), NULL)::text[] AS readable_statuses,
ARRAY_REMOVE(ARRAY_AGG(t.external_id), NULL)::uuid[] AS external_ids
FROM
updated_tasks t
)
SELECT
(SELECT count FROM event_count) AS count,
task_ids,
task_inserted_ats,
readable_statuses,
external_ids
FROM
rows_to_return
`
type UpdateTaskStatusesParams struct {
Partitionnumber int32 `json:"partitionnumber"`
Tenantid pgtype.UUID `json:"tenantid"`
Eventlimit int32 `json:"eventlimit"`
}
type UpdateTaskStatusesRow struct {
Count int64 `json:"count"`
TaskIds []int64 `json:"task_ids"`
TaskInsertedAts []pgtype.Timestamptz `json:"task_inserted_ats"`
ReadableStatuses []string `json:"readable_statuses"`
ExternalIds []pgtype.UUID `json:"external_ids"`
}
func (q *Queries) UpdateTaskStatuses(ctx context.Context, db DBTX, arg UpdateTaskStatusesParams) (*UpdateTaskStatusesRow, error) {
row := db.QueryRow(ctx, updateTaskStatuses, arg.Partitionnumber, arg.Tenantid, arg.Eventlimit)
var i UpdateTaskStatusesRow
err := row.Scan(
&i.Count,
&i.TaskIds,
&i.TaskInsertedAts,
&i.ReadableStatuses,
&i.ExternalIds,
)
return &i, err
}