feat(ui): runtime settings (#7320)

* feat(ui): add watchdog settings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Do not re-read env

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Some refactor, move other settings to runtime (p2p)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Add API Keys handling

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Allow to disable runtime settings

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Documentation

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Small fixups

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* show MCP toggle in index

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Drop context default

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2025-11-20 22:37:20 +01:00
committed by GitHub
parent 53d51671d7
commit 2dd42292dc
30 changed files with 2245 additions and 438 deletions

View File

@@ -1,6 +1,9 @@
package application
import (
"context"
"sync"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services"
"github.com/mudler/LocalAI/core/templates"
@@ -11,8 +14,14 @@ type Application struct {
backendLoader *config.ModelConfigLoader
modelLoader *model.ModelLoader
applicationConfig *config.ApplicationConfig
startupConfig *config.ApplicationConfig // Stores original config from env vars (before file loading)
templatesEvaluator *templates.Evaluator
galleryService *services.GalleryService
watchdogMutex sync.Mutex
watchdogStop chan bool
p2pMutex sync.Mutex
p2pCtx context.Context
p2pCancel context.CancelFunc
}
func newApplication(appConfig *config.ApplicationConfig) *Application {
@@ -44,6 +53,11 @@ func (a *Application) GalleryService() *services.GalleryService {
return a.galleryService
}
// StartupConfig returns the original startup configuration (from env vars, before file loading)
func (a *Application) StartupConfig() *config.ApplicationConfig {
return a.startupConfig
}
func (a *Application) start() error {
galleryService := services.NewGalleryService(a.ApplicationConfig(), a.ModelLoader())
err := galleryService.Start(a.ApplicationConfig().Context, a.ModelConfigLoader(), a.ApplicationConfig().SystemState)

View File

@@ -1,180 +1,343 @@
package application
import (
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"time"
"dario.cat/mergo"
"github.com/fsnotify/fsnotify"
"github.com/mudler/LocalAI/core/config"
"github.com/rs/zerolog/log"
)
type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error
type configFileHandler struct {
handlers map[string]fileHandler
watcher *fsnotify.Watcher
appConfig *config.ApplicationConfig
}
// TODO: This should be a singleton eventually so other parts of the code can register config file handlers,
// then we can export it to other packages
func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler {
c := configFileHandler{
handlers: make(map[string]fileHandler),
appConfig: appConfig,
}
err := c.Register("api_keys.json", readApiKeysJson(*appConfig), true)
if err != nil {
log.Error().Err(err).Str("file", "api_keys.json").Msg("unable to register config file handler")
}
err = c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true)
if err != nil {
log.Error().Err(err).Str("file", "external_backends.json").Msg("unable to register config file handler")
}
return c
}
func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error {
_, ok := c.handlers[filename]
if ok {
return fmt.Errorf("handler already registered for file %s", filename)
}
c.handlers[filename] = handler
if runNow {
c.callHandler(filename, handler)
}
return nil
}
func (c *configFileHandler) callHandler(filename string, handler fileHandler) {
rootedFilePath := filepath.Join(c.appConfig.DynamicConfigsDir, filepath.Clean(filename))
log.Trace().Str("filename", rootedFilePath).Msg("reading file for dynamic config update")
fileContent, err := os.ReadFile(rootedFilePath)
if err != nil && !os.IsNotExist(err) {
log.Error().Err(err).Str("filename", rootedFilePath).Msg("could not read file")
}
if err = handler(fileContent, c.appConfig); err != nil {
log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options")
}
}
func (c *configFileHandler) Watch() error {
configWatcher, err := fsnotify.NewWatcher()
c.watcher = configWatcher
if err != nil {
return err
}
if c.appConfig.DynamicConfigsDirPollInterval > 0 {
log.Debug().Msg("Poll interval set, falling back to polling for configuration changes")
ticker := time.NewTicker(c.appConfig.DynamicConfigsDirPollInterval)
go func() {
for {
<-ticker.C
for file, handler := range c.handlers {
log.Debug().Str("file", file).Msg("polling config file")
c.callHandler(file, handler)
}
}
}()
}
// Start listening for events.
go func() {
for {
select {
case event, ok := <-c.watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) {
handler, ok := c.handlers[path.Base(event.Name)]
if !ok {
continue
}
c.callHandler(filepath.Base(event.Name), handler)
}
case err, ok := <-c.watcher.Errors:
log.Error().Err(err).Msg("config watcher error received")
if !ok {
return
}
}
}
}()
// Add a path.
err = c.watcher.Add(c.appConfig.DynamicConfigsDir)
if err != nil {
return fmt.Errorf("unable to create a watcher on the configuration directory: %+v", err)
}
return nil
}
// TODO: When we institute graceful shutdown, this should be called
func (c *configFileHandler) Stop() error {
return c.watcher.Close()
}
func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing api keys runtime update")
log.Trace().Int("numKeys", len(startupAppConfig.ApiKeys)).Msg("api keys provided at startup")
if len(fileContent) > 0 {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err != nil {
return err
}
log.Trace().Int("numKeys", len(fileKeys)).Msg("discovered API keys from api keys dynamic config dile")
appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...)
} else {
log.Trace().Msg("no API keys discovered from dynamic config file")
appConfig.ApiKeys = startupAppConfig.ApiKeys
}
log.Trace().Int("numKeys", len(appConfig.ApiKeys)).Msg("total api keys after processing")
return nil
}
return handler
}
func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing external_backends.json")
if len(fileContent) > 0 {
// Parse JSON content from the file
var fileBackends map[string]string
err := json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
}
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends)
if err != nil {
return err
}
} else {
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
}
log.Debug().Msg("external backends loaded from external_backends.json")
return nil
}
return handler
}
package application
import (
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"time"
"dario.cat/mergo"
"github.com/fsnotify/fsnotify"
"github.com/mudler/LocalAI/core/config"
"github.com/rs/zerolog/log"
)
type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error
type configFileHandler struct {
handlers map[string]fileHandler
watcher *fsnotify.Watcher
appConfig *config.ApplicationConfig
}
// TODO: This should be a singleton eventually so other parts of the code can register config file handlers,
// then we can export it to other packages
func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler {
c := configFileHandler{
handlers: make(map[string]fileHandler),
appConfig: appConfig,
}
err := c.Register("api_keys.json", readApiKeysJson(*appConfig), true)
if err != nil {
log.Error().Err(err).Str("file", "api_keys.json").Msg("unable to register config file handler")
}
err = c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true)
if err != nil {
log.Error().Err(err).Str("file", "external_backends.json").Msg("unable to register config file handler")
}
err = c.Register("runtime_settings.json", readRuntimeSettingsJson(*appConfig), true)
if err != nil {
log.Error().Err(err).Str("file", "runtime_settings.json").Msg("unable to register config file handler")
}
return c
}
func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error {
_, ok := c.handlers[filename]
if ok {
return fmt.Errorf("handler already registered for file %s", filename)
}
c.handlers[filename] = handler
if runNow {
c.callHandler(filename, handler)
}
return nil
}
func (c *configFileHandler) callHandler(filename string, handler fileHandler) {
rootedFilePath := filepath.Join(c.appConfig.DynamicConfigsDir, filepath.Clean(filename))
log.Trace().Str("filename", rootedFilePath).Msg("reading file for dynamic config update")
fileContent, err := os.ReadFile(rootedFilePath)
if err != nil && !os.IsNotExist(err) {
log.Error().Err(err).Str("filename", rootedFilePath).Msg("could not read file")
}
if err = handler(fileContent, c.appConfig); err != nil {
log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options")
}
}
func (c *configFileHandler) Watch() error {
configWatcher, err := fsnotify.NewWatcher()
c.watcher = configWatcher
if err != nil {
return err
}
if c.appConfig.DynamicConfigsDirPollInterval > 0 {
log.Debug().Msg("Poll interval set, falling back to polling for configuration changes")
ticker := time.NewTicker(c.appConfig.DynamicConfigsDirPollInterval)
go func() {
for {
<-ticker.C
for file, handler := range c.handlers {
log.Debug().Str("file", file).Msg("polling config file")
c.callHandler(file, handler)
}
}
}()
}
// Start listening for events.
go func() {
for {
select {
case event, ok := <-c.watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) {
handler, ok := c.handlers[path.Base(event.Name)]
if !ok {
continue
}
c.callHandler(filepath.Base(event.Name), handler)
}
case err, ok := <-c.watcher.Errors:
log.Error().Err(err).Msg("config watcher error received")
if !ok {
return
}
}
}
}()
// Add a path.
err = c.watcher.Add(c.appConfig.DynamicConfigsDir)
if err != nil {
return fmt.Errorf("unable to create a watcher on the configuration directory: %+v", err)
}
return nil
}
// TODO: When we institute graceful shutdown, this should be called
func (c *configFileHandler) Stop() error {
return c.watcher.Close()
}
func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing api keys runtime update")
log.Trace().Int("numKeys", len(startupAppConfig.ApiKeys)).Msg("api keys provided at startup")
if len(fileContent) > 0 {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err != nil {
return err
}
log.Trace().Int("numKeys", len(fileKeys)).Msg("discovered API keys from api keys dynamic config dile")
appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...)
} else {
log.Trace().Msg("no API keys discovered from dynamic config file")
appConfig.ApiKeys = startupAppConfig.ApiKeys
}
log.Trace().Int("numKeys", len(appConfig.ApiKeys)).Msg("total api keys after processing")
return nil
}
return handler
}
func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing external_backends.json")
if len(fileContent) > 0 {
// Parse JSON content from the file
var fileBackends map[string]string
err := json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
}
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends)
if err != nil {
return err
}
} else {
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
}
log.Debug().Msg("external backends loaded from external_backends.json")
return nil
}
return handler
}
type runtimeSettings struct {
WatchdogEnabled *bool `json:"watchdog_enabled,omitempty"`
WatchdogIdleEnabled *bool `json:"watchdog_idle_enabled,omitempty"`
WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"`
WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"`
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
SingleBackend *bool `json:"single_backend,omitempty"`
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
Threads *int `json:"threads,omitempty"`
ContextSize *int `json:"context_size,omitempty"`
F16 *bool `json:"f16,omitempty"`
Debug *bool `json:"debug,omitempty"`
CORS *bool `json:"cors,omitempty"`
CSRF *bool `json:"csrf,omitempty"`
CORSAllowOrigins *string `json:"cors_allow_origins,omitempty"`
P2PToken *string `json:"p2p_token,omitempty"`
P2PNetworkID *string `json:"p2p_network_id,omitempty"`
Federated *bool `json:"federated,omitempty"`
Galleries *[]config.Gallery `json:"galleries,omitempty"`
BackendGalleries *[]config.Gallery `json:"backend_galleries,omitempty"`
AutoloadGalleries *bool `json:"autoload_galleries,omitempty"`
AutoloadBackendGalleries *bool `json:"autoload_backend_galleries,omitempty"`
ApiKeys *[]string `json:"api_keys,omitempty"`
}
func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing runtime_settings.json")
// Determine if settings came from env vars by comparing with startup config
// startupAppConfig contains the original values set from env vars at startup.
// If current values match startup values, they came from env vars (or defaults).
// We apply file settings only if current values match startup values (meaning not from env vars).
envWatchdogIdle := appConfig.WatchDogIdle == startupAppConfig.WatchDogIdle
envWatchdogBusy := appConfig.WatchDogBusy == startupAppConfig.WatchDogBusy
envWatchdogIdleTimeout := appConfig.WatchDogIdleTimeout == startupAppConfig.WatchDogIdleTimeout
envWatchdogBusyTimeout := appConfig.WatchDogBusyTimeout == startupAppConfig.WatchDogBusyTimeout
envSingleBackend := appConfig.SingleBackend == startupAppConfig.SingleBackend
envParallelRequests := appConfig.ParallelBackendRequests == startupAppConfig.ParallelBackendRequests
envThreads := appConfig.Threads == startupAppConfig.Threads
envContextSize := appConfig.ContextSize == startupAppConfig.ContextSize
envF16 := appConfig.F16 == startupAppConfig.F16
envDebug := appConfig.Debug == startupAppConfig.Debug
envCORS := appConfig.CORS == startupAppConfig.CORS
envCSRF := appConfig.CSRF == startupAppConfig.CSRF
envCORSAllowOrigins := appConfig.CORSAllowOrigins == startupAppConfig.CORSAllowOrigins
envP2PToken := appConfig.P2PToken == startupAppConfig.P2PToken
envP2PNetworkID := appConfig.P2PNetworkID == startupAppConfig.P2PNetworkID
envFederated := appConfig.Federated == startupAppConfig.Federated
envAutoloadGalleries := appConfig.AutoloadGalleries == startupAppConfig.AutoloadGalleries
envAutoloadBackendGalleries := appConfig.AutoloadBackendGalleries == startupAppConfig.AutoloadBackendGalleries
if len(fileContent) > 0 {
var settings runtimeSettings
err := json.Unmarshal(fileContent, &settings)
if err != nil {
return err
}
// Apply file settings only if they don't match startup values (i.e., not from env vars)
if settings.WatchdogIdleEnabled != nil && !envWatchdogIdle {
appConfig.WatchDogIdle = *settings.WatchdogIdleEnabled
if appConfig.WatchDogIdle {
appConfig.WatchDog = true
}
}
if settings.WatchdogBusyEnabled != nil && !envWatchdogBusy {
appConfig.WatchDogBusy = *settings.WatchdogBusyEnabled
if appConfig.WatchDogBusy {
appConfig.WatchDog = true
}
}
if settings.WatchdogIdleTimeout != nil && !envWatchdogIdleTimeout {
dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout)
if err == nil {
appConfig.WatchDogIdleTimeout = dur
} else {
log.Warn().Err(err).Str("timeout", *settings.WatchdogIdleTimeout).Msg("invalid watchdog idle timeout in runtime_settings.json")
}
}
if settings.WatchdogBusyTimeout != nil && !envWatchdogBusyTimeout {
dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout)
if err == nil {
appConfig.WatchDogBusyTimeout = dur
} else {
log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json")
}
}
if settings.SingleBackend != nil && !envSingleBackend {
appConfig.SingleBackend = *settings.SingleBackend
}
if settings.ParallelBackendRequests != nil && !envParallelRequests {
appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests
}
if settings.Threads != nil && !envThreads {
appConfig.Threads = *settings.Threads
}
if settings.ContextSize != nil && !envContextSize {
appConfig.ContextSize = *settings.ContextSize
}
if settings.F16 != nil && !envF16 {
appConfig.F16 = *settings.F16
}
if settings.Debug != nil && !envDebug {
appConfig.Debug = *settings.Debug
}
if settings.CORS != nil && !envCORS {
appConfig.CORS = *settings.CORS
}
if settings.CSRF != nil && !envCSRF {
appConfig.CSRF = *settings.CSRF
}
if settings.CORSAllowOrigins != nil && !envCORSAllowOrigins {
appConfig.CORSAllowOrigins = *settings.CORSAllowOrigins
}
if settings.P2PToken != nil && !envP2PToken {
appConfig.P2PToken = *settings.P2PToken
}
if settings.P2PNetworkID != nil && !envP2PNetworkID {
appConfig.P2PNetworkID = *settings.P2PNetworkID
}
if settings.Federated != nil && !envFederated {
appConfig.Federated = *settings.Federated
}
if settings.Galleries != nil {
appConfig.Galleries = *settings.Galleries
}
if settings.BackendGalleries != nil {
appConfig.BackendGalleries = *settings.BackendGalleries
}
if settings.AutoloadGalleries != nil && !envAutoloadGalleries {
appConfig.AutoloadGalleries = *settings.AutoloadGalleries
}
if settings.AutoloadBackendGalleries != nil && !envAutoloadBackendGalleries {
appConfig.AutoloadBackendGalleries = *settings.AutoloadBackendGalleries
}
if settings.ApiKeys != nil {
// API keys from env vars (startup) should be kept, runtime settings keys replace all runtime keys
// If runtime_settings.json specifies ApiKeys (even if empty), it replaces all runtime keys
// Start with env keys, then add runtime_settings.json keys (which may be empty to clear them)
envKeys := startupAppConfig.ApiKeys
runtimeKeys := *settings.ApiKeys
// Replace all runtime keys with what's in runtime_settings.json
appConfig.ApiKeys = append(envKeys, runtimeKeys...)
}
// If watchdog is enabled via file but not via env, ensure WatchDog flag is set
if !envWatchdogIdle && !envWatchdogBusy {
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
appConfig.WatchDog = true
}
}
}
log.Debug().Msg("runtime settings loaded from runtime_settings.json")
return nil
}
return handler
}

