diff --git a/core/application/application.go b/core/application/application.go index 3e241c698..38a9d2cf9 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -29,7 +29,7 @@ type Application struct { func newApplication(appConfig *config.ApplicationConfig) *Application { return &Application{ backendLoader: config.NewModelConfigLoader(appConfig.SystemState.Model.ModelsPath), - modelLoader: model.NewModelLoader(appConfig.SystemState, appConfig.SingleBackend), + modelLoader: model.NewModelLoader(appConfig.SystemState), applicationConfig: appConfig, templatesEvaluator: templates.NewEvaluator(appConfig.SystemState.Model.ModelsPath), } diff --git a/core/application/config_file_watcher.go b/core/application/config_file_watcher.go index 999d29aec..30b3e5ad6 100644 --- a/core/application/config_file_watcher.go +++ b/core/application/config_file_watcher.go @@ -191,7 +191,8 @@ type runtimeSettings struct { WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` - SingleBackend *bool `json:"single_backend,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead + MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited, 1 = single backend mode) ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` Threads *int `json:"threads,omitempty"` ContextSize *int `json:"context_size,omitempty"` @@ -224,6 +225,7 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand envWatchdogIdleTimeout := appConfig.WatchDogIdleTimeout == startupAppConfig.WatchDogIdleTimeout envWatchdogBusyTimeout := appConfig.WatchDogBusyTimeout == startupAppConfig.WatchDogBusyTimeout envSingleBackend := appConfig.SingleBackend == startupAppConfig.SingleBackend + envMaxActiveBackends := appConfig.MaxActiveBackends == startupAppConfig.MaxActiveBackends envParallelRequests := appConfig.ParallelBackendRequests == startupAppConfig.ParallelBackendRequests envThreads := appConfig.Threads == startupAppConfig.Threads envContextSize := appConfig.ContextSize == startupAppConfig.ContextSize @@ -275,8 +277,19 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json") } } - if settings.SingleBackend != nil && !envSingleBackend { + // Handle MaxActiveBackends (new) and SingleBackend (deprecated) + if settings.MaxActiveBackends != nil && !envMaxActiveBackends { + appConfig.MaxActiveBackends = *settings.MaxActiveBackends + // For backward compatibility, also set SingleBackend if MaxActiveBackends == 1 + appConfig.SingleBackend = (*settings.MaxActiveBackends == 1) + } else if settings.SingleBackend != nil && !envSingleBackend { + // Legacy: SingleBackend maps to MaxActiveBackends = 1 appConfig.SingleBackend = *settings.SingleBackend + if *settings.SingleBackend { + appConfig.MaxActiveBackends = 1 + } else { + appConfig.MaxActiveBackends = 0 + } } if settings.ParallelBackendRequests != nil && !envParallelRequests { appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests diff --git a/core/application/startup.go b/core/application/startup.go index 2bbbdfac7..490fee24e 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -224,7 +224,8 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) { WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` - SingleBackend *bool `json:"single_backend,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead + MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited) ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` AgentJobRetentionDays *int `json:"agent_job_retention_days,omitempty"` } @@ -280,9 +281,21 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) { } } } - if settings.SingleBackend != nil { + // Handle MaxActiveBackends (new) and SingleBackend (deprecated) + if settings.MaxActiveBackends != nil { + // Only apply if current value is default (0), suggesting it wasn't set from env var + if options.MaxActiveBackends == 0 { + options.MaxActiveBackends = *settings.MaxActiveBackends + // For backward compatibility, also set SingleBackend if MaxActiveBackends == 1 + options.SingleBackend = (*settings.MaxActiveBackends == 1) + } + } else if settings.SingleBackend != nil { + // Legacy: SingleBackend maps to MaxActiveBackends = 1 if !options.SingleBackend { options.SingleBackend = *settings.SingleBackend + if *settings.SingleBackend { + options.MaxActiveBackends = 1 + } } } if settings.ParallelBackendRequests != nil { @@ -307,15 +320,25 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) { // initializeWatchdog initializes the watchdog with current ApplicationConfig settings func initializeWatchdog(application *Application, options *config.ApplicationConfig) { - if options.WatchDog { + // Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend) + lruLimit := options.GetEffectiveMaxActiveBackends() + + // Create watchdog if enabled OR if LRU limit is set + if options.WatchDog || lruLimit > 0 { wd := model.NewWatchDog( application.ModelLoader(), options.WatchDogBusyTimeout, options.WatchDogIdleTimeout, options.WatchDogBusy, - options.WatchDogIdle) + options.WatchDogIdle, + lruLimit) application.ModelLoader().SetWatchDog(wd) - go wd.Run() + + // Start watchdog goroutine only if busy/idle checks are enabled + if options.WatchDogBusy || options.WatchDogIdle { + go wd.Run() + } + go func() { <-options.Context.Done() log.Debug().Msgf("Context canceled, shutting down") diff --git a/core/application/watchdog.go b/core/application/watchdog.go index 20acf0b7a..e82ac28dc 100644 --- a/core/application/watchdog.go +++ b/core/application/watchdog.go @@ -20,21 +20,29 @@ func (a *Application) StopWatchdog() error { func (a *Application) startWatchdog() error { appConfig := a.ApplicationConfig() - // Create new watchdog if enabled - if appConfig.WatchDog { + // Get effective max active backends (considers both MaxActiveBackends and deprecated SingleBackend) + lruLimit := appConfig.GetEffectiveMaxActiveBackends() + + // Create watchdog if enabled OR if LRU limit is set + // LRU eviction requires watchdog infrastructure even without busy/idle checks + if appConfig.WatchDog || lruLimit > 0 { wd := model.NewWatchDog( a.modelLoader, appConfig.WatchDogBusyTimeout, appConfig.WatchDogIdleTimeout, appConfig.WatchDogBusy, - appConfig.WatchDogIdle) + appConfig.WatchDogIdle, + lruLimit) a.modelLoader.SetWatchDog(wd) // Create new stop channel a.watchdogStop = make(chan bool, 1) - // Start watchdog goroutine - go wd.Run() + // Start watchdog goroutine only if busy/idle checks are enabled + // LRU eviction doesn't need the Run() loop - it's triggered on model load + if appConfig.WatchDogBusy || appConfig.WatchDogIdle { + go wd.Run() + } // Setup shutdown handler go func() { @@ -48,7 +56,7 @@ func (a *Application) startWatchdog() error { } }() - log.Info().Msg("Watchdog started with new settings") + log.Info().Int("lruLimit", lruLimit).Bool("busyCheck", appConfig.WatchDogBusy).Bool("idleCheck", appConfig.WatchDogIdle).Msg("Watchdog started with new settings") } else { log.Info().Msg("Watchdog disabled") } diff --git a/core/backend/detection.go b/core/backend/detection.go index a3a443952..1b1991824 100644 --- a/core/backend/detection.go +++ b/core/backend/detection.go @@ -20,7 +20,6 @@ func Detection( if err != nil { return nil, err } - defer loader.Close() if detectionModel == nil { return nil, fmt.Errorf("could not load detection model") diff --git a/core/backend/embeddings.go b/core/backend/embeddings.go index c809992a4..2383023c0 100644 --- a/core/backend/embeddings.go +++ b/core/backend/embeddings.go @@ -17,7 +17,6 @@ func ModelEmbedding(s string, tokens []int, loader *model.ModelLoader, modelConf if err != nil { return nil, err } - defer loader.Close() var fn func() ([]float32, error) switch model := inferenceModel.(type) { diff --git a/core/backend/image.go b/core/backend/image.go index 796c71979..b6bb4f8a7 100644 --- a/core/backend/image.go +++ b/core/backend/image.go @@ -16,7 +16,6 @@ func ImageGeneration(height, width, mode, step, seed int, positive_prompt, negat if err != nil { return nil, err } - defer loader.Close() fn := func() error { _, err := inferenceModel.GenerateImage( diff --git a/core/backend/llm.go b/core/backend/llm.go index c00f5876d..92ba91839 100644 --- a/core/backend/llm.go +++ b/core/backend/llm.go @@ -60,7 +60,6 @@ func ModelInference(ctx context.Context, s string, messages schema.Messages, ima if err != nil { return nil, err } - defer loader.Close() var protoMessages []*proto.Message // if we are using the tokenizer template, we need to convert the messages to proto messages diff --git a/core/backend/rerank.go b/core/backend/rerank.go index 068d05e68..bcfad7382 100644 --- a/core/backend/rerank.go +++ b/core/backend/rerank.go @@ -15,7 +15,6 @@ func Rerank(request *proto.RerankRequest, loader *model.ModelLoader, appConfig * if err != nil { return nil, err } - defer loader.Close() if rerankModel == nil { return nil, fmt.Errorf("could not load rerank model") diff --git a/core/backend/soundgeneration.go b/core/backend/soundgeneration.go index 2c91958cf..ca78b2db9 100644 --- a/core/backend/soundgeneration.go +++ b/core/backend/soundgeneration.go @@ -29,7 +29,6 @@ func SoundGeneration( if err != nil { return "", nil, err } - defer loader.Close() if soundGenModel == nil { return "", nil, fmt.Errorf("could not load sound generation model") diff --git a/core/backend/token_metrics.go b/core/backend/token_metrics.go index c3e15d773..c81f57cab 100644 --- a/core/backend/token_metrics.go +++ b/core/backend/token_metrics.go @@ -20,7 +20,6 @@ func TokenMetrics( if err != nil { return nil, err } - defer loader.Close() if model == nil { return nil, fmt.Errorf("could not loadmodel model") diff --git a/core/backend/tokenize.go b/core/backend/tokenize.go index e85958b27..5803e44be 100644 --- a/core/backend/tokenize.go +++ b/core/backend/tokenize.go @@ -17,7 +17,6 @@ func ModelTokenize(s string, loader *model.ModelLoader, modelConfig config.Model if err != nil { return schema.TokenizeResponse{}, err } - defer loader.Close() predictOptions := gRPCPredictOpts(modelConfig, loader.ModelPath) predictOptions.Prompt = s diff --git a/core/backend/transcript.go b/core/backend/transcript.go index 576458250..9781e26fd 100644 --- a/core/backend/transcript.go +++ b/core/backend/transcript.go @@ -24,7 +24,6 @@ func ModelTranscription(audio, language string, translate bool, diarize bool, ml if err != nil { return nil, err } - defer ml.Close() if transcriptionModel == nil { return nil, fmt.Errorf("could not load transcription model") diff --git a/core/backend/tts.go b/core/backend/tts.go index 7b478a5fc..9c75cb37a 100644 --- a/core/backend/tts.go +++ b/core/backend/tts.go @@ -26,7 +26,6 @@ func ModelTTS( if err != nil { return "", nil, err } - defer loader.Close() if ttsModel == nil { return "", nil, fmt.Errorf("could not load tts model %q", modelConfig.Model) diff --git a/core/backend/vad.go b/core/backend/vad.go index 91f70bbc3..37859931d 100644 --- a/core/backend/vad.go +++ b/core/backend/vad.go @@ -19,7 +19,6 @@ func VAD(request *schema.VADRequest, if err != nil { return nil, err } - defer ml.Close() req := proto.VADRequest{ Audio: request.Audio, diff --git a/core/backend/video.go b/core/backend/video.go index a7a39bf24..666a76252 100644 --- a/core/backend/video.go +++ b/core/backend/video.go @@ -16,7 +16,6 @@ func VideoGeneration(height, width int32, prompt, negativePrompt, startImage, en if err != nil { return nil, err } - defer loader.Close() fn := func() error { _, err := inferenceModel.GenerateVideo( diff --git a/core/cli/backends.go b/core/cli/backends.go index 6ccc6496a..aa32a40e4 100644 --- a/core/cli/backends.go +++ b/core/cli/backends.go @@ -102,7 +102,7 @@ func (bi *BackendsInstall) Run(ctx *cliContext.Context) error { } } - modelLoader := model.NewModelLoader(systemState, true) + modelLoader := model.NewModelLoader(systemState) err = startup.InstallExternalBackends(context.Background(), galleries, systemState, modelLoader, progressCallback, bi.BackendArgs, bi.Name, bi.Alias) if err != nil { return err diff --git a/core/cli/models.go b/core/cli/models.go index bcbb60d48..ba76e8527 100644 --- a/core/cli/models.go +++ b/core/cli/models.go @@ -80,7 +80,7 @@ func (mi *ModelsInstall) Run(ctx *cliContext.Context) error { return err } - galleryService := services.NewGalleryService(&config.ApplicationConfig{}, model.NewModelLoader(systemState, true)) + galleryService := services.NewGalleryService(&config.ApplicationConfig{}, model.NewModelLoader(systemState)) err = galleryService.Start(context.Background(), config.NewModelConfigLoader(mi.ModelsPath), systemState) if err != nil { return err @@ -134,7 +134,7 @@ func (mi *ModelsInstall) Run(ctx *cliContext.Context) error { log.Info().Str("model", modelName).Str("license", model.License).Msg("installing model") } - modelLoader := model.NewModelLoader(systemState, true) + modelLoader := model.NewModelLoader(systemState) err = startup.InstallModels(context.Background(), galleryService, galleries, backendGalleries, systemState, modelLoader, !mi.DisablePredownloadScan, mi.AutoloadBackendGalleries, progressCallback, modelName) if err != nil { return err diff --git a/core/cli/run.go b/core/cli/run.go index 3cc77baf1..4df4fbdf3 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -64,7 +64,8 @@ type RunCMD struct { Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"` ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"` - SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"` + SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time (deprecated: use --max-active-backends=1 instead)" group:"backends"` + MaxActiveBackends int `env:"LOCALAI_MAX_ACTIVE_BACKENDS,MAX_ACTIVE_BACKENDS" default:"0" help:"Maximum number of backends to keep loaded at once (0 = unlimited, 1 = single backend mode). Least recently used backends are evicted when limit is reached" group:"backends"` PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"` ExternalGRPCBackends []string `env:"LOCALAI_EXTERNAL_GRPC_BACKENDS,EXTERNAL_GRPC_BACKENDS" help:"A list of external grpc backends" group:"backends"` EnableWatchdogIdle bool `env:"LOCALAI_WATCHDOG_IDLE,WATCHDOG_IDLE" default:"false" help:"Enable watchdog for stopping backends that are idle longer than the watchdog-idle-timeout" group:"backends"` @@ -202,7 +203,13 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { if r.ParallelRequests { opts = append(opts, config.EnableParallelBackendRequests) } - if r.SingleActiveBackend { + + // Handle max active backends (LRU eviction) + // MaxActiveBackends takes precedence over SingleActiveBackend + if r.MaxActiveBackends > 0 { + opts = append(opts, config.SetMaxActiveBackends(r.MaxActiveBackends)) + } else if r.SingleActiveBackend { + // Backward compatibility: --single-active-backend is equivalent to --max-active-backends=1 opts = append(opts, config.EnableSingleBackend) } diff --git a/core/cli/soundgeneration.go b/core/cli/soundgeneration.go index a0f96b4fb..c94bb294d 100644 --- a/core/cli/soundgeneration.go +++ b/core/cli/soundgeneration.go @@ -79,7 +79,7 @@ func (t *SoundGenerationCMD) Run(ctx *cliContext.Context) error { GeneratedContentDir: outputDir, ExternalGRPCBackends: externalBackends, } - ml := model.NewModelLoader(systemState, opts.SingleBackend) + ml := model.NewModelLoader(systemState) defer func() { err := ml.StopAllGRPC() diff --git a/core/cli/transcript.go b/core/cli/transcript.go index 30a686aab..2beb00944 100644 --- a/core/cli/transcript.go +++ b/core/cli/transcript.go @@ -38,7 +38,7 @@ func (t *TranscriptCMD) Run(ctx *cliContext.Context) error { } cl := config.NewModelConfigLoader(t.ModelsPath) - ml := model.NewModelLoader(systemState, opts.SingleBackend) + ml := model.NewModelLoader(systemState) if err := cl.LoadModelConfigsFromPath(t.ModelsPath); err != nil { return err } diff --git a/core/cli/tts.go b/core/cli/tts.go index ed0266714..43c1749e8 100644 --- a/core/cli/tts.go +++ b/core/cli/tts.go @@ -48,7 +48,7 @@ func (t *TTSCMD) Run(ctx *cliContext.Context) error { GeneratedContentDir: outputDir, } - ml := model.NewModelLoader(systemState, opts.SingleBackend) + ml := model.NewModelLoader(systemState) defer func() { err := ml.StopAllGRPC() diff --git a/core/cli/worker/worker_llamacpp.go b/core/cli/worker/worker_llamacpp.go index 1b4be6736..a7ecec406 100644 --- a/core/cli/worker/worker_llamacpp.go +++ b/core/cli/worker/worker_llamacpp.go @@ -37,7 +37,7 @@ func findLLamaCPPBackend(galleries string, systemState *system.SystemState) (str backend, ok := backends.Get(llamaCPPGalleryName) if !ok { - ml := model.NewModelLoader(systemState, true) + ml := model.NewModelLoader(systemState) var gals []config.Gallery if err := json.Unmarshal([]byte(galleries), &gals); err != nil { log.Error().Err(err).Msg("failed loading galleries") diff --git a/core/config/application_config.go b/core/config/application_config.go index 4d770179b..c67e24f5c 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -52,7 +52,8 @@ type ApplicationConfig struct { AutoloadGalleries, AutoloadBackendGalleries bool - SingleBackend bool + SingleBackend bool // Deprecated: use MaxActiveBackends = 1 instead + MaxActiveBackends int // Maximum number of active backends (0 = unlimited, 1 = single backend mode) ParallelBackendRequests bool WatchDogIdle bool @@ -186,8 +187,38 @@ func SetWatchDogIdleTimeout(t time.Duration) AppOption { } } +// EnableSingleBackend is deprecated: use SetMaxActiveBackends(1) instead. +// This is kept for backward compatibility. var EnableSingleBackend = func(o *ApplicationConfig) { o.SingleBackend = true + o.MaxActiveBackends = 1 +} + +// SetMaxActiveBackends sets the maximum number of active backends. +// 0 = unlimited, 1 = single backend mode (replaces EnableSingleBackend) +func SetMaxActiveBackends(n int) AppOption { + return func(o *ApplicationConfig) { + o.MaxActiveBackends = n + // For backward compatibility, also set SingleBackend if n == 1 + if n == 1 { + o.SingleBackend = true + } + } +} + +// GetEffectiveMaxActiveBackends returns the effective max active backends limit. +// It considers both MaxActiveBackends and the deprecated SingleBackend setting. +// If MaxActiveBackends is set (> 0), it takes precedence. +// If SingleBackend is true and MaxActiveBackends is 0, returns 1. +// Otherwise returns 0 (unlimited). +func (o *ApplicationConfig) GetEffectiveMaxActiveBackends() int { + if o.MaxActiveBackends > 0 { + return o.MaxActiveBackends + } + if o.SingleBackend { + return 1 + } + return 0 } var EnableParallelBackendRequests = func(o *ApplicationConfig) { diff --git a/core/gallery/backends_test.go b/core/gallery/backends_test.go index 756d2e7a2..3799dc682 100644 --- a/core/gallery/backends_test.go +++ b/core/gallery/backends_test.go @@ -108,7 +108,7 @@ var _ = Describe("Gallery Backends", func() { } systemState, err = system.GetSystemState(system.WithBackendPath(tempDir)) Expect(err).NotTo(HaveOccurred()) - ml = model.NewModelLoader(systemState, true) + ml = model.NewModelLoader(systemState) }) AfterEach(func() { diff --git a/core/http/endpoints/localai/settings.go b/core/http/endpoints/localai/settings.go index d5c5cd7db..dee77646e 100644 --- a/core/http/endpoints/localai/settings.go +++ b/core/http/endpoints/localai/settings.go @@ -27,7 +27,8 @@ type RuntimeSettings struct { WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` - SingleBackend *bool `json:"single_backend,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` // Deprecated: use MaxActiveBackends = 1 instead + MaxActiveBackends *int `json:"max_active_backends,omitempty"` // Maximum number of active backends (0 = unlimited, 1 = single backend mode) ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` Threads *int `json:"threads,omitempty"` ContextSize *int `json:"context_size,omitempty"` @@ -65,6 +66,7 @@ func GetSettingsEndpoint(app *application.Application) echo.HandlerFunc { watchdogBusy := appConfig.WatchDogBusy watchdogEnabled := appConfig.WatchDog singleBackend := appConfig.SingleBackend + maxActiveBackends := appConfig.MaxActiveBackends parallelBackendRequests := appConfig.ParallelBackendRequests threads := appConfig.Threads contextSize := appConfig.ContextSize @@ -87,6 +89,7 @@ func GetSettingsEndpoint(app *application.Application) echo.HandlerFunc { settings.WatchdogBusyEnabled = &watchdogBusy settings.WatchdogEnabled = &watchdogEnabled settings.SingleBackend = &singleBackend + settings.MaxActiveBackends = &maxActiveBackends settings.ParallelBackendRequests = ¶llelBackendRequests settings.Threads = &threads settings.ContextSize = &contextSize @@ -223,8 +226,20 @@ func UpdateSettingsEndpoint(app *application.Application) echo.HandlerFunc { appConfig.WatchDogBusyTimeout = dur watchdogChanged = true } - if settings.SingleBackend != nil { + if settings.MaxActiveBackends != nil { + appConfig.MaxActiveBackends = *settings.MaxActiveBackends + // For backward compatibility, update SingleBackend too + appConfig.SingleBackend = (*settings.MaxActiveBackends == 1) + watchdogChanged = true // LRU limit is managed by watchdog + } else if settings.SingleBackend != nil { + // Legacy support: SingleBackend maps to MaxActiveBackends = 1 appConfig.SingleBackend = *settings.SingleBackend + if *settings.SingleBackend { + appConfig.MaxActiveBackends = 1 + } else { + appConfig.MaxActiveBackends = 0 + } + watchdogChanged = true // LRU limit is managed by watchdog } if settings.ParallelBackendRequests != nil { appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests diff --git a/core/http/endpoints/localai/stores.go b/core/http/endpoints/localai/stores.go index 033334375..8074da9e0 100644 --- a/core/http/endpoints/localai/stores.go +++ b/core/http/endpoints/localai/stores.go @@ -21,7 +21,6 @@ func StoresSetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi if err != nil { return err } - defer sl.Close() vals := make([][]byte, len(input.Values)) for i, v := range input.Values { @@ -49,7 +48,6 @@ func StoresDeleteEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationCo if err != nil { return err } - defer sl.Close() if err := store.DeleteCols(c.Request().Context(), sb, input.Keys); err != nil { return err @@ -71,7 +69,6 @@ func StoresGetEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConfi if err != nil { return err } - defer sl.Close() keys, vals, err := store.GetCols(c.Request().Context(), sb, input.Keys) if err != nil { @@ -103,7 +100,6 @@ func StoresFindEndpoint(sl *model.ModelLoader, appConfig *config.ApplicationConf if err != nil { return err } - defer sl.Close() keys, vals, similarities, err := store.Find(c.Request().Context(), sb, input.Key, input.Topk) if err != nil { diff --git a/core/http/views/settings.html b/core/http/views/settings.html index 95bec85bb..37292007e 100644 --- a/core/http/views/settings.html +++ b/core/http/views/settings.html @@ -138,17 +138,15 @@

- -
-
- -

Allow only one backend to be active at a time

-
- + +
+ +

Maximum number of models to keep loaded at once (0 = unlimited, 1 = single backend mode). Least recently used models are evicted when limit is reached.

+
@@ -462,7 +460,7 @@ function settingsDashboard() { watchdog_busy_enabled: false, watchdog_idle_timeout: '15m', watchdog_busy_timeout: '5m', - single_backend: false, + max_active_backends: 0, parallel_backend_requests: false, threads: 0, context_size: 0, @@ -500,7 +498,7 @@ function settingsDashboard() { watchdog_busy_enabled: data.watchdog_busy_enabled, watchdog_idle_timeout: data.watchdog_idle_timeout || '15m', watchdog_busy_timeout: data.watchdog_busy_timeout || '5m', - single_backend: data.single_backend, + max_active_backends: data.max_active_backends || 0, parallel_backend_requests: data.parallel_backend_requests, threads: data.threads || 0, context_size: data.context_size || 0, @@ -536,6 +534,12 @@ function settingsDashboard() { } }, + updateMaxActiveBackends() { + // Ensure max_active_backends is a non-negative integer + const value = parseInt(this.settings.max_active_backends) || 0; + this.settings.max_active_backends = Math.max(0, value); + }, + async saveSettings() { if (this.saving) return; @@ -560,8 +564,8 @@ function settingsDashboard() { if (this.settings.watchdog_busy_timeout) { payload.watchdog_busy_timeout = this.settings.watchdog_busy_timeout; } - if (this.settings.single_backend !== undefined) { - payload.single_backend = this.settings.single_backend; + if (this.settings.max_active_backends !== undefined) { + payload.max_active_backends = parseInt(this.settings.max_active_backends) || 0; } if (this.settings.parallel_backend_requests !== undefined) { payload.parallel_backend_requests = this.settings.parallel_backend_requests; diff --git a/core/services/agent_jobs_test.go b/core/services/agent_jobs_test.go index 447f3e6b4..ff38aaec0 100644 --- a/core/services/agent_jobs_test.go +++ b/core/services/agent_jobs_test.go @@ -42,7 +42,7 @@ var _ = Describe("AgentJobService", func() { appConfig.APIAddress = "127.0.0.1:8080" appConfig.AgentJobRetentionDays = 30 - modelLoader = model.NewModelLoader(systemState, false) + modelLoader = model.NewModelLoader(systemState) configLoader = config.NewModelConfigLoader(tempDir) evaluator = templates.NewEvaluator(tempDir) diff --git a/core/startup/model_preload_test.go b/core/startup/model_preload_test.go index 3bf6d2687..d324f44a7 100644 --- a/core/startup/model_preload_test.go +++ b/core/startup/model_preload_test.go @@ -30,7 +30,7 @@ var _ = Describe("Preload test", func() { Expect(err).ToNot(HaveOccurred()) systemState, err = system.GetSystemState(system.WithModelPath(tmpdir)) Expect(err).ToNot(HaveOccurred()) - ml = model.NewModelLoader(systemState, true) + ml = model.NewModelLoader(systemState) }) AfterEach(func() { diff --git a/docs/content/advanced/vram-management.md b/docs/content/advanced/vram-management.md index 557e51bec..2487de051 100644 --- a/docs/content/advanced/vram-management.md +++ b/docs/content/advanced/vram-management.md @@ -5,7 +5,10 @@ weight = 22 url = '/advanced/vram-management' +++ -When running multiple models in LocalAI, especially on systems with limited GPU memory (VRAM), you may encounter situations where loading a new model fails because there isn't enough available VRAM. LocalAI provides two mechanisms to automatically manage model memory allocation and prevent VRAM exhaustion. +When running multiple models in LocalAI, especially on systems with limited GPU memory (VRAM), you may encounter situations where loading a new model fails because there isn't enough available VRAM. LocalAI provides several mechanisms to automatically manage model memory allocation and prevent VRAM exhaustion: + +1. **Max Active Backends (LRU Eviction)**: Limit the number of loaded models, evicting the least recently used when the limit is reached +2. **Watchdog Mechanisms**: Automatically unload idle or stuck models based on configurable timeouts ## The Problem @@ -16,34 +19,80 @@ By default, LocalAI keeps models loaded in memory once they're first used. This This is a common issue when working with GPU-accelerated models, as VRAM is typically more limited than system RAM. For more context, see issues [#6068](https://github.com/mudler/LocalAI/issues/6068), [#7269](https://github.com/mudler/LocalAI/issues/7269), and [#5352](https://github.com/mudler/LocalAI/issues/5352). -## Solution 1: Single Active Backend +## Solution 1: Max Active Backends (LRU Eviction) -The simplest approach is to ensure only one model is loaded at a time. When a new model is requested, LocalAI will automatically unload the currently active model before loading the new one. +LocalAI supports limiting the maximum number of active backends (loaded models) using LRU (Least Recently Used) eviction. When the limit is reached and a new model needs to be loaded, the least recently used model is automatically unloaded to make room. ### Configuration -```bash -./local-ai --single-active-backend +Set the maximum number of active backends using CLI flags or environment variables: -LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai +```bash +# Allow up to 3 models loaded simultaneously +./local-ai --max-active-backends=3 + +# Using environment variables +LOCALAI_MAX_ACTIVE_BACKENDS=3 ./local-ai +MAX_ACTIVE_BACKENDS=3 ./local-ai ``` +Setting the limit to `1` is equivalent to single active backend mode (see below). Setting to `0` disables the limit (unlimited backends). + ### Use cases -- Single GPU systems with limited VRAM -- When you only need one model active at a time -- Simple deployments where model switching is acceptable +- Systems with limited VRAM that can handle a few models simultaneously +- Multi-model deployments where you want to keep frequently-used models loaded +- Balancing between memory usage and model reload times +- Production environments requiring predictable memory consumption + +### How it works + +1. When a model is requested, its "last used" timestamp is updated +2. When a new model needs to be loaded and the limit is reached, LocalAI identifies the least recently used model(s) +3. The LRU model(s) are automatically unloaded to make room for the new model +4. Concurrent requests for loading different models are handled safely - the system accounts for models currently being loaded when calculating evictions ### Example ```bash -LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai +# Allow 2 active backends +LOCALAI_MAX_ACTIVE_BACKENDS=2 ./local-ai +# First request - model-a is loaded (1 active) curl http://localhost:8080/v1/chat/completions -d '{"model": "model-a", ...}' +# Second request - model-b is loaded (2 active, at limit) +curl http://localhost:8080/v1/chat/completions -d '{"model": "model-b", ...}' + +# Third request - model-a is evicted (LRU), model-c is loaded +curl http://localhost:8080/v1/chat/completions -d '{"model": "model-c", ...}' + +# Request for model-b updates its "last used" time curl http://localhost:8080/v1/chat/completions -d '{"model": "model-b", ...}' ``` +### Single Active Backend Mode + +The simplest approach is to ensure only one model is loaded at a time. This is now implemented as `--max-active-backends=1`. When a new model is requested, LocalAI will automatically unload the currently active model before loading the new one. + +```bash +# These are equivalent: +./local-ai --max-active-backends=1 +./local-ai --single-active-backend + +# Using environment variables +LOCALAI_MAX_ACTIVE_BACKENDS=1 ./local-ai +LOCALAI_SINGLE_ACTIVE_BACKEND=true ./local-ai +``` + +> **Note:** The `--single-active-backend` flag is deprecated but still supported for backward compatibility. It is recommended to use `--max-active-backends=1` instead. + +#### Single backend use cases + +- Single GPU systems with very limited VRAM +- When you only need one model active at a time +- Simple deployments where model switching is acceptable + ## Solution 2: Watchdog Mechanisms For more flexible memory management, LocalAI provides watchdog mechanisms that automatically unload models based on their activity state. This allows multiple models to be loaded simultaneously, but automatically frees memory when models become inactive or stuck. @@ -133,6 +182,31 @@ Timeouts can be specified using Go's duration format: - `30s` - 30 seconds - `2h30m` - 2 hours and 30 minutes +## Combining LRU and Watchdog + +You can combine Max Active Backends (LRU eviction) with the watchdog mechanisms for comprehensive memory management: + +```bash +# Allow up to 3 active backends with idle watchdog +LOCALAI_MAX_ACTIVE_BACKENDS=3 \ +LOCALAI_WATCHDOG_IDLE=true \ +LOCALAI_WATCHDOG_IDLE_TIMEOUT=15m \ +./local-ai +``` + +Or using command line flags: + +```bash +./local-ai \ + --max-active-backends=3 \ + --enable-watchdog-idle --watchdog-idle-timeout=15m +``` + +This configuration: +- Ensures no more than 3 models are loaded at once (LRU eviction kicks in when exceeded) +- Automatically unloads any model that hasn't been used for 15 minutes +- Provides both hard limits and time-based cleanup + ## Limitations and Considerations ### VRAM Usage Estimation @@ -157,10 +231,11 @@ To stop all models, you'll need to call the endpoint for each loaded model indiv ### Best Practices 1. **Monitor VRAM usage**: Use `nvidia-smi` (for NVIDIA GPUs) or similar tools to monitor actual VRAM usage -2. **Start with single active backend**: For single-GPU systems, `--single-active-backend` is often the simplest solution -3. **Tune watchdog timeouts**: Adjust timeouts based on your usage patterns - shorter timeouts free memory faster but may cause more frequent reloads -4. **Consider model size**: Ensure your VRAM can accommodate at least one of your largest models -5. **Use quantization**: Smaller quantized models use less VRAM and allow more flexibility +2. **Set an appropriate backend limit**: For single-GPU systems, `--max-active-backends=1` is often the simplest solution. For systems with more VRAM, you can increase the limit to keep more models loaded +3. **Combine LRU with watchdog**: Use `--max-active-backends` to limit the number of loaded models, and enable idle watchdog to unload models that haven't been used recently +4. **Tune watchdog timeouts**: Adjust timeouts based on your usage patterns - shorter timeouts free memory faster but may cause more frequent reloads +5. **Consider model size**: Ensure your VRAM can accommodate at least one of your largest models +6. **Use quantization**: Smaller quantized models use less VRAM and allow more flexibility ## Related Documentation diff --git a/docs/content/features/runtime-settings.md b/docs/content/features/runtime-settings.md index 5e548c5c0..dcdc8143d 100644 --- a/docs/content/features/runtime-settings.md +++ b/docs/content/features/runtime-settings.md @@ -27,9 +27,11 @@ Changes to watchdog settings are applied immediately by restarting the watchdog ### Backend Configuration -- **Single Backend**: Allow only one backend to run at a time +- **Max Active Backends**: Maximum number of active backends (loaded models). When exceeded, the least recently used model is automatically evicted. Set to `0` for unlimited, `1` for single-backend mode - **Parallel Backend Requests**: Enable backends to handle multiple requests in parallel if supported +> **Note:** The "Single Backend" setting is deprecated. Use "Max Active Backends" set to `1` for single-backend behavior. + ### Performance Settings - **Threads**: Number of threads used for parallel computation (recommended: number of physical cores) @@ -90,7 +92,7 @@ The `runtime_settings.json` file follows this structure: "watchdog_busy_enabled": false, "watchdog_idle_timeout": "15m", "watchdog_busy_timeout": "5m", - "single_backend": false, + "max_active_backends": 0, "parallel_backend_requests": true, "threads": 8, "context_size": 2048, diff --git a/docs/content/reference/cli-reference.md b/docs/content/reference/cli-reference.md index 9a9965045..de7917ea4 100644 --- a/docs/content/reference/cli-reference.md +++ b/docs/content/reference/cli-reference.md @@ -39,7 +39,8 @@ Complete reference for all LocalAI command-line interface (CLI) parameters and e | `--backend-galleries` | | JSON list of backend galleries | `$LOCALAI_BACKEND_GALLERIES`, `$BACKEND_GALLERIES` | | `--autoload-backend-galleries` | `true` | Automatically load backend galleries on startup | `$LOCALAI_AUTOLOAD_BACKEND_GALLERIES`, `$AUTOLOAD_BACKEND_GALLERIES` | | `--parallel-requests` | `false` | Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm) | `$LOCALAI_PARALLEL_REQUESTS`, `$PARALLEL_REQUESTS` | -| `--single-active-backend` | `false` | Allow only one backend to be run at a time | `$LOCALAI_SINGLE_ACTIVE_BACKEND`, `$SINGLE_ACTIVE_BACKEND` | +| `--max-active-backends` | `0` | Maximum number of active backends (loaded models). When exceeded, the least recently used model is evicted. Set to `0` for unlimited, `1` for single-backend mode | `$LOCALAI_MAX_ACTIVE_BACKENDS`, `$MAX_ACTIVE_BACKENDS` | +| `--single-active-backend` | `false` | **DEPRECATED** - Use `--max-active-backends=1` instead. Allow only one backend to be run at a time | `$LOCALAI_SINGLE_ACTIVE_BACKEND`, `$SINGLE_ACTIVE_BACKEND` | | `--preload-backend-only` | `false` | Do not launch the API services, only the preloaded models/backends are started (useful for multi-node setups) | `$LOCALAI_PRELOAD_BACKEND_ONLY`, `$PRELOAD_BACKEND_ONLY` | | `--enable-watchdog-idle` | `false` | Enable watchdog for stopping backends that are idle longer than the watchdog-idle-timeout | `$LOCALAI_WATCHDOG_IDLE`, `$WATCHDOG_IDLE` | | `--watchdog-idle-timeout` | `15m` | Threshold beyond which an idle backend should be stopped | `$LOCALAI_WATCHDOG_IDLE_TIMEOUT`, `$WATCHDOG_IDLE_TIMEOUT` | diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index 9178a5265..f1989e82d 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -184,54 +184,45 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e return model.GRPC(o.parallelRequests, ml.wd), nil } -func (ml *ModelLoader) stopActiveBackends(modelID string) { - if !ml.singletonMode { +// enforceLRULimit enforces the LRU limit before loading a new model. +// This is called before loading a model to ensure we don't exceed the limit. +// It accounts for models that are currently being loaded by other goroutines. +func (ml *ModelLoader) enforceLRULimit() { + if ml.wd == nil { return } - - // If we can have only one backend active, kill all the others (except external backends) - - // Stop all backends except the one we are going to load - log.Debug().Msgf("Stopping all backends except '%s'", modelID) - err := ml.StopGRPC(allExcept(modelID)) - if err != nil { - log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing") - } + // Get the count of models currently being loaded to account for concurrent requests + pendingLoads := ml.GetLoadingCount() + ml.wd.EnforceLRULimit(pendingLoads) } -func (ml *ModelLoader) Close() { - if !ml.singletonMode { +// updateModelLastUsed updates the last used time for a model (for LRU tracking) +func (ml *ModelLoader) updateModelLastUsed(m *Model) { + if ml.wd == nil || m == nil { return } - ml.singletonLock.Unlock() -} - -func (ml *ModelLoader) lockBackend() { - if !ml.singletonMode { - return - } - ml.singletonLock.Lock() + ml.wd.UpdateLastUsed(m.address) } func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { - ml.lockBackend() // grab the singleton lock if needed - o := NewOptions(opts...) // Return earlier if we have a model already loaded // (avoid looping through all the backends) if m := ml.CheckIsLoaded(o.modelID); m != nil { log.Debug().Msgf("Model '%s' already loaded", o.modelID) + // Update last used time for LRU tracking + ml.updateModelLastUsed(m) return m.GRPC(o.parallelRequests, ml.wd), nil } - ml.stopActiveBackends(o.modelID) + // Enforce LRU limit before loading a new model + ml.enforceLRULimit() // if a backend is defined, return the loader directly if o.backendString != "" { client, err := ml.backendLoader(opts...) if err != nil { - ml.Close() return nil, err } return client, nil @@ -250,7 +241,6 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { if len(autoLoadBackends) == 0 { log.Error().Msg("No backends found") - ml.Close() return nil, fmt.Errorf("no backends found") } @@ -277,7 +267,5 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { } } - ml.Close() // make sure to release the lock in case of failure - return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error()) } diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 2ef6ccddc..307b600d3 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -22,24 +22,32 @@ import ( type ModelLoader struct { ModelPath string mu sync.Mutex - singletonLock sync.Mutex - singletonMode bool models map[string]*Model + loading map[string]chan struct{} // tracks models currently being loaded wd *WatchDog externalBackends map[string]string } -func NewModelLoader(system *system.SystemState, singleActiveBackend bool) *ModelLoader { +// NewModelLoader creates a new ModelLoader instance. +// LRU eviction is now managed through the WatchDog component. +func NewModelLoader(system *system.SystemState) *ModelLoader { nml := &ModelLoader{ ModelPath: system.Model.ModelsPath, models: make(map[string]*Model), - singletonMode: singleActiveBackend, + loading: make(map[string]chan struct{}), externalBackends: make(map[string]string), } return nml } +// GetLoadingCount returns the number of models currently being loaded +func (ml *ModelLoader) GetLoadingCount() int { + ml.mu.Lock() + defer ml.mu.Unlock() + return len(ml.loading) +} + func (ml *ModelLoader) SetWatchDog(wd *WatchDog) { ml.wd = wd } @@ -154,14 +162,44 @@ func (ml *ModelLoader) ListLoadedModels() []*Model { func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, string, string) (*Model, error)) (*Model, error) { ml.mu.Lock() - defer ml.mu.Unlock() // Check if we already have a loaded model if model := ml.checkIsLoaded(modelID); model != nil { + ml.mu.Unlock() return model, nil } - // Load the model and keep it in memory for later use + // Check if another goroutine is already loading this model + if loadingChan, isLoading := ml.loading[modelID]; isLoading { + ml.mu.Unlock() + // Wait for the other goroutine to finish loading + log.Debug().Str("modelID", modelID).Msg("Waiting for model to be loaded by another request") + <-loadingChan + // Now check if the model is loaded + ml.mu.Lock() + model := ml.checkIsLoaded(modelID) + ml.mu.Unlock() + if model != nil { + return model, nil + } + // If still not loaded, the other goroutine failed - we'll try again + return ml.LoadModel(modelID, modelName, loader) + } + + // Mark this model as loading (create a channel that will be closed when done) + loadingChan := make(chan struct{}) + ml.loading[modelID] = loadingChan + ml.mu.Unlock() + + // Ensure we clean up the loading state when done + defer func() { + ml.mu.Lock() + delete(ml.loading, modelID) + close(loadingChan) + ml.mu.Unlock() + }() + + // Load the model (this can take a long time, no lock held) modelFile := filepath.Join(ml.ModelPath, modelName) log.Debug().Msgf("Loading model in memory from file: %s", modelFile) @@ -174,7 +212,10 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string, return nil, fmt.Errorf("loader didn't return a model") } + // Add to models map + ml.mu.Lock() ml.models[modelID] = model + ml.mu.Unlock() return model, nil } diff --git a/pkg/model/loader_test.go b/pkg/model/loader_test.go index a0199a4ca..25b379732 100644 --- a/pkg/model/loader_test.go +++ b/pkg/model/loader_test.go @@ -4,6 +4,9 @@ import ( "errors" "os" "path/filepath" + "sync" + "sync/atomic" + "time" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" @@ -27,7 +30,7 @@ var _ = Describe("ModelLoader", func() { system.WithModelPath(modelPath), ) Expect(err).ToNot(HaveOccurred()) - modelLoader = model.NewModelLoader(systemState, false) + modelLoader = model.NewModelLoader(systemState) }) AfterEach(func() { @@ -106,4 +109,157 @@ var _ = Describe("ModelLoader", func() { Expect(modelLoader.CheckIsLoaded("foo")).To(BeNil()) }) }) + + Context("Concurrent Loading", func() { + It("should handle concurrent requests for the same model", func() { + var loadCount int32 + mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) { + atomic.AddInt32(&loadCount, 1) + time.Sleep(100 * time.Millisecond) // Simulate loading time + return model.NewModel(modelID, modelName, nil), nil + } + + var wg sync.WaitGroup + results := make([]*model.Model, 5) + errs := make([]error, 5) + + // Start 5 concurrent requests for the same model + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx], errs[idx] = modelLoader.LoadModel("concurrent-model", "test.model", mockLoader) + }(i) + } + + wg.Wait() + + // All requests should succeed + for i := 0; i < 5; i++ { + Expect(errs[i]).To(BeNil()) + Expect(results[i]).ToNot(BeNil()) + } + + // The loader should only have been called once + Expect(atomic.LoadInt32(&loadCount)).To(Equal(int32(1))) + + // All results should be the same model instance + for i := 1; i < 5; i++ { + Expect(results[i]).To(Equal(results[0])) + } + }) + + It("should handle concurrent requests for different models", func() { + var loadCount int32 + mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) { + atomic.AddInt32(&loadCount, 1) + time.Sleep(50 * time.Millisecond) // Simulate loading time + return model.NewModel(modelID, modelName, nil), nil + } + + var wg sync.WaitGroup + modelCount := 3 + + // Start concurrent requests for different models + for i := 0; i < modelCount; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + modelID := "model-" + string(rune('A'+idx)) + _, err := modelLoader.LoadModel(modelID, "test.model", mockLoader) + Expect(err).To(BeNil()) + }(i) + } + + wg.Wait() + + // Each model should be loaded exactly once + Expect(atomic.LoadInt32(&loadCount)).To(Equal(int32(modelCount))) + + // All models should be loaded + Expect(modelLoader.CheckIsLoaded("model-A")).ToNot(BeNil()) + Expect(modelLoader.CheckIsLoaded("model-B")).ToNot(BeNil()) + Expect(modelLoader.CheckIsLoaded("model-C")).ToNot(BeNil()) + }) + + It("should track loading count correctly", func() { + loadStarted := make(chan struct{}) + loadComplete := make(chan struct{}) + + mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) { + close(loadStarted) + <-loadComplete // Wait until we're told to complete + return model.NewModel(modelID, modelName, nil), nil + } + + // Start loading in background + go func() { + modelLoader.LoadModel("slow-model", "test.model", mockLoader) + }() + + // Wait for loading to start + <-loadStarted + + // Loading count should be 1 + Expect(modelLoader.GetLoadingCount()).To(Equal(1)) + + // Complete the loading + close(loadComplete) + + // Wait a bit for cleanup + time.Sleep(50 * time.Millisecond) + + // Loading count should be back to 0 + Expect(modelLoader.GetLoadingCount()).To(Equal(0)) + }) + + It("should retry loading if first attempt fails", func() { + var attemptCount int32 + mockLoader := func(modelID, modelName, modelFile string) (*model.Model, error) { + count := atomic.AddInt32(&attemptCount, 1) + if count == 1 { + return nil, errors.New("first attempt fails") + } + return model.NewModel(modelID, modelName, nil), nil + } + + // First goroutine will fail + var wg sync.WaitGroup + wg.Add(2) + + var err1, err2 error + var m1, m2 *model.Model + + go func() { + defer wg.Done() + m1, err1 = modelLoader.LoadModel("retry-model", "test.model", mockLoader) + }() + + // Give first goroutine a head start + time.Sleep(10 * time.Millisecond) + + go func() { + defer wg.Done() + m2, err2 = modelLoader.LoadModel("retry-model", "test.model", mockLoader) + }() + + wg.Wait() + + // At least one should succeed (the second attempt after retry) + successCount := 0 + if err1 == nil && m1 != nil { + successCount++ + } + if err2 == nil && m2 != nil { + successCount++ + } + Expect(successCount).To(BeNumerically(">=", 1)) + }) + }) + + Context("GetLoadingCount", func() { + It("should return 0 when nothing is loading", func() { + Expect(modelLoader.GetLoadingCount()).To(Equal(0)) + }) + }) }) diff --git a/pkg/model/watchdog.go b/pkg/model/watchdog.go index e279a9edc..4feb49c35 100644 --- a/pkg/model/watchdog.go +++ b/pkg/model/watchdog.go @@ -1,6 +1,7 @@ package model import ( + "sort" "sync" "time" @@ -13,13 +14,16 @@ import ( // watchdog that will keep track of the state of each backend (busy or not) // and for how much time it has been busy. // If a backend is busy for too long, the watchdog will kill the process and -// force a reload of the model +// force a reload of the model. +// The watchdog also supports LRU (Least Recently Used) eviction when a maximum +// number of active backends is configured. // The watchdog runs as a separate go routine, // and the GRPC client talks to it via a channel to send status updates type WatchDog struct { sync.Mutex - timetable map[string]time.Time + busyTime map[string]time.Time idleTime map[string]time.Time + lastUsed map[string]time.Time // LRU tracking: when each model was last used timeout, idletimeout time.Duration addressMap map[string]*process.Process addressModelMap map[string]string @@ -27,27 +31,44 @@ type WatchDog struct { stop chan bool busyCheck, idleCheck bool + lruLimit int // Maximum number of active backends (0 = unlimited) } type ProcessManager interface { ShutdownModel(modelName string) error } -func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog { +func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool, lruLimit int) *WatchDog { return &WatchDog{ timeout: timeoutBusy, idletimeout: timeoutIdle, pm: pm, - timetable: make(map[string]time.Time), + busyTime: make(map[string]time.Time), idleTime: make(map[string]time.Time), + lastUsed: make(map[string]time.Time), addressMap: make(map[string]*process.Process), busyCheck: busy, idleCheck: idle, + lruLimit: lruLimit, addressModelMap: make(map[string]string), stop: make(chan bool, 1), } } +// SetLRULimit updates the LRU limit dynamically +func (wd *WatchDog) SetLRULimit(limit int) { + wd.Lock() + defer wd.Unlock() + wd.lruLimit = limit +} + +// GetLRULimit returns the current LRU limit +func (wd *WatchDog) GetLRULimit() int { + wd.Lock() + defer wd.Unlock() + return wd.lruLimit +} + func (wd *WatchDog) Shutdown() { wd.Lock() defer wd.Unlock() @@ -70,15 +91,107 @@ func (wd *WatchDog) Add(address string, p *process.Process) { func (wd *WatchDog) Mark(address string) { wd.Lock() defer wd.Unlock() - wd.timetable[address] = time.Now() + now := time.Now() + wd.busyTime[address] = now + wd.lastUsed[address] = now // Update LRU tracking delete(wd.idleTime, address) } func (wd *WatchDog) UnMark(ModelAddress string) { wd.Lock() defer wd.Unlock() - delete(wd.timetable, ModelAddress) - wd.idleTime[ModelAddress] = time.Now() + now := time.Now() + delete(wd.busyTime, ModelAddress) + wd.idleTime[ModelAddress] = now + wd.lastUsed[ModelAddress] = now // Update LRU tracking +} + +// UpdateLastUsed updates the last used time for a model address (for LRU tracking) +// This should be called when a model is accessed (e.g., when checking if loaded) +func (wd *WatchDog) UpdateLastUsed(address string) { + wd.Lock() + defer wd.Unlock() + wd.lastUsed[address] = time.Now() +} + +// GetLoadedModelCount returns the number of currently loaded models tracked by the watchdog +func (wd *WatchDog) GetLoadedModelCount() int { + wd.Lock() + defer wd.Unlock() + return len(wd.addressModelMap) +} + +// modelUsageInfo holds information about a model's usage for LRU sorting +type modelUsageInfo struct { + address string + model string + lastUsed time.Time +} + +// EnforceLRULimit ensures we're under the LRU limit by evicting least recently used models. +// This should be called before loading a new model. +// pendingLoads is the number of models currently being loaded (to account for concurrent loads). +// Returns the number of models evicted. +func (wd *WatchDog) EnforceLRULimit(pendingLoads int) int { + if wd.lruLimit <= 0 { + return 0 // LRU disabled + } + + wd.Lock() + + currentCount := len(wd.addressModelMap) + // We need to evict enough to make room for the new model AND any pending loads + // Total after loading = currentCount + pendingLoads + 1 (the new one we're about to load) + // We need: currentCount + pendingLoads + 1 <= lruLimit + // So evict: currentCount + pendingLoads + 1 - lruLimit = currentCount - lruLimit + pendingLoads + 1 + modelsToEvict := currentCount - wd.lruLimit + pendingLoads + 1 + if modelsToEvict <= 0 { + wd.Unlock() + return 0 + } + + log.Debug().Int("current", currentCount).Int("pendingLoads", pendingLoads).Int("limit", wd.lruLimit).Int("toEvict", modelsToEvict).Msg("[WatchDog] LRU enforcement triggered") + + // Build a list of models sorted by last used time (oldest first) + var models []modelUsageInfo + for address, model := range wd.addressModelMap { + lastUsed := wd.lastUsed[address] + if lastUsed.IsZero() { + // If no lastUsed recorded, use a very old time + lastUsed = time.Time{} + } + models = append(models, modelUsageInfo{ + address: address, + model: model, + lastUsed: lastUsed, + }) + } + + // Sort by lastUsed time (oldest first) + sort.Slice(models, func(i, j int) bool { + return models[i].lastUsed.Before(models[j].lastUsed) + }) + + // Collect models to evict (the oldest ones) + var modelsToShutdown []string + for i := 0; i < modelsToEvict && i < len(models); i++ { + m := models[i] + log.Info().Str("model", m.model).Time("lastUsed", m.lastUsed).Msg("[WatchDog] LRU evicting model") + modelsToShutdown = append(modelsToShutdown, m.model) + // Clean up the maps while we have the lock + wd.untrack(m.address) + } + wd.Unlock() + + // Now shutdown models without holding the watchdog lock to prevent deadlock + for _, model := range modelsToShutdown { + if err := wd.pm.ShutdownModel(model); err != nil { + log.Error().Err(err).Str("model", model).Msg("[WatchDog] error shutting down model during LRU eviction") + } + log.Debug().Str("model", model).Msg("[WatchDog] LRU eviction complete") + } + + return len(modelsToShutdown) } func (wd *WatchDog) Run() { @@ -117,14 +230,10 @@ func (wd *WatchDog) checkIdle() { model, ok := wd.addressModelMap[address] if ok { modelsToShutdown = append(modelsToShutdown, model) - // Clean up the maps while we have the lock - delete(wd.idleTime, address) - delete(wd.addressModelMap, address) - delete(wd.addressMap, address) } else { log.Warn().Msgf("[WatchDog] Address %s unresolvable", address) - delete(wd.idleTime, address) } + wd.untrack(address) } } wd.Unlock() @@ -144,7 +253,7 @@ func (wd *WatchDog) checkBusy() { // Collect models to shutdown while holding the lock var modelsToShutdown []string - for address, t := range wd.timetable { + for address, t := range wd.busyTime { log.Debug().Msgf("[WatchDog] %s: active connection", address) if time.Since(t) > wd.timeout { @@ -152,14 +261,10 @@ func (wd *WatchDog) checkBusy() { if ok { log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model) modelsToShutdown = append(modelsToShutdown, model) - // Clean up the maps while we have the lock - delete(wd.timetable, address) - delete(wd.addressModelMap, address) - delete(wd.addressMap, address) } else { log.Warn().Msgf("[WatchDog] Address %s unresolvable", address) - delete(wd.timetable, address) } + wd.untrack(address) } } wd.Unlock() @@ -172,3 +277,11 @@ func (wd *WatchDog) checkBusy() { log.Debug().Msgf("[WatchDog] model shut down: %s", model) } } + +func (wd *WatchDog) untrack(address string) { + delete(wd.busyTime, address) + delete(wd.idleTime, address) + delete(wd.lastUsed, address) + delete(wd.addressModelMap, address) + delete(wd.addressMap, address) +} diff --git a/pkg/model/watchdog_test.go b/pkg/model/watchdog_test.go new file mode 100644 index 000000000..30d7ffc66 --- /dev/null +++ b/pkg/model/watchdog_test.go @@ -0,0 +1,244 @@ +package model_test + +import ( + "sync" + "time" + + "github.com/mudler/LocalAI/pkg/model" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// mockProcessManager implements ProcessManager for testing +type mockProcessManager struct { + mu sync.Mutex + shutdownCalls []string + shutdownErrors map[string]error +} + +func newMockProcessManager() *mockProcessManager { + return &mockProcessManager{ + shutdownCalls: []string{}, + shutdownErrors: make(map[string]error), + } +} + +func (m *mockProcessManager) ShutdownModel(modelName string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.shutdownCalls = append(m.shutdownCalls, modelName) + if err, ok := m.shutdownErrors[modelName]; ok { + return err + } + return nil +} + +func (m *mockProcessManager) getShutdownCalls() []string { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]string, len(m.shutdownCalls)) + copy(result, m.shutdownCalls) + return result +} + +var _ = Describe("WatchDog", func() { + var ( + wd *model.WatchDog + pm *mockProcessManager + ) + + BeforeEach(func() { + pm = newMockProcessManager() + }) + + Context("LRU Limit", func() { + It("should create watchdog with LRU limit", func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2) + Expect(wd.GetLRULimit()).To(Equal(2)) + }) + + It("should allow updating LRU limit dynamically", func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2) + wd.SetLRULimit(5) + Expect(wd.GetLRULimit()).To(Equal(5)) + }) + + It("should return 0 for disabled LRU", func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 0) + Expect(wd.GetLRULimit()).To(Equal(0)) + }) + }) + + Context("Model Tracking", func() { + BeforeEach(func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 3) + }) + + It("should track loaded models count", func() { + Expect(wd.GetLoadedModelCount()).To(Equal(0)) + + wd.AddAddressModelMap("addr1", "model1") + Expect(wd.GetLoadedModelCount()).To(Equal(1)) + + wd.AddAddressModelMap("addr2", "model2") + Expect(wd.GetLoadedModelCount()).To(Equal(2)) + }) + + It("should update lastUsed time on Mark", func() { + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + // The model should now have a lastUsed time set + // We can verify this indirectly through LRU eviction behavior + }) + + It("should update lastUsed time on UnMark", func() { + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + time.Sleep(10 * time.Millisecond) + wd.UnMark("addr1") + // The model should now have an updated lastUsed time + }) + + It("should update lastUsed time via UpdateLastUsed", func() { + wd.AddAddressModelMap("addr1", "model1") + wd.UpdateLastUsed("addr1") + // Verify the time was updated + }) + }) + + Context("EnforceLRULimit", func() { + BeforeEach(func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 2) + }) + + It("should not evict when under limit", func() { + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(Equal(0)) + Expect(pm.getShutdownCalls()).To(BeEmpty()) + }) + + It("should evict oldest model when at limit", func() { + // Add two models + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + time.Sleep(10 * time.Millisecond) + + wd.AddAddressModelMap("addr2", "model2") + wd.Mark("addr2") + + // Enforce LRU with limit of 2 (need to make room for 1 new model) + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(Equal(1)) + Expect(pm.getShutdownCalls()).To(ContainElement("model1")) // oldest should be evicted + }) + + It("should evict multiple models when needed", func() { + // Add three models + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + time.Sleep(10 * time.Millisecond) + + wd.AddAddressModelMap("addr2", "model2") + wd.Mark("addr2") + time.Sleep(10 * time.Millisecond) + + wd.AddAddressModelMap("addr3", "model3") + wd.Mark("addr3") + + // Set limit to 1, should evict 2 oldest + 1 for new = 3 evictions + wd.SetLRULimit(1) + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(Equal(3)) + shutdowns := pm.getShutdownCalls() + Expect(shutdowns).To(ContainElement("model1")) + Expect(shutdowns).To(ContainElement("model2")) + Expect(shutdowns).To(ContainElement("model3")) + }) + + It("should account for pending loads", func() { + // Add two models (at limit) + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + time.Sleep(10 * time.Millisecond) + + wd.AddAddressModelMap("addr2", "model2") + wd.Mark("addr2") + + // With 1 pending load, we need to evict 2 (current=2, pending=1, new=1, limit=2) + // total after = 2 + 1 + 1 = 4, need to evict 4 - 2 = 2 + evicted := wd.EnforceLRULimit(1) + Expect(evicted).To(Equal(2)) + }) + + It("should not evict when LRU is disabled", func() { + wd.SetLRULimit(0) + + wd.AddAddressModelMap("addr1", "model1") + wd.AddAddressModelMap("addr2", "model2") + wd.AddAddressModelMap("addr3", "model3") + + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(Equal(0)) + Expect(pm.getShutdownCalls()).To(BeEmpty()) + }) + + It("should evict least recently used first", func() { + wd.SetLRULimit(2) + + // Add models with different lastUsed times + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + time.Sleep(20 * time.Millisecond) + + wd.AddAddressModelMap("addr2", "model2") + wd.Mark("addr2") + time.Sleep(20 * time.Millisecond) + + // Touch model1 again to make it more recent + wd.UpdateLastUsed("addr1") + time.Sleep(20 * time.Millisecond) + + wd.AddAddressModelMap("addr3", "model3") + wd.Mark("addr3") + + // Now model2 is the oldest, should be evicted first + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(BeNumerically(">=", 1)) + + shutdowns := pm.getShutdownCalls() + // model2 should be evicted first (it's the oldest) + if len(shutdowns) >= 1 { + Expect(shutdowns[0]).To(Equal("model2")) + } + }) + }) + + Context("Single Backend Mode (LRU=1)", func() { + BeforeEach(func() { + wd = model.NewWatchDog(pm, 5*time.Minute, 15*time.Minute, false, false, 1) + }) + + It("should evict existing model when loading new one", func() { + wd.AddAddressModelMap("addr1", "model1") + wd.Mark("addr1") + + // With limit=1, loading a new model should evict the existing one + evicted := wd.EnforceLRULimit(0) + Expect(evicted).To(Equal(1)) + Expect(pm.getShutdownCalls()).To(ContainElement("model1")) + }) + + It("should handle rapid model switches", func() { + for i := 0; i < 5; i++ { + wd.AddAddressModelMap("addr", "model") + wd.Mark("addr") + wd.EnforceLRULimit(0) + } + // All previous models should have been evicted + Expect(len(pm.getShutdownCalls())).To(Equal(5)) + }) + }) +}) diff --git a/tests/integration/stores_test.go b/tests/integration/stores_test.go index 2cb3afd8a..e423e957e 100644 --- a/tests/integration/stores_test.go +++ b/tests/integration/stores_test.go @@ -63,7 +63,7 @@ var _ = Describe("Integration tests for the stores backend(s) and internal APIs" ) Expect(err).ToNot(HaveOccurred()) - sl = model.NewModelLoader(systemState, false) + sl = model.NewModelLoader(systemState) sc, err = sl.Load(storeOpts...) Expect(err).ToNot(HaveOccurred()) Expect(sc).ToNot(BeNil())