fix: concurrency issues and a few small improvements (#1324)

This commit is contained in:
abelanger5
2025-03-12 16:30:34 -04:00
committed by GitHub
parent 1950a0796d
commit ac968e94b8
13 changed files with 149 additions and 162 deletions

View File

@@ -73,5 +73,10 @@ RUN apk update && apk add --no-cache gcc musl-dev openssl bash ca-certificates
COPY --from=build-go /hatchet/bin/hatchet-${SERVER_TARGET} /hatchet/
# NOTE: this is just here for backwards compatibility with old migrate images which require the atlas-apply.sh script.
# This script is just a wrapped for `/hatchet/hatchet-migrate`.
COPY /hack/db/atlas-apply.sh ./atlas-apply.sh
RUN chmod +x ./atlas-apply.sh
EXPOSE 8080
CMD /hatchet/hatchet-${SERVER_TARGET}

View File

@@ -0,0 +1,7 @@
-- +goose Up
-- +goose StatementBegin
DROP TRIGGER IF EXISTS after_v1_task_runtime_delete ON v1_task_runtime;
DROP FUNCTION IF EXISTS after_v1_task_runtime_delete_function();
-- +goose StatementEnd

View File

@@ -1,68 +1,3 @@
#!/bin/bash
# Check whether DATABASE_URL is set
if [ -z "$DATABASE_URL" ]; then
echo "DATABASE_URL is not set"
exit 1
fi
# Wait up to 30 seconds for the database to be ready
echo "Waiting for database to be ready..."
timeout 30s bash -c '
until psql "$DATABASE_URL" -c "\q" 2>/dev/null; do
sleep 1
done
'
if [ $? -eq 124 ]; then
echo "Timed out waiting for the database to be ready"
exit 1
fi
# Function to attempt a psql connection with the given DATABASE_URL
attempt_psql_connection() {
local db_url=$1
psql "$db_url" -t -c "SELECT 1;" 2>/dev/null
return $?
}
# Check if sslmode is set in the DATABASE_URL
if [[ ! "$DATABASE_URL" =~ sslmode ]]; then
# Attempt a secure psql connection first if sslmode is not set
SECURE_DB_URL="${DATABASE_URL}?sslmode=require"
attempt_psql_connection "$SECURE_DB_URL"
if [ $? -ne 0 ]; then
# If secure connection fails, use sslmode=disable
echo "Secure connection failed. Using sslmode=disable"
DATABASE_URL="${DATABASE_URL}?sslmode=disable"
else
DATABASE_URL="$SECURE_DB_URL"
fi
fi
# Check for prisma migrations
MIGRATION_NAME=$(psql "$DATABASE_URL" -t -c "SELECT migration_name FROM _prisma_migrations ORDER BY started_at DESC LIMIT 1;" 2>/dev/null | xargs)
MIGRATION_NAME=$(echo $MIGRATION_NAME | cut -d'_' -f1)
echo "Migration name: $MIGRATION_NAME"
if [ $? -eq 0 ] && [ -n "$MIGRATION_NAME" ]; then
echo "Using existing prisma migration: $MIGRATION_NAME"
atlas migrate apply \
--url "$DATABASE_URL" \
--baseline "$MIGRATION_NAME" \
--dir "file://sql/atlas"
else
echo "No prisma migration found. Applying migrations via atlas..."
atlas migrate apply \
--url "$DATABASE_URL" \
--dir "file://sql/atlas"
fi
# if either of the above commands failed, exit with an error
if [ $? -ne 0 ]; then
echo "Migration failed. Exiting..."
exit 1
fi
/hatchet/hatchet-migrate

View File

@@ -608,7 +608,7 @@ func (t *MessageQueueImpl) subscribe(
t.l.Debug().Msgf("(session: %d) got msg", sessionCount)
if err := preAck(msg); err != nil {
t.l.Error().Msgf("error in pre-ack: %v", err)
t.l.Error().Msgf("error in pre-ack on msg %s: %v", msg.ID, err)
// nack the message
if err := rabbitMsg.Reject(false); err != nil {

View File

@@ -118,7 +118,11 @@ func baseFromReleaseTasksRow(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent {
}
func (e *TaskOutputEvent) Bytes() []byte {
resBytes, _ := json.Marshal(e) // nolint: errcheck
resBytes, err := json.Marshal(e)
if err != nil {
return []byte("{}")
}
return resBytes
}

View File

@@ -201,8 +201,15 @@ func (d *queueRepository) MarkQueueItemsProcessed(ctx context.Context, r *Assign
taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem
}
tasksToRelease := make([]TaskIdInsertedAtRetryCount, 0, len(r.SchedulingTimedOut))
for _, id := range r.SchedulingTimedOut {
idsToUnqueue = append(idsToUnqueue, id.ID)
tasksToRelease = append(tasksToRelease, TaskIdInsertedAtRetryCount{
Id: id.TaskID,
InsertedAt: id.TaskInsertedAt,
RetryCount: id.RetryCount,
})
}
queuedItemIds, err := d.queries.BulkQueueItems(ctx, tx, idsToUnqueue)
@@ -211,6 +218,12 @@ func (d *queueRepository) MarkQueueItemsProcessed(ctx context.Context, r *Assign
return nil, nil, err
}
_, err = d.releaseTasks(ctx, tx, sqlchelpers.UUIDToStr(d.tenantId), tasksToRelease)
if err != nil {
return nil, nil, err
}
queuedItemsMap := make(map[int64]struct{}, len(queuedItemIds))
for _, id := range queuedItemIds {

View File

@@ -480,13 +480,15 @@ WITH slots AS (
rn <= @maxRuns::int
), slots_to_cancel AS (
SELECT
*
cs.*
FROM
v1_concurrency_slot
v1_concurrency_slot cs
JOIN
tmp_workflow_concurrency_slot wcs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
WHERE
tenant_id = @tenantId::uuid AND
strategy_id = @strategyId::bigint AND
(task_inserted_at, task_id, task_retry_count) NOT IN (
cs.tenant_id = @tenantId::uuid AND
cs.strategy_id = @strategyId::bigint AND
(cs.task_inserted_at, cs.task_id, cs.task_retry_count) NOT IN (
SELECT
ers.task_inserted_at,
ers.task_id,
@@ -495,7 +497,7 @@ WITH slots AS (
eligible_running_slots ers
)
ORDER BY
task_id, task_inserted_at
cs.task_id, cs.task_inserted_at
FOR UPDATE
), slots_to_run AS (
SELECT
@@ -832,13 +834,15 @@ WITH slots AS (
rn <= @maxRuns::int
), slots_to_cancel AS (
SELECT
*
cs.*
FROM
v1_concurrency_slot
v1_concurrency_slot cs
JOIN
tmp_workflow_concurrency_slot wcs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
WHERE
tenant_id = @tenantId::uuid AND
strategy_id = @strategyId::bigint AND
(task_inserted_at, task_id, task_retry_count) NOT IN (
cs.tenant_id = @tenantId::uuid AND
cs.strategy_id = @strategyId::bigint AND
(cs.task_inserted_at, cs.task_id, cs.task_retry_count) NOT IN (
SELECT
ers.task_inserted_at,
ers.task_id,
@@ -847,7 +851,7 @@ WITH slots AS (
eligible_running_slots ers
)
ORDER BY
task_id ASC, task_inserted_at ASC
cs.task_id ASC, cs.task_inserted_at ASC
FOR UPDATE
), slots_to_run AS (
SELECT

View File

@@ -701,13 +701,15 @@ WITH slots AS (
rn <= $3::int
), slots_to_cancel AS (
SELECT
sort_id, task_id, task_inserted_at, task_retry_count, external_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, parent_strategy_id, priority, key, is_filled, next_parent_strategy_ids, next_strategy_ids, next_keys, queue_to_notify, schedule_timeout_at
cs.sort_id, cs.task_id, cs.task_inserted_at, cs.task_retry_count, cs.external_id, cs.tenant_id, cs.workflow_id, cs.workflow_version_id, cs.workflow_run_id, cs.strategy_id, cs.parent_strategy_id, cs.priority, cs.key, cs.is_filled, cs.next_parent_strategy_ids, cs.next_strategy_ids, cs.next_keys, cs.queue_to_notify, cs.schedule_timeout_at
FROM
v1_concurrency_slot
v1_concurrency_slot cs
JOIN
tmp_workflow_concurrency_slot wcs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
WHERE
tenant_id = $1::uuid AND
strategy_id = $2::bigint AND
(task_inserted_at, task_id, task_retry_count) NOT IN (
cs.tenant_id = $1::uuid AND
cs.strategy_id = $2::bigint AND
(cs.task_inserted_at, cs.task_id, cs.task_retry_count) NOT IN (
SELECT
ers.task_inserted_at,
ers.task_id,
@@ -716,7 +718,7 @@ WITH slots AS (
eligible_running_slots ers
)
ORDER BY
task_id, task_inserted_at
cs.task_id, cs.task_inserted_at
FOR UPDATE
), slots_to_run AS (
SELECT
@@ -921,13 +923,15 @@ WITH slots AS (
rn <= $3::int
), slots_to_cancel AS (
SELECT
sort_id, task_id, task_inserted_at, task_retry_count, external_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, parent_strategy_id, priority, key, is_filled, next_parent_strategy_ids, next_strategy_ids, next_keys, queue_to_notify, schedule_timeout_at
cs.sort_id, cs.task_id, cs.task_inserted_at, cs.task_retry_count, cs.external_id, cs.tenant_id, cs.workflow_id, cs.workflow_version_id, cs.workflow_run_id, cs.strategy_id, cs.parent_strategy_id, cs.priority, cs.key, cs.is_filled, cs.next_parent_strategy_ids, cs.next_strategy_ids, cs.next_keys, cs.queue_to_notify, cs.schedule_timeout_at
FROM
v1_concurrency_slot
v1_concurrency_slot cs
JOIN
tmp_workflow_concurrency_slot wcs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
WHERE
tenant_id = $1::uuid AND
strategy_id = $2::bigint AND
(task_inserted_at, task_id, task_retry_count) NOT IN (
cs.tenant_id = $1::uuid AND
cs.strategy_id = $2::bigint AND
(cs.task_inserted_at, cs.task_id, cs.task_retry_count) NOT IN (
SELECT
ers.task_inserted_at,
ers.task_id,
@@ -936,7 +940,7 @@ WITH slots AS (
eligible_running_slots ers
)
ORDER BY
task_id ASC, task_inserted_at ASC
cs.task_id ASC, cs.task_inserted_at ASC
FOR UPDATE
), slots_to_run AS (
SELECT

View File

@@ -134,7 +134,13 @@ WITH match_counts AS (
GROUP BY v1_match_id
), result_matches AS (
SELECT
m.*
m.*,
CASE WHEN
(mc.total_create_groups > 0 AND mc.total_create_groups = mc.satisfied_create_groups) THEN 'CREATE'
WHEN (mc.total_queue_groups > 0 AND mc.total_queue_groups = mc.satisfied_queue_groups) THEN 'QUEUE'
WHEN (mc.total_cancel_groups > 0 AND mc.total_cancel_groups = mc.satisfied_cancel_groups) THEN 'CANCEL'
WHEN (mc.total_skip_groups > 0 AND mc.total_skip_groups = mc.satisfied_skip_groups) THEN 'SKIP'
END::v1_match_condition_action AS action
FROM
v1_match m
JOIN
@@ -167,15 +173,16 @@ WITH match_counts AS (
), matches_with_data AS (
SELECT
m.id,
m.action,
(
SELECT jsonb_object_agg(action, aggregated_1)
FROM (
SELECT action, jsonb_object_agg(readable_data_key, data_array) AS aggregated_1
FROM (
SELECT action, readable_data_key, jsonb_agg(data) AS data_array
FROM v1_match_condition
WHERE v1_match_id = m.id AND is_satisfied
GROUP BY action, readable_data_key
SELECT mc.action, readable_data_key, jsonb_agg(data) AS data_array
FROM v1_match_condition mc
WHERE mc.v1_match_id = m.id AND mc.is_satisfied AND mc.action = m.action
GROUP BY mc.action, readable_data_key
) t
GROUP BY action
) s
@@ -183,7 +190,7 @@ WITH match_counts AS (
FROM
result_matches m
GROUP BY
m.id
m.id, m.action
), deleted_matches AS (
DELETE FROM
v1_match

View File

@@ -298,7 +298,13 @@ WITH match_counts AS (
GROUP BY v1_match_id
), result_matches AS (
SELECT
m.id, m.tenant_id, m.kind, m.is_satisfied, m.signal_task_id, m.signal_task_inserted_at, m.signal_external_id, m.signal_key, m.trigger_dag_id, m.trigger_dag_inserted_at, m.trigger_step_id, m.trigger_step_index, m.trigger_external_id, m.trigger_workflow_run_id, m.trigger_parent_task_external_id, m.trigger_parent_task_id, m.trigger_parent_task_inserted_at, m.trigger_child_index, m.trigger_child_key, m.trigger_existing_task_id, m.trigger_existing_task_inserted_at
m.id, m.tenant_id, m.kind, m.is_satisfied, m.signal_task_id, m.signal_task_inserted_at, m.signal_external_id, m.signal_key, m.trigger_dag_id, m.trigger_dag_inserted_at, m.trigger_step_id, m.trigger_step_index, m.trigger_external_id, m.trigger_workflow_run_id, m.trigger_parent_task_external_id, m.trigger_parent_task_id, m.trigger_parent_task_inserted_at, m.trigger_child_index, m.trigger_child_key, m.trigger_existing_task_id, m.trigger_existing_task_inserted_at,
CASE WHEN
(mc.total_create_groups > 0 AND mc.total_create_groups = mc.satisfied_create_groups) THEN 'CREATE'
WHEN (mc.total_queue_groups > 0 AND mc.total_queue_groups = mc.satisfied_queue_groups) THEN 'QUEUE'
WHEN (mc.total_cancel_groups > 0 AND mc.total_cancel_groups = mc.satisfied_cancel_groups) THEN 'CANCEL'
WHEN (mc.total_skip_groups > 0 AND mc.total_skip_groups = mc.satisfied_skip_groups) THEN 'SKIP'
END::v1_match_condition_action AS action
FROM
v1_match m
JOIN
@@ -331,15 +337,16 @@ WITH match_counts AS (
), matches_with_data AS (
SELECT
m.id,
m.action,
(
SELECT jsonb_object_agg(action, aggregated_1)
FROM (
SELECT action, jsonb_object_agg(readable_data_key, data_array) AS aggregated_1
FROM (
SELECT action, readable_data_key, jsonb_agg(data) AS data_array
FROM v1_match_condition
WHERE v1_match_id = m.id AND is_satisfied
GROUP BY action, readable_data_key
SELECT mc.action, readable_data_key, jsonb_agg(data) AS data_array
FROM v1_match_condition mc
WHERE mc.v1_match_id = m.id AND mc.is_satisfied AND mc.action = m.action
GROUP BY mc.action, readable_data_key
) t
GROUP BY action
) s
@@ -347,7 +354,7 @@ WITH match_counts AS (
FROM
result_matches m
GROUP BY
m.id
m.id, m.action
), deleted_matches AS (
DELETE FROM
v1_match
@@ -355,7 +362,7 @@ WITH match_counts AS (
id IN (SELECT id FROM deleted_conditions)
)
SELECT
result_matches.id, tenant_id, kind, is_satisfied, signal_task_id, signal_task_inserted_at, signal_external_id, signal_key, trigger_dag_id, trigger_dag_inserted_at, trigger_step_id, trigger_step_index, trigger_external_id, trigger_workflow_run_id, trigger_parent_task_external_id, trigger_parent_task_id, trigger_parent_task_inserted_at, trigger_child_index, trigger_child_key, trigger_existing_task_id, trigger_existing_task_inserted_at, d.id, mc_aggregated_data,
result_matches.id, tenant_id, kind, is_satisfied, signal_task_id, signal_task_inserted_at, signal_external_id, signal_key, trigger_dag_id, trigger_dag_inserted_at, trigger_step_id, trigger_step_index, trigger_external_id, trigger_workflow_run_id, trigger_parent_task_external_id, trigger_parent_task_id, trigger_parent_task_inserted_at, trigger_child_index, trigger_child_key, trigger_existing_task_id, trigger_existing_task_inserted_at, result_matches.action, d.id, d.action, mc_aggregated_data,
d.mc_aggregated_data
FROM
result_matches
@@ -364,30 +371,32 @@ LEFT JOIN
`
type SaveSatisfiedMatchConditionsRow struct {
ID int64 `json:"id"`
TenantID pgtype.UUID `json:"tenant_id"`
Kind V1MatchKind `json:"kind"`
IsSatisfied bool `json:"is_satisfied"`
SignalTaskID pgtype.Int8 `json:"signal_task_id"`
SignalTaskInsertedAt pgtype.Timestamptz `json:"signal_task_inserted_at"`
SignalExternalID pgtype.UUID `json:"signal_external_id"`
SignalKey pgtype.Text `json:"signal_key"`
TriggerDagID pgtype.Int8 `json:"trigger_dag_id"`
TriggerDagInsertedAt pgtype.Timestamptz `json:"trigger_dag_inserted_at"`
TriggerStepID pgtype.UUID `json:"trigger_step_id"`
TriggerStepIndex pgtype.Int8 `json:"trigger_step_index"`
TriggerExternalID pgtype.UUID `json:"trigger_external_id"`
TriggerWorkflowRunID pgtype.UUID `json:"trigger_workflow_run_id"`
TriggerParentTaskExternalID pgtype.UUID `json:"trigger_parent_task_external_id"`
TriggerParentTaskID pgtype.Int8 `json:"trigger_parent_task_id"`
TriggerParentTaskInsertedAt pgtype.Timestamptz `json:"trigger_parent_task_inserted_at"`
TriggerChildIndex pgtype.Int8 `json:"trigger_child_index"`
TriggerChildKey pgtype.Text `json:"trigger_child_key"`
TriggerExistingTaskID pgtype.Int8 `json:"trigger_existing_task_id"`
TriggerExistingTaskInsertedAt pgtype.Timestamptz `json:"trigger_existing_task_inserted_at"`
ID_2 pgtype.Int8 `json:"id_2"`
McAggregatedData []byte `json:"mc_aggregated_data"`
McAggregatedData_2 []byte `json:"mc_aggregated_data_2"`
ID int64 `json:"id"`
TenantID pgtype.UUID `json:"tenant_id"`
Kind V1MatchKind `json:"kind"`
IsSatisfied bool `json:"is_satisfied"`
SignalTaskID pgtype.Int8 `json:"signal_task_id"`
SignalTaskInsertedAt pgtype.Timestamptz `json:"signal_task_inserted_at"`
SignalExternalID pgtype.UUID `json:"signal_external_id"`
SignalKey pgtype.Text `json:"signal_key"`
TriggerDagID pgtype.Int8 `json:"trigger_dag_id"`
TriggerDagInsertedAt pgtype.Timestamptz `json:"trigger_dag_inserted_at"`
TriggerStepID pgtype.UUID `json:"trigger_step_id"`
TriggerStepIndex pgtype.Int8 `json:"trigger_step_index"`
TriggerExternalID pgtype.UUID `json:"trigger_external_id"`
TriggerWorkflowRunID pgtype.UUID `json:"trigger_workflow_run_id"`
TriggerParentTaskExternalID pgtype.UUID `json:"trigger_parent_task_external_id"`
TriggerParentTaskID pgtype.Int8 `json:"trigger_parent_task_id"`
TriggerParentTaskInsertedAt pgtype.Timestamptz `json:"trigger_parent_task_inserted_at"`
TriggerChildIndex pgtype.Int8 `json:"trigger_child_index"`
TriggerChildKey pgtype.Text `json:"trigger_child_key"`
TriggerExistingTaskID pgtype.Int8 `json:"trigger_existing_task_id"`
TriggerExistingTaskInsertedAt pgtype.Timestamptz `json:"trigger_existing_task_inserted_at"`
Action V1MatchConditionAction `json:"action"`
ID_2 pgtype.Int8 `json:"id_2"`
Action_2 NullV1MatchConditionAction `json:"action_2"`
McAggregatedData []byte `json:"mc_aggregated_data"`
McAggregatedData_2 []byte `json:"mc_aggregated_data_2"`
}
// NOTE: we have to break this into a separate query because CTEs can't see modified rows
@@ -425,7 +434,9 @@ func (q *Queries) SaveSatisfiedMatchConditions(ctx context.Context, db DBTX, mat
&i.TriggerChildKey,
&i.TriggerExistingTaskID,
&i.TriggerExistingTaskInsertedAt,
&i.Action,
&i.ID_2,
&i.Action_2,
&i.McAggregatedData,
&i.McAggregatedData_2,
); err != nil {

View File

@@ -198,6 +198,21 @@ WITH input AS (
v1_queue_item
WHERE
(task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM queue_items_to_delete)
), concurrency_slots_to_delete AS (
SELECT
task_id, task_inserted_at, task_retry_count
FROM
v1_concurrency_slot
WHERE
(task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input)
ORDER BY
task_id, task_inserted_at, task_retry_count
FOR UPDATE
), deleted_slots AS (
DELETE FROM
v1_concurrency_slot
WHERE
(task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, task_retry_count FROM concurrency_slots_to_delete)
)
SELECT
t.queue,

View File

@@ -1787,6 +1787,21 @@ WITH input AS (
v1_queue_item
WHERE
(task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM queue_items_to_delete)
), concurrency_slots_to_delete AS (
SELECT
task_id, task_inserted_at, task_retry_count
FROM
v1_concurrency_slot
WHERE
(task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input)
ORDER BY
task_id, task_inserted_at, task_retry_count
FOR UPDATE
), deleted_slots AS (
DELETE FROM
v1_concurrency_slot
WHERE
(task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, task_retry_count FROM concurrency_slots_to_delete)
)
SELECT
t.queue,

View File

@@ -681,39 +681,6 @@ REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE FUNCTION after_v1_workflow_concurrency_slot_update_function();
CREATE OR REPLACE FUNCTION after_v1_task_runtime_delete_function()
RETURNS trigger AS $$
BEGIN
WITH slots_to_delete AS (
SELECT
cs.task_inserted_at, cs.task_id, cs.task_retry_count, cs.key
FROM
deleted_rows d
JOIN v1_concurrency_slot cs ON cs.task_id = d.task_id AND cs.task_inserted_at = d.task_inserted_at AND cs.task_retry_count = d.retry_count
ORDER BY
cs.task_id, cs.task_inserted_at, cs.task_retry_count, cs.key
FOR UPDATE
)
DELETE FROM
v1_concurrency_slot cs
WHERE
(task_inserted_at, task_id, task_retry_count, key) IN (
SELECT
task_inserted_at, task_id, task_retry_count, key
FROM
slots_to_delete
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER after_v1_task_runtime_delete
AFTER DELETE ON v1_task_runtime
REFERENCING OLD TABLE AS deleted_rows
FOR EACH STATEMENT
EXECUTE FUNCTION after_v1_task_runtime_delete_function();
CREATE TABLE v1_retry_queue_item (
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,