mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 18:19:17 -05:00
fix: delete missing workers (#3273)
This commit is contained in:
@@ -10,13 +10,15 @@ import (
|
||||
)
|
||||
|
||||
type PrometheusExtension struct {
|
||||
mu sync.RWMutex
|
||||
tenants map[string]*sqlcv1.Tenant
|
||||
mu sync.RWMutex
|
||||
tenants map[uuid.UUID]*sqlcv1.Tenant
|
||||
tenantIdToWorkerLabels map[uuid.UUID]map[WorkerPromLabels]struct{}
|
||||
}
|
||||
|
||||
func NewPrometheusExtension() *PrometheusExtension {
|
||||
return &PrometheusExtension{
|
||||
tenants: make(map[string]*sqlcv1.Tenant),
|
||||
tenants: make(map[uuid.UUID]*sqlcv1.Tenant),
|
||||
tenantIdToWorkerLabels: make(map[uuid.UUID]map[WorkerPromLabels]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +27,7 @@ func (p *PrometheusExtension) SetTenants(tenants []*sqlcv1.Tenant) {
|
||||
defer p.mu.Unlock()
|
||||
|
||||
for _, tenant := range tenants {
|
||||
p.tenants[tenant.ID.String()] = tenant
|
||||
p.tenants[tenant.ID] = tenant
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,10 +37,12 @@ type WorkerPromLabels struct {
|
||||
}
|
||||
|
||||
func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
workerPromLabelsToSlotData := make(map[*WorkerPromLabels]*SlotUtilization)
|
||||
tenantIdStr := tenantId.String()
|
||||
|
||||
workerPromLabelsToSlotData := make(map[WorkerPromLabels]*SlotUtilization)
|
||||
|
||||
for workerId, utilization := range input.WorkerSlotUtilization {
|
||||
worker, ok := input.Workers[workerId]
|
||||
@@ -46,7 +50,7 @@ func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *Snapshot
|
||||
continue
|
||||
}
|
||||
|
||||
promLabels := &WorkerPromLabels{
|
||||
promLabels := WorkerPromLabels{
|
||||
ID: worker.WorkerId,
|
||||
Name: worker.Name,
|
||||
}
|
||||
@@ -55,20 +59,37 @@ func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *Snapshot
|
||||
if ok {
|
||||
data.UtilizedSlots += utilization.UtilizedSlots
|
||||
data.NonUtilizedSlots += utilization.NonUtilizedSlots
|
||||
workerPromLabelsToSlotData[promLabels] = data
|
||||
} else {
|
||||
workerPromLabelsToSlotData[promLabels] = utilization
|
||||
workerPromLabelsToSlotData[promLabels] = &SlotUtilization{
|
||||
UtilizedSlots: utilization.UtilizedSlots,
|
||||
NonUtilizedSlots: utilization.NonUtilizedSlots,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if known, ok := p.tenantIdToWorkerLabels[tenantId]; ok {
|
||||
for labels := range known {
|
||||
if _, stillActive := workerPromLabelsToSlotData[labels]; !stillActive {
|
||||
prometheus.TenantWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
prometheus.TenantUsedWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
prometheus.TenantAvailableWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
currentWorkers := make(map[WorkerPromLabels]struct{}, len(workerPromLabelsToSlotData))
|
||||
for promLabels, utilization := range workerPromLabelsToSlotData {
|
||||
usedSlots := float64(utilization.UtilizedSlots)
|
||||
availableSlots := float64(utilization.NonUtilizedSlots)
|
||||
|
||||
prometheus.TenantWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(usedSlots + availableSlots)
|
||||
prometheus.TenantUsedWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(usedSlots)
|
||||
prometheus.TenantAvailableWorkerSlots.WithLabelValues(tenantId.String(), promLabels.ID.String(), promLabels.Name).Set(availableSlots)
|
||||
prometheus.TenantWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots + availableSlots)
|
||||
prometheus.TenantUsedWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots)
|
||||
prometheus.TenantAvailableWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(availableSlots)
|
||||
|
||||
currentWorkers[promLabels] = struct{}{}
|
||||
}
|
||||
|
||||
p.tenantIdToWorkerLabels[tenantId] = currentWorkers
|
||||
}
|
||||
|
||||
func (p *PrometheusExtension) PostAssign(tenantId uuid.UUID, input *PostAssignInput) {}
|
||||
@@ -77,6 +98,7 @@ func (p *PrometheusExtension) Cleanup() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
p.tenants = make(map[string]*sqlcv1.Tenant)
|
||||
p.tenants = make(map[uuid.UUID]*sqlcv1.Tenant)
|
||||
p.tenantIdToWorkerLabels = make(map[uuid.UUID]map[WorkerPromLabels]struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user