240
core/application/p2p.go Normal file
View File

@@ -0,0 +1,240 @@
package application
import (
"context"
"fmt"
"net"
"slices"
"time"
"github.com/google/uuid"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services"
"github.com/mudler/edgevpn/pkg/node"
"github.com/rs/zerolog/log"
zlog "github.com/rs/zerolog/log"
)
func (a *Application) StopP2P() error {
if a.p2pCancel != nil {
a.p2pCancel()
a.p2pCancel = nil
a.p2pCtx = nil
// Wait a bit for shutdown to complete
time.Sleep(200 * time.Millisecond)
}
return nil
}
func (a *Application) StartP2P() error {
// we need a p2p token
if a.applicationConfig.P2PToken == "" {
return fmt.Errorf("P2P token is not set")
}
networkID := a.applicationConfig.P2PNetworkID
ctx, cancel := context.WithCancel(a.ApplicationConfig().Context)
a.p2pCtx = ctx
a.p2pCancel = cancel
var n *node.Node
// Here we are avoiding creating multiple nodes:
// - if the federated mode is enabled, we create a federated node and expose a service
// - exposing a service creates a node with specific options, and we don't want to create another node
// If the federated mode is enabled, we expose a service to the local instance running
// at r.Address
if a.applicationConfig.Federated {
_, port, err := net.SplitHostPort(a.applicationConfig.APIAddress)
if err != nil {
return err
}
// Here a new node is created and started
// and a service is exposed by the node
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID))
if err != nil {
return err
}
if err := p2p.ServiceDiscoverer(ctx, node, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
return err
}
n = node
// start node sync in the background
if err := a.p2pSync(ctx, node); err != nil {
return err
}
}
// If a node wasn't created previously, create it
if n == nil {
node, err := p2p.NewNode(a.applicationConfig.P2PToken)
if err != nil {
return err
}
err = node.Start(ctx)
if err != nil {
return fmt.Errorf("starting new node: %w", err)
}
n = node
}
// Attach a ServiceDiscoverer to the p2p node
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(ctx, n, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
if a.applicationConfig.TunnelCallback != nil {
a.applicationConfig.TunnelCallback(tunnelAddresses)
}
}, true); err != nil {
return err
}
return nil
}
// RestartP2P restarts the P2P stack with current ApplicationConfig settings
// Note: This method signals that P2P should be restarted, but the actual restart
// is handled by the caller to avoid import cycles
func (a *Application) RestartP2P() error {
a.p2pMutex.Lock()
defer a.p2pMutex.Unlock()
// Stop existing P2P if running
if a.p2pCancel != nil {
a.p2pCancel()
a.p2pCancel = nil
a.p2pCtx = nil
// Wait a bit for shutdown to complete
time.Sleep(200 * time.Millisecond)
}
appConfig := a.ApplicationConfig()
// Start P2P if token is set
if appConfig.P2PToken == "" {
return fmt.Errorf("P2P token is not set")
}
// Create new context for P2P
ctx, cancel := context.WithCancel(appConfig.Context)
a.p2pCtx = ctx
a.p2pCancel = cancel
// Get API address from config
address := appConfig.APIAddress
if address == "" {
address = "127.0.0.1:8080" // default
}
// Start P2P stack in a goroutine
go func() {
if err := a.StartP2P(); err != nil {
log.Error().Err(err).Msg("Failed to start P2P stack")
cancel() // Cancel context on error
}
}()
log.Info().Msg("P2P stack restarted with new settings")
return nil
}
func syncState(ctx context.Context, n *node.Node, app *Application) error {
zlog.Debug().Msg("[p2p-sync] Syncing state")
whatWeHave := []string{}
for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() {
whatWeHave = append(whatWeHave, model.Name)
}
ledger, _ := n.Ledger()
currentData := ledger.CurrentData()
zlog.Debug().Msgf("[p2p-sync] Current data: %v", currentData)
data, exists := ledger.GetKey("shared_state", "models")
if !exists {
ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave)
zlog.Debug().Msgf("No models found in the ledger, announced our models: %v", whatWeHave)
}
models := []string{}
if err := data.Unmarshal(&models); err != nil {
zlog.Warn().Err(err).Msg("error unmarshalling models")
return nil
}
zlog.Debug().Msgf("[p2p-sync] Models that are present in this instance: %v\nModels that are in the ledger: %v", whatWeHave, models)
// Sync with our state
whatIsNotThere := []string{}
for _, model := range whatWeHave {
if !slices.Contains(models, model) {
whatIsNotThere = append(whatIsNotThere, model)
}
}
if len(whatIsNotThere) > 0 {
zlog.Debug().Msgf("[p2p-sync] Announcing our models: %v", append(models, whatIsNotThere...))
ledger.AnnounceUpdate(
ctx,
1*time.Minute,
"shared_state",
"models",
append(models, whatIsNotThere...),
)
}
// Check if we have a model that is not in our state, otherwise install it
for _, model := range models {
if slices.Contains(whatWeHave, model) {
zlog.Debug().Msgf("[p2p-sync] Model %s is already present in this instance", model)
continue
}
// we install model
zlog.Info().Msgf("[p2p-sync] Installing model which is not present in this instance: %s", model)
uuid, err := uuid.NewUUID()
if err != nil {
zlog.Error().Err(err).Msg("error generating UUID")
continue
}
app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel, gallery.ModelConfig]{
ID: uuid.String(),
GalleryElementName: model,
Galleries: app.ApplicationConfig().Galleries,
BackendGalleries: app.ApplicationConfig().BackendGalleries,
}
}
return nil
}
func (a *Application) p2pSync(ctx context.Context, n *node.Node) error {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Minute):
if err := syncState(ctx, n, a); err != nil {
zlog.Error().Err(err).Msg("error syncing state")
}
}
}
}()
return nil
}

