feat: show concurrency queue counts in the UI (#1495)

* feat: show concurrency queue counts in the UI

* fix: parent concurrency queues
This commit is contained in:
abelanger5
2025-04-04 12:22:14 -04:00
committed by GitHub
parent 5c985c3f49
commit b6d077f96d
11 changed files with 651 additions and 184 deletions

View File

@@ -572,12 +572,18 @@ BEGIN
FROM
parent_to_child_strategy_ids pcs
ON CONFLICT (strategy_id, workflow_version_id, workflow_run_id) DO UPDATE
-- If there's a conflict, and we're inserting a new concurrency_slot, we'd like to remove the strategy_id
-- from the completed child strategy ids.
-- When concurrency slots are INSERTED:
-- We need to REMOVE their strategy_ids from the parent's completed_child_strategy_ids
-- because these child slots are now active again, not completed
-- This correctly handles bulk inserts by removing ALL strategy_ids in the current batch
SET completed_child_strategy_ids = ARRAY(
SELECT DISTINCT UNNEST(ARRAY_REMOVE(v1_workflow_concurrency_slot.completed_child_strategy_ids, cs.strategy_id))
FROM new_table cs
WHERE EXCLUDED.strategy_id = cs.parent_strategy_id
SELECT DISTINCT e
FROM UNNEST(v1_workflow_concurrency_slot.completed_child_strategy_ids) AS e
WHERE e NOT IN (
SELECT strategy_id
FROM new_table
WHERE parent_strategy_id = EXCLUDED.strategy_id
)
);
-- If the v1_step_concurrency strategy is not active, we set it to active.
@@ -635,16 +641,29 @@ BEGIN
WHERE
cs.parent_strategy_id IS NOT NULL
AND rqi.task_id IS NULL
), parent_slots_grouped AS (
SELECT
cs.parent_strategy_id,
cs.workflow_version_id,
cs.workflow_run_id,
ARRAY_AGG(cs.strategy_id) AS child_strategy_ids
FROM
parent_slot cs
GROUP BY
cs.parent_strategy_id,
cs.workflow_version_id,
cs.workflow_run_id
), locked_parent_slots AS (
SELECT
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id,
cs.strategy_id AS child_strategy_id
psg.child_strategy_ids
FROM
v1_workflow_concurrency_slot wcs
JOIN
parent_slot cs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
parent_slots_grouped psg ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) =
(psg.parent_strategy_id, psg.workflow_version_id, psg.workflow_run_id)
ORDER BY
wcs.strategy_id,
wcs.workflow_version_id,
@@ -653,7 +672,19 @@ BEGIN
)
UPDATE v1_workflow_concurrency_slot wcs
SET completed_child_strategy_ids = ARRAY(
SELECT DISTINCT UNNEST(ARRAY_APPEND(wcs.completed_child_strategy_ids, cs.child_strategy_id))
-- When concurrency slots are DELETED:
-- We need to ADD their strategy_ids to the parent's completed_child_strategy_ids
-- This correctly handles bulk deletes by adding ALL strategy_ids in the current batch
SELECT DISTINCT e
FROM (
SELECT UNNEST(wcs.completed_child_strategy_ids) AS e
UNION
SELECT UNNEST(lps.child_strategy_ids)
FROM locked_parent_slots lps
WHERE lps.strategy_id = wcs.strategy_id
AND lps.workflow_version_id = wcs.workflow_version_id
AND lps.workflow_run_id = wcs.workflow_run_id
) AS subquery
)
FROM locked_parent_slots cs
WHERE