mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-20 16:52:08 -05:00
fix: cleanup orphaned metrics (#3300)
This commit is contained in:
@@ -40,6 +40,7 @@ type SchedulerExtension interface {
|
||||
SetTenants(tenants []*sqlcv1.Tenant)
|
||||
ReportSnapshot(tenantId uuid.UUID, input *SnapshotInput)
|
||||
PostAssign(tenantId uuid.UUID, input *PostAssignInput)
|
||||
CleanupTenant(tenantId uuid.UUID) error
|
||||
Cleanup() error
|
||||
}
|
||||
|
||||
@@ -79,6 +80,20 @@ func (e *Extensions) PostAssign(tenantId uuid.UUID, input *PostAssignInput) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Extensions) CleanupTenant(tenantId uuid.UUID) error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
eg := errgroup.Group{}
|
||||
|
||||
for _, ext := range e.exts {
|
||||
f := ext.CleanupTenant
|
||||
eg.Go(func() error { return f(tenantId) })
|
||||
}
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (e *Extensions) Cleanup() error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
@@ -157,11 +157,13 @@ func (p *SchedulingPool) cleanupTenants(toCleanup []*tenantManager) {
|
||||
go func(tm *tenantManager) {
|
||||
defer wg.Done()
|
||||
|
||||
err := tm.Cleanup()
|
||||
|
||||
if err != nil {
|
||||
if err := tm.Cleanup(); err != nil {
|
||||
tm.l.Error().Err(err).Msg("failed to cleanup tenant manager")
|
||||
}
|
||||
|
||||
if err := p.Extensions.CleanupTenant(tm.tenantId); err != nil {
|
||||
tm.l.Error().Err(err).Msg("failed to cleanup extension metrics for tenant")
|
||||
}
|
||||
}(tm)
|
||||
}
|
||||
|
||||
|
||||
@@ -79,14 +79,21 @@ func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *Snapshot
|
||||
|
||||
currentWorkers := make(map[WorkerPromLabels]struct{}, len(workerPromLabelsToSlotData))
|
||||
for promLabels, utilization := range workerPromLabelsToSlotData {
|
||||
currentWorkers[promLabels] = struct{}{}
|
||||
|
||||
usedSlots := float64(utilization.UtilizedSlots)
|
||||
availableSlots := float64(utilization.NonUtilizedSlots)
|
||||
|
||||
// Skip setting gauge values for workers with no slots yet to avoid reporting
|
||||
// transient 0-slot state between worker registration and slot replenishment.
|
||||
// Previous non-zero values are preserved until the worker is replenished or removed.
|
||||
if usedSlots+availableSlots == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
prometheus.TenantWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots + availableSlots)
|
||||
prometheus.TenantUsedWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(usedSlots)
|
||||
prometheus.TenantAvailableWorkerSlots.WithLabelValues(tenantIdStr, promLabels.ID.String(), promLabels.Name).Set(availableSlots)
|
||||
|
||||
currentWorkers[promLabels] = struct{}{}
|
||||
}
|
||||
|
||||
p.tenantIdToWorkerLabels[tenantId] = currentWorkers
|
||||
@@ -94,10 +101,38 @@ func (p *PrometheusExtension) ReportSnapshot(tenantId uuid.UUID, input *Snapshot
|
||||
|
||||
func (p *PrometheusExtension) PostAssign(tenantId uuid.UUID, input *PostAssignInput) {}
|
||||
|
||||
func (p *PrometheusExtension) deleteGaugesForTenant(tenantId uuid.UUID) {
|
||||
tenantIdStr := tenantId.String()
|
||||
|
||||
if known, ok := p.tenantIdToWorkerLabels[tenantId]; ok {
|
||||
for labels := range known {
|
||||
prometheus.TenantWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
prometheus.TenantUsedWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
prometheus.TenantAvailableWorkerSlots.DeleteLabelValues(tenantIdStr, labels.ID.String(), labels.Name)
|
||||
}
|
||||
|
||||
delete(p.tenantIdToWorkerLabels, tenantId)
|
||||
}
|
||||
|
||||
delete(p.tenants, tenantId)
|
||||
}
|
||||
|
||||
func (p *PrometheusExtension) CleanupTenant(tenantId uuid.UUID) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
p.deleteGaugesForTenant(tenantId)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PrometheusExtension) Cleanup() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
for tenantId := range p.tenantIdToWorkerLabels {
|
||||
p.deleteGaugesForTenant(tenantId)
|
||||
}
|
||||
|
||||
p.tenants = make(map[uuid.UUID]*sqlcv1.Tenant)
|
||||
p.tenantIdToWorkerLabels = make(map[uuid.UUID]map[WorkerPromLabels]struct{})
|
||||
return nil
|
||||
|
||||
@@ -41,6 +41,8 @@ func (c *captureSnapshotsExt) ReportSnapshot(tenantId uuid.UUID, input *schedv1.
|
||||
|
||||
func (c *captureSnapshotsExt) PostAssign(_ uuid.UUID, _ *schedv1.PostAssignInput) {}
|
||||
|
||||
func (c *captureSnapshotsExt) CleanupTenant(_ uuid.UUID) error { return nil }
|
||||
|
||||
func (c *captureSnapshotsExt) Cleanup() error { return nil }
|
||||
|
||||
func runWithDatabase(t *testing.T, test func(conf *database.Layer) error) {
|
||||
|
||||
Reference in New Issue
Block a user