mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-24 12:28:35 -05:00
@@ -108,14 +108,26 @@ func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(
|
||||
) (res *RunConcurrencyResult, err error) {
|
||||
switch strategy.Strategy {
|
||||
case sqlcv1.V1ConcurrencyStrategyGROUPROUNDROBIN:
|
||||
return c.runGroupRoundRobin(ctx, tenantId, strategy)
|
||||
res, err = c.runGroupRoundRobin(ctx, tenantId, strategy)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("group round robin (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
case sqlcv1.V1ConcurrencyStrategyCANCELINPROGRESS:
|
||||
return c.runCancelInProgress(ctx, tenantId, strategy)
|
||||
res, err = c.runCancelInProgress(ctx, tenantId, strategy)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cancel in progress (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
case sqlcv1.V1ConcurrencyStrategyCANCELNEWEST:
|
||||
return c.runCancelNewest(ctx, tenantId, strategy)
|
||||
res, err = c.runCancelNewest(ctx, tenantId, strategy)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cancel newest (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
@@ -123,10 +135,11 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
tenantId pgtype.UUID,
|
||||
strategy *sqlcv1.V1StepConcurrency,
|
||||
) (res *RunConcurrencyResult, err error) {
|
||||
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
defer rollback()
|
||||
@@ -134,7 +147,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
err = c.queries.AdvisoryLock(ctx, tx, strategy.ID)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
var queued []TaskWithQueue
|
||||
@@ -145,7 +158,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
acquired, err := c.queries.TryAdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
if acquired {
|
||||
@@ -156,7 +169,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to run parent group round robin (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,7 +180,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to run child group round robin (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -210,7 +223,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to run group round robin: %w", err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -248,7 +261,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin(
|
||||
}
|
||||
|
||||
if err = commit(ctx); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to commit transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
return &RunConcurrencyResult{
|
||||
@@ -263,10 +276,11 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress(
|
||||
tenantId pgtype.UUID,
|
||||
strategy *sqlcv1.V1StepConcurrency,
|
||||
) (res *RunConcurrencyResult, err error) {
|
||||
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
defer rollback()
|
||||
@@ -274,7 +288,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress(
|
||||
err = c.queries.AdvisoryLock(ctx, tx, strategy.ID)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
var queued []TaskWithQueue
|
||||
@@ -285,7 +299,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress(
|
||||
err := c.queries.AdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(
|
||||
@@ -301,7 +315,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating parent temp table: %w", err)
|
||||
return nil, fmt.Errorf("error creating parent temp table (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
err = c.queries.RunParentCancelInProgress(ctx, tx, sqlcv1.RunParentCancelInProgressParams{
|
||||
@@ -311,7 +325,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error running parent cancel in progress: %w", err)
|
||||
return nil, fmt.Errorf("error running parent cancel in progress (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
poppedResults, err := c.queries.RunChildCancelInProgress(ctx, tx, sqlcv1.RunChildCancelInProgressParams{
|
||||
@@ -321,7 +335,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error running child cancel in progress: %w", err)
|
||||
return nil, fmt.Errorf("error running child cancel in progress (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -371,7 +385,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error running cancel in progress (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
// for any cancelled tasks, call cancelTasks
|
||||
@@ -402,7 +416,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error deleting tasks from queue (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -447,7 +461,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
}
|
||||
|
||||
if err = commit(ctx); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to commit transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
return &RunConcurrencyResult{
|
||||
@@ -465,7 +479,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest(
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
defer rollback()
|
||||
@@ -473,7 +487,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest(
|
||||
err = c.queries.AdvisoryLock(ctx, tx, strategy.ID)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
var queued []TaskWithQueue
|
||||
@@ -484,7 +498,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest(
|
||||
err := c.queries.AdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(
|
||||
@@ -500,7 +514,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating parent temp table: %w", err)
|
||||
return nil, fmt.Errorf("error creating parent temp table (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
err = c.queries.RunParentCancelNewest(ctx, tx, sqlcv1.RunParentCancelNewestParams{
|
||||
@@ -510,7 +524,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error running parent cancel newest: %w", err)
|
||||
return nil, fmt.Errorf("error running parent cancel newest (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||
}
|
||||
|
||||
poppedResults, err := c.queries.RunChildCancelNewest(ctx, tx, sqlcv1.RunChildCancelNewestParams{
|
||||
@@ -520,7 +534,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error running child cancel newest (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
// for any cancelled tasks, call cancelTasks
|
||||
@@ -551,7 +565,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error deleting tasks from queue (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -601,7 +615,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error running cancel newest (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
// for any cancelled tasks, call cancelTasks
|
||||
@@ -632,7 +646,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error deleting tasks from queue (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
queued = make([]TaskWithQueue, 0, len(poppedResults))
|
||||
@@ -677,7 +691,7 @@ WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint;`,
|
||||
}
|
||||
|
||||
if err = commit(ctx); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to commit transaction (strategy ID: %d): %w", strategy.ID, err)
|
||||
}
|
||||
|
||||
return &RunConcurrencyResult{
|
||||
|
||||
Reference in New Issue
Block a user