Files
LocalAI/core/p2p/sync.go
Ettore Di Giacinto 9c7f92c81f feat(p2p): automatically sync installed models between instances (#6108)
* feat(p2p): sync models between federated nodes

This change makes sure that between federated nodes all the models are
synced with each other.

Note: this works exclusively with models belonging to a gallery. It does
not sync files between the nodes, but rather it synces the node setup.
E.g. All the nodes needs to have configured the same galleries and
install models without any local editing.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Make nodes stable

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fixups on syncing

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* ui: improve p2p view

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2025-08-19 19:37:46 +02:00

103 lines
2.7 KiB
Go

package p2p
import (
"context"
"slices"
"time"
"github.com/google/uuid"
"github.com/mudler/LocalAI/core/application"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services"
"github.com/mudler/edgevpn/pkg/node"
zlog "github.com/rs/zerolog/log"
)
func syncState(ctx context.Context, n *node.Node, app *application.Application) error {
zlog.Debug().Msg("[p2p-sync] Syncing state")
whatWeHave := []string{}
for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() {
whatWeHave = append(whatWeHave, model.Name)
}
ledger, _ := n.Ledger()
currentData := ledger.CurrentData()
zlog.Debug().Msgf("[p2p-sync] Current data: %v", currentData)
data, exists := ledger.GetKey("shared_state", "models")
if !exists {
ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave)
zlog.Debug().Msgf("No models found in the ledger, announced our models: %v", whatWeHave)
}
models := []string{}
if err := data.Unmarshal(&models); err != nil {
zlog.Warn().Err(err).Msg("error unmarshalling models")
return nil
}
zlog.Debug().Msgf("[p2p-sync] Models that are present in this instance: %v\nModels that are in the ledger: %v", whatWeHave, models)
// Sync with our state
whatIsNotThere := []string{}
for _, model := range whatWeHave {
if !slices.Contains(models, model) {
whatIsNotThere = append(whatIsNotThere, model)
}
}
if len(whatIsNotThere) > 0 {
zlog.Debug().Msgf("[p2p-sync] Announcing our models: %v", append(models, whatIsNotThere...))
ledger.AnnounceUpdate(
ctx,
1*time.Minute,
"shared_state",
"models",
append(models, whatIsNotThere...),
)
}
// Check if we have a model that is not in our state, otherwise install it
for _, model := range models {
if slices.Contains(whatWeHave, model) {
zlog.Debug().Msgf("[p2p-sync] Model %s is already present in this instance", model)
continue
}
// we install model
zlog.Info().Msgf("[p2p-sync] Installing model which is not present in this instance: %s", model)
uuid, err := uuid.NewUUID()
if err != nil {
zlog.Error().Err(err).Msg("error generating UUID")
continue
}
app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel]{
ID: uuid.String(),
GalleryElementName: model,
Galleries: app.ApplicationConfig().Galleries,
BackendGalleries: app.ApplicationConfig().BackendGalleries,
}
}
return nil
}
func Sync(ctx context.Context, n *node.Node, app *application.Application) error {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Minute):
if err := syncState(ctx, n, app); err != nil {
zlog.Error().Err(err).Msg("error syncing state")
}
}
}
}()
return nil
}