Files
LocalAI/core/p2p/p2p.go
Ettore Di Giacinto 9c7f92c81f feat(p2p): automatically sync installed models between instances (#6108)
* feat(p2p): sync models between federated nodes

This change makes sure that between federated nodes all the models are
synced with each other.

Note: this works exclusively with models belonging to a gallery. It does
not sync files between the nodes, but rather it synces the node setup.
E.g. All the nodes needs to have configured the same galleries and
install models without any local editing.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Make nodes stable

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Fixups on syncing

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* ui: improve p2p view

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2025-08-19 19:37:46 +02:00

465 lines
12 KiB
Go

package p2p
import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"time"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/pkg/utils"
"github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/services"
"github.com/mudler/edgevpn/pkg/types"
eutils "github.com/mudler/edgevpn/pkg/utils"
"github.com/multiformats/go-multiaddr"
"github.com/phayes/freeport"
zlog "github.com/rs/zerolog/log"
"github.com/mudler/edgevpn/pkg/logger"
)
func generateNewConnectionData(DHTInterval, OTPInterval int) *node.YAMLConnectionConfig {
maxMessSize := 20 << 20 // 20MB
keyLength := 43
if DHTInterval == 0 {
DHTInterval = 30
}
if OTPInterval == 0 {
OTPInterval = 9000
}
return &node.YAMLConnectionConfig{
MaxMessageSize: maxMessSize,
RoomName: eutils.RandStringRunes(keyLength),
Rendezvous: eutils.RandStringRunes(keyLength),
MDNS: eutils.RandStringRunes(keyLength),
OTP: node.OTP{
DHT: node.OTPConfig{
Key: eutils.RandStringRunes(keyLength),
Interval: DHTInterval,
Length: keyLength,
},
Crypto: node.OTPConfig{
Key: eutils.RandStringRunes(keyLength),
Interval: OTPInterval,
Length: keyLength,
},
},
}
}
func GenerateToken(DHTInterval, OTPInterval int) string {
// Generates a new config and exit
return generateNewConnectionData(DHTInterval, OTPInterval).Base64()
}
func nodeID(s string) string {
hostname, _ := os.Hostname()
return fmt.Sprintf("%s-%s", hostname, s)
}
func nodeAnnounce(ctx context.Context, node *node.Node) {
ledger, _ := node.Ledger()
// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
},
)
}
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
ledger, _ := node.Ledger()
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error().Msg("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
}
// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error().Msg("cannot decode peer")
conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
}
// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Err(err).Msg("cannot open stream peer")
conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer
stream.Close()
conn.Close()
}
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
zlog.Error().Err(err).Msg("Error listening")
return err
}
go func() {
<-ctx.Done()
l.Close()
}()
nodeAnnounce(ctx, node)
defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
zlog.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}
// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
proxyP2PConnection(ctx, node, service, conn)
}()
}
}
}
// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node schema.NodeData), allocate bool) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate)
if err != nil {
return err
}
// TODO: discoveryTunnels should return all the nodes that are available?
// In this way we updated availableNodes here instead of appending
// e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels
// each time the node is seen
// In this case the below function should be idempotent and just keep track of the nodes
go func() {
for {
select {
case <-ctx.Done():
zlog.Error().Msg("Discoverer stopped")
return
case tunnel := <-tunnels:
AddNode(servicesID, tunnel)
if discoveryFunc != nil {
discoveryFunc(servicesID, tunnel)
}
}
}
}()
return nil
}
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan schema.NodeData, error) {
tunnels := make(chan schema.NodeData)
ledger, err := n.Ledger()
if err != nil {
return nil, fmt.Errorf("getting the ledger: %w", err)
}
// get new services, allocate and return to the channel
// TODO:
// a function ensureServices that:
// - starts a service if not started, if the worker is Online
// - checks that workers are Online, if not cancel the context of allocateLocalService
// - discoveryTunnels should return all the nodes and addresses associated with it
// - the caller should take now care of the fact that we are always returning fresh information
go func() {
for {
select {
case <-ctx.Done():
zlog.Error().Msg("Discoverer stopped")
return
default:
time.Sleep(5 * time.Second)
data := ledger.LastBlock().Storage[servicesID]
if logLevel == logLevelDebug {
// We want to surface this debugging data only if p2p logging is set to debug
// (and not generally the whole application, as this can be really noisy)
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")
}
for k, v := range data {
// New worker found in the ledger data as k (worker id)
nd := &schema.NodeData{}
if err := v.Unmarshal(nd); err != nil {
zlog.Error().Msg("cannot unmarshal node data")
continue
}
ensureService(ctx, n, nd, k, allocate)
muservice.Lock()
if _, ok := service[nd.Name]; ok {
tunnels <- service[nd.Name].NodeData
}
muservice.Unlock()
}
}
}
}()
return tunnels, err
}
type nodeServiceData struct {
NodeData schema.NodeData
CancelFunc context.CancelFunc
}
var service = map[string]nodeServiceData{}
var muservice sync.Mutex
func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
nd.ServiceID = sserv
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
// Node nd.ID is offline
return
}
newCtxm, cancel := context.WithCancel(ctx)
if allocate {
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
cancel()
return
}
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
}
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: cancel,
}
} else {
// Check if the service is still alive
// if not cancel the context
if !nd.IsOnline() && !ndService.NodeData.IsOnline() {
ndService.CancelFunc()
delete(service, nd.Name)
zlog.Info().Msgf("Node %s is offline, deleting", nd.ID)
} else if nd.IsOnline() {
// update last seen inside service
nd.TunnelAddress = ndService.NodeData.TunnelAddress
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: ndService.CancelFunc,
}
}
}
}
// This is the P2P worker main
func ExposeService(ctx context.Context, host, port, token, servicesID string) (*node.Node, error) {
if servicesID == "" {
servicesID = defaultServicesID
}
llger := logger.New(log.LevelFatal)
nodeOpts, err := newNodeOpts(token)
if err != nil {
return nil, err
}
// generate a random string for the name
name := utils.RandString(10)
// Register the service
nodeOpts = append(nodeOpts,
services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...)
n, err := node.New(nodeOpts...)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(ctx)
if err != nil {
return n, fmt.Errorf("creating a new node: %w", err)
}
ledger, err := n.Ledger()
if err != nil {
return n, fmt.Errorf("creating a new node: %w", err)
}
ledger.Announce(
ctx,
20*time.Second,
func() {
updatedMap := map[string]interface{}{}
updatedMap[name] = &schema.NodeData{
Name: name,
LastSeen: time.Now(),
ID: nodeID(name),
}
ledger.Add(servicesID, updatedMap)
},
)
return n, err
}
func NewNode(token string) (*node.Node, error) {
nodeOpts, err := newNodeOpts(token)
if err != nil {
return nil, err
}
n, err := node.New(nodeOpts...)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
return n, nil
}
func newNodeOpts(token string) ([]node.Option, error) {
llger := logger.New(log.LevelFatal)
defaultInterval := 10 * time.Second
// TODO: move this up, expose more config options when creating a node
noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true"
noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") != "true"
var listenMaddrs []string
var bootstrapPeers []string
laddrs := os.Getenv("LOCALAI_P2P_LISTEN_MADDRS")
if laddrs != "" {
listenMaddrs = strings.Split(laddrs, ",")
}
bootmaddr := os.Getenv("LOCALAI_P2P_BOOTSTRAP_PEERS_MADDRS")
if bootmaddr != "" {
bootstrapPeers = strings.Split(bootmaddr, ",")
}
dhtAnnounceMaddrs := stringsToMultiAddr(strings.Split(os.Getenv("LOCALAI_P2P_DHT_ANNOUNCE_MADDRS"), ","))
libp2ploglevel := os.Getenv("LOCALAI_P2P_LIB_LOGLEVEL")
if libp2ploglevel == "" {
libp2ploglevel = "fatal"
}
c := config.Config{
ListenMaddrs: listenMaddrs,
DHTAnnounceMaddrs: dhtAnnounceMaddrs,
Limit: config.ResourceLimit{
Enable: noLimits,
MaxConns: 100,
},
NetworkToken: token,
LowProfile: false,
LogLevel: logLevel,
Libp2pLogLevel: libp2ploglevel,
Ledger: config.Ledger{
SyncInterval: defaultInterval,
AnnounceInterval: defaultInterval,
},
NAT: config.NAT{
Service: true,
Map: true,
RateLimit: true,
RateLimitGlobal: 100,
RateLimitPeer: 100,
RateLimitInterval: defaultInterval,
},
Discovery: config.Discovery{
DHT: !noDHT,
MDNS: true,
Interval: 10 * time.Second,
BootstrapPeers: bootstrapPeers,
},
Connection: config.Connection{
HolePunch: true,
AutoRelay: true,
MaxConnections: 1000,
},
}
nodeOpts, _, err := c.ToOpts(llger)
if err != nil {
return nil, fmt.Errorf("parsing options: %w", err)
}
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...)
return nodeOpts, nil
}
func stringsToMultiAddr(peers []string) []multiaddr.Multiaddr {
res := []multiaddr.Multiaddr{}
for _, p := range peers {
addr, err := multiaddr.NewMultiaddr(p)
if err != nil {
continue
}
res = append(res, addr)
}
return res
}
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
}