fix: dag statuses should wait for all tasks to be created (#1428)

This commit is contained in:
abelanger5
2025-03-27 15:22:01 -07:00
committed by GitHub
parent 113fa98b55
commit 1bad2de35f
8 changed files with 36 additions and 8 deletions

View File

@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE v1_dags_olap ADD COLUMN total_tasks INT NOT NULL DEFAULT 1;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE v1_dags_olap DROP COLUMN total_tasks;
-- +goose StatementEnd

View File

@@ -1169,6 +1169,7 @@ func (r *OLAPRepositoryImpl) writeDAGBatch(ctx context.Context, tenantId string,
Input: dag.Input,
AdditionalMetadata: dag.AdditionalMetadata,
ParentTaskExternalID: parentTaskExternalID,
TotalTasks: int32(dag.TotalTasks), // nolint: gosec
})
}

View File

@@ -74,6 +74,7 @@ func (r iteratorForCreateDAGsOLAP) Values() ([]interface{}, error) {
r.rows[0].Input,
r.rows[0].AdditionalMetadata,
r.rows[0].ParentTaskExternalID,
r.rows[0].TotalTasks,
}, nil
}
@@ -82,7 +83,7 @@ func (r iteratorForCreateDAGsOLAP) Err() error {
}
func (q *Queries) CreateDAGsOLAP(ctx context.Context, db DBTX, arg []CreateDAGsOLAPParams) (int64, error) {
return db.CopyFrom(ctx, []string{"v1_dags_olap"}, []string{"tenant_id", "id", "inserted_at", "external_id", "display_name", "workflow_id", "workflow_version_id", "input", "additional_metadata", "parent_task_external_id"}, &iteratorForCreateDAGsOLAP{rows: arg})
return db.CopyFrom(ctx, []string{"v1_dags_olap"}, []string{"tenant_id", "id", "inserted_at", "external_id", "display_name", "workflow_id", "workflow_version_id", "input", "additional_metadata", "parent_task_external_id", "total_tasks"}, &iteratorForCreateDAGsOLAP{rows: arg})
}
// iteratorForCreateMatchConditions implements pgx.CopyFromSource.

View File