View File

@@ -1,8 +1,11 @@
package application
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/mudler/LocalAI/core/backend"
"github.com/mudler/LocalAI/core/config"
@@ -18,7 +21,12 @@ import (
func New(opts ...config.AppOption) (*Application, error) {
options := config.NewApplicationConfig(opts...)
// Store a copy of the startup config (from env vars, before file loading)
// This is used to determine if settings came from env vars vs file
startupConfigCopy := *options
application := newApplication(options)
application.startupConfig = &startupConfigCopy
log.Info().Msgf("Starting LocalAI using %d threads, with models path: %s", options.Threads, options.SystemState.Model.ModelsPath)
log.Info().Msgf("LocalAI version: %s", internal.PrintableVersion())
@@ -110,6 +118,13 @@ func New(opts ...config.AppOption) (*Application, error) {
}
}
// Load runtime settings from file if DynamicConfigsDir is set
// This applies file settings with env var precedence (env vars take priority)
// Note: startupConfigCopy was already created above, so it has the original env var values
if options.DynamicConfigsDir != "" {
loadRuntimeSettingsFromFile(options)
}
// turn off any process that was started by GRPC if the context is canceled
go func() {
<-options.Context.Done()
@@ -120,21 +135,8 @@ func New(opts ...config.AppOption) (*Application, error) {
}
}()
if options.WatchDog {
wd := model.NewWatchDog(
application.ModelLoader(),
options.WatchDogBusyTimeout,
options.WatchDogIdleTimeout,
options.WatchDogBusy,
options.WatchDogIdle)
application.ModelLoader().SetWatchDog(wd)
go wd.Run()
go func() {
<-options.Context.Done()
log.Debug().Msgf("Context canceled, shutting down")
wd.Shutdown()
}()
}
// Initialize watchdog with current settings (after loading from file)
initializeWatchdog(application, options)
if options.LoadToMemory != nil && !options.SingleBackend {
for _, m := range options.LoadToMemory {
@@ -186,3 +188,131 @@ func startWatcher(options *config.ApplicationConfig) {
log.Error().Err(err).Msg("failed creating watcher")
}
}
// loadRuntimeSettingsFromFile loads settings from runtime_settings.json with env var precedence
// This function is called at startup, before env vars are applied via AppOptions.
// Since env vars are applied via AppOptions in run.go, we need to check if they're set.
// We do this by checking if the current options values differ from defaults, which would
// indicate they were set from env vars. However, a simpler approach is to just apply
// file settings here, and let the AppOptions (which are applied after this) override them.
// But actually, this is called AFTER AppOptions are applied in New(), so we need to check env vars.
// The cleanest solution: Store original values before applying file, or check if values match
// what would be set from env vars. For now, we'll apply file settings and they'll be
// overridden by AppOptions if env vars were set (but AppOptions are already applied).
// Actually, this function is called in New() before AppOptions are fully processed for watchdog.
// Let's check the call order: New() -> loadRuntimeSettingsFromFile() -> initializeWatchdog()
// But AppOptions are applied in NewApplicationConfig() which is called first.
// So at this point, options already has values from env vars. We should compare against
// defaults to see if env vars were set. But we don't have defaults stored.
// Simplest: Just apply file settings. If env vars were set, they're already in options.
// The file watcher handler will handle runtime changes properly by comparing with startupAppConfig.
func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
settingsFile := filepath.Join(options.DynamicConfigsDir, "runtime_settings.json")
fileContent, err := os.ReadFile(settingsFile)
if err != nil {
if os.IsNotExist(err) {
log.Debug().Msg("runtime_settings.json not found, using defaults")
return
}
log.Warn().Err(err).Msg("failed to read runtime_settings.json")
return
}
var settings struct {
WatchdogEnabled *bool `json:"watchdog_enabled,omitempty"`
WatchdogIdleEnabled *bool `json:"watchdog_idle_enabled,omitempty"`
WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"`
WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"`
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
SingleBackend *bool `json:"single_backend,omitempty"`
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
}
if err := json.Unmarshal(fileContent, &settings); err != nil {
log.Warn().Err(err).Msg("failed to parse runtime_settings.json")
return
}
// At this point, options already has values from env vars (via AppOptions in run.go).
// To avoid env var duplication, we determine if env vars were set by checking if
// current values differ from defaults. Defaults are: false for bools, 0 for durations.
// If current value is at default, it likely wasn't set from env var, so we can apply file.
// If current value is non-default, it was likely set from env var, so we preserve it.
// Note: This means env vars explicitly setting to false/0 won't be distinguishable from defaults,
// but that's an acceptable limitation to avoid env var duplication.
if settings.WatchdogIdleEnabled != nil {
// Only apply if current value is default (false), suggesting it wasn't set from env var
if !options.WatchDogIdle {
options.WatchDogIdle = *settings.WatchdogIdleEnabled
if options.WatchDogIdle {
options.WatchDog = true
}
}
}
if settings.WatchdogBusyEnabled != nil {
if !options.WatchDogBusy {
options.WatchDogBusy = *settings.WatchdogBusyEnabled
if options.WatchDogBusy {
options.WatchDog = true
}
}
}
if settings.WatchdogIdleTimeout != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.WatchDogIdleTimeout == 0 {
dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout)
if err == nil {
options.WatchDogIdleTimeout = dur
} else {
log.Warn().Err(err).Str("timeout", *settings.WatchdogIdleTimeout).Msg("invalid watchdog idle timeout in runtime_settings.json")
}
}
}
if settings.WatchdogBusyTimeout != nil {
if options.WatchDogBusyTimeout == 0 {
dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout)
if err == nil {
options.WatchDogBusyTimeout = dur
} else {
log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json")
}
}
}
if settings.SingleBackend != nil {
if !options.SingleBackend {
options.SingleBackend = *settings.SingleBackend
}
}
if settings.ParallelBackendRequests != nil {
if !options.ParallelBackendRequests {
options.ParallelBackendRequests = *settings.ParallelBackendRequests
}
}
if !options.WatchDogIdle && !options.WatchDogBusy {
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
options.WatchDog = true
}
}
log.Debug().Msg("Runtime settings loaded from runtime_settings.json")
}
// initializeWatchdog initializes the watchdog with current ApplicationConfig settings
func initializeWatchdog(application *Application, options *config.ApplicationConfig) {
if options.WatchDog {
wd := model.NewWatchDog(
application.ModelLoader(),
options.WatchDogBusyTimeout,
options.WatchDogIdleTimeout,
options.WatchDogBusy,
options.WatchDogIdle)
application.ModelLoader().SetWatchDog(wd)
go wd.Run()
go func() {
<-options.Context.Done()
log.Debug().Msgf("Context canceled, shutting down")
wd.Shutdown()
}()
}
}

