diff --git a/core/application/application.go b/core/application/application.go index d49260eae..c852566d7 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -2,6 +2,7 @@ package application import ( "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/core/templates" "github.com/mudler/LocalAI/pkg/model" ) @@ -11,6 +12,7 @@ type Application struct { modelLoader *model.ModelLoader applicationConfig *config.ApplicationConfig templatesEvaluator *templates.Evaluator + galleryService *services.GalleryService } func newApplication(appConfig *config.ApplicationConfig) *Application { @@ -22,7 +24,7 @@ func newApplication(appConfig *config.ApplicationConfig) *Application { } } -func (a *Application) BackendLoader() *config.ModelConfigLoader { +func (a *Application) ModelConfigLoader() *config.ModelConfigLoader { return a.backendLoader } @@ -37,3 +39,19 @@ func (a *Application) ApplicationConfig() *config.ApplicationConfig { func (a *Application) TemplatesEvaluator() *templates.Evaluator { return a.templatesEvaluator } + +func (a *Application) GalleryService() *services.GalleryService { + return a.galleryService +} + +func (a *Application) start() error { + galleryService := services.NewGalleryService(a.ApplicationConfig(), a.ModelLoader()) + err := galleryService.Start(a.ApplicationConfig().Context, a.ModelConfigLoader(), a.ApplicationConfig().SystemState) + if err != nil { + return err + } + + a.galleryService = galleryService + + return nil +} diff --git a/core/application/startup.go b/core/application/startup.go index 8ebd44071..69238d54f 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -68,7 +68,7 @@ func New(opts ...config.AppOption) (*Application, error) { configLoaderOpts := options.ToConfigLoaderOptions() - if err := application.BackendLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil { + if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil { log.Error().Err(err).Msg("error loading config files") } @@ -77,12 +77,12 @@ func New(opts ...config.AppOption) (*Application, error) { } if options.ConfigFile != "" { - if err := application.BackendLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil { + if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil { log.Error().Err(err).Msg("error loading config file") } } - if err := application.BackendLoader().Preload(options.SystemState.Model.ModelsPath); err != nil { + if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil { log.Error().Err(err).Msg("error downloading models") } @@ -99,7 +99,7 @@ func New(opts ...config.AppOption) (*Application, error) { } if options.Debug { - for _, v := range application.BackendLoader().GetAllModelsConfigs() { + for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() { log.Debug().Msgf("Model: %s (config: %+v)", v.Name, v) } } @@ -132,7 +132,7 @@ func New(opts ...config.AppOption) (*Application, error) { if options.LoadToMemory != nil && !options.SingleBackend { for _, m := range options.LoadToMemory { - cfg, err := application.BackendLoader().LoadModelConfigFileByNameDefaultOptions(m, options) + cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options) if err != nil { return nil, err } @@ -152,6 +152,10 @@ func New(opts ...config.AppOption) (*Application, error) { // Watch the configuration directory startWatcher(options) + if err := application.start(); err != nil { + return nil, err + } + log.Info().Msg("core/startup process completed!") return application, nil } diff --git a/core/cli/api/p2p.go b/core/cli/api/p2p.go index a2ecfe3fe..9e94e94d6 100644 --- a/core/cli/api/p2p.go +++ b/core/cli/api/p2p.go @@ -7,13 +7,15 @@ import ( "os" "strings" + "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/node" "github.com/rs/zerolog/log" ) -func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool) error { +func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool, app *application.Application) error { var n *node.Node // Here we are avoiding creating multiple nodes: // - if the federated mode is enabled, we create a federated node and expose a service @@ -39,6 +41,11 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa } n = node + + // start node sync in the background + if err := p2p.Sync(ctx, node, app); err != nil { + return err + } } // If the p2p mode is enabled, we start the service discovery @@ -58,7 +65,7 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa // Attach a ServiceDiscoverer to the p2p node log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) { + if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) { var tunnelAddresses []string for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) { if v.IsOnline() { diff --git a/core/cli/run.go b/core/cli/run.go index eb94becd7..96bb203a9 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -144,10 +144,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { backgroundCtx := context.Background() - if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated); err != nil { - return err - } - idleWatchDog := r.EnableWatchdogIdle busyWatchDog := r.EnableWatchdogBusy @@ -216,5 +212,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { return err } + if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated, app); err != nil { + return err + } + return appHTTP.Listen(r.Address) } diff --git a/core/explorer/discovery.go b/core/explorer/discovery.go index fe6470cb8..454614172 100644 --- a/core/explorer/discovery.go +++ b/core/explorer/discovery.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/blockchain" ) @@ -177,7 +178,7 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch atLeastOneWorker := false DATA: for _, v := range data[d] { - nd := &p2p.NodeData{} + nd := &schema.NodeData{} if err := v.Unmarshal(nd); err != nil { continue DATA } diff --git a/core/http/app.go b/core/http/app.go index c73e94308..09f068834 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -197,21 +197,15 @@ func API(application *application.Application) (*fiber.App, error) { router.Use(csrf.New()) } - galleryService := services.NewGalleryService(application.ApplicationConfig(), application.ModelLoader()) - err = galleryService.Start(application.ApplicationConfig().Context, application.BackendLoader(), application.ApplicationConfig().SystemState) - if err != nil { - return nil, err - } + requestExtractor := middleware.NewRequestExtractor(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) - requestExtractor := middleware.NewRequestExtractor(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) - - routes.RegisterElevenLabsRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) - routes.RegisterLocalAIRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService) + routes.RegisterElevenLabsRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) + routes.RegisterLocalAIRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService()) routes.RegisterOpenAIRoutes(router, requestExtractor, application) if !application.ApplicationConfig().DisableWebUI { - routes.RegisterUIRoutes(router, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService) + routes.RegisterUIRoutes(router, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService()) } - routes.RegisterJINARoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) + routes.RegisterJINARoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) // Define a custom 404 handler // Note: keep this at the bottom! diff --git a/core/http/elements/p2p.go b/core/http/elements/p2p.go index 6c0a5a577..4f191772f 100644 --- a/core/http/elements/p2p.go +++ b/core/http/elements/p2p.go @@ -7,7 +7,7 @@ import ( "github.com/chasefleming/elem-go" "github.com/chasefleming/elem-go/attrs" "github.com/microcosm-cc/bluemonday" - "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" ) func renderElements(n []elem.Node) string { @@ -18,7 +18,7 @@ func renderElements(n []elem.Node) string { return render } -func P2PNodeStats(nodes []p2p.NodeData) string { +func P2PNodeStats(nodes []schema.NodeData) string { online := 0 for _, n := range nodes { if n.IsOnline() { @@ -26,15 +26,17 @@ func P2PNodeStats(nodes []p2p.NodeData) string { } } - class := "text-blue-400" + class := "text-green-400" if online == 0 { class = "text-red-400" + } else if online < len(nodes) { + class = "text-yellow-400" } nodesElements := []elem.Node{ elem.Span( attrs.Props{ - "class": class + " font-bold text-xl", + "class": class + " font-bold text-2xl", }, elem.Text(fmt.Sprintf("%d", online)), ), @@ -49,9 +51,16 @@ func P2PNodeStats(nodes []p2p.NodeData) string { return renderElements(nodesElements) } -func P2PNodeBoxes(nodes []p2p.NodeData) string { - nodesElements := []elem.Node{} +func P2PNodeBoxes(nodes []schema.NodeData) string { + if len(nodes) == 0 { + return `
+ +

No nodes available

+

Start some workers to see them here

+
` + } + render := "" for _, n := range nodes { nodeID := bluemonday.StrictPolicy().Sanitize(n.ID) @@ -59,67 +68,89 @@ func P2PNodeBoxes(nodes []p2p.NodeData) string { statusIconClass := "text-green-400" statusText := "Online" statusTextClass := "text-green-400" + cardHoverClass := "hover:shadow-green-500/20 hover:border-green-400/50" if !n.IsOnline() { statusIconClass = "text-red-400" statusText = "Offline" statusTextClass = "text-red-400" + cardHoverClass = "hover:shadow-red-500/20 hover:border-red-400/50" } - nodesElements = append(nodesElements, + nodeCard := elem.Div( + attrs.Props{ + "class": "bg-gradient-to-br from-gray-800/90 to-gray-900/80 border border-gray-700/50 rounded-xl p-5 shadow-xl transition-all duration-300 " + cardHoverClass + " backdrop-blur-sm", + }, + // Header with node icon and status elem.Div( attrs.Props{ - "class": "bg-gray-800/80 border border-gray-700/50 rounded-xl p-4 shadow-lg transition-all duration-300 hover:shadow-blue-900/20 hover:border-blue-700/50", + "class": "flex items-center justify-between mb-4", }, - // Node ID and status indicator in top row + // Node info elem.Div( attrs.Props{ - "class": "flex items-center justify-between mb-3", + "class": "flex items-center", }, - // Node ID with icon elem.Div( attrs.Props{ - "class": "flex items-center", + "class": "w-10 h-10 bg-blue-500/20 rounded-lg flex items-center justify-center mr-3", }, elem.I( attrs.Props{ - "class": "fas fa-server text-blue-400 mr-2", + "class": "fas fa-server text-blue-400 text-lg", }, ), - elem.Span( + ), + elem.Div( + attrs.Props{}, + elem.H4( attrs.Props{ - "class": "text-white font-medium", + "class": "text-white font-semibold text-sm", + }, + elem.Text("Node"), + ), + elem.P( + attrs.Props{ + "class": "text-gray-400 text-xs font-mono break-all", }, elem.Text(nodeID), ), ), - // Status indicator - elem.Div( - attrs.Props{ - "class": "flex items-center", - }, - elem.I( - attrs.Props{ - "class": "fas fa-circle animate-pulse " + statusIconClass + " mr-1.5", - }, - ), - elem.Span( - attrs.Props{ - "class": statusTextClass, - }, - elem.Text(statusText), - ), - ), ), - // Bottom section with timestamp + // Status badge elem.Div( attrs.Props{ - "class": "text-xs text-gray-400 pt-1 border-t border-gray-700/30", + "class": "flex items-center bg-gray-900/50 rounded-full px-3 py-1.5 border border-gray-700/50", }, - elem.Text("Last updated: "+time.Now().UTC().Format("2006-01-02 15:04:05")), + elem.I( + attrs.Props{ + "class": "fas fa-circle animate-pulse " + statusIconClass + " mr-2 text-xs", + }, + ), + elem.Span( + attrs.Props{ + "class": statusTextClass + " text-xs font-medium", + }, + elem.Text(statusText), + ), ), - )) + ), + // Footer with timestamp + elem.Div( + attrs.Props{ + "class": "text-xs text-gray-500 pt-3 border-t border-gray-700/30 flex items-center", + }, + elem.I( + attrs.Props{ + "class": "fas fa-clock mr-2", + }, + ), + elem.Text("Updated: "+time.Now().UTC().Format("15:04:05")), + ), + ) + + render += nodeCard.Render() } - return renderElements(nodesElements) + return render } diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 8f330e873..2e692b52a 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -239,7 +239,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co m, cfg, err := newTranscriptionOnlyModel( &pipeline, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ) @@ -313,7 +313,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co if err := updateTransSession( session, &sessionUpdate, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ); err != nil { @@ -342,7 +342,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co if err := updateSession( session, &sessionUpdate, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ); err != nil { diff --git a/core/http/routes/openai.go b/core/http/routes/openai.go index 8a2789407..e4e31f700 100644 --- a/core/http/routes/openai.go +++ b/core/http/routes/openai.go @@ -26,7 +26,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_CHAT)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.ChatEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.ChatEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/chat/completions", chatChain...) app.Post("/chat/completions", chatChain...) @@ -37,7 +37,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.EditEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.EditEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/edits", editChain...) app.Post("/edits", editChain...) @@ -48,7 +48,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.CompletionEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.CompletionEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/completions", completionChain...) app.Post("/completions", completionChain...) @@ -60,7 +60,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.EmbeddingsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()), + openai.EmbeddingsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()), } app.Post("/v1/embeddings", embeddingChain...) app.Post("/embeddings", embeddingChain...) @@ -71,22 +71,22 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_TRANSCRIPT)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.TranscriptEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()), + openai.TranscriptEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()), ) app.Post("/v1/audio/speech", re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_TTS)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.TTSRequest) }), - localai.TTSEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + localai.TTSEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) // images app.Post("/v1/images/generations", re.BuildConstantDefaultModelNameMiddleware("stablediffusion"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.ImageEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + openai.ImageEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) // List models - app.Get("/v1/models", openai.ListModelsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) - app.Get("/models", openai.ListModelsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + app.Get("/v1/models", openai.ListModelsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) + app.Get("/models", openai.ListModelsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) } diff --git a/core/http/views/p2p.html b/core/http/views/p2p.html index bd6324bf6..87fead0a9 100644 --- a/core/http/views/p2p.html +++ b/core/http/views/p2p.html @@ -15,11 +15,11 @@

- Distributed inference with P2P + Distributed AI Computing

- Distribute computation by sharing and loadbalancing instances or sharding model weights + Scale your AI workloads across multiple devices with peer-to-peer distribution @@ -27,10 +27,90 @@

-
-

- LocalAI uses P2P technologies to enable distribution of work between peers. It is possible to share an instance with Federation and/or split the weights of a model across peers (only available with llama.cpp models). You can now share computational resources between your devices or your friends! -

+ +
+ +
+
+
+
+ +
+
+

+ + How P2P Distribution Works + +

+

+ LocalAI leverages cutting-edge peer-to-peer technologies to distribute AI workloads intelligently across your network +

+
+ + +
+ +
+
+ +
+

Instance Federation

+

+ Share complete LocalAI instances across your network for load balancing and redundancy. Perfect for scaling across multiple devices. +

+
+ + +
+
+ +
+

Model Sharding

+

+ Split large model weights across multiple workers. Currently supported with llama.cpp backends for efficient memory usage. +

+
+ + +
+
+ +
+

Resource Sharing

+

+ Pool computational resources from multiple devices, including your friends' machines, to handle larger workloads collaboratively. +

+
+
+ + +
+
+
+ Faster +
+

Parallel processing

+
+
+
+ Scalable +
+

Add more nodes

+
+
+
+ Resilient +
+

Fault tolerant

+
+
+
+ Efficient +
+

Resource optimization

+
+
+
@@ -64,21 +144,106 @@
{{ else }} - -
-
-
- -

Federated Nodes: - -

+ +
+ +
+
+
+
+ +
+
+

Federation

+

Instance sharing

+
+
+
+
+

nodes

+
-

- You can start LocalAI in federated mode to share your instance, or start the federated server to balance requests between nodes of the federation. -

+
+ + Load balanced instances +
+
-
-
+ +
+
+
+
+ +
+
+

Workers

+

Model sharding

+
+
+
+
+

workers

+
+
+
+ + Distributed computation +
+
+ + +
+
+
+
+ +
+
+

Network

+

Connection token

+
+
+ +
+
+ + Ready to connect +
+
+
+ + +
+
+
+
+
+ +
+
+

Federation Network

+

Instance load balancing and sharing

+
+
+
+
Active Nodes
+
+
+
+ +
+

+ + Start LocalAI in federated mode to share your instance, or launch a federated server to distribute requests intelligently across multiple nodes in your network. +

+
+ + +
+
@@ -168,38 +333,52 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" --
- -
-
-
- -

Workers (llama.cpp): - -

+ +
+
+
+
+
+ +
+
+

Worker Network

+

Distributed model computation (llama.cpp)

+
+
+
+
Active Workers
+
+
+
+ +
+

+ + Deploy llama.cpp workers to split model weights across multiple devices. This enables processing larger models by distributing computational load and memory requirements. +

-

- You can start llama.cpp workers to distribute weights between the workers and offload part of the computation. To start a new worker, you can use the CLI or Docker. -

-
-
+ +
+
-
+

- Start a new llama.cpp P2P worker + Start a new llama.cpp worker

    @@ -221,7 +400,7 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" -- export TOKEN="{{.P2PToken}}"
    local-ai worker p2p-llama-cpp-rpc -

    For all the options available, please refer to the documentation.

    +

    For all the options available, please refer to the documentation.

@@ -256,23 +435,148 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" -- .token { word-break: break-all; } - .workers .grid div { - display: flex; - flex-direction: column; - justify-content: space-between; + + /* Enhanced scrollbar styling */ + .scrollbar-thin::-webkit-scrollbar { + width: 6px; } + + .scrollbar-thin::-webkit-scrollbar-track { + background: rgba(31, 41, 55, 0.5); + border-radius: 6px; + } + + .scrollbar-thin::-webkit-scrollbar-thumb { + background: rgba(107, 114, 128, 0.5); + border-radius: 6px; + } + + .scrollbar-thin::-webkit-scrollbar-thumb:hover { + background: rgba(107, 114, 128, 0.8); + } + + /* Animation enhancements */ .fa-circle-nodes { animation: pulseGlow 2s ease-in-out infinite; } - @keyframes pulseGlow { - 0%, 100% { filter: drop-shadow(0 0 2px rgba(96, 165, 250, 0.3)); } - 50% { filter: drop-shadow(0 0 8px rgba(96, 165, 250, 0.7)); } + + .fa-puzzle-piece { + animation: rotateGlow 3s ease-in-out infinite; } + + @keyframes pulseGlow { + 0%, 100% { + filter: drop-shadow(0 0 2px rgba(96, 165, 250, 0.3)); + transform: scale(1); + } + 50% { + filter: drop-shadow(0 0 8px rgba(96, 165, 250, 0.7)); + transform: scale(1.05); + } + } + + @keyframes rotateGlow { + 0%, 100% { + filter: drop-shadow(0 0 2px rgba(147, 51, 234, 0.3)); + transform: rotate(0deg) scale(1); + } + 33% { + filter: drop-shadow(0 0 6px rgba(147, 51, 234, 0.6)); + transform: rotate(10deg) scale(1.05); + } + 66% { + filter: drop-shadow(0 0 4px rgba(147, 51, 234, 0.4)); + transform: rotate(-5deg) scale(1.02); + } + } + + /* Copy button enhancements */ .copy-icon:hover, button:hover .fa-copy { color: #60a5fa; transform: scale(1.1); transition: all 0.2s ease; } + + /* Node card hover effects */ + .workers .grid > div { + transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); + } + + .workers .grid > div:hover { + transform: translateY(-4px); + box-shadow: 0 20px 40px rgba(0, 0, 0, 0.3); + } + + /* Status indicator animations */ + .animate-pulse { + animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite; + } + + @keyframes pulse { + 0%, 100% { + opacity: 1; + } + 50% { + opacity: .5; + } + } + + /* Enhanced tab styling */ + .tablink { + position: relative; + overflow: hidden; + } + + .tablink::before { + content: ''; + position: absolute; + top: 0; + left: -100%; + width: 100%; + height: 100%; + background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.1), transparent); + transition: left 0.5s ease; + } + + .tablink:hover::before { + left: 100%; + } + + /* Loading spinner for HTMX */ + .htmx-indicator { + display: none; + } + + .htmx-request .htmx-indicator { + display: inline; + } + + /* Card gradient overlays */ + .card-overlay { + background: linear-gradient(135deg, rgba(59, 130, 246, 0.1) 0%, rgba(99, 102, 241, 0.1) 100%); + } + + /* Enhanced button styles */ + button[onclick*="copyClipboard"] { + transition: all 0.2s ease; + backdrop-filter: blur(8px); + } + + button[onclick*="copyClipboard"]:hover { + transform: scale(1.05); + box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3); + } + + /* Code block enhancements */ + code { + position: relative; + transition: all 0.2s ease; + } + + code:hover { + box-shadow: 0 4px 12px rgba(234, 179, 8, 0.2); + border-color: rgba(234, 179, 8, 0.3) !important; + } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index e382576ba..6f5cfb053 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -7,6 +7,7 @@ import ( "io" "net" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/node" "github.com/rs/zerolog/log" ) @@ -21,7 +22,7 @@ func (f *FederatedServer) Start(ctx context.Context) error { return fmt.Errorf("creating a new node: %w", err) } - if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { + if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel schema.NodeData) { log.Debug().Msgf("Discovered node: %s", tunnel.ID) }, false); err != nil { return err diff --git a/core/p2p/node.go b/core/p2p/node.go index 6c43dde00..78efb77ca 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -1,8 +1,11 @@ package p2p import ( + "slices" + "strings" "sync" - "time" + + "github.com/mudler/LocalAI/core/schema" ) const ( @@ -10,57 +13,48 @@ const ( WorkerID = "worker" ) -type NodeData struct { - Name string - ID string - TunnelAddress string - ServiceID string - LastSeen time.Time -} - -func (d NodeData) IsOnline() bool { - now := time.Now() - // if the node was seen in the last 40 seconds, it's online - return now.Sub(d.LastSeen) < 40*time.Second -} - var mu sync.Mutex -var nodes = map[string]map[string]NodeData{} +var nodes = map[string]map[string]schema.NodeData{} -func GetAvailableNodes(serviceID string) []NodeData { +func GetAvailableNodes(serviceID string) []schema.NodeData { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() - var availableNodes = []NodeData{} + var availableNodes = []schema.NodeData{} for _, v := range nodes[serviceID] { availableNodes = append(availableNodes, v) } + + slices.SortFunc(availableNodes, func(a, b schema.NodeData) int { + return strings.Compare(a.ID, b.ID) + }) + return availableNodes } -func GetNode(serviceID, nodeID string) (NodeData, bool) { +func GetNode(serviceID, nodeID string) (schema.NodeData, bool) { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() if _, ok := nodes[serviceID]; !ok { - return NodeData{}, false + return schema.NodeData{}, false } nd, exists := nodes[serviceID][nodeID] return nd, exists } -func AddNode(serviceID string, node NodeData) { +func AddNode(serviceID string, node schema.NodeData) { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() if nodes[serviceID] == nil { - nodes[serviceID] = map[string]NodeData{} + nodes[serviceID] = map[string]schema.NodeData{} } nodes[serviceID][node.ID] = node } diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index e21c5e2f7..ec550eb1f 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/peer" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/utils" "github.com/mudler/edgevpn/pkg/config" "github.com/mudler/edgevpn/pkg/node" @@ -169,7 +170,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services -func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error { +func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node schema.NodeData), allocate bool) error { if servicesID == "" { servicesID = defaultServicesID } @@ -200,8 +201,8 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri return nil } -func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { - tunnels := make(chan NodeData) +func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan schema.NodeData, error) { + tunnels := make(chan schema.NodeData) ledger, err := n.Ledger() if err != nil { @@ -234,7 +235,7 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin for k, v := range data { // New worker found in the ledger data as k (worker id) - nd := &NodeData{} + nd := &schema.NodeData{} if err := v.Unmarshal(nd); err != nil { zlog.Error().Msg("cannot unmarshal node data") continue @@ -254,14 +255,14 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin } type nodeServiceData struct { - NodeData NodeData + NodeData schema.NodeData CancelFunc context.CancelFunc } var service = map[string]nodeServiceData{} var muservice sync.Mutex -func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { +func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv string, allocate bool) { muservice.Lock() defer muservice.Unlock() nd.ServiceID = sserv @@ -346,7 +347,7 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (* 20*time.Second, func() { updatedMap := map[string]interface{}{} - updatedMap[name] = &NodeData{ + updatedMap[name] = &schema.NodeData{ Name: name, LastSeen: time.Now(), ID: nodeID(name), diff --git a/core/p2p/sync.go b/core/p2p/sync.go new file mode 100644 index 000000000..f9be422a6 --- /dev/null +++ b/core/p2p/sync.go @@ -0,0 +1,102 @@ +package p2p + +import ( + "context" + "slices" + "time" + + "github.com/google/uuid" + "github.com/mudler/LocalAI/core/application" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services" + + "github.com/mudler/edgevpn/pkg/node" + zlog "github.com/rs/zerolog/log" +) + +func syncState(ctx context.Context, n *node.Node, app *application.Application) error { + zlog.Debug().Msg("[p2p-sync] Syncing state") + + whatWeHave := []string{} + for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() { + whatWeHave = append(whatWeHave, model.Name) + } + + ledger, _ := n.Ledger() + currentData := ledger.CurrentData() + zlog.Debug().Msgf("[p2p-sync] Current data: %v", currentData) + data, exists := ledger.GetKey("shared_state", "models") + if !exists { + ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave) + zlog.Debug().Msgf("No models found in the ledger, announced our models: %v", whatWeHave) + } + + models := []string{} + if err := data.Unmarshal(&models); err != nil { + zlog.Warn().Err(err).Msg("error unmarshalling models") + return nil + } + + zlog.Debug().Msgf("[p2p-sync] Models that are present in this instance: %v\nModels that are in the ledger: %v", whatWeHave, models) + + // Sync with our state + whatIsNotThere := []string{} + for _, model := range whatWeHave { + if !slices.Contains(models, model) { + whatIsNotThere = append(whatIsNotThere, model) + } + } + if len(whatIsNotThere) > 0 { + zlog.Debug().Msgf("[p2p-sync] Announcing our models: %v", append(models, whatIsNotThere...)) + ledger.AnnounceUpdate( + ctx, + 1*time.Minute, + "shared_state", + "models", + append(models, whatIsNotThere...), + ) + } + + // Check if we have a model that is not in our state, otherwise install it + for _, model := range models { + if slices.Contains(whatWeHave, model) { + zlog.Debug().Msgf("[p2p-sync] Model %s is already present in this instance", model) + continue + } + + // we install model + zlog.Info().Msgf("[p2p-sync] Installing model which is not present in this instance: %s", model) + + uuid, err := uuid.NewUUID() + if err != nil { + zlog.Error().Err(err).Msg("error generating UUID") + continue + } + + app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel]{ + ID: uuid.String(), + GalleryElementName: model, + Galleries: app.ApplicationConfig().Galleries, + BackendGalleries: app.ApplicationConfig().BackendGalleries, + } + } + + return nil +} + +func Sync(ctx context.Context, n *node.Node, app *application.Application) error { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + if err := syncState(ctx, n, app); err != nil { + zlog.Error().Err(err).Msg("error syncing state") + } + } + + } + }() + return nil +} diff --git a/core/schema/localai.go b/core/schema/localai.go index d093faafe..5949e743d 100644 --- a/core/schema/localai.go +++ b/core/schema/localai.go @@ -1,7 +1,8 @@ package schema import ( - "github.com/mudler/LocalAI/core/p2p" + "time" + gopsutil "github.com/shirou/gopsutil/v3/process" ) @@ -107,9 +108,23 @@ type StoresFindResponse struct { Similarities []float32 `json:"similarities" yaml:"similarities"` } +type NodeData struct { + Name string + ID string + TunnelAddress string + ServiceID string + LastSeen time.Time +} + +func (d NodeData) IsOnline() bool { + now := time.Now() + // if the node was seen in the last 40 seconds, it's online + return now.Sub(d.LastSeen) < 40*time.Second +} + type P2PNodesResponse struct { - Nodes []p2p.NodeData `json:"nodes" yaml:"nodes"` - FederatedNodes []p2p.NodeData `json:"federated_nodes" yaml:"federated_nodes"` + Nodes []NodeData `json:"nodes" yaml:"nodes"` + FederatedNodes []NodeData `json:"federated_nodes" yaml:"federated_nodes"` } type SysInfoModel struct {