@@ -2469,6 +2469,7 @@ type V1DagsOlap struct {
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
TotalTasks int32 `json:"total_tasks"`
}
type V1DurableSleep struct {

View File

@@ -91,7 +91,8 @@ INSERT INTO v1_dags_olap (
workflow_version_id,
input,
additional_metadata,
parent_task_external_id
parent_task_external_id,
total_tasks
) VALUES (
$1,
$2,
@@ -102,7 +103,8 @@ INSERT INTO v1_dags_olap (
$7,
$8,
$9,
$10
$10,
$11
);
-- name: CreateTaskEventsOLAPTmp :copyfrom
@@ -749,7 +751,8 @@ WITH locked_events AS (
d.id,
d.inserted_at,
d.readable_status,
d.tenant_id
d.tenant_id,
d.total_tasks
FROM
v1_dags_olap d
JOIN
@@ -762,6 +765,7 @@ WITH locked_events AS (
SELECT
d.id,
d.inserted_at,
d.total_tasks,
COUNT(t.id) AS task_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'COMPLETED') AS completed_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'FAILED') AS failed_count,
@@ -777,7 +781,7 @@ WITH locked_events AS (
v1_tasks_olap t ON
(dt.task_id, dt.task_inserted_at) = (t.id, t.inserted_at)
GROUP BY
d.id, d.inserted_at
d.id, d.inserted_at, d.total_tasks
), updated_dags AS (
UPDATE
v1_dags_olap d
@@ -785,6 +789,8 @@ WITH locked_events AS (
readable_status = CASE
-- If we only have queued events, we should keep the status as is
WHEN dtc.queued_count = dtc.task_count THEN d.readable_status
-- If the task count is not equal to the total tasks, we should set the status to running
WHEN dtc.task_count != dtc.total_tasks THEN 'RUNNING'
-- If we have any running or queued tasks, we should set the status to running
WHEN dtc.running_count > 0 OR dtc.queued_count > 0 THEN 'RUNNING'
WHEN dtc.failed_count > 0 THEN 'FAILED'

View File

@@ -22,6 +22,7 @@ type CreateDAGsOLAPParams struct {
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
TotalTasks int32 `json:"total_tasks"`
}
const createOLAPPartitions = `-- name: CreateOLAPPartitions :exec
@@ -1285,7 +1286,7 @@ WITH lookup_task AS (
external_id = $1::uuid
)
SELECT
d.id, d.inserted_at, d.tenant_id, d.external_id, d.display_name, d.workflow_id, d.workflow_version_id, d.readable_status, d.input, d.additional_metadata, d.parent_task_external_id
d.id, d.inserted_at, d.tenant_id, d.external_id, d.display_name, d.workflow_id, d.workflow_version_id, d.readable_status, d.input, d.additional_metadata, d.parent_task_external_id, d.total_tasks
FROM
v1_dags_olap d
JOIN
@@ -1307,6 +1308,7 @@ func (q *Queries) ReadDAGByExternalID(ctx context.Context, db DBTX, externalid p
&i.Input,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.TotalTasks,
)
return &i, err
}
@@ -1570,7 +1572,8 @@ WITH locked_events AS (
d.id,
d.inserted_at,
d.readable_status,
d.tenant_id
d.tenant_id,
d.total_tasks
FROM
v1_dags_olap d
JOIN
@@ -1583,6 +1586,7 @@ WITH locked_events AS (
SELECT
d.id,
d.inserted_at,
d.total_tasks,
COUNT(t.id) AS task_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'COMPLETED') AS completed_count,
COUNT(t.id) FILTER (WHERE t.readable_status = 'FAILED') AS failed_count,
@@ -1598,7 +1602,7 @@ WITH locked_events AS (
v1_tasks_olap t ON
(dt.task_id, dt.task_inserted_at) = (t.id, t.inserted_at)
GROUP BY
d.id, d.inserted_at
d.id, d.inserted_at, d.total_tasks
), updated_dags AS (
UPDATE
v1_dags_olap d
@@ -1606,6 +1610,8 @@ WITH locked_events AS (
readable_status = CASE
-- If we only have queued events, we should keep the status as is
WHEN dtc.queued_count = dtc.task_count THEN d.readable_status
-- If the task count is not equal to the total tasks, we should set the status to running
WHEN dtc.task_count != dtc.total_tasks THEN 'RUNNING'
-- If we have any running or queued tasks, we should set the status to running
WHEN dtc.running_count > 0 OR dtc.queued_count > 0 THEN 'RUNNING'
WHEN dtc.failed_count > 0 THEN 'FAILED'

View File

@@ -870,6 +870,8 @@ type DAGWithData struct {
AdditionalMetadata []byte
ParentTaskExternalID *pgtype.UUID
TotalTasks int
}
func (r *TriggerRepositoryImpl) createDAGs(ctx context.Context, tx sqlcv1.DBTX, tenantId string, opts []createDAGOpts) ([]*DAGWithData, error) {
@@ -958,6 +960,7 @@ func (r *TriggerRepositoryImpl) createDAGs(ctx context.Context, tx sqlcv1.DBTX,
Input: input,
AdditionalMetadata: additionalMeta,
ParentTaskExternalID: &parentTaskExternalID,
TotalTasks: len(opt.TaskIds),
})
}

View File

@@ -183,6 +183,7 @@ CREATE TABLE v1_dags_olap (
input JSONB NOT NULL,
additional_metadata JSONB,
parent_task_external_id UUID,
total_tasks INT NOT NULL DEFAULT 1,
PRIMARY KEY (inserted_at, id, readable_status)
) PARTITION BY RANGE(inserted_at);