fix: duplicate assignments in queuer (#993)

* wip: individual mutexes for actions

* tmp: debug panic

* remove debug code

* remove deadlocks package and don't write unassigned events

* fix: race condition in scheduler and add internal retries

* fix: data race
This commit is contained in:
abelanger5
2024-10-25 12:52:43 -04:00
committed by GitHub
parent 7ece86dfff
commit 509542b804
9 changed files with 332 additions and 132 deletions
+43 -4
View File
@@ -328,14 +328,39 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId string, res *
if len(res.Assigned) > 0 {
dispatcherIdToWorkerIdsToStepRuns := make(map[string]map[string][]string)
workerIds := make([]string, 0)
for _, assigned := range res.Assigned {
workerIds = append(workerIds, sqlchelpers.UUIDToStr(assigned.WorkerId))
}
dispatcherIdWorkerIds, err := s.repo.Worker().GetDispatcherIdsForWorkers(ctx, tenantId, workerIds)
if err != nil {
s.internalRetry(ctx, tenantId, res.Assigned...)
return fmt.Errorf("could not list dispatcher ids for workers: %w. attempting internal retry", err)
}
workerIdToDispatcherId := make(map[string]string)
for dispatcherId, workerIds := range dispatcherIdWorkerIds {
for _, workerId := range workerIds {
workerIdToDispatcherId[workerId] = dispatcherId
}
}
for _, bulkAssigned := range res.Assigned {
if bulkAssigned.DispatcherId == nil {
s.l.Error().Msg("could not assign step run to worker: no dispatcher id")
dispatcherId, ok := workerIdToDispatcherId[sqlchelpers.UUIDToStr(bulkAssigned.WorkerId)]
if !ok {
s.l.Error().Msg("could not assign step run to worker: no dispatcher id. attempting internal retry.")
s.internalRetry(ctx, tenantId, bulkAssigned)
continue
}
dispatcherId := sqlchelpers.UUIDToStr(*bulkAssigned.DispatcherId)
if _, ok := dispatcherIdToWorkerIdsToStepRuns[dispatcherId]; !ok {
dispatcherIdToWorkerIdsToStepRuns[dispatcherId] = make(map[string][]string)
}
@@ -382,6 +407,20 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId string, res *
return err
}
func (s *Scheduler) internalRetry(ctx context.Context, tenantId string, assigned ...*v2.AssignedQueueItem) {
for _, a := range assigned {
stepRunId := sqlchelpers.UUIDToStr(a.QueueItem.StepRunId)
_, err := s.repo.StepRun().QueueStepRun(ctx, tenantId, stepRunId, &repository.QueueStepRunOpts{
IsInternalRetry: true,
})
if err != nil {
s.l.Error().Err(err).Msg("could not requeue step run for internal retry")
}
}
}
func getStepRunCancelTask(tenantId, stepRunId, reason string) *msgqueue.Message {
payload, _ := datautils.ToJSONMap(tasktypes.StepRunCancelTaskPayload{
StepRunId: stepRunId,
+32 -23
View File
@@ -855,7 +855,7 @@ FROM (
) AS input
RETURNING *;
-- name: UpdateStepRunsToAssigned :exec
-- name: UpdateStepRunsToAssigned :many
WITH input AS (
SELECT
"id",
@@ -880,7 +880,7 @@ WITH input AS (
JOIN
"StepRun" sr ON sr."id" = input."id"
ORDER BY sr."id"
), assignments AS (
), assigned_step_runs AS (
INSERT INTO "SemaphoreQueueItem" (
"stepRunId",
"workerId",
@@ -892,31 +892,40 @@ WITH input AS (
@tenantId::uuid
FROM
input
-- conflicting step run id should update workerId
ON CONFLICT ("stepRunId") DO UPDATE
ON CONFLICT ("stepRunId") DO NOTHING
-- only return the step run ids that were successfully assigned
RETURNING "stepRunId", "workerId"
), timeout_insert AS (
-- bulk insert into timeout queue items
INSERT INTO
"TimeoutQueueItem" (
"stepRunId",
"retryCount",
"timeoutAt",
"tenantId",
"isQueued"
)
SELECT
sr."id",
sr."retryCount",
sr."timeoutAt",
sr."tenantId",
true
FROM
updated_step_runs sr
JOIN
assigned_step_runs asr ON sr."id" = asr."stepRunId"
ON CONFLICT ("stepRunId", "retryCount") DO UPDATE
SET
"workerId" = EXCLUDED."workerId"
"timeoutAt" = EXCLUDED."timeoutAt"
RETURNING
"stepRunId"
)
-- bulk insert into timeout queue items
INSERT INTO
"TimeoutQueueItem" (
"stepRunId",
"retryCount",
"timeoutAt",
"tenantId",
"isQueued"
)
SELECT
sr."id",
sr."retryCount",
sr."timeoutAt",
sr."tenantId",
true
asr."stepRunId",
asr."workerId"
FROM
updated_step_runs sr
ON CONFLICT ("stepRunId", "retryCount") DO UPDATE
SET
"timeoutAt" = EXCLUDED."timeoutAt";
assigned_step_runs asr;
-- name: GetFinalizedStepRuns :many
SELECT
+55 -26
View File
@@ -2872,7 +2872,7 @@ func (q *Queries) UpdateStepRunUnsetWorkerIdBulk(ctx context.Context, db DBTX, s
return err
}
const updateStepRunsToAssigned = `-- name: UpdateStepRunsToAssigned :exec
const updateStepRunsToAssigned = `-- name: UpdateStepRunsToAssigned :many
WITH input AS (
SELECT
"id",
@@ -2897,7 +2897,7 @@ WITH input AS (
JOIN
"StepRun" sr ON sr."id" = input."id"
ORDER BY sr."id"
), assignments AS (
), assigned_step_runs AS (
INSERT INTO "SemaphoreQueueItem" (
"stepRunId",
"workerId",
@@ -2909,30 +2909,40 @@ WITH input AS (
$4::uuid
FROM
input
-- conflicting step run id should update workerId
ON CONFLICT ("stepRunId") DO UPDATE
ON CONFLICT ("stepRunId") DO NOTHING
-- only return the step run ids that were successfully assigned
RETURNING "stepRunId", "workerId"
), timeout_insert AS (
-- bulk insert into timeout queue items
INSERT INTO
"TimeoutQueueItem" (
"stepRunId",
"retryCount",
"timeoutAt",
"tenantId",
"isQueued"
)
SELECT
sr."id",
sr."retryCount",
sr."timeoutAt",
sr."tenantId",
true
FROM
updated_step_runs sr
JOIN
assigned_step_runs asr ON sr."id" = asr."stepRunId"
ON CONFLICT ("stepRunId", "retryCount") DO UPDATE
SET
"workerId" = EXCLUDED."workerId"
"timeoutAt" = EXCLUDED."timeoutAt"
RETURNING
"stepRunId"
)
INSERT INTO
"TimeoutQueueItem" (
"stepRunId",
"retryCount",
"timeoutAt",
"tenantId",
"isQueued"
)
SELECT
sr."id",
sr."retryCount",
sr."timeoutAt",
sr."tenantId",
true
asr."stepRunId",
asr."workerId"
FROM
updated_step_runs sr
ON CONFLICT ("stepRunId", "retryCount") DO UPDATE
SET
"timeoutAt" = EXCLUDED."timeoutAt"
assigned_step_runs asr
`
type UpdateStepRunsToAssignedParams struct {
@@ -2942,15 +2952,34 @@ type UpdateStepRunsToAssignedParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
}
// bulk insert into timeout queue items
func (q *Queries) UpdateStepRunsToAssigned(ctx context.Context, db DBTX, arg UpdateStepRunsToAssignedParams) error {
_, err := db.Exec(ctx, updateStepRunsToAssigned,
type UpdateStepRunsToAssignedRow struct {
StepRunId pgtype.UUID `json:"stepRunId"`
WorkerId pgtype.UUID `json:"workerId"`
}
func (q *Queries) UpdateStepRunsToAssigned(ctx context.Context, db DBTX, arg UpdateStepRunsToAssignedParams) ([]*UpdateStepRunsToAssignedRow, error) {
rows, err := db.Query(ctx, updateStepRunsToAssigned,
arg.Steprunids,
arg.Stepruntimeouts,
arg.Workerids,
arg.Tenantid,
)
return err
if err != nil {
return nil, err
}
defer rows.Close()
var items []*UpdateStepRunsToAssignedRow
for rows.Next() {
var i UpdateStepRunsToAssignedRow
if err := rows.Scan(&i.StepRunId, &i.WorkerId); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const upsertDesiredWorkerLabel = `-- name: UpsertDesiredWorkerLabel :one
+1 -1
View File
@@ -1304,7 +1304,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, qlp *zerolo
numAssigns[sqlchelpers.UUIDToStr(workerId)]++
}
err = s.queries.UpdateStepRunsToAssigned(ctx, tx, dbsqlc.UpdateStepRunsToAssignedParams{
_, err = s.queries.UpdateStepRunsToAssigned(ctx, tx, dbsqlc.UpdateStepRunsToAssignedParams{
Steprunids: plan.StepRunIds,
Workerids: plan.WorkerIds,
Stepruntimeouts: plan.StepRunTimeouts,
+32
View File
@@ -571,3 +571,35 @@ func (r *workerEngineRepository) DeleteOldWorkerEvents(ctx context.Context, tena
return nil
}
func (r *workerEngineRepository) GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error) {
pgWorkerIds := make([]pgtype.UUID, len(workerIds))
for i, workerId := range workerIds {
pgWorkerIds[i] = sqlchelpers.UUIDFromStr(workerId)
}
rows, err := r.queries.ListDispatcherIdsForWorkers(ctx, r.pool, dbsqlc.ListDispatcherIdsForWorkersParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Workerids: sqlchelpers.UniqueSet(pgWorkerIds),
})
if err != nil {
return nil, fmt.Errorf("could not get dispatcher ids for workers: %w", err)
}
dispatcherIdsToWorkers := make(map[string][]string)
for _, row := range rows {
dispatcherId := sqlchelpers.UUIDToStr(row.DispatcherId)
workerId := sqlchelpers.UUIDToStr(row.WorkerId)
if _, ok := dispatcherIdsToWorkers[dispatcherId]; !ok {
dispatcherIdsToWorkers[dispatcherId] = make([]string, 0)
}
dispatcherIdsToWorkers[dispatcherId] = append(dispatcherIdsToWorkers[dispatcherId], workerId)
}
return dispatcherIdsToWorkers, nil
}
+2
View File
@@ -113,4 +113,6 @@ type WorkerEngineRepository interface {
DeleteOldWorkers(ctx context.Context, tenantId string, lastHeartbeatBefore time.Time) (bool, error)
DeleteOldWorkerEvents(ctx context.Context, tenantId string, lastHeartbeatAfter time.Time) error
GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error)
}
+53
View File
@@ -1,6 +1,14 @@
package v2
import (
"slices"
"sync"
)
type action struct {
mu sync.RWMutex
actionId string
lastReplenishedSlotCount int
lastReplenishedWorkerCount int
@@ -9,6 +17,9 @@ type action struct {
}
func (a *action) activeCount() int {
a.mu.RLock()
defer a.mu.RUnlock()
count := 0
for _, slot := range a.slots {
@@ -19,3 +30,45 @@ func (a *action) activeCount() int {
return count
}
// orderedLock acquires the locks in a stable order to prevent deadlocks
func orderedLock(actionsMap map[string]*action) {
actions := sortActions(actionsMap)
for _, action := range actions {
action.mu.Lock()
}
}
// orderedUnlock releases the locks in a stable order to prevent deadlocks. it returns
// a function that should be deferred to unlock the locks.
func orderedUnlock(actionsMap map[string]*action) func() {
actions := sortActions(actionsMap)
return func() {
for _, action := range actions {
action.mu.Unlock()
}
}
}
func sortActions(actionsMap map[string]*action) []*action {
actions := make([]*action, 0, len(actionsMap))
for _, action := range actionsMap {
actions = append(actions, action)
}
slices.SortStableFunc(actions, func(i, j *action) int {
switch {
case i.actionId < j.actionId:
return -1
case i.actionId > j.actionId:
return 1
default:
return 0
}
})
return actions
}
+50 -29
View File
@@ -353,7 +353,6 @@ func (d *queuerDbQueries) MarkQueueItemsProcessed(ctx context.Context, r *assign
durPrepare := time.Since(checkpoint)
checkpoint = time.Now()
// d.queries.UpdateStepRunsToAssigned
idsToUnqueue := make([]int64, len(r.assigned))
stepRunIds := make([]pgtype.UUID, len(r.assigned))
workerIds := make([]pgtype.UUID, len(r.assigned))
@@ -385,9 +384,7 @@ func (d *queuerDbQueries) MarkQueueItemsProcessed(ctx context.Context, r *assign
return nil, nil, fmt.Errorf("could not bulk mark step runs as cancelling: %w", err)
}
// TODO: ADD UNIQUE CONSTRAINT TO SEMAPHORES WITH ON CONFLICT DO NOTHING, THEN DON'T
// QUEUE ITEMS THAT ALREADY HAVE SEMAPHORES
err = d.queries.UpdateStepRunsToAssigned(ctx, tx, dbsqlc.UpdateStepRunsToAssignedParams{
updatedStepRuns, err := d.queries.UpdateStepRunsToAssigned(ctx, tx, dbsqlc.UpdateStepRunsToAssignedParams{
Steprunids: stepRunIds,
Workerids: workerIds,
Stepruntimeouts: stepTimeouts,
@@ -409,11 +406,6 @@ func (d *queuerDbQueries) MarkQueueItemsProcessed(ctx context.Context, r *assign
timeAfterBulkQueueItems := time.Since(checkpoint)
dispatcherIdWorkerIds, err := d.queries.ListDispatcherIdsForWorkers(ctx, tx, dbsqlc.ListDispatcherIdsForWorkersParams{
Tenantid: d.tenantId,
Workerids: sqlchelpers.UniqueSet(workerIds),
})
if err := commit(ctx); err != nil {
return nil, nil, err
}
@@ -422,30 +414,34 @@ func (d *queuerDbQueries) MarkQueueItemsProcessed(ctx context.Context, r *assign
// if we committed, we can update the min id
d.updateMinId()
d.bulkStepRunsAssigned(sqlchelpers.UUIDToStr(d.tenantId), time.Now().UTC(), stepRunIds, workerIds)
d.bulkStepRunsUnassigned(sqlchelpers.UUIDToStr(d.tenantId), unassignedStepRunIds)
assignedStepRuns := make([]pgtype.UUID, len(updatedStepRuns))
assignedWorkerIds := make([]pgtype.UUID, len(updatedStepRuns))
for i, row := range updatedStepRuns {
assignedStepRuns[i] = row.StepRunId
assignedWorkerIds[i] = row.WorkerId
}
d.bulkStepRunsAssigned(sqlchelpers.UUIDToStr(d.tenantId), time.Now().UTC(), assignedStepRuns, assignedWorkerIds)
d.bulkStepRunsRateLimited(sqlchelpers.UUIDToStr(d.tenantId), r.rateLimited)
}()
workerIdToDispatcherId := make(map[string]pgtype.UUID, len(dispatcherIdWorkerIds))
stepRunIdToAssignedItem := make(map[string]*AssignedQueueItem, len(updatedStepRuns))
for _, dispatcherIdWorkerId := range dispatcherIdWorkerIds {
workerIdToDispatcherId[sqlchelpers.UUIDToStr(dispatcherIdWorkerId.WorkerId)] = dispatcherIdWorkerId.DispatcherId
for _, assignedItem := range r.assigned {
stepRunIdToAssignedItem[sqlchelpers.UUIDToStr(assignedItem.QueueItem.StepRunId)] = assignedItem
}
succeeded = make([]*AssignedQueueItem, 0, len(r.assigned))
failed = make([]*AssignedQueueItem, 0, len(r.assigned))
for _, assignedItem := range r.assigned {
dispatcherId, ok := workerIdToDispatcherId[sqlchelpers.UUIDToStr(assignedItem.WorkerId)]
for _, row := range updatedStepRuns {
succeeded = append(succeeded, stepRunIdToAssignedItem[sqlchelpers.UUIDToStr(row.StepRunId)])
delete(stepRunIdToAssignedItem, sqlchelpers.UUIDToStr(row.StepRunId))
}
if !ok {
failed = append(failed, assignedItem)
continue
}
assignedItem.DispatcherId = &dispatcherId
succeeded = append(succeeded, assignedItem)
for _, assignedItem := range stepRunIdToAssignedItem {
failed = append(failed, assignedItem)
}
if sinceStart := time.Since(start); sinceStart > 100*time.Millisecond {
@@ -769,6 +765,15 @@ type Queuer struct {
unackedMu rwMutex
unacked map[int64]struct{}
unassigned map[int64]*dbsqlc.QueueItem
unassignedMu mutex
}
type alreadyAssigned struct {
assignedQi *dbsqlc.QueueItem
assignedAtBatch int
assignedAtCount int
}
func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Scheduler, eventBuffer *buffer.BulkEventWriter, resultsCh chan<- *QueueResults) *Queuer {
@@ -794,6 +799,8 @@ func newQueuer(conf *sharedConfig, tenantId pgtype.UUID, queueName string, s *Sc
queueMu: newMu(conf.l),
unackedMu: newRWMu(conf.l),
unacked: make(map[int64]struct{}),
unassigned: make(map[int64]*dbsqlc.QueueItem),
unassignedMu: newMu(conf.l),
}
ctx, cancel := context.WithCancel(context.Background())
@@ -834,7 +841,6 @@ func (q *Queuer) queue() {
func (q *Queuer) loopQueue(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
qis := make([]*dbsqlc.QueueItem, 0)
for {
select {
@@ -854,7 +860,7 @@ func (q *Queuer) loopQueue(ctx context.Context) {
start := time.Now()
checkpoint := start
var err error
qis, err = q.refillQueue(ctx, qis)
qis, err := q.refillQueue(ctx)
if err != nil {
span.End()
@@ -959,15 +965,23 @@ func (q *Queuer) loopQueue(ctx context.Context) {
}
}
func (q *Queuer) refillQueue(ctx context.Context, curr []*dbsqlc.QueueItem) ([]*dbsqlc.QueueItem, error) {
func (q *Queuer) refillQueue(ctx context.Context) ([]*dbsqlc.QueueItem, error) {
q.unackedMu.Lock()
defer q.unackedMu.Unlock()
q.unassignedMu.Lock()
defer q.unassignedMu.Unlock()
curr := make([]*dbsqlc.QueueItem, 0, len(q.unassigned))
for _, qi := range q.unassigned {
curr = append(curr, qi)
}
// determine whether we need to replenish with the following cases:
// - we last replenished more than 1 second ago
// - if we are at less than 50% of the limit, we always attempt to replenish
replenish := false
now := time.Now()
if len(curr) < q.limit {
replenish = true
@@ -978,6 +992,7 @@ func (q *Queuer) refillQueue(ctx context.Context, curr []*dbsqlc.QueueItem) ([]*
}
if replenish {
now := time.Now()
q.lastReplenished = &now
limit := 2 * q.limit
@@ -1016,24 +1031,30 @@ type QueueResults struct {
func (q *Queuer) ack(r *assignResults) {
q.unackedMu.Lock()
defer q.unackedMu.Unlock()
q.unassignedMu.Lock()
defer q.unassignedMu.Unlock()
for _, assignedItem := range r.assigned {
delete(q.unacked, assignedItem.QueueItem.ID)
delete(q.unassigned, assignedItem.QueueItem.ID)
}
for _, unassignedItem := range r.unassigned {
delete(q.unacked, unassignedItem.ID)
q.unassigned[unassignedItem.ID] = unassignedItem
}
for _, schedulingTimedOutItem := range r.schedulingTimedOut {
delete(q.unacked, schedulingTimedOutItem.ID)
delete(q.unassigned, schedulingTimedOutItem.ID)
}
for _, rateLimitedItem := range r.rateLimited {
delete(q.unacked, rateLimitedItem.qi.ID)
q.unassigned[rateLimitedItem.qi.ID] = rateLimitedItem.qi
}
q.unackedMu.Unlock()
}
func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int {
+64 -49
View File
@@ -195,18 +195,26 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
// - some slots for an action: replenish if 50% of slots have been used, or have expired
// - more workers available for an action than previously: fully replenish
// - otherwise, do not replenish
actionsToReplenish := make(map[string]bool)
s.actionsMu.RLock()
actionsToReplenish := make(map[string]*action)
s.actionsMu.Lock()
for actionId, workers := range actionsToWorkerIds {
if mustReplenish {
actionsToReplenish[actionId] = true
// if the action is not in the map, it should be replenished
if _, ok := s.actions[actionId]; !ok {
newAction := &action{
actionId: actionId,
}
actionsToReplenish[actionId] = newAction
s.actions[actionId] = newAction
continue
}
// if the action is not in the map, it should be replenished
if _, ok := s.actions[actionId]; !ok {
actionsToReplenish[actionId] = true
if mustReplenish {
actionsToReplenish[actionId] = s.actions[actionId]
continue
}
@@ -228,10 +236,24 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
replenish = true
}
actionsToReplenish[actionId] = replenish
if replenish {
actionsToReplenish[actionId] = s.actions[actionId]
}
}
s.actionsMu.RUnlock()
// if there are any workers which have additional actions not in the actionsToReplenish map, we need
// to add them to the actionsToReplenish map
for actionId := range actionsToReplenish {
for _, workerId := range actionsToWorkerIds[actionId] {
for _, actions := range workerIdsToActions[workerId] {
if _, ok := actionsToReplenish[actions]; !ok {
actionsToReplenish[actions] = s.actions[actions]
}
}
}
}
s.actionsMu.Unlock()
s.l.Debug().Msgf("determining which actions to replenish took %s", time.Since(checkpoint))
checkpoint = time.Now()
@@ -239,11 +261,7 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
// FUNCTION 2: for each action which should be replenished, load the available slots
uniqueWorkerIds := make(map[string]bool)
for actionId, replenish := range actionsToReplenish {
if !replenish {
continue
}
for actionId := range actionsToReplenish {
workerIds := actionsToWorkerIds[actionId]
for _, workerId := range workerIds {
@@ -257,6 +275,21 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
workerUUIDs = append(workerUUIDs, sqlchelpers.UUIDFromStr(workerId))
}
// we get a lock on the actions mutexes here because we want to acquire the locks in the same order
// as the tryAssignBatch function. otherwise, we could deadlock when tryAssignBatch has a lock
// on the actionsMu and tries to acquire the unackedMu lock.
// additionally, we have to acquire a lock this early (before the database read) to prevent slots
// from being assigned while we read slots from the database.
s.actionsMu.Lock()
defer s.actionsMu.Unlock()
orderedLock(actionsToReplenish)
unlock := orderedUnlock(actionsToReplenish)
defer unlock()
s.unackedMu.Lock()
defer s.unackedMu.Unlock()
availableSlots, err := s.repo.ListAvailableSlotsForWorkers(ctx, dbsqlc.ListAvailableSlotsForWorkersParams{
Tenantid: s.tenantId,
Workerids: workerUUIDs,
@@ -271,15 +304,6 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
// FUNCTION 3: list unacked slots (so they're not counted towards the worker slot count)
workersToUnackedSlots := make(map[string][]*slot)
// we get a lock on the actionsMu here because we want to acquire the locks in the same order
// as the tryAssignBatch function. otherwise, we could deadlock when tryAssignBatch has a lock
// on the actionsMu and tries to acquire the unackedMu lock.
s.actionsMu.Lock()
defer s.actionsMu.Unlock()
s.unackedMu.Lock()
defer s.unackedMu.Unlock()
for _, unackedSlot := range s.unackedSlots {
s := unackedSlot
workerId := s.getWorkerId()
@@ -330,26 +354,17 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
// randomly sort the slots
randSource.Shuffle(len(newSlots), func(i, j int) { newSlots[i], newSlots[j] = newSlots[j], newSlots[i] })
if _, ok := s.actions[actionId]; !ok {
s.actions[actionId] = &action{
slots: newSlots,
lastReplenishedSlotCount: len(newSlots),
lastReplenishedWorkerCount: len(actionsToWorkerIds[actionId]),
}
} else {
// we overwrite the slots for the action
s.actions[actionId].slots = newSlots
s.actions[actionId].lastReplenishedSlotCount = actionsToTotalSlots[actionId]
s.actions[actionId].lastReplenishedWorkerCount = len(actionsToWorkerIds[actionId])
}
// we overwrite the slots for the action. we know that the action is in the map because we checked
// for it in the first pass.
s.actions[actionId].slots = newSlots
s.actions[actionId].lastReplenishedSlotCount = actionsToTotalSlots[actionId]
s.actions[actionId].lastReplenishedWorkerCount = len(actionsToWorkerIds[actionId])
s.l.Debug().Msgf("before cleanup, action %s has %d slots", actionId, len(newSlots))
}
// second pass: clean up expired slots
for i := range s.actions {
storedAction := s.actions[i]
for _, storedAction := range actionsToReplenish {
newSlots := make([]*slot, 0, len(storedAction.slots))
for i := range storedAction.slots {
@@ -362,11 +377,11 @@ func (s *Scheduler) replenish(ctx context.Context, mustReplenish bool) error {
storedAction.slots = newSlots
s.l.Debug().Msgf("after cleanup, action %s has %d slots", i, len(newSlots))
s.l.Debug().Msgf("after cleanup, action %s has %d slots", storedAction.actionId, len(newSlots))
}
// third pass: remove any actions which have no slots
for actionId, storedAction := range s.actions {
for actionId, storedAction := range actionsToReplenish {
if len(storedAction.slots) == 0 {
s.l.Debug().Msgf("removing action %s because it has no slots", actionId)
delete(s.actions, actionId)
@@ -493,7 +508,9 @@ func (s *Scheduler) tryAssignBatch(
// order as the replenish() function, otherwise we may deadlock.
s.actionsMu.RLock()
if _, ok := s.actions[actionId]; !ok {
action, ok := s.actions[actionId]
if !ok || len(action.slots) == 0 {
s.actionsMu.RUnlock()
s.l.Debug().Msgf("no slots for action %s", actionId)
@@ -506,7 +523,12 @@ func (s *Scheduler) tryAssignBatch(
return res, newRingOffset, nil
}
candidateSlots := s.actions[actionId].slots
s.actionsMu.RUnlock()
action.mu.Lock()
defer action.mu.Unlock()
candidateSlots := action.slots
wg := sync.WaitGroup{}
@@ -547,10 +569,6 @@ func (s *Scheduler) tryAssignBatch(
wg.Wait()
// we can only unlock the actions mutex after assigning slots, because we are using the
// underlying pointers to the slots
s.actionsMu.RUnlock()
return res, newRingOffset, nil
}
@@ -628,9 +646,6 @@ type AssignedQueueItem struct {
WorkerId pgtype.UUID
QueueItem *dbsqlc.QueueItem
// DispatcherId only gets set after a successful flush to the database
DispatcherId *pgtype.UUID
}
type assignResults struct {