mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-21 00:30:12 -06:00
both strats
This commit is contained in:
@@ -285,18 +285,38 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress(
|
|||||||
|
|
||||||
defer rollback()
|
defer rollback()
|
||||||
|
|
||||||
err = c.queries.AdvisoryLock(ctx, tx, strategy.ID)
|
acquired, err := c.queries.TryAdvisoryLock(ctx, tx, strategy.ID)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err)
|
return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !acquired {
|
||||||
|
c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID)
|
||||||
|
|
||||||
|
return &RunConcurrencyResult{
|
||||||
|
Queued: []TaskWithQueue{},
|
||||||
|
Cancelled: []TaskWithCancelledReason{},
|
||||||
|
NextConcurrencyStrategies: []int64{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
var queued []TaskWithQueue
|
var queued []TaskWithQueue
|
||||||
var cancelled []TaskWithCancelledReason
|
var cancelled []TaskWithCancelledReason
|
||||||
var nextConcurrencyStrategies []int64
|
var nextConcurrencyStrategies []int64
|
||||||
|
|
||||||
if strategy.ParentStrategyID.Valid {
|
if strategy.ParentStrategyID.Valid {
|
||||||
err := c.queries.AdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64)
|
acquired, err := c.queries.TryAdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64)
|
||||||
|
|
||||||
|
if !acquired {
|
||||||
|
c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID)
|
||||||
|
|
||||||
|
return &RunConcurrencyResult{
|
||||||
|
Queued: []TaskWithQueue{},
|
||||||
|
Cancelled: []TaskWithCancelledReason{},
|
||||||
|
NextConcurrencyStrategies: []int64{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err)
|
||||||
@@ -493,7 +513,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest(
|
|||||||
|
|
||||||
if !acquired {
|
if !acquired {
|
||||||
// Log lock contention issue
|
// Log lock contention issue
|
||||||
c.l.Warnf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID)
|
c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID)
|
||||||
// Lock not available, return empty result to avoid blocking
|
// Lock not available, return empty result to avoid blocking
|
||||||
return &RunConcurrencyResult{
|
return &RunConcurrencyResult{
|
||||||
Queued: []TaskWithQueue{},
|
Queued: []TaskWithQueue{},
|
||||||
@@ -516,7 +536,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest(
|
|||||||
|
|
||||||
if !parentAcquired {
|
if !parentAcquired {
|
||||||
// Log the event when the parent advisory lock is not acquired
|
// Log the event when the parent advisory lock is not acquired
|
||||||
c.l.Warnf("Parent advisory lock not acquired (strategy ID: %d, parent: %d)", strategy.ID, strategy.ParentStrategyID.Int64)
|
c.l.Warn().Msgf("Parent advisory lock not acquired (strategy ID: %d, parent: %d)", strategy.ID, strategy.ParentStrategyID.Int64)
|
||||||
// Parent lock not available, return empty result
|
// Parent lock not available, return empty result
|
||||||
return &RunConcurrencyResult{
|
return &RunConcurrencyResult{
|
||||||
Queued: []TaskWithQueue{},
|
Queued: []TaskWithQueue{},
|
||||||
|
|||||||
Reference in New Issue
Block a user