From 006444d5df5a62880a703b2158f590d06e93ee8d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 5 Aug 2024 11:51:26 +0200 Subject: [PATCH] feat(p2p): allow to run multiple clusters in the same network Allow to specify a network ID via CLI which allows to run multiple clusters, logically separated within the same network (by using the same shared token). Note: This segregation is not "secure" by any means, anyone having the network token can see the services available in all the network, however, this provides a way to separate the inference endpoints. This allows for instance to have a node which is both federated and having attached a set of llama.cpp workers. Signed-off-by: Ettore Di Giacinto --- core/cli/federated.go | 9 +++++---- core/cli/run.go | 11 +++++++---- core/cli/worker/worker_p2p.go | 17 +++++++++-------- core/config/application_config.go | 7 +++++++ core/http/endpoints/localai/p2p.go | 12 +++++++----- core/http/routes/localai.go | 2 +- core/http/routes/ui.go | 9 +++++---- core/p2p/federated.go | 9 +++++++++ 8 files changed, 50 insertions(+), 26 deletions(-) diff --git a/core/cli/federated.go b/core/cli/federated.go index 32f0fa879555..271babcae3ca 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -8,14 +8,15 @@ import ( ) type FederatedCLI struct { - Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` - Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` - LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"` + Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` + Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` + LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"` + Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"` } func (f *FederatedCLI) Run(ctx *cliContext.Context) error { - fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced) + fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced) return fs.Start(context.Background()) } diff --git a/core/cli/run.go b/core/cli/run.go index b3d9163223a2..9d58f6d9cc71 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -54,6 +54,7 @@ type RunCMD struct { OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"` Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"` Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"` + Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"` ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"` SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"` PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"` @@ -94,6 +95,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { config.WithModelsURL(append(r.Models, r.ModelArgs...)...), config.WithOpaqueErrors(r.OpaqueErrors), config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan), + config.WithP2PNetworkID(r.Peer2PeerNetworkID), } token := "" @@ -119,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) { var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes("") { + for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) { if v.IsOnline() { tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) } else { @@ -142,14 +144,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { if err != nil { return err } - if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil { + if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)); err != nil { return err } node, err := p2p.NewNode(token) if err != nil { return err } - if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil { + + if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil { return err } } diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index 2eb5cb94bb7c..ddb3518ce9a0 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -19,12 +19,13 @@ import ( ) type P2P struct { - WorkerFlags `embed:""` - Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"` - NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"` - RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"` - RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"` - ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"` + WorkerFlags `embed:""` + Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"` + NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"` + RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"` + RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"` + ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"` + Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"` } func (r *P2P) Run(ctx *cliContext.Context) error { @@ -59,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { p = r.RunnerPort } - err = p2p.ExposeService(context.Background(), address, p, r.Token, "") + err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, "")) if err != nil { return err } @@ -99,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { } }() - err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "") + err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, "")) if err != nil { return err } diff --git a/core/config/application_config.go b/core/config/application_config.go index 7233d1ac0916..6e8c46e1ba31 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -34,6 +34,7 @@ type ApplicationConfig struct { EnforcePredownloadScans bool OpaqueErrors bool P2PToken string + P2PNetworkID string ModelLibraryURL string @@ -91,6 +92,12 @@ func WithCors(b bool) AppOption { } } +func WithP2PNetworkID(s string) AppOption { + return func(o *ApplicationConfig) { + o.P2PNetworkID = s + } +} + func WithCsrf(b bool) AppOption { return func(o *ApplicationConfig) { o.CSRF = b diff --git a/core/http/endpoints/localai/p2p.go b/core/http/endpoints/localai/p2p.go index cab0bb5daf59..93e9b5d5b468 100644 --- a/core/http/endpoints/localai/p2p.go +++ b/core/http/endpoints/localai/p2p.go @@ -11,12 +11,14 @@ import ( // @Summary Returns available P2P nodes // @Success 200 {object} []schema.P2PNodesResponse "Response" // @Router /api/p2p [get] -func ShowP2PNodes(c *fiber.Ctx) error { +func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error { // Render index - return c.JSON(schema.P2PNodesResponse{ - Nodes: p2p.GetAvailableNodes(""), - FederatedNodes: p2p.GetAvailableNodes(p2p.FederatedID), - }) + return func(c *fiber.Ctx) error { + return c.JSON(schema.P2PNodesResponse{ + Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")), + FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)), + }) + } } // ShowP2PToken returns the P2P token diff --git a/core/http/routes/localai.go b/core/http/routes/localai.go index b8a811b5faf0..9c4200101583 100644 --- a/core/http/routes/localai.go +++ b/core/http/routes/localai.go @@ -59,7 +59,7 @@ func RegisterLocalAIRoutes(app *fiber.App, // p2p if p2p.IsP2PEnabled() { - app.Get("/api/p2p", auth, localai.ShowP2PNodes) + app.Get("/api/p2p", auth, localai.ShowP2PNodes(appConfig)) app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig)) } diff --git a/core/http/routes/ui.go b/core/http/routes/ui.go index 929174635def..4f8afd3ccda2 100644 --- a/core/http/routes/ui.go +++ b/core/http/routes/ui.go @@ -96,6 +96,7 @@ func RegisterUIRoutes(app *fiber.App, //"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID), "IsP2PEnabled": p2p.IsP2PEnabled(), "P2PToken": appConfig.P2PToken, + "NetworkID": appConfig.P2PNetworkID, } // Render index @@ -104,17 +105,17 @@ func RegisterUIRoutes(app *fiber.App, /* show nodes live! */ app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(""))) + return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")))) }) app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.FederatedID))) + return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)))) }) app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(""))) + return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")))) }) app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.FederatedID))) + return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)))) }) } diff --git a/core/p2p/federated.go b/core/p2p/federated.go index b56c9e0ca00f..3ac3ff91aca2 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -1,7 +1,16 @@ package p2p +import "fmt" + const FederatedID = "federated" +func NetworkID(networkID, serviceID string) string { + if networkID != "" { + return fmt.Sprintf("%s_%s", networkID, serviceID) + } + return serviceID +} + type FederatedServer struct { listenAddr, service, p2ptoken string requestTable map[string]int