From 10a66938f93ff78f55001324a2db5b4e0c17d928 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 16 Oct 2025 21:28:19 +0200 Subject: [PATCH] fix: guard from potential deadlock with requests in flight (#6484) * fix(watchdog): guard from potential deadlock with requests in flight Signed-off-by: Ettore Di Giacinto * Improve locking when loading models Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- pkg/model/loader.go | 11 ++++++++--- pkg/model/watchdog.go | 39 ++++++++++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/pkg/model/loader.go b/pkg/model/loader.go index b52c91fcc..0851d3cac 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -149,8 +149,11 @@ func (ml *ModelLoader) ListLoadedModels() []*Model { } func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) { + ml.mu.Lock() + defer ml.mu.Unlock() + // Check if we already have a loaded model - if model := ml.CheckIsLoaded(modelID); model != nil { + if model := ml.checkIsLoaded(modelID); model != nil { return model, nil } @@ -158,8 +161,6 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, modelFile := filepath.Join(ml.ModelPath, modelName) log.Debug().Msgf("Loading model in memory from file: %s", modelFile) - ml.mu.Lock() - defer ml.mu.Unlock() model, err := loader(modelID, modelName, modelFile) if err != nil { return nil, fmt.Errorf("failed to load model with internal loader: %s", err) @@ -184,6 +185,10 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error { func (ml *ModelLoader) CheckIsLoaded(s string) *Model { ml.mu.Lock() defer ml.mu.Unlock() + return ml.checkIsLoaded(s) +} + +func (ml *ModelLoader) checkIsLoaded(s string) *Model { m, ok := ml.models[s] if !ok { return nil diff --git a/pkg/model/watchdog.go b/pkg/model/watchdog.go index 5702dda54..483fc7304 100644 --- a/pkg/model/watchdog.go +++ b/pkg/model/watchdog.go @@ -44,6 +44,7 @@ func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy busyCheck: busy, idleCheck: idle, addressModelMap: make(map[string]string), + stop: make(chan bool, 1), } } @@ -104,18 +105,18 @@ func (wd *WatchDog) Run() { func (wd *WatchDog) checkIdle() { wd.Lock() - defer wd.Unlock() log.Debug().Msg("[WatchDog] Watchdog checks for idle connections") + + // Collect models to shutdown while holding the lock + var modelsToShutdown []string for address, t := range wd.idleTime { log.Debug().Msgf("[WatchDog] %s: idle connection", address) if time.Since(t) > wd.idletimeout { log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address) model, ok := wd.addressModelMap[address] if ok { - if err := wd.pm.ShutdownModel(model); err != nil { - log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") - } - log.Debug().Msgf("[WatchDog] model shut down: %s", address) + modelsToShutdown = append(modelsToShutdown, model) + // Clean up the maps while we have the lock delete(wd.idleTime, address) delete(wd.addressModelMap, address) delete(wd.addressMap, address) @@ -125,25 +126,32 @@ func (wd *WatchDog) checkIdle() { } } } + wd.Unlock() + + // Now shutdown models without holding the watchdog lock to prevent deadlock + for _, model := range modelsToShutdown { + if err := wd.pm.ShutdownModel(model); err != nil { + log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") + } + log.Debug().Msgf("[WatchDog] model shut down: %s", model) + } } func (wd *WatchDog) checkBusy() { wd.Lock() - defer wd.Unlock() log.Debug().Msg("[WatchDog] Watchdog checks for busy connections") + // Collect models to shutdown while holding the lock + var modelsToShutdown []string for address, t := range wd.timetable { log.Debug().Msgf("[WatchDog] %s: active connection", address) if time.Since(t) > wd.timeout { - model, ok := wd.addressModelMap[address] if ok { log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model) - if err := wd.pm.ShutdownModel(model); err != nil { - log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") - } - log.Debug().Msgf("[WatchDog] model shut down: %s", address) + modelsToShutdown = append(modelsToShutdown, model) + // Clean up the maps while we have the lock delete(wd.timetable, address) delete(wd.addressModelMap, address) delete(wd.addressMap, address) @@ -153,4 +161,13 @@ func (wd *WatchDog) checkBusy() { } } } + wd.Unlock() + + // Now shutdown models without holding the watchdog lock to prevent deadlock + for _, model := range modelsToShutdown { + if err := wd.pm.ShutdownModel(model); err != nil { + log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model") + } + log.Debug().Msgf("[WatchDog] model shut down: %s", model) + } }