Feat: Partition pruning for ListTaskParentOutputs, lookup index for v1_payload_wal (#2294)

* fix: small query rework, partition pruning for `ListTaskParentOutputs`

* feat: migration for adding index

* fix: copilot comments
This commit is contained in:
matt
2025-09-12 13:18:08 -04:00
committed by GitHub
parent b8ad5ee94c
commit c759da79aa
5 changed files with 129 additions and 81 deletions

View File

@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
CREATE INDEX v1_payload_wal_payload_lookup_idx ON v1_payload_wal (payload_id, payload_inserted_at, payload_type, tenant_id);
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP INDEX v1_payload_wal_payload_lookup_idx;
-- +goose StatementEnd

View File

@@ -612,67 +612,67 @@ LEFT JOIN
-- of the tasks as well.
WITH input AS (
SELECT
*
FROM
(
SELECT
unnest(@taskIds::bigint[]) AS task_id,
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at
) AS subquery
), task_outputs AS (
UNNEST(@taskIds::BIGINT[]) AS task_id,
UNNEST(@taskInsertedAts::TIMESTAMPTZ[]) AS task_inserted_at
), parent_tasks AS (
SELECT
t.id,
t.inserted_at,
e.retry_count,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
t.step_readable_id,
t.workflow_run_id,
t.step_id,
t.workflow_id,
e.data AS output
parent.id,
parent.inserted_at,
parent.retry_count,
parent.tenant_id,
parent.dag_id,
parent.dag_inserted_at,
parent.step_readable_id,
parent.workflow_run_id,
parent.step_id,
parent.workflow_id
FROM
v1_task t1
v1_task child
JOIN
v1_dag_to_task dt ON dt.dag_id = t1.dag_id AND dt.dag_inserted_at = t1.dag_inserted_at
v1_dag_to_task dt ON dt.dag_id = child.dag_id AND dt.dag_inserted_at = child.dag_inserted_at
JOIN
v1_task t ON t.id = dt.task_id AND t.inserted_at = dt.task_inserted_at
JOIN
v1_task_event e ON e.task_id = t.id AND e.task_inserted_at = t.inserted_at AND e.event_type = 'COMPLETED'
v1_task parent ON parent.id = dt.task_id AND parent.inserted_at = dt.task_inserted_at
WHERE
(t1.id, t1.inserted_at) IN (
child.inserted_at >= @minTaskInsertedAt::TIMESTAMPTZ
AND dt.dag_inserted_at >= @minDagInsertedAt::TIMESTAMPTZ
AND (child.id, child.inserted_at) IN (
SELECT
task_id,
task_inserted_at
FROM
input
)
AND t1.tenant_id = @tenantId::uuid
AND t1.dag_id IS NOT NULL
AND child.tenant_id = @tenantId::uuid
AND child.dag_id IS NOT NULL
), max_retry_counts AS (
SELECT
id,
inserted_at,
MAX(retry_count) AS max_retry_count
FROM
task_outputs
parent_tasks
GROUP BY
id, inserted_at
)
SELECT
DISTINCT ON (task_outputs.id, task_outputs.inserted_at, task_outputs.retry_count)
task_outputs.*
DISTINCT ON (p.id, p.inserted_at, p.retry_count)
p.*,
e.data AS output
FROM
task_outputs
parent_tasks p
JOIN
max_retry_counts mrc ON task_outputs.id = mrc.id
AND task_outputs.inserted_at = mrc.inserted_at
AND task_outputs.retry_count = mrc.max_retry_count
max_retry_counts mrc ON p.id = mrc.id
AND p.inserted_at = mrc.inserted_at
AND p.retry_count = mrc.max_retry_count
JOIN
v1_task_event e ON (e.task_id, e.task_inserted_at, e.retry_count) = (p.id, p.inserted_at, p.retry_count)
WHERE
e.event_type = 'COMPLETED'
ORDER BY
task_outputs.id,
task_outputs.inserted_at,
task_outputs.retry_count DESC;
p.id,
p.inserted_at,
p.retry_count DESC;
-- name: LockDAGsForReplay :many
-- Locks a list of DAGs for replay. Returns successfully locked DAGs which can be replayed.

View File

@@ -876,73 +876,75 @@ func (q *Queries) ListTaskMetas(ctx context.Context, db DBTX, arg ListTaskMetasP
const listTaskParentOutputs = `-- name: ListTaskParentOutputs :many
WITH input AS (
SELECT
task_id, task_inserted_at
FROM
(
SELECT
unnest($1::bigint[]) AS task_id,
unnest($2::timestamptz[]) AS task_inserted_at
) AS subquery
), task_outputs AS (
UNNEST($1::BIGINT[]) AS task_id,
UNNEST($2::TIMESTAMPTZ[]) AS task_inserted_at
), parent_tasks AS (
SELECT
t.id,
t.inserted_at,
e.retry_count,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
t.step_readable_id,
t.workflow_run_id,
t.step_id,
t.workflow_id,
e.data AS output
parent.id,
parent.inserted_at,
parent.retry_count,
parent.tenant_id,
parent.dag_id,
parent.dag_inserted_at,
parent.step_readable_id,
parent.workflow_run_id,
parent.step_id,
parent.workflow_id
FROM
v1_task t1
v1_task child
JOIN
v1_dag_to_task dt ON dt.dag_id = t1.dag_id AND dt.dag_inserted_at = t1.dag_inserted_at
v1_dag_to_task dt ON dt.dag_id = child.dag_id AND dt.dag_inserted_at = child.dag_inserted_at
JOIN
v1_task t ON t.id = dt.task_id AND t.inserted_at = dt.task_inserted_at
JOIN
v1_task_event e ON e.task_id = t.id AND e.task_inserted_at = t.inserted_at AND e.event_type = 'COMPLETED'
v1_task parent ON parent.id = dt.task_id AND parent.inserted_at = dt.task_inserted_at
WHERE
(t1.id, t1.inserted_at) IN (
child.inserted_at >= $3::TIMESTAMPTZ
AND dt.dag_inserted_at >= $4::TIMESTAMPTZ
AND (child.id, child.inserted_at) IN (
SELECT
task_id,
task_inserted_at
FROM
input
)
AND t1.tenant_id = $3::uuid
AND t1.dag_id IS NOT NULL
AND child.tenant_id = $5::uuid
AND child.dag_id IS NOT NULL
), max_retry_counts AS (
SELECT
id,
inserted_at,
MAX(retry_count) AS max_retry_count
FROM
task_outputs
parent_tasks
GROUP BY
id, inserted_at
)
SELECT
DISTINCT ON (task_outputs.id, task_outputs.inserted_at, task_outputs.retry_count)
task_outputs.id, task_outputs.inserted_at, task_outputs.retry_count, task_outputs.tenant_id, task_outputs.dag_id, task_outputs.dag_inserted_at, task_outputs.step_readable_id, task_outputs.workflow_run_id, task_outputs.step_id, task_outputs.workflow_id, task_outputs.output
DISTINCT ON (p.id, p.inserted_at, p.retry_count)
p.id, p.inserted_at, p.retry_count, p.tenant_id, p.dag_id, p.dag_inserted_at, p.step_readable_id, p.workflow_run_id, p.step_id, p.workflow_id,
e.data AS output
FROM
task_outputs
parent_tasks p
JOIN
max_retry_counts mrc ON task_outputs.id = mrc.id
AND task_outputs.inserted_at = mrc.inserted_at
AND task_outputs.retry_count = mrc.max_retry_count
max_retry_counts mrc ON p.id = mrc.id
AND p.inserted_at = mrc.inserted_at
AND p.retry_count = mrc.max_retry_count
JOIN
v1_task_event e ON (e.task_id, e.task_inserted_at, e.retry_count) = (p.id, p.inserted_at, p.retry_count)
WHERE
e.event_type = 'COMPLETED'
ORDER BY
task_outputs.id,
task_outputs.inserted_at,
task_outputs.retry_count DESC
p.id,
p.inserted_at,
p.retry_count DESC
`
type ListTaskParentOutputsParams struct {
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Tenantid pgtype.UUID `json:"tenantid"`
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Mintaskinsertedat pgtype.Timestamptz `json:"mintaskinsertedat"`
Mindaginsertedat pgtype.Timestamptz `json:"mindaginsertedat"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListTaskParentOutputsRow struct {
@@ -962,7 +964,13 @@ type ListTaskParentOutputsRow struct {
// Lists the outputs of parent steps for a list of tasks. This is recursive because it looks at all grandparents
// of the tasks as well.
func (q *Queries) ListTaskParentOutputs(ctx context.Context, db DBTX, arg ListTaskParentOutputsParams) ([]*ListTaskParentOutputsRow, error) {
rows, err := db.Query(ctx, listTaskParentOutputs, arg.Taskids, arg.Taskinsertedats, arg.Tenantid)
rows, err := db.Query(ctx, listTaskParentOutputs,
arg.Taskids,
arg.Taskinsertedats,
arg.Mintaskinsertedat,
arg.Mindaginsertedat,
arg.Tenantid,
)
if err != nil {
return nil, err
}

View File

@@ -3288,10 +3288,25 @@ func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
minTaskInsertedAt := pgtype.Timestamptz{
Valid: false,
}
minDagInsertedAt := pgtype.Timestamptz{
Valid: false,
}
for _, task := range tasks {
if task.DagID.Valid {
taskIds = append(taskIds, task.ID)
taskInsertedAts = append(taskInsertedAts, task.InsertedAt)
if task.DagInsertedAt.Valid && (!minDagInsertedAt.Valid || task.DagInsertedAt.Time.Before(minDagInsertedAt.Time)) {
minDagInsertedAt = task.DagInsertedAt
}
if task.InsertedAt.Valid && (!minTaskInsertedAt.Valid || task.InsertedAt.Time.Before(minTaskInsertedAt.Time)) {
minTaskInsertedAt = task.InsertedAt
}
}
}
@@ -3301,10 +3316,24 @@ func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId
return resMap, nil
}
// if the inserted at values are still not valid, set them to a year ago as a placeholder
// this is the equivalent of no partition pruning
longTimeAgo := sqlchelpers.TimestamptzFromTime(time.Now().Add(-24 * 365 * time.Hour)) // 1 year ago
if !minTaskInsertedAt.Valid {
minTaskInsertedAt = longTimeAgo
}
if !minDagInsertedAt.Valid {
minDagInsertedAt = longTimeAgo
}
res, err := r.queries.ListTaskParentOutputs(ctx, r.pool, sqlcv1.ListTaskParentOutputsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
Mintaskinsertedat: minTaskInsertedAt,
Mindaginsertedat: minDagInsertedAt,
})
if err != nil {

View File

@@ -1664,6 +1664,8 @@ CREATE TABLE v1_payload_wal (
CONSTRAINT "v1_payload_wal_payload" FOREIGN KEY (payload_id, payload_inserted_at, payload_type, tenant_id) REFERENCES v1_payload (id, inserted_at, type, tenant_id) ON DELETE CASCADE
) PARTITION BY HASH (tenant_id);
CREATE INDEX v1_payload_wal_payload_lookup_idx ON v1_payload_wal (payload_id, payload_inserted_at, payload_type, tenant_id);
SELECT create_v1_hash_partitions('v1_payload_wal'::TEXT, 4);