mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-02 15:39:53 -05:00
fix: tenant race conditions, cleanup logic, old workers getting assigned (#1050)
This commit is contained in:
@@ -553,14 +553,7 @@ func (s *DispatcherImpl) Heartbeat(ctx context.Context, req *contracts.Heartbeat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if we haven't seen the dispatcher for 6 seconds (one interval plus latency), reject the heartbeat as the client
|
||||
// should reconnect
|
||||
if worker.DispatcherLastHeartbeatAt.Time.Before(time.Now().Add(-6 * time.Second)) {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(telemetry_codes.Error, "dispatcher latency")
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "Heartbeat rejected: dispatcher latency: %s, %s", req.WorkerId, sqlchelpers.UUIDToStr(worker.DispatcherId))
|
||||
}
|
||||
|
||||
// if the worker is not active, the listener should reconnect
|
||||
if worker.LastListenerEstablished.Valid && !worker.IsActive {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(telemetry_codes.Error, "worker stream is not active")
|
||||
|
||||
@@ -517,7 +517,11 @@ LEFT JOIN
|
||||
"Action" a ON atw."A" = a."id"
|
||||
WHERE
|
||||
w."tenantId" = @tenantId::uuid
|
||||
AND w."id" = ANY(@workerIds::uuid[]);
|
||||
AND w."id" = ANY(@workerIds::uuid[])
|
||||
AND w."dispatcherId" IS NOT NULL
|
||||
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
|
||||
AND w."isActive" = true
|
||||
AND w."isPaused" = false;
|
||||
|
||||
-- name: ListActionsForAvailableWorkers :many
|
||||
SELECT
|
||||
|
||||
@@ -516,6 +516,10 @@ LEFT JOIN
|
||||
WHERE
|
||||
w."tenantId" = $1::uuid
|
||||
AND w."id" = ANY($2::uuid[])
|
||||
AND w."dispatcherId" IS NOT NULL
|
||||
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
|
||||
AND w."isActive" = true
|
||||
AND w."isPaused" = false
|
||||
`
|
||||
|
||||
type ListActionsForWorkersParams struct {
|
||||
|
||||
+17
-13
@@ -91,7 +91,7 @@ func (p *SchedulingPool) SetTenants(tenants []*dbsqlc.Tenant) {
|
||||
for _, t := range tenants {
|
||||
tenantId := sqlchelpers.UUIDToStr(t.ID)
|
||||
tenantMap[tenantId] = true
|
||||
p.getTenantManager(tenantId) // nolint: ineffassign
|
||||
p.getTenantManager(tenantId, true) // nolint: ineffassign
|
||||
}
|
||||
|
||||
toCleanup := make([]*tenantManager, 0)
|
||||
@@ -141,29 +141,33 @@ func (p *SchedulingPool) cleanupTenants(toCleanup []*tenantManager) {
|
||||
}
|
||||
|
||||
func (p *SchedulingPool) RefreshAll(ctx context.Context, tenantId string) {
|
||||
tm := p.getTenantManager(tenantId)
|
||||
|
||||
tm.refreshAll(ctx)
|
||||
if tm := p.getTenantManager(tenantId, false); tm != nil {
|
||||
tm.refreshAll(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SchedulingPool) Replenish(ctx context.Context, tenantId string) {
|
||||
tm := p.getTenantManager(tenantId)
|
||||
|
||||
tm.replenish(ctx)
|
||||
if tm := p.getTenantManager(tenantId, false); tm != nil {
|
||||
tm.replenish(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SchedulingPool) Queue(ctx context.Context, tenantId string, queueName string) {
|
||||
tm := p.getTenantManager(tenantId)
|
||||
|
||||
tm.queue(queueName)
|
||||
if tm := p.getTenantManager(tenantId, false); tm != nil {
|
||||
tm.queue(queueName)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SchedulingPool) getTenantManager(tenantId string) *tenantManager {
|
||||
func (p *SchedulingPool) getTenantManager(tenantId string, storeIfNotFound bool) *tenantManager {
|
||||
tm, ok := p.tenants.Load(tenantId)
|
||||
|
||||
if !ok {
|
||||
tm = newTenantManager(p.cf, tenantId, p.eventBuffer, p.resultsCh)
|
||||
p.tenants.Store(tenantId, tm)
|
||||
if storeIfNotFound {
|
||||
tm = newTenantManager(p.cf, tenantId, p.eventBuffer, p.resultsCh)
|
||||
p.tenants.Store(tenantId, tm)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return tm.(*tenantManager)
|
||||
|
||||
@@ -75,10 +75,7 @@ func (t *tenantManager) Cleanup() error {
|
||||
|
||||
err := t.leaseManager.cleanup(cleanupCtx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// clean up the other resources even if the lease manager fails to clean up
|
||||
t.queuersMu.RLock()
|
||||
defer t.queuersMu.RUnlock()
|
||||
|
||||
@@ -88,7 +85,7 @@ func (t *tenantManager) Cleanup() error {
|
||||
|
||||
t.rl.cleanup()
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *tenantManager) listenForWorkerLeases(ctx context.Context) {
|
||||
|
||||
Reference in New Issue
Block a user