mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 10:10:07 -05:00
a3275ac101
* fix: enhance SQL query name extraction in otel tracer and fallback * feat: propagate context to logger * feat: correlation ids * feat: add tenant ids * feat: more richness for partitionable things * fix: tests * feat: capture http errors on spans * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
594 lines
14 KiB
Go
594 lines
14 KiB
Go
package v1
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
lru "github.com/hashicorp/golang-lru/v2"
|
|
"github.com/rs/zerolog"
|
|
|
|
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
|
|
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
|
|
)
|
|
|
|
type notifierMsg[T any] struct {
|
|
items []T
|
|
|
|
// isIncremental refers to whether the resource should be added to the current set, or
|
|
// should replace the current set
|
|
isIncremental bool
|
|
}
|
|
|
|
type notifierCh[T any] chan notifierMsg[T]
|
|
|
|
// tenantManager manages the scheduler and queuers for a tenant and multiplexes
|
|
// messages to the relevant queuer.
|
|
type tenantManager struct {
|
|
cf *sharedConfig
|
|
l zerolog.Logger
|
|
tenantId uuid.UUID
|
|
|
|
scheduler *Scheduler
|
|
rl *rateLimiter
|
|
|
|
queuers []*Queuer
|
|
queuersMu sync.RWMutex
|
|
|
|
concurrencyStrategies []*ConcurrencyManager
|
|
|
|
// maintain a mapping of strategy ids to parent ids, because we'd like to signal all
|
|
// child strategy ids when a parent strategy id is updated
|
|
strategyIdsToParentIds *lru.Cache[int64, int64]
|
|
parentIdsToStrategyIds *lru.Cache[int64, []int64]
|
|
|
|
concurrencyMu sync.RWMutex
|
|
|
|
leaseManager *LeaseManager
|
|
|
|
workersCh notifierCh[*v1.ListActiveWorkersResult]
|
|
queuesCh notifierCh[string]
|
|
concurrencyCh notifierCh[*sqlcv1.V1StepConcurrency]
|
|
|
|
concurrencyResultsCh chan *ConcurrencyResults
|
|
|
|
resultsCh chan *QueueResults
|
|
|
|
cleanup func()
|
|
}
|
|
|
|
func newTenantManager(cf *sharedConfig, tenantId uuid.UUID, resultsCh chan *QueueResults, concurrencyResultsCh chan *ConcurrencyResults, exts *Extensions) *tenantManager {
|
|
tenantIdUUID := tenantId
|
|
|
|
rl := newRateLimiter(cf, tenantIdUUID)
|
|
s := newScheduler(cf, tenantIdUUID, rl, exts)
|
|
leaseManager, workersCh, queuesCh, concurrencyCh := newLeaseManager(cf, tenantIdUUID)
|
|
|
|
strategyIdsToParentIds, _ := lru.New[int64, int64](1000)
|
|
parentIdsToStrategyIds, _ := lru.New[int64, []int64](1000)
|
|
|
|
t := &tenantManager{
|
|
scheduler: s,
|
|
leaseManager: leaseManager,
|
|
cf: cf,
|
|
l: cf.l.With().Str("tenant_id", tenantIdUUID.String()).Logger(),
|
|
tenantId: tenantIdUUID,
|
|
workersCh: workersCh,
|
|
queuesCh: queuesCh,
|
|
concurrencyCh: concurrencyCh,
|
|
resultsCh: resultsCh,
|
|
rl: rl,
|
|
concurrencyResultsCh: concurrencyResultsCh,
|
|
strategyIdsToParentIds: strategyIdsToParentIds,
|
|
parentIdsToStrategyIds: parentIdsToStrategyIds,
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.cleanup = cancel
|
|
|
|
go t.listenForWorkerLeases(ctx)
|
|
go t.listenForQueueLeases(ctx)
|
|
go t.listenForConcurrencyLeases(ctx)
|
|
|
|
leaseManager.start(ctx)
|
|
s.start(ctx)
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *tenantManager) Cleanup() error {
|
|
defer t.cleanup()
|
|
|
|
cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
err := t.leaseManager.cleanup(cleanupCtx)
|
|
|
|
// clean up the other resources even if the lease manager fails to clean up
|
|
t.queuersMu.RLock()
|
|
defer t.queuersMu.RUnlock()
|
|
|
|
for _, q := range t.queuers {
|
|
q.Cleanup()
|
|
}
|
|
|
|
t.rl.cleanup()
|
|
|
|
return err
|
|
}
|
|
|
|
func (t *tenantManager) listenForWorkerLeases(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-t.workersCh:
|
|
if msg.isIncremental {
|
|
for _, worker := range msg.items {
|
|
t.scheduler.addWorker(worker)
|
|
}
|
|
|
|
t.replenish(ctx)
|
|
|
|
// notify all queues to check if the new worker can take any tasks
|
|
t.queuersMu.RLock()
|
|
|
|
for _, q := range t.queuers {
|
|
q.queue(ctx)
|
|
}
|
|
|
|
t.queuersMu.RUnlock()
|
|
} else {
|
|
t.scheduler.setWorkers(msg.items)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) listenForQueueLeases(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-t.queuesCh:
|
|
if msg.isIncremental {
|
|
for _, queueName := range msg.items {
|
|
t.addQueuer(queueName)
|
|
}
|
|
} else {
|
|
t.setQueuers(msg.items)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) listenForConcurrencyLeases(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-t.concurrencyCh:
|
|
if msg.isIncremental {
|
|
for _, strategy := range msg.items {
|
|
t.addConcurrencyStrategy(strategy)
|
|
}
|
|
} else {
|
|
t.setConcurrencyStrategies(msg.items)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) setQueuers(queueNames []string) {
|
|
t.queuersMu.Lock()
|
|
defer t.queuersMu.Unlock()
|
|
|
|
queueNamesSet := make(map[string]struct{}, len(queueNames))
|
|
|
|
for _, queueName := range queueNames {
|
|
queueNamesSet[queueName] = struct{}{}
|
|
}
|
|
|
|
newQueueArr := make([]*Queuer, 0, len(queueNames))
|
|
|
|
for _, q := range t.queuers {
|
|
if _, ok := queueNamesSet[q.queueName]; ok {
|
|
newQueueArr = append(newQueueArr, q)
|
|
|
|
// delete from set
|
|
delete(queueNamesSet, q.queueName)
|
|
} else {
|
|
// if not in new set, cleanup
|
|
t.l.Debug().Msgf("cleaning up queuer for queue %s for tenant %s", q.queueName, t.tenantId)
|
|
|
|
go q.Cleanup()
|
|
}
|
|
}
|
|
|
|
for queueName := range queueNamesSet {
|
|
newQueueArr = append(newQueueArr, newQueuer(t.cf, t.tenantId, queueName, t.scheduler, t.resultsCh))
|
|
}
|
|
|
|
t.queuers = newQueueArr
|
|
}
|
|
|
|
func (t *tenantManager) addQueuer(queueName string) {
|
|
t.queuersMu.Lock()
|
|
|
|
for _, q := range t.queuers {
|
|
if q.queueName == queueName {
|
|
t.queuersMu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
|
|
q := newQueuer(t.cf, t.tenantId, queueName, t.scheduler, t.resultsCh)
|
|
|
|
t.queuers = append(t.queuers, q)
|
|
|
|
t.queuersMu.Unlock()
|
|
|
|
t.queue(context.Background(), []string{queueName})
|
|
}
|
|
|
|
func (t *tenantManager) setConcurrencyStrategies(strategies []*sqlcv1.V1StepConcurrency) {
|
|
t.concurrencyMu.Lock()
|
|
defer t.concurrencyMu.Unlock()
|
|
|
|
strategiesSet := make(map[int64]*sqlcv1.V1StepConcurrency, len(strategies))
|
|
|
|
for _, strat := range strategies {
|
|
strategiesSet[strat.ID] = strat
|
|
}
|
|
|
|
newArr := make([]*ConcurrencyManager, 0, len(strategies))
|
|
|
|
for _, c := range t.concurrencyStrategies {
|
|
if _, ok := strategiesSet[c.strategy.ID]; ok {
|
|
newArr = append(newArr, c)
|
|
|
|
// delete from set
|
|
delete(strategiesSet, c.strategy.ID)
|
|
} else {
|
|
// if not in new set, cleanup
|
|
go c.cleanup()
|
|
}
|
|
}
|
|
|
|
for _, strategy := range strategiesSet {
|
|
newArr = append(newArr, newConcurrencyManager(t.cf, t.tenantId, strategy, t.concurrencyResultsCh))
|
|
}
|
|
|
|
t.concurrencyStrategies = newArr
|
|
}
|
|
|
|
func (t *tenantManager) addConcurrencyStrategy(strategy *sqlcv1.V1StepConcurrency) {
|
|
t.concurrencyMu.Lock()
|
|
defer t.concurrencyMu.Unlock()
|
|
|
|
for _, c := range t.concurrencyStrategies {
|
|
if c.strategy.ID == strategy.ID {
|
|
return
|
|
}
|
|
}
|
|
|
|
t.concurrencyStrategies = append(t.concurrencyStrategies, newConcurrencyManager(t.cf, t.tenantId, strategy, t.concurrencyResultsCh))
|
|
}
|
|
|
|
func (t *tenantManager) replenish(ctx context.Context) {
|
|
err := t.scheduler.replenish(ctx, false)
|
|
|
|
if err != nil {
|
|
t.l.Error().Err(err).Msg("error replenishing scheduler")
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) notifyConcurrency(ctx context.Context, strategyIds []int64) {
|
|
strategyIdsMap := make(map[int64]struct{}, len(strategyIds))
|
|
|
|
for _, id := range strategyIds {
|
|
strategyIdsMap[id] = struct{}{}
|
|
}
|
|
|
|
t.concurrencyMu.RLock()
|
|
|
|
for _, c := range t.concurrencyStrategies {
|
|
if _, ok := strategyIdsMap[c.strategy.ID]; !ok {
|
|
continue
|
|
}
|
|
|
|
c.notify(ctx)
|
|
|
|
childStrategyIds := make([]int64, 0)
|
|
|
|
// store the parent id for each strategy id
|
|
if c.strategy.ParentStrategyID.Valid {
|
|
parentId := c.strategy.ParentStrategyID.Int64
|
|
|
|
t.strategyIdsToParentIds.Add(c.strategy.ID, parentId)
|
|
|
|
var ok bool
|
|
|
|
childStrategyIds, ok = t.parentIdsToStrategyIds.Get(parentId)
|
|
|
|
// add the strategy id to the parent id
|
|
if ok {
|
|
// merge with existing map
|
|
found := false
|
|
|
|
for _, id := range childStrategyIds {
|
|
if id == c.strategy.ID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
childStrategyIds = append(childStrategyIds, c.strategy.ID)
|
|
|
|
}
|
|
} else {
|
|
childStrategyIds = []int64{c.strategy.ID}
|
|
}
|
|
|
|
t.parentIdsToStrategyIds.Add(parentId, childStrategyIds)
|
|
}
|
|
|
|
// notify the other child strategies
|
|
for _, childId := range childStrategyIds {
|
|
if childId != c.strategy.ID {
|
|
for _, c := range t.concurrencyStrategies {
|
|
if c.strategy.ID == childId {
|
|
c.notify(ctx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
t.concurrencyMu.RUnlock()
|
|
}
|
|
|
|
func (t *tenantManager) notifyNewWorker(ctx context.Context, workerId uuid.UUID) {
|
|
err := t.leaseManager.notifyNewWorker(ctx, workerId)
|
|
|
|
if err != nil {
|
|
t.l.Error().Err(err).Msg("error notifying new worker")
|
|
return
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) notifyNewQueue(ctx context.Context, queueName string) {
|
|
t.l.Debug().Msgf("notifying new queue %s for tenant %s", queueName, t.tenantId)
|
|
|
|
err := t.leaseManager.notifyNewQueue(ctx, queueName)
|
|
|
|
if err != nil {
|
|
t.l.Error().Err(err).Msg("error notifying new queue")
|
|
return
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strategyId int64) {
|
|
err := t.leaseManager.notifyNewConcurrencyStrategy(ctx, strategyId)
|
|
|
|
if err != nil {
|
|
t.l.Error().Err(err).Msg("error notifying new concurrency strategy")
|
|
return
|
|
}
|
|
}
|
|
|
|
func (t *tenantManager) queue(ctx context.Context, queueNames []string) {
|
|
queueNamesMap := make(map[string]struct{}, len(queueNames))
|
|
|
|
for _, name := range queueNames {
|
|
queueNamesMap[name] = struct{}{}
|
|
}
|
|
|
|
t.queuersMu.RLock()
|
|
|
|
for _, q := range t.queuers {
|
|
if _, ok := queueNamesMap[q.queueName]; ok {
|
|
q.queue(ctx)
|
|
}
|
|
}
|
|
|
|
t.queuersMu.RUnlock()
|
|
}
|
|
|
|
type AssignedItemWithTask struct {
|
|
AssignedItem *v1.AssignedItem
|
|
Task *v1.V1TaskWithPayload
|
|
}
|
|
|
|
func (t *tenantManager) runOptimisticScheduling(
|
|
ctx context.Context,
|
|
opts []*v1.WorkflowNameTriggerOpts,
|
|
localWorkerIds map[uuid.UUID]struct{},
|
|
) (map[uuid.UUID][]*AssignedItemWithTask, []*v1.V1TaskWithPayload, []*v1.DAGWithData, error) {
|
|
// create a transaction
|
|
tx, err := t.cf.repo.Optimistic().StartTx(ctx)
|
|
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
defer tx.Rollback()
|
|
|
|
// hook into the trigger transaction
|
|
qis, tasks, dags, err := t.cf.repo.Optimistic().TriggerFromNames(ctx, tx, t.tenantId, opts)
|
|
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
// read the queue items for the tasks we just created
|
|
// rewrite the queuer loop to not be asynchronous in batches, but instead run tryAssign
|
|
// and then immediately flush to the database
|
|
|
|
// split qis by their queue name
|
|
qisByQueueName := make(map[string][]*sqlcv1.V1QueueItem, len(qis))
|
|
|
|
for _, qi := range qis {
|
|
qisByQueueName[qi.Queue] = append(qisByQueueName[qi.Queue], qi)
|
|
}
|
|
|
|
var allLocalAssigned []*v1.AssignedItem
|
|
var allQueueResults []*QueueResults
|
|
|
|
for queueName, qis := range qisByQueueName {
|
|
t.queuersMu.RLock()
|
|
|
|
for _, q := range t.queuers {
|
|
if q.queueName == queueName {
|
|
localAssigned, queueResults, err := q.runOptimisticQueue(ctx, tx, qis, localWorkerIds)
|
|
|
|
if err != nil {
|
|
t.queuersMu.RUnlock()
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
allLocalAssigned = append(allLocalAssigned, localAssigned...)
|
|
allQueueResults = append(allQueueResults, queueResults...)
|
|
}
|
|
}
|
|
t.queuersMu.RUnlock()
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
for _, qr := range allQueueResults {
|
|
t.resultsCh <- qr
|
|
}
|
|
|
|
// map the tasks to the assigned items
|
|
taskUUIDToAssigned := make(map[uuid.UUID]*v1.AssignedItem, len(allLocalAssigned))
|
|
taskUUIDToTask := make(map[uuid.UUID]*v1.V1TaskWithPayload, len(qis))
|
|
|
|
for _, ai := range allLocalAssigned {
|
|
taskUUIDToAssigned[ai.QueueItem.ExternalID] = ai
|
|
}
|
|
|
|
for _, task := range tasks {
|
|
taskUUIDToTask[task.ExternalID] = task
|
|
}
|
|
|
|
// return the assigned items with their tasks
|
|
res := make(map[uuid.UUID][]*AssignedItemWithTask)
|
|
|
|
for taskUUID, ai := range taskUUIDToAssigned {
|
|
task, ok := taskUUIDToTask[taskUUID]
|
|
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
workerId := ai.WorkerId
|
|
|
|
res[workerId] = append(res[workerId], &AssignedItemWithTask{
|
|
AssignedItem: ai,
|
|
Task: task,
|
|
})
|
|
}
|
|
|
|
return res, tasks, dags, nil
|
|
}
|
|
|
|
func (t *tenantManager) runOptimisticSchedulingFromEvents(
|
|
ctx context.Context,
|
|
opts []v1.EventTriggerOpts,
|
|
localWorkerIds map[uuid.UUID]struct{},
|
|
) (map[uuid.UUID][]*AssignedItemWithTask, *v1.TriggerFromEventsResult, error) {
|
|
// create a transaction
|
|
tx, err := t.cf.repo.Optimistic().StartTx(ctx)
|
|
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
defer tx.Rollback()
|
|
|
|
// hook into the trigger transaction
|
|
qis, eventsRes, err := t.cf.repo.Optimistic().TriggerFromEvents(ctx, tx, t.tenantId, opts)
|
|
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// read the queue items for the tasks we just created
|
|
// rewrite the queuer loop to not be asynchronous in batches, but instead run tryAssign
|
|
// and then immediately flush to the database
|
|
|
|
// split qis by their queue name
|
|
qisByQueueName := make(map[string][]*sqlcv1.V1QueueItem, len(qis))
|
|
|
|
for _, qi := range qis {
|
|
qisByQueueName[qi.Queue] = append(qisByQueueName[qi.Queue], qi)
|
|
}
|
|
|
|
var allLocalAssigned []*v1.AssignedItem
|
|
var allQueueResults []*QueueResults
|
|
|
|
for queueName, qis := range qisByQueueName {
|
|
t.queuersMu.RLock()
|
|
|
|
for _, q := range t.queuers {
|
|
if q.queueName == queueName {
|
|
localAssigned, queueResults, err := q.runOptimisticQueue(ctx, tx, qis, localWorkerIds)
|
|
|
|
if err != nil {
|
|
t.queuersMu.RUnlock()
|
|
return nil, nil, err
|
|
}
|
|
|
|
allLocalAssigned = append(allLocalAssigned, localAssigned...)
|
|
allQueueResults = append(allQueueResults, queueResults...)
|
|
}
|
|
}
|
|
t.queuersMu.RUnlock()
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
for _, qr := range allQueueResults {
|
|
t.resultsCh <- qr
|
|
}
|
|
|
|
// map the tasks to the assigned items
|
|
taskUUIDToAssigned := make(map[uuid.UUID]*v1.AssignedItem, len(allLocalAssigned))
|
|
taskUUIDToTask := make(map[uuid.UUID]*v1.V1TaskWithPayload, len(qis))
|
|
|
|
for _, ai := range allLocalAssigned {
|
|
taskUUIDToAssigned[ai.QueueItem.ExternalID] = ai
|
|
}
|
|
|
|
for _, task := range eventsRes.Tasks {
|
|
taskUUIDToTask[task.ExternalID] = task
|
|
}
|
|
|
|
// return the assigned items with their tasks
|
|
res := make(map[uuid.UUID][]*AssignedItemWithTask)
|
|
|
|
for taskUUID, ai := range taskUUIDToAssigned {
|
|
task, ok := taskUUIDToTask[taskUUID]
|
|
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
workerId := ai.WorkerId
|
|
|
|
res[workerId] = append(res[workerId], &AssignedItemWithTask{
|
|
AssignedItem: ai,
|
|
Task: task,
|
|
})
|
|
}
|
|
|
|
return res, eventsRes, nil
|
|
}
|