mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-30 13:19:44 -06:00
* fix: move rate limited queue items off the main queue * preserve FIFO behavior on queues * fix unit tests, address pr comments * fix: generated * rename table
75 lines
1.3 KiB
Go
75 lines
1.3 KiB
Go
package v1
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
)
|
|
|
|
type action struct {
|
|
mu sync.RWMutex
|
|
actionId string
|
|
|
|
lastReplenishedSlotCount int
|
|
lastReplenishedWorkerCount int
|
|
|
|
// note that slots can be used across multiple actions, hence the pointer
|
|
slots []*slot
|
|
}
|
|
|
|
func (a *action) activeCount() int {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
|
|
count := 0
|
|
|
|
for _, slot := range a.slots {
|
|
if slot.active() {
|
|
count++
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
// orderedLock acquires the locks in a stable order to prevent deadlocks
|
|
func orderedLock(actionsMap map[string]*action) {
|
|
actions := sortActions(actionsMap)
|
|
|
|
for _, action := range actions {
|
|
action.mu.Lock()
|
|
}
|
|
}
|
|
|
|
// orderedUnlock releases the locks in a stable order to prevent deadlocks. it returns
|
|
// a function that should be deferred to unlock the locks.
|
|
func orderedUnlock(actionsMap map[string]*action) func() {
|
|
actions := sortActions(actionsMap)
|
|
|
|
return func() {
|
|
for _, action := range actions {
|
|
action.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func sortActions(actionsMap map[string]*action) []*action {
|
|
actions := make([]*action, 0, len(actionsMap))
|
|
|
|
for _, action := range actionsMap {
|
|
actions = append(actions, action)
|
|
}
|
|
|
|
slices.SortStableFunc(actions, func(i, j *action) int {
|
|
switch {
|
|
case i.actionId < j.actionId:
|
|
return -1
|
|
case i.actionId > j.actionId:
|
|
return 1
|
|
default:
|
|
return 0
|
|
}
|
|
})
|
|
|
|
return actions
|
|
}
|