View File

@@ -0,0 +1,88 @@
package application
import (
"time"
"github.com/mudler/LocalAI/pkg/model"
"github.com/rs/zerolog/log"
)
func (a *Application) StopWatchdog() error {
if a.watchdogStop != nil {
close(a.watchdogStop)
a.watchdogStop = nil
}
return nil
}
// startWatchdog starts the watchdog with current ApplicationConfig settings
// This is an internal method that assumes the caller holds the watchdogMutex
func (a *Application) startWatchdog() error {
appConfig := a.ApplicationConfig()
// Create new watchdog if enabled
if appConfig.WatchDog {
wd := model.NewWatchDog(
a.modelLoader,
appConfig.WatchDogBusyTimeout,
appConfig.WatchDogIdleTimeout,
appConfig.WatchDogBusy,
appConfig.WatchDogIdle)
a.modelLoader.SetWatchDog(wd)
// Create new stop channel
a.watchdogStop = make(chan bool, 1)
// Start watchdog goroutine
go wd.Run()
// Setup shutdown handler
go func() {
select {
case <-a.watchdogStop:
log.Debug().Msg("Watchdog stop signal received")
wd.Shutdown()
case <-appConfig.Context.Done():
log.Debug().Msg("Context canceled, shutting down watchdog")
wd.Shutdown()
}
}()
log.Info().Msg("Watchdog started with new settings")
} else {
log.Info().Msg("Watchdog disabled")
}
return nil
}
// StartWatchdog starts the watchdog with current ApplicationConfig settings
func (a *Application) StartWatchdog() error {
a.watchdogMutex.Lock()
defer a.watchdogMutex.Unlock()
return a.startWatchdog()
}
// RestartWatchdog restarts the watchdog with current ApplicationConfig settings
func (a *Application) RestartWatchdog() error {
a.watchdogMutex.Lock()
defer a.watchdogMutex.Unlock()
// Shutdown existing watchdog if running
if a.watchdogStop != nil {
close(a.watchdogStop)
a.watchdogStop = nil
}
// Shutdown existing watchdog if running
currentWD := a.modelLoader.GetWatchDog()
if currentWD != nil {
currentWD.Shutdown()
// Wait a bit for shutdown to complete
time.Sleep(100 * time.Millisecond)
}
// Start watchdog with new settings
return a.startWatchdog()
}