mirror of
https://github.com/mudler/LocalAI.git
synced 2025-12-30 22:20:20 -06:00
fix: guard from potential deadlock with requests in flight (#6484)
* fix(watchdog): guard from potential deadlock with requests in flight Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Improve locking when loading models Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
committed by
GitHub
parent
f0245fa36c
commit
10a66938f9
@@ -149,8 +149,11 @@ func (ml *ModelLoader) ListLoadedModels() []*Model {
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
|
||||
// Check if we already have a loaded model
|
||||
if model := ml.CheckIsLoaded(modelID); model != nil {
|
||||
if model := ml.checkIsLoaded(modelID); model != nil {
|
||||
return model, nil
|
||||
}
|
||||
|
||||
@@ -158,8 +161,6 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string,
|
||||
modelFile := filepath.Join(ml.ModelPath, modelName)
|
||||
log.Debug().Msgf("Loading model in memory from file: %s", modelFile)
|
||||
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
model, err := loader(modelID, modelName, modelFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load model with internal loader: %s", err)
|
||||
@@ -184,6 +185,10 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
|
||||
func (ml *ModelLoader) CheckIsLoaded(s string) *Model {
|
||||
ml.mu.Lock()
|
||||
defer ml.mu.Unlock()
|
||||
return ml.checkIsLoaded(s)
|
||||
}
|
||||
|
||||
func (ml *ModelLoader) checkIsLoaded(s string) *Model {
|
||||
m, ok := ml.models[s]
|
||||
if !ok {
|
||||
return nil
|
||||
|
||||
@@ -44,6 +44,7 @@ func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy
|
||||
busyCheck: busy,
|
||||
idleCheck: idle,
|
||||
addressModelMap: make(map[string]string),
|
||||
stop: make(chan bool, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,18 +105,18 @@ func (wd *WatchDog) Run() {
|
||||
|
||||
func (wd *WatchDog) checkIdle() {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
log.Debug().Msg("[WatchDog] Watchdog checks for idle connections")
|
||||
|
||||
// Collect models to shutdown while holding the lock
|
||||
var modelsToShutdown []string
|
||||
for address, t := range wd.idleTime {
|
||||
log.Debug().Msgf("[WatchDog] %s: idle connection", address)
|
||||
if time.Since(t) > wd.idletimeout {
|
||||
log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address)
|
||||
model, ok := wd.addressModelMap[address]
|
||||
if ok {
|
||||
if err := wd.pm.ShutdownModel(model); err != nil {
|
||||
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
|
||||
}
|
||||
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
|
||||
modelsToShutdown = append(modelsToShutdown, model)
|
||||
// Clean up the maps while we have the lock
|
||||
delete(wd.idleTime, address)
|
||||
delete(wd.addressModelMap, address)
|
||||
delete(wd.addressMap, address)
|
||||
@@ -125,25 +126,32 @@ func (wd *WatchDog) checkIdle() {
|
||||
}
|
||||
}
|
||||
}
|
||||
wd.Unlock()
|
||||
|
||||
// Now shutdown models without holding the watchdog lock to prevent deadlock
|
||||
for _, model := range modelsToShutdown {
|
||||
if err := wd.pm.ShutdownModel(model); err != nil {
|
||||
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
|
||||
}
|
||||
log.Debug().Msgf("[WatchDog] model shut down: %s", model)
|
||||
}
|
||||
}
|
||||
|
||||
func (wd *WatchDog) checkBusy() {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
log.Debug().Msg("[WatchDog] Watchdog checks for busy connections")
|
||||
|
||||
// Collect models to shutdown while holding the lock
|
||||
var modelsToShutdown []string
|
||||
for address, t := range wd.timetable {
|
||||
log.Debug().Msgf("[WatchDog] %s: active connection", address)
|
||||
|
||||
if time.Since(t) > wd.timeout {
|
||||
|
||||
model, ok := wd.addressModelMap[address]
|
||||
if ok {
|
||||
log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
|
||||
if err := wd.pm.ShutdownModel(model); err != nil {
|
||||
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
|
||||
}
|
||||
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
|
||||
modelsToShutdown = append(modelsToShutdown, model)
|
||||
// Clean up the maps while we have the lock
|
||||
delete(wd.timetable, address)
|
||||
delete(wd.addressModelMap, address)
|
||||
delete(wd.addressMap, address)
|
||||
@@ -153,4 +161,13 @@ func (wd *WatchDog) checkBusy() {
|
||||
}
|
||||
}
|
||||
}
|
||||
wd.Unlock()
|
||||
|
||||
// Now shutdown models without holding the watchdog lock to prevent deadlock
|
||||
for _, model := range modelsToShutdown {
|
||||
if err := wd.pm.ShutdownModel(model); err != nil {
|
||||
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
|
||||
}
|
||||
log.Debug().Msgf("[WatchDog] model shut down: %s", model)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user