mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 10:10:07 -05:00
fix: match condition writes and retry counts on failure (#1507)
This commit is contained in:
@@ -719,6 +719,36 @@ func getCreateTaskOpts(tasks []*contracts.CreateTaskOpts, kind string) ([]v1.Cre
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if stepCp.Concurrency != nil {
|
||||
for _, concurrency := range stepCp.Concurrency {
|
||||
// Skip nil concurrency
|
||||
if concurrency == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if concurrency.Expression == "" {
|
||||
return nil, status.Error(
|
||||
codes.InvalidArgument,
|
||||
fmt.Sprintf("CEL expression is required for concurrency (step %s)", stepCp.ReadableId),
|
||||
)
|
||||
}
|
||||
|
||||
var limitStrategy *string
|
||||
|
||||
if concurrency.LimitStrategy != nil && concurrency.LimitStrategy.String() != "" {
|
||||
s := concurrency.LimitStrategy.String()
|
||||
limitStrategy = &s
|
||||
}
|
||||
|
||||
steps[j].Concurrency = append(steps[j].Concurrency, v1.CreateConcurrencyOpts{
|
||||
Expression: concurrency.Expression,
|
||||
MaxRuns: concurrency.MaxRuns,
|
||||
LimitStrategy: limitStrategy,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+4
-11
@@ -1,13 +1,6 @@
|
||||
{
|
||||
"restartable": "rs",
|
||||
"verbose": true,
|
||||
"ext": "go,mod,sum",
|
||||
"watch": [
|
||||
"cmd/hatchet-api",
|
||||
"api",
|
||||
"internal",
|
||||
"go.mod",
|
||||
"go.sum"
|
||||
]
|
||||
|
||||
"restartable": "rs",
|
||||
"verbose": true,
|
||||
"ext": "go,mod,sum",
|
||||
"watch": ["cmd/hatchet-api", "api", "internal", "pkg", "go.mod", "go.sum"]
|
||||
}
|
||||
|
||||
+4
-9
@@ -1,11 +1,6 @@
|
||||
{
|
||||
"restartable": "rs",
|
||||
"verbose": true,
|
||||
"ext": "go,mod,sum",
|
||||
"watch": [
|
||||
"cmd/hatchet-engine",
|
||||
"internal",
|
||||
"go.mod",
|
||||
"go.sum"
|
||||
]
|
||||
"restartable": "rs",
|
||||
"verbose": true,
|
||||
"ext": "go,mod,sum",
|
||||
"watch": ["cmd/hatchet-engine", "internal", "pkg", "go.mod", "go.sum"]
|
||||
}
|
||||
|
||||
+197
-87
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"github.com/google/cel-go/cel"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
|
||||
@@ -706,72 +707,79 @@ func (m *sharedRepository) processCELExpressions(ctx context.Context, events []C
|
||||
}
|
||||
|
||||
func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBTX, tenantId string, eventMatches []CreateMatchOpts) error {
|
||||
// create the event matches first
|
||||
dagTenantIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
dagKinds := make([]string, 0, len(eventMatches))
|
||||
dagExistingDatas := make([][]byte, 0, len(eventMatches))
|
||||
triggerDagIds := make([]int64, 0, len(eventMatches))
|
||||
triggerDagInsertedAts := make([]pgtype.Timestamptz, 0, len(eventMatches))
|
||||
triggerStepIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
triggerStepIndices := make([]int64, 0, len(eventMatches))
|
||||
triggerExternalIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
triggerWorkflowRunIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
triggerExistingTaskIds := make([]pgtype.Int8, 0, len(eventMatches))
|
||||
triggerExistingTaskInsertedAts := make([]pgtype.Timestamptz, 0, len(eventMatches))
|
||||
triggerParentExternalIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
triggerParentTaskIds := make([]pgtype.Int8, 0, len(eventMatches))
|
||||
triggerParentTaskInsertedAts := make([]pgtype.Timestamptz, 0, len(eventMatches))
|
||||
triggerChildIndices := make([]pgtype.Int8, 0, len(eventMatches))
|
||||
triggerChildKeys := make([]pgtype.Text, 0, len(eventMatches))
|
||||
// Create maps to store match details by key
|
||||
matchByKey := make(map[string]CreateMatchOpts)
|
||||
|
||||
signalTenantIds := make([]pgtype.UUID, 0, len(eventMatches))
|
||||
signalKinds := make([]string, 0, len(eventMatches))
|
||||
signalTaskIds := make([]int64, 0, len(eventMatches))
|
||||
signalTaskInsertedAts := make([]pgtype.Timestamptz, 0, len(eventMatches))
|
||||
signalKeys := make([]string, 0, len(eventMatches))
|
||||
// Separate DAG and signal matches
|
||||
dagMatches := make([]CreateMatchOpts, 0)
|
||||
signalMatches := make([]CreateMatchOpts, 0)
|
||||
|
||||
// Group matches and create lookup map
|
||||
for _, match := range eventMatches {
|
||||
// at the moment, we skip creating matches for things that don't have all fields set
|
||||
key := getMatchKey(match)
|
||||
matchByKey[key] = match
|
||||
|
||||
if match.TriggerDAGId != nil && match.TriggerDAGInsertedAt.Valid && match.TriggerStepId != nil && match.TriggerExternalId != nil {
|
||||
dagTenantIds = append(dagTenantIds, sqlchelpers.UUIDFromStr(tenantId))
|
||||
dagKinds = append(dagKinds, string(match.Kind))
|
||||
dagExistingDatas = append(dagExistingDatas, match.ExistingMatchData)
|
||||
triggerDagIds = append(triggerDagIds, *match.TriggerDAGId)
|
||||
triggerDagInsertedAts = append(triggerDagInsertedAts, match.TriggerDAGInsertedAt)
|
||||
triggerStepIds = append(triggerStepIds, sqlchelpers.UUIDFromStr(*match.TriggerStepId))
|
||||
triggerStepIndices = append(triggerStepIndices, match.TriggerStepIndex.Int64)
|
||||
triggerExternalIds = append(triggerExternalIds, sqlchelpers.UUIDFromStr(*match.TriggerExternalId))
|
||||
triggerParentExternalIds = append(triggerParentExternalIds, match.TriggerParentTaskExternalId)
|
||||
triggerParentTaskIds = append(triggerParentTaskIds, match.TriggerParentTaskId)
|
||||
triggerParentTaskInsertedAts = append(triggerParentTaskInsertedAts, match.TriggerParentTaskInsertedAt)
|
||||
triggerChildIndices = append(triggerChildIndices, match.TriggerChildIndex)
|
||||
triggerChildKeys = append(triggerChildKeys, match.TriggerChildKey)
|
||||
|
||||
if match.TriggerExistingTaskId != nil {
|
||||
triggerExistingTaskIds = append(triggerExistingTaskIds, pgtype.Int8{Int64: *match.TriggerExistingTaskId, Valid: true})
|
||||
} else {
|
||||
triggerExistingTaskIds = append(triggerExistingTaskIds, pgtype.Int8{})
|
||||
}
|
||||
|
||||
if match.TriggerWorkflowRunId != nil {
|
||||
triggerWorkflowRunIds = append(triggerWorkflowRunIds, sqlchelpers.UUIDFromStr(*match.TriggerWorkflowRunId))
|
||||
} else {
|
||||
triggerWorkflowRunIds = append(triggerWorkflowRunIds, pgtype.UUID{})
|
||||
}
|
||||
|
||||
triggerExistingTaskInsertedAts = append(triggerExistingTaskInsertedAts, match.TriggerExistingTaskInsertedAt)
|
||||
dagMatches = append(dagMatches, match)
|
||||
} else if match.SignalTaskId != nil && match.SignalKey != nil && match.SignalTaskInsertedAt.Valid {
|
||||
signalTenantIds = append(signalTenantIds, sqlchelpers.UUIDFromStr(tenantId))
|
||||
signalKinds = append(signalKinds, string(match.Kind))
|
||||
signalTaskIds = append(signalTaskIds, *match.SignalTaskId)
|
||||
signalTaskInsertedAts = append(signalTaskInsertedAts, match.SignalTaskInsertedAt)
|
||||
signalKeys = append(signalKeys, *match.SignalKey)
|
||||
signalMatches = append(signalMatches, match)
|
||||
}
|
||||
}
|
||||
|
||||
var createdMatches []*sqlcv1.V1Match
|
||||
// Create match conditions
|
||||
var matchConditionParams []sqlcv1.CreateMatchConditionsParams
|
||||
|
||||
if len(dagTenantIds) > 0 {
|
||||
// Create DAG trigger matches
|
||||
if len(dagMatches) > 0 {
|
||||
// Prepare data for DAG trigger matches
|
||||
dagTenantIds := make([]pgtype.UUID, len(dagMatches))
|
||||
dagKinds := make([]string, len(dagMatches))
|
||||
dagExistingDatas := make([][]byte, len(dagMatches))
|
||||
triggerDagIds := make([]int64, len(dagMatches))
|
||||
triggerDagInsertedAts := make([]pgtype.Timestamptz, len(dagMatches))
|
||||
triggerStepIds := make([]pgtype.UUID, len(dagMatches))
|
||||
triggerStepIndices := make([]int64, len(dagMatches))
|
||||
triggerExternalIds := make([]pgtype.UUID, len(dagMatches))
|
||||
triggerWorkflowRunIds := make([]pgtype.UUID, len(dagMatches))
|
||||
triggerExistingTaskIds := make([]pgtype.Int8, len(dagMatches))
|
||||
triggerExistingTaskInsertedAts := make([]pgtype.Timestamptz, len(dagMatches))
|
||||
triggerParentExternalIds := make([]pgtype.UUID, len(dagMatches))
|
||||
triggerParentTaskIds := make([]pgtype.Int8, len(dagMatches))
|
||||
triggerParentTaskInsertedAts := make([]pgtype.Timestamptz, len(dagMatches))
|
||||
triggerChildIndices := make([]pgtype.Int8, len(dagMatches))
|
||||
triggerChildKeys := make([]pgtype.Text, len(dagMatches))
|
||||
|
||||
for i, match := range dagMatches {
|
||||
dagTenantIds[i] = sqlchelpers.UUIDFromStr(tenantId)
|
||||
dagKinds[i] = string(match.Kind)
|
||||
dagExistingDatas[i] = match.ExistingMatchData
|
||||
triggerDagIds[i] = *match.TriggerDAGId
|
||||
triggerDagInsertedAts[i] = match.TriggerDAGInsertedAt
|
||||
triggerStepIds[i] = sqlchelpers.UUIDFromStr(*match.TriggerStepId)
|
||||
triggerStepIndices[i] = match.TriggerStepIndex.Int64
|
||||
triggerExternalIds[i] = sqlchelpers.UUIDFromStr(*match.TriggerExternalId)
|
||||
triggerParentExternalIds[i] = match.TriggerParentTaskExternalId
|
||||
triggerParentTaskIds[i] = match.TriggerParentTaskId
|
||||
triggerParentTaskInsertedAts[i] = match.TriggerParentTaskInsertedAt
|
||||
triggerChildIndices[i] = match.TriggerChildIndex
|
||||
triggerChildKeys[i] = match.TriggerChildKey
|
||||
|
||||
if match.TriggerExistingTaskId != nil {
|
||||
triggerExistingTaskIds[i] = pgtype.Int8{Int64: *match.TriggerExistingTaskId, Valid: true}
|
||||
} else {
|
||||
triggerExistingTaskIds[i] = pgtype.Int8{}
|
||||
}
|
||||
|
||||
if match.TriggerWorkflowRunId != nil {
|
||||
triggerWorkflowRunIds[i] = sqlchelpers.UUIDFromStr(*match.TriggerWorkflowRunId)
|
||||
} else {
|
||||
triggerWorkflowRunIds[i] = pgtype.UUID{}
|
||||
}
|
||||
|
||||
triggerExistingTaskInsertedAts[i] = match.TriggerExistingTaskInsertedAt
|
||||
}
|
||||
|
||||
// Create matches in the database
|
||||
dagCreatedMatches, err := m.queries.CreateMatchesForDAGTriggers(
|
||||
ctx,
|
||||
tx,
|
||||
@@ -799,10 +807,56 @@ func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBT
|
||||
return err
|
||||
}
|
||||
|
||||
createdMatches = append(createdMatches, dagCreatedMatches...)
|
||||
// For each created match, generate a key from its properties and map it to its ID
|
||||
for _, createdMatch := range dagCreatedMatches {
|
||||
// Get existingTaskId pointer if valid
|
||||
var existingTaskId *int64
|
||||
if createdMatch.TriggerExistingTaskID.Valid {
|
||||
taskId := createdMatch.TriggerExistingTaskID.Int64
|
||||
existingTaskId = &taskId
|
||||
}
|
||||
|
||||
// Generate key using the specific function for DAG matches
|
||||
key := getDagMatchKey(
|
||||
string(createdMatch.Kind),
|
||||
createdMatch.TriggerDagID.Int64,
|
||||
createdMatch.TriggerExternalID.String(),
|
||||
createdMatch.TriggerStepID.String(),
|
||||
existingTaskId,
|
||||
createdMatch.TriggerParentTaskID,
|
||||
)
|
||||
|
||||
// Get the original match from the map
|
||||
match, exists := matchByKey[key]
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("match not found for key %s", key)
|
||||
}
|
||||
|
||||
for _, condition := range match.Conditions {
|
||||
matchConditionParams = append(matchConditionParams, getConditionParam(tenantId, createdMatch.ID, condition))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(signalTenantIds) > 0 {
|
||||
// Create signal trigger matches
|
||||
if len(signalMatches) > 0 {
|
||||
// Prepare data for signal trigger matches
|
||||
signalTenantIds := make([]pgtype.UUID, len(signalMatches))
|
||||
signalKinds := make([]string, len(signalMatches))
|
||||
signalTaskIds := make([]int64, len(signalMatches))
|
||||
signalTaskInsertedAts := make([]pgtype.Timestamptz, len(signalMatches))
|
||||
signalKeys := make([]string, len(signalMatches))
|
||||
|
||||
for i, match := range signalMatches {
|
||||
signalTenantIds[i] = sqlchelpers.UUIDFromStr(tenantId)
|
||||
signalKinds[i] = string(match.Kind)
|
||||
signalTaskIds[i] = *match.SignalTaskId
|
||||
signalTaskInsertedAts[i] = match.SignalTaskInsertedAt
|
||||
signalKeys[i] = *match.SignalKey
|
||||
}
|
||||
|
||||
// Create matches in the database
|
||||
signalCreatedMatches, err := m.queries.CreateMatchesForSignalTriggers(
|
||||
ctx,
|
||||
tx,
|
||||
@@ -819,42 +873,29 @@ func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBT
|
||||
return err
|
||||
}
|
||||
|
||||
createdMatches = append(createdMatches, signalCreatedMatches...)
|
||||
}
|
||||
// For each created match, generate a key from its properties and map it to its ID
|
||||
for _, createdMatch := range signalCreatedMatches {
|
||||
// Generate key using the specific function for signal matches
|
||||
key := getSignalMatchKey(
|
||||
string(createdMatch.Kind),
|
||||
createdMatch.SignalTaskID.Int64,
|
||||
createdMatch.SignalKey.String,
|
||||
)
|
||||
|
||||
if len(createdMatches) != len(eventMatches) {
|
||||
return fmt.Errorf("expected %d matches to be created, but only %d were created", len(eventMatches), len(createdMatches))
|
||||
}
|
||||
// Get the original match from the map
|
||||
match, exists := matchByKey[key]
|
||||
|
||||
// next, create the match conditions
|
||||
params := make([]sqlcv1.CreateMatchConditionsParams, 0, len(eventMatches))
|
||||
|
||||
for i, match := range eventMatches {
|
||||
createdMatch := createdMatches[i]
|
||||
|
||||
for _, condition := range match.Conditions {
|
||||
param := sqlcv1.CreateMatchConditionsParams{
|
||||
V1MatchID: createdMatch.ID,
|
||||
TenantID: sqlchelpers.UUIDFromStr(tenantId),
|
||||
EventType: condition.EventType,
|
||||
EventKey: condition.EventKey,
|
||||
ReadableDataKey: condition.ReadableDataKey,
|
||||
OrGroupID: sqlchelpers.UUIDFromStr(condition.GroupId),
|
||||
Expression: sqlchelpers.TextFromStr(condition.Expression),
|
||||
Action: condition.Action,
|
||||
IsSatisfied: false,
|
||||
Data: condition.Data,
|
||||
if !exists {
|
||||
return fmt.Errorf("match not found for key %s", key)
|
||||
}
|
||||
|
||||
if condition.EventResourceHint != nil {
|
||||
param.EventResourceHint = sqlchelpers.TextFromStr(*condition.EventResourceHint)
|
||||
for _, condition := range match.Conditions {
|
||||
matchConditionParams = append(matchConditionParams, getConditionParam(tenantId, createdMatch.ID, condition))
|
||||
}
|
||||
|
||||
params = append(params, param)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := m.queries.CreateMatchConditions(ctx, tx, params)
|
||||
_, err := m.queries.CreateMatchConditions(ctx, tx, matchConditionParams)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -863,6 +904,75 @@ func (m *sharedRepository) createEventMatches(ctx context.Context, tx sqlcv1.DBT
|
||||
return nil
|
||||
}
|
||||
|
||||
func getConditionParam(tenantId string, createdMatchId int64, condition GroupMatchCondition) sqlcv1.CreateMatchConditionsParams {
|
||||
param := sqlcv1.CreateMatchConditionsParams{
|
||||
V1MatchID: createdMatchId,
|
||||
TenantID: sqlchelpers.UUIDFromStr(tenantId),
|
||||
EventType: condition.EventType,
|
||||
EventKey: condition.EventKey,
|
||||
ReadableDataKey: condition.ReadableDataKey,
|
||||
OrGroupID: sqlchelpers.UUIDFromStr(condition.GroupId),
|
||||
Expression: sqlchelpers.TextFromStr(condition.Expression),
|
||||
Action: condition.Action,
|
||||
IsSatisfied: false,
|
||||
Data: condition.Data,
|
||||
}
|
||||
|
||||
if condition.EventResourceHint != nil {
|
||||
param.EventResourceHint = sqlchelpers.TextFromStr(*condition.EventResourceHint)
|
||||
}
|
||||
|
||||
return param
|
||||
}
|
||||
|
||||
func getDagMatchKey(kind string, dagId int64, externalId string, stepId string, existingTaskId *int64, parentTaskId pgtype.Int8) string {
|
||||
existingTaskIdStr := ""
|
||||
if existingTaskId != nil {
|
||||
existingTaskIdStr = fmt.Sprintf("%d", *existingTaskId)
|
||||
}
|
||||
|
||||
parentTaskIdStr := ""
|
||||
if parentTaskId.Valid {
|
||||
parentTaskIdStr = fmt.Sprintf("%d", parentTaskId.Int64)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("dag:%s:%d:%s:%s:%s:%s",
|
||||
kind,
|
||||
dagId,
|
||||
externalId,
|
||||
stepId,
|
||||
existingTaskIdStr,
|
||||
parentTaskIdStr)
|
||||
}
|
||||
|
||||
func getSignalMatchKey(kind string, signalTaskId int64, signalKey string) string {
|
||||
return fmt.Sprintf("signal:%s:%d:%s", kind, signalTaskId, signalKey)
|
||||
}
|
||||
|
||||
func getMatchKey(match CreateMatchOpts) string {
|
||||
// For DAG triggers
|
||||
if match.TriggerDAGId != nil && match.TriggerStepId != nil && match.TriggerExternalId != nil {
|
||||
return getDagMatchKey(
|
||||
string(sqlcv1.V1MatchKindTRIGGER),
|
||||
*match.TriggerDAGId,
|
||||
*match.TriggerExternalId,
|
||||
*match.TriggerStepId,
|
||||
match.TriggerExistingTaskId,
|
||||
match.TriggerParentTaskId)
|
||||
}
|
||||
|
||||
// For signal triggers
|
||||
if match.SignalTaskId != nil && match.SignalKey != nil {
|
||||
return getSignalMatchKey(
|
||||
string(sqlcv1.V1MatchKindSIGNAL),
|
||||
*match.SignalTaskId,
|
||||
*match.SignalKey)
|
||||
}
|
||||
|
||||
// Fallback for incomplete match data
|
||||
return uuid.New().String()
|
||||
}
|
||||
|
||||
func (m *sharedRepository) createAdditionalMatches(ctx context.Context, tx sqlcv1.DBTX, tenantId string, satisfiedMatches []*sqlcv1.SaveSatisfiedMatchConditionsRow) error { // nolint: unused
|
||||
additionalMatchStepIds := make([]pgtype.UUID, 0, len(satisfiedMatches))
|
||||
|
||||
|
||||
@@ -241,6 +241,7 @@ WITH input AS (
|
||||
SELECT
|
||||
unnest(@taskIds::bigint[]) AS task_id,
|
||||
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at,
|
||||
unnest(@taskRetryCounts::integer[]) AS task_retry_count,
|
||||
unnest(@isNonRetryables::boolean[]) AS is_non_retryable
|
||||
) AS subquery
|
||||
), locked_tasks AS (
|
||||
@@ -250,14 +251,10 @@ WITH input AS (
|
||||
t.step_id
|
||||
FROM
|
||||
v1_task t
|
||||
-- only fail tasks which have a v1_task_runtime equivalent to the current retry count. otherwise,
|
||||
-- a cancellation which deletes the v1_task_runtime might lead to a future failure event, which triggers
|
||||
-- a retry.
|
||||
JOIN
|
||||
v1_task_runtime rt ON rt.task_id = t.id AND rt.task_inserted_at = t.inserted_at AND rt.retry_count = t.retry_count
|
||||
input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at AND i.task_retry_count = t.retry_count
|
||||
WHERE
|
||||
(t.id, t.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
AND t.tenant_id = @tenantId::uuid
|
||||
t.tenant_id = @tenantId::uuid
|
||||
-- order by the task id to get a stable lock order
|
||||
ORDER BY
|
||||
id
|
||||
@@ -280,8 +277,8 @@ SET
|
||||
FROM
|
||||
tasks_to_steps
|
||||
WHERE
|
||||
(v1_task.id, v1_task.inserted_at) IN (
|
||||
SELECT task_id, task_inserted_at
|
||||
(v1_task.id, v1_task.inserted_at, v1_task.retry_count) IN (
|
||||
SELECT task_id, task_inserted_at, task_retry_count
|
||||
FROM input
|
||||
WHERE is_non_retryable = FALSE
|
||||
)
|
||||
@@ -303,7 +300,8 @@ WITH input AS (
|
||||
(
|
||||
SELECT
|
||||
unnest(@taskIds::bigint[]) AS task_id,
|
||||
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at
|
||||
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at,
|
||||
unnest(@taskRetryCounts::integer[]) AS task_retry_count
|
||||
) AS subquery
|
||||
), locked_tasks AS (
|
||||
SELECT
|
||||
@@ -314,10 +312,9 @@ WITH input AS (
|
||||
-- a cancellation which deletes the v1_task_runtime might lead to a future failure event, which triggers
|
||||
-- a retry.
|
||||
JOIN
|
||||
v1_task_runtime rt ON rt.task_id = t.id AND rt.task_inserted_at = t.inserted_at AND rt.retry_count = t.retry_count
|
||||
input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at AND i.task_retry_count = t.retry_count
|
||||
WHERE
|
||||
(t.id, t.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
AND t.tenant_id = @tenantId::uuid
|
||||
t.tenant_id = @tenantId::uuid
|
||||
-- order by the task id to get a stable lock order
|
||||
ORDER BY
|
||||
id
|
||||
@@ -331,7 +328,10 @@ SET
|
||||
FROM
|
||||
locked_tasks
|
||||
WHERE
|
||||
(v1_task.id, v1_task.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
(v1_task.id, v1_task.inserted_at, v1_task.retry_count) IN (
|
||||
SELECT task_id, task_inserted_at, task_retry_count
|
||||
FROM input
|
||||
)
|
||||
AND @maxInternalRetries::int > v1_task.internal_retry_count
|
||||
RETURNING
|
||||
v1_task.id,
|
||||
@@ -371,8 +371,7 @@ SELECT
|
||||
FROM
|
||||
v1_task
|
||||
JOIN
|
||||
-- NOTE: we only join when retry count matches
|
||||
expired_runtimes ON expired_runtimes.task_id = v1_task.id AND expired_runtimes.task_inserted_at = v1_task.inserted_at AND expired_runtimes.retry_count = v1_task.retry_count;
|
||||
expired_runtimes ON expired_runtimes.task_id = v1_task.id AND expired_runtimes.task_inserted_at = v1_task.inserted_at;
|
||||
|
||||
-- name: ListTasksToReassign :many
|
||||
WITH tasks_on_inactive_workers AS (
|
||||
@@ -397,8 +396,7 @@ SELECT
|
||||
FROM
|
||||
v1_task
|
||||
JOIN
|
||||
-- NOTE: we only join when retry count matches
|
||||
tasks_on_inactive_workers lrs ON lrs.task_id = v1_task.id AND lrs.task_inserted_at = v1_task.inserted_at AND lrs.retry_count = v1_task.retry_count;
|
||||
tasks_on_inactive_workers lrs ON lrs.task_id = v1_task.id AND lrs.task_inserted_at = v1_task.inserted_at;
|
||||
|
||||
-- name: ProcessRetryQueueItems :many
|
||||
WITH rqis_to_delete AS (
|
||||
|
||||
@@ -77,13 +77,14 @@ func (q *Queries) DeleteMatchingSignalEvents(ctx context.Context, db DBTX, arg D
|
||||
const failTaskAppFailure = `-- name: FailTaskAppFailure :many
|
||||
WITH input AS (
|
||||
SELECT
|
||||
task_id, task_inserted_at, is_non_retryable
|
||||
task_id, task_inserted_at, task_retry_count, is_non_retryable
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
unnest($1::bigint[]) AS task_id,
|
||||
unnest($2::timestamptz[]) AS task_inserted_at,
|
||||
unnest($3::boolean[]) AS is_non_retryable
|
||||
unnest($3::integer[]) AS task_retry_count,
|
||||
unnest($4::boolean[]) AS is_non_retryable
|
||||
) AS subquery
|
||||
), locked_tasks AS (
|
||||
SELECT
|
||||
@@ -92,14 +93,10 @@ WITH input AS (
|
||||
t.step_id
|
||||
FROM
|
||||
v1_task t
|
||||
-- only fail tasks which have a v1_task_runtime equivalent to the current retry count. otherwise,
|
||||
-- a cancellation which deletes the v1_task_runtime might lead to a future failure event, which triggers
|
||||
-- a retry.
|
||||
JOIN
|
||||
v1_task_runtime rt ON rt.task_id = t.id AND rt.task_inserted_at = t.inserted_at AND rt.retry_count = t.retry_count
|
||||
input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at AND i.task_retry_count = t.retry_count
|
||||
WHERE
|
||||
(t.id, t.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
AND t.tenant_id = $4::uuid
|
||||
t.tenant_id = $5::uuid
|
||||
-- order by the task id to get a stable lock order
|
||||
ORDER BY
|
||||
id
|
||||
@@ -122,8 +119,8 @@ SET
|
||||
FROM
|
||||
tasks_to_steps
|
||||
WHERE
|
||||
(v1_task.id, v1_task.inserted_at) IN (
|
||||
SELECT task_id, task_inserted_at
|
||||
(v1_task.id, v1_task.inserted_at, v1_task.retry_count) IN (
|
||||
SELECT task_id, task_inserted_at, task_retry_count
|
||||
FROM input
|
||||
WHERE is_non_retryable = FALSE
|
||||
)
|
||||
@@ -140,6 +137,7 @@ RETURNING
|
||||
type FailTaskAppFailureParams struct {
|
||||
Taskids []int64 `json:"taskids"`
|
||||
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
|
||||
Taskretrycounts []int32 `json:"taskretrycounts"`
|
||||
Isnonretryables []bool `json:"isnonretryables"`
|
||||
Tenantid pgtype.UUID `json:"tenantid"`
|
||||
}
|
||||
@@ -158,6 +156,7 @@ func (q *Queries) FailTaskAppFailure(ctx context.Context, db DBTX, arg FailTaskA
|
||||
rows, err := db.Query(ctx, failTaskAppFailure,
|
||||
arg.Taskids,
|
||||
arg.Taskinsertedats,
|
||||
arg.Taskretrycounts,
|
||||
arg.Isnonretryables,
|
||||
arg.Tenantid,
|
||||
)
|
||||
@@ -189,12 +188,13 @@ func (q *Queries) FailTaskAppFailure(ctx context.Context, db DBTX, arg FailTaskA
|
||||
const failTaskInternalFailure = `-- name: FailTaskInternalFailure :many
|
||||
WITH input AS (
|
||||
SELECT
|
||||
task_id, task_inserted_at
|
||||
task_id, task_inserted_at, task_retry_count
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
unnest($2::bigint[]) AS task_id,
|
||||
unnest($3::timestamptz[]) AS task_inserted_at
|
||||
unnest($3::timestamptz[]) AS task_inserted_at,
|
||||
unnest($4::integer[]) AS task_retry_count
|
||||
) AS subquery
|
||||
), locked_tasks AS (
|
||||
SELECT
|
||||
@@ -205,10 +205,9 @@ WITH input AS (
|
||||
-- a cancellation which deletes the v1_task_runtime might lead to a future failure event, which triggers
|
||||
-- a retry.
|
||||
JOIN
|
||||
v1_task_runtime rt ON rt.task_id = t.id AND rt.task_inserted_at = t.inserted_at AND rt.retry_count = t.retry_count
|
||||
input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at AND i.task_retry_count = t.retry_count
|
||||
WHERE
|
||||
(t.id, t.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
AND t.tenant_id = $4::uuid
|
||||
t.tenant_id = $5::uuid
|
||||
-- order by the task id to get a stable lock order
|
||||
ORDER BY
|
||||
id
|
||||
@@ -222,7 +221,10 @@ SET
|
||||
FROM
|
||||
locked_tasks
|
||||
WHERE
|
||||
(v1_task.id, v1_task.inserted_at) IN (SELECT task_id, task_inserted_at FROM input)
|
||||
(v1_task.id, v1_task.inserted_at, v1_task.retry_count) IN (
|
||||
SELECT task_id, task_inserted_at, task_retry_count
|
||||
FROM input
|
||||
)
|
||||
AND $1::int > v1_task.internal_retry_count
|
||||
RETURNING
|
||||
v1_task.id,
|
||||
@@ -234,6 +236,7 @@ type FailTaskInternalFailureParams struct {
|
||||
Maxinternalretries int32 `json:"maxinternalretries"`
|
||||
Taskids []int64 `json:"taskids"`
|
||||
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
|
||||
Taskretrycounts []int32 `json:"taskretrycounts"`
|
||||
Tenantid pgtype.UUID `json:"tenantid"`
|
||||
}
|
||||
|
||||
@@ -249,6 +252,7 @@ func (q *Queries) FailTaskInternalFailure(ctx context.Context, db DBTX, arg Fail
|
||||
arg.Maxinternalretries,
|
||||
arg.Taskids,
|
||||
arg.Taskinsertedats,
|
||||
arg.Taskretrycounts,
|
||||
arg.Tenantid,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -1154,8 +1158,7 @@ SELECT
|
||||
FROM
|
||||
v1_task
|
||||
JOIN
|
||||
-- NOTE: we only join when retry count matches
|
||||
tasks_on_inactive_workers lrs ON lrs.task_id = v1_task.id AND lrs.task_inserted_at = v1_task.inserted_at AND lrs.retry_count = v1_task.retry_count
|
||||
tasks_on_inactive_workers lrs ON lrs.task_id = v1_task.id AND lrs.task_inserted_at = v1_task.inserted_at
|
||||
`
|
||||
|
||||
type ListTasksToReassignParams struct {
|
||||
@@ -1222,8 +1225,7 @@ SELECT
|
||||
FROM
|
||||
v1_task
|
||||
JOIN
|
||||
-- NOTE: we only join when retry count matches
|
||||
expired_runtimes ON expired_runtimes.task_id = v1_task.id AND expired_runtimes.task_inserted_at = v1_task.inserted_at AND expired_runtimes.retry_count = v1_task.retry_count
|
||||
expired_runtimes ON expired_runtimes.task_id = v1_task.id AND expired_runtimes.task_inserted_at = v1_task.inserted_at
|
||||
`
|
||||
|
||||
type ListTasksToTimeoutParams struct {
|
||||
|
||||
@@ -602,10 +602,12 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
|
||||
tasks := make([]TaskIdInsertedAtRetryCount, len(failureOpts))
|
||||
appFailureTaskIds := make([]int64, 0)
|
||||
appFailureTaskInsertedAts := make([]pgtype.Timestamptz, 0)
|
||||
appFailureTaskRetryCounts := make([]int32, 0)
|
||||
appFailureIsNonRetryableStatuses := make([]bool, 0)
|
||||
|
||||
internalFailureTaskIds := make([]int64, 0)
|
||||
internalFailureInsertedAts := make([]pgtype.Timestamptz, 0)
|
||||
internalFailureTaskRetryCounts := make([]int32, 0)
|
||||
|
||||
for i, failureOpt := range failureOpts {
|
||||
tasks[i] = *failureOpt.TaskIdInsertedAtRetryCount
|
||||
@@ -613,10 +615,12 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
|
||||
if failureOpt.IsAppError {
|
||||
appFailureTaskIds = append(appFailureTaskIds, failureOpt.Id)
|
||||
appFailureTaskInsertedAts = append(appFailureTaskInsertedAts, failureOpt.InsertedAt)
|
||||
appFailureTaskRetryCounts = append(appFailureTaskRetryCounts, failureOpt.RetryCount)
|
||||
appFailureIsNonRetryableStatuses = append(appFailureIsNonRetryableStatuses, failureOpt.IsNonRetryable)
|
||||
} else {
|
||||
internalFailureTaskIds = append(internalFailureTaskIds, failureOpt.Id)
|
||||
internalFailureInsertedAts = append(internalFailureInsertedAts, failureOpt.InsertedAt)
|
||||
internalFailureTaskRetryCounts = append(internalFailureTaskRetryCounts, failureOpt.RetryCount)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -630,6 +634,7 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Taskids: appFailureTaskIds,
|
||||
Taskinsertedats: appFailureTaskInsertedAts,
|
||||
Taskretrycounts: appFailureTaskRetryCounts,
|
||||
Isnonretryables: appFailureIsNonRetryableStatuses,
|
||||
})
|
||||
|
||||
@@ -659,6 +664,7 @@ func (r *TaskRepositoryImpl) failTasksTx(ctx context.Context, tx sqlcv1.DBTX, te
|
||||
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
|
||||
Taskids: internalFailureTaskIds,
|
||||
Taskinsertedats: internalFailureInsertedAts,
|
||||
Taskretrycounts: internalFailureTaskRetryCounts,
|
||||
Maxinternalretries: r.maxInternalRetryCount,
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user