Files
hatchet/pkg/scheduling/v1/tenant_manager.go
Gabe Ruttner a3275ac101 fix: enhance SQL query name extraction in otel tracer and fallback (#3277)
* fix: enhance SQL query name extraction in otel tracer and fallback

* feat: propagate context to logger

* feat: correlation ids

* feat: add tenant ids

* feat: more richness for partitionable things

* fix: tests

* feat: capture http errors on spans

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-17 07:31:21 -07:00

594 lines
14 KiB
Go

package v1
import (
"context"
"sync"
"time"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/rs/zerolog"
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)
type notifierMsg[T any] struct {
items []T
// isIncremental refers to whether the resource should be added to the current set, or
// should replace the current set
isIncremental bool
}
type notifierCh[T any] chan notifierMsg[T]
// tenantManager manages the scheduler and queuers for a tenant and multiplexes
// messages to the relevant queuer.
type tenantManager struct {
cf *sharedConfig
l zerolog.Logger
tenantId uuid.UUID
scheduler *Scheduler
rl *rateLimiter
queuers []*Queuer
queuersMu sync.RWMutex
concurrencyStrategies []*ConcurrencyManager
// maintain a mapping of strategy ids to parent ids, because we'd like to signal all
// child strategy ids when a parent strategy id is updated
strategyIdsToParentIds *lru.Cache[int64, int64]
parentIdsToStrategyIds *lru.Cache[int64, []int64]
concurrencyMu sync.RWMutex
leaseManager *LeaseManager
workersCh notifierCh[*v1.ListActiveWorkersResult]
queuesCh notifierCh[string]
concurrencyCh notifierCh[*sqlcv1.V1StepConcurrency]
concurrencyResultsCh chan *ConcurrencyResults
resultsCh chan *QueueResults
cleanup func()
}
func newTenantManager(cf *sharedConfig, tenantId uuid.UUID, resultsCh chan *QueueResults, concurrencyResultsCh chan *ConcurrencyResults, exts *Extensions) *tenantManager {
tenantIdUUID := tenantId
rl := newRateLimiter(cf, tenantIdUUID)
s := newScheduler(cf, tenantIdUUID, rl, exts)
leaseManager, workersCh, queuesCh, concurrencyCh := newLeaseManager(cf, tenantIdUUID)
strategyIdsToParentIds, _ := lru.New[int64, int64](1000)
parentIdsToStrategyIds, _ := lru.New[int64, []int64](1000)
t := &tenantManager{
scheduler: s,
leaseManager: leaseManager,
cf: cf,
l: cf.l.With().Str("tenant_id", tenantIdUUID.String()).Logger(),
tenantId: tenantIdUUID,
workersCh: workersCh,
queuesCh: queuesCh,
concurrencyCh: concurrencyCh,
resultsCh: resultsCh,
rl: rl,
concurrencyResultsCh: concurrencyResultsCh,
strategyIdsToParentIds: strategyIdsToParentIds,
parentIdsToStrategyIds: parentIdsToStrategyIds,
}
ctx, cancel := context.WithCancel(context.Background())
t.cleanup = cancel
go t.listenForWorkerLeases(ctx)
go t.listenForQueueLeases(ctx)
go t.listenForConcurrencyLeases(ctx)
leaseManager.start(ctx)
s.start(ctx)
return t
}
func (t *tenantManager) Cleanup() error {
defer t.cleanup()
cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := t.leaseManager.cleanup(cleanupCtx)
// clean up the other resources even if the lease manager fails to clean up
t.queuersMu.RLock()
defer t.queuersMu.RUnlock()
for _, q := range t.queuers {
q.Cleanup()
}
t.rl.cleanup()
return err
}
func (t *tenantManager) listenForWorkerLeases(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-t.workersCh:
if msg.isIncremental {
for _, worker := range msg.items {
t.scheduler.addWorker(worker)
}
t.replenish(ctx)
// notify all queues to check if the new worker can take any tasks
t.queuersMu.RLock()
for _, q := range t.queuers {
q.queue(ctx)
}
t.queuersMu.RUnlock()
} else {
t.scheduler.setWorkers(msg.items)
}
}
}
}
func (t *tenantManager) listenForQueueLeases(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-t.queuesCh:
if msg.isIncremental {
for _, queueName := range msg.items {
t.addQueuer(queueName)
}
} else {
t.setQueuers(msg.items)
}
}
}
}
func (t *tenantManager) listenForConcurrencyLeases(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-t.concurrencyCh:
if msg.isIncremental {
for _, strategy := range msg.items {
t.addConcurrencyStrategy(strategy)
}
} else {
t.setConcurrencyStrategies(msg.items)
}
}
}
}
func (t *tenantManager) setQueuers(queueNames []string) {
t.queuersMu.Lock()
defer t.queuersMu.Unlock()
queueNamesSet := make(map[string]struct{}, len(queueNames))
for _, queueName := range queueNames {
queueNamesSet[queueName] = struct{}{}
}
newQueueArr := make([]*Queuer, 0, len(queueNames))
for _, q := range t.queuers {
if _, ok := queueNamesSet[q.queueName]; ok {
newQueueArr = append(newQueueArr, q)
// delete from set
delete(queueNamesSet, q.queueName)
} else {
// if not in new set, cleanup
t.l.Debug().Msgf("cleaning up queuer for queue %s for tenant %s", q.queueName, t.tenantId)
go q.Cleanup()
}
}
for queueName := range queueNamesSet {
newQueueArr = append(newQueueArr, newQueuer(t.cf, t.tenantId, queueName, t.scheduler, t.resultsCh))
}
t.queuers = newQueueArr
}
func (t *tenantManager) addQueuer(queueName string) {
t.queuersMu.Lock()
for _, q := range t.queuers {
if q.queueName == queueName {
t.queuersMu.Unlock()
return
}
}
q := newQueuer(t.cf, t.tenantId, queueName, t.scheduler, t.resultsCh)
t.queuers = append(t.queuers, q)
t.queuersMu.Unlock()
t.queue(context.Background(), []string{queueName})
}
func (t *tenantManager) setConcurrencyStrategies(strategies []*sqlcv1.V1StepConcurrency) {
t.concurrencyMu.Lock()
defer t.concurrencyMu.Unlock()
strategiesSet := make(map[int64]*sqlcv1.V1StepConcurrency, len(strategies))
for _, strat := range strategies {
strategiesSet[strat.ID] = strat
}
newArr := make([]*ConcurrencyManager, 0, len(strategies))
for _, c := range t.concurrencyStrategies {
if _, ok := strategiesSet[c.strategy.ID]; ok {
newArr = append(newArr, c)
// delete from set
delete(strategiesSet, c.strategy.ID)
} else {
// if not in new set, cleanup
go c.cleanup()
}
}
for _, strategy := range strategiesSet {
newArr = append(newArr, newConcurrencyManager(t.cf, t.tenantId, strategy, t.concurrencyResultsCh))
}
t.concurrencyStrategies = newArr
}
func (t *tenantManager) addConcurrencyStrategy(strategy *sqlcv1.V1StepConcurrency) {
t.concurrencyMu.Lock()
defer t.concurrencyMu.Unlock()
for _, c := range t.concurrencyStrategies {
if c.strategy.ID == strategy.ID {
return
}
}
t.concurrencyStrategies = append(t.concurrencyStrategies, newConcurrencyManager(t.cf, t.tenantId, strategy, t.concurrencyResultsCh))
}
func (t *tenantManager) replenish(ctx context.Context) {
err := t.scheduler.replenish(ctx, false)
if err != nil {
t.l.Error().Err(err).Msg("error replenishing scheduler")
}
}
func (t *tenantManager) notifyConcurrency(ctx context.Context, strategyIds []int64) {
strategyIdsMap := make(map[int64]struct{}, len(strategyIds))
for _, id := range strategyIds {
strategyIdsMap[id] = struct{}{}
}
t.concurrencyMu.RLock()
for _, c := range t.concurrencyStrategies {
if _, ok := strategyIdsMap[c.strategy.ID]; !ok {
continue
}
c.notify(ctx)
childStrategyIds := make([]int64, 0)
// store the parent id for each strategy id
if c.strategy.ParentStrategyID.Valid {
parentId := c.strategy.ParentStrategyID.Int64
t.strategyIdsToParentIds.Add(c.strategy.ID, parentId)
var ok bool
childStrategyIds, ok = t.parentIdsToStrategyIds.Get(parentId)
// add the strategy id to the parent id
if ok {
// merge with existing map
found := false
for _, id := range childStrategyIds {
if id == c.strategy.ID {
found = true
break
}
}
if !found {
childStrategyIds = append(childStrategyIds, c.strategy.ID)
}
} else {
childStrategyIds = []int64{c.strategy.ID}
}
t.parentIdsToStrategyIds.Add(parentId, childStrategyIds)
}
// notify the other child strategies
for _, childId := range childStrategyIds {
if childId != c.strategy.ID {
for _, c := range t.concurrencyStrategies {
if c.strategy.ID == childId {
c.notify(ctx)
}
}
}
}
}
t.concurrencyMu.RUnlock()
}
func (t *tenantManager) notifyNewWorker(ctx context.Context, workerId uuid.UUID) {
err := t.leaseManager.notifyNewWorker(ctx, workerId)
if err != nil {
t.l.Error().Err(err).Msg("error notifying new worker")
return
}
}
func (t *tenantManager) notifyNewQueue(ctx context.Context, queueName string) {
t.l.Debug().Msgf("notifying new queue %s for tenant %s", queueName, t.tenantId)
err := t.leaseManager.notifyNewQueue(ctx, queueName)
if err != nil {
t.l.Error().Err(err).Msg("error notifying new queue")
return
}
}
func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strategyId int64) {
err := t.leaseManager.notifyNewConcurrencyStrategy(ctx, strategyId)
if err != nil {
t.l.Error().Err(err).Msg("error notifying new concurrency strategy")
return
}
}
func (t *tenantManager) queue(ctx context.Context, queueNames []string) {
queueNamesMap := make(map[string]struct{}, len(queueNames))
for _, name := range queueNames {
queueNamesMap[name] = struct{}{}
}
t.queuersMu.RLock()
for _, q := range t.queuers {
if _, ok := queueNamesMap[q.queueName]; ok {
q.queue(ctx)
}
}
t.queuersMu.RUnlock()
}
type AssignedItemWithTask struct {
AssignedItem *v1.AssignedItem
Task *v1.V1TaskWithPayload
}
func (t *tenantManager) runOptimisticScheduling(
ctx context.Context,
opts []*v1.WorkflowNameTriggerOpts,
localWorkerIds map[uuid.UUID]struct{},
) (map[uuid.UUID][]*AssignedItemWithTask, []*v1.V1TaskWithPayload, []*v1.DAGWithData, error) {
// create a transaction
tx, err := t.cf.repo.Optimistic().StartTx(ctx)
if err != nil {
return nil, nil, nil, err
}
defer tx.Rollback()
// hook into the trigger transaction
qis, tasks, dags, err := t.cf.repo.Optimistic().TriggerFromNames(ctx, tx, t.tenantId, opts)
if err != nil {
return nil, nil, nil, err
}
// read the queue items for the tasks we just created
// rewrite the queuer loop to not be asynchronous in batches, but instead run tryAssign
// and then immediately flush to the database
// split qis by their queue name
qisByQueueName := make(map[string][]*sqlcv1.V1QueueItem, len(qis))
for _, qi := range qis {
qisByQueueName[qi.Queue] = append(qisByQueueName[qi.Queue], qi)
}
var allLocalAssigned []*v1.AssignedItem
var allQueueResults []*QueueResults
for queueName, qis := range qisByQueueName {
t.queuersMu.RLock()
for _, q := range t.queuers {
if q.queueName == queueName {
localAssigned, queueResults, err := q.runOptimisticQueue(ctx, tx, qis, localWorkerIds)
if err != nil {
t.queuersMu.RUnlock()
return nil, nil, nil, err
}
allLocalAssigned = append(allLocalAssigned, localAssigned...)
allQueueResults = append(allQueueResults, queueResults...)
}
}
t.queuersMu.RUnlock()
}
if err := tx.Commit(ctx); err != nil {
return nil, nil, nil, err
}
for _, qr := range allQueueResults {
t.resultsCh <- qr
}
// map the tasks to the assigned items
taskUUIDToAssigned := make(map[uuid.UUID]*v1.AssignedItem, len(allLocalAssigned))
taskUUIDToTask := make(map[uuid.UUID]*v1.V1TaskWithPayload, len(qis))
for _, ai := range allLocalAssigned {
taskUUIDToAssigned[ai.QueueItem.ExternalID] = ai
}
for _, task := range tasks {
taskUUIDToTask[task.ExternalID] = task
}
// return the assigned items with their tasks
res := make(map[uuid.UUID][]*AssignedItemWithTask)
for taskUUID, ai := range taskUUIDToAssigned {
task, ok := taskUUIDToTask[taskUUID]
if !ok {
continue
}
workerId := ai.WorkerId
res[workerId] = append(res[workerId], &AssignedItemWithTask{
AssignedItem: ai,
Task: task,
})
}
return res, tasks, dags, nil
}
func (t *tenantManager) runOptimisticSchedulingFromEvents(
ctx context.Context,
opts []v1.EventTriggerOpts,
localWorkerIds map[uuid.UUID]struct{},
) (map[uuid.UUID][]*AssignedItemWithTask, *v1.TriggerFromEventsResult, error) {
// create a transaction
tx, err := t.cf.repo.Optimistic().StartTx(ctx)
if err != nil {
return nil, nil, err
}
defer tx.Rollback()
// hook into the trigger transaction
qis, eventsRes, err := t.cf.repo.Optimistic().TriggerFromEvents(ctx, tx, t.tenantId, opts)
if err != nil {
return nil, nil, err
}
// read the queue items for the tasks we just created
// rewrite the queuer loop to not be asynchronous in batches, but instead run tryAssign
// and then immediately flush to the database
// split qis by their queue name
qisByQueueName := make(map[string][]*sqlcv1.V1QueueItem, len(qis))
for _, qi := range qis {
qisByQueueName[qi.Queue] = append(qisByQueueName[qi.Queue], qi)
}
var allLocalAssigned []*v1.AssignedItem
var allQueueResults []*QueueResults
for queueName, qis := range qisByQueueName {
t.queuersMu.RLock()
for _, q := range t.queuers {
if q.queueName == queueName {
localAssigned, queueResults, err := q.runOptimisticQueue(ctx, tx, qis, localWorkerIds)
if err != nil {
t.queuersMu.RUnlock()
return nil, nil, err
}
allLocalAssigned = append(allLocalAssigned, localAssigned...)
allQueueResults = append(allQueueResults, queueResults...)
}
}
t.queuersMu.RUnlock()
}
if err := tx.Commit(ctx); err != nil {
return nil, nil, err
}
for _, qr := range allQueueResults {
t.resultsCh <- qr
}
// map the tasks to the assigned items
taskUUIDToAssigned := make(map[uuid.UUID]*v1.AssignedItem, len(allLocalAssigned))
taskUUIDToTask := make(map[uuid.UUID]*v1.V1TaskWithPayload, len(qis))
for _, ai := range allLocalAssigned {
taskUUIDToAssigned[ai.QueueItem.ExternalID] = ai
}
for _, task := range eventsRes.Tasks {
taskUUIDToTask[task.ExternalID] = task
}
// return the assigned items with their tasks
res := make(map[uuid.UUID][]*AssignedItemWithTask)
for taskUUID, ai := range taskUUIDToAssigned {
task, ok := taskUUIDToTask[taskUUID]
if !ok {
continue
}
workerId := ai.WorkerId
res[workerId] = append(res[workerId], &AssignedItemWithTask{
AssignedItem: ai,
Task: task,
})
}
return res, eventsRes, nil
}