diff --git a/pkg/repository/v1/scheduler_concurrency.go b/pkg/repository/v1/scheduler_concurrency.go index eb93614ad..2f56df69f 100644 --- a/pkg/repository/v1/scheduler_concurrency.go +++ b/pkg/repository/v1/scheduler_concurrency.go @@ -285,18 +285,38 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress( defer rollback() - err = c.queries.AdvisoryLock(ctx, tx, strategy.ID) + acquired, err := c.queries.TryAdvisoryLock(ctx, tx, strategy.ID) if err != nil { return nil, fmt.Errorf("failed to acquire advisory lock (strategy ID: %d): %w", strategy.ID, err) } + if !acquired { + c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID) + + return &RunConcurrencyResult{ + Queued: []TaskWithQueue{}, + Cancelled: []TaskWithCancelledReason{}, + NextConcurrencyStrategies: []int64{}, + }, nil + } + var queued []TaskWithQueue var cancelled []TaskWithCancelledReason var nextConcurrencyStrategies []int64 if strategy.ParentStrategyID.Valid { - err := c.queries.AdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64) + acquired, err := c.queries.TryAdvisoryLock(ctx, tx, PARENT_STRATEGY_LOCK_OFFSET+strategy.ParentStrategyID.Int64) + + if !acquired { + c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID) + + return &RunConcurrencyResult{ + Queued: []TaskWithQueue{}, + Cancelled: []TaskWithCancelledReason{}, + NextConcurrencyStrategies: []int64{}, + }, nil + } if err != nil { return nil, fmt.Errorf("failed to acquire parent advisory lock (strategy ID: %d, parent: %d): %w", strategy.ID, strategy.ParentStrategyID.Int64, err) @@ -493,7 +513,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest( if !acquired { // Log lock contention issue - c.l.Warnf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID) + c.l.Warn().Msgf("Advisory lock not acquired (strategy ID: %d). Possible lock contention.", strategy.ID) // Lock not available, return empty result to avoid blocking return &RunConcurrencyResult{ Queued: []TaskWithQueue{}, @@ -516,7 +536,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest( if !parentAcquired { // Log the event when the parent advisory lock is not acquired - c.l.Warnf("Parent advisory lock not acquired (strategy ID: %d, parent: %d)", strategy.ID, strategy.ParentStrategyID.Int64) + c.l.Warn().Msgf("Parent advisory lock not acquired (strategy ID: %d, parent: %d)", strategy.ID, strategy.ParentStrategyID.Int64) // Parent lock not available, return empty result return &RunConcurrencyResult{ Queued: []TaskWithQueue{},