Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow to run parallel requests #1290

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,7 @@ MODELS_PATH=/models
# PYTHON_GRPC_MAX_WORKERS=1

### Define the number of parallel LLAMA.cpp workers (Defaults to 1)
# LLAMACPP_PARALLEL=1
# LLAMACPP_PARALLEL=1

### Enable to run parallel requests
# PARALLEL_REQUESTS=true
4 changes: 4 additions & 0 deletions api/backend/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func modelOpts(c config.Config, o *options.Option, opts []model.Option) []model.
opts = append(opts, model.WithSingleActiveBackend())
}

if o.ParallelBackendRequests {
opts = append(opts, model.EnableParallelRequests)
}

if c.GRPC.Attempts != 0 {
opts = append(opts, model.WithGRPCAttempts(c.GRPC.Attempts))
}
Expand Down
4 changes: 2 additions & 2 deletions api/localai/backend_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func BackendMonitorEndpoint(bm BackendMonitor) func(c *fiber.Ctx) error {

client := bm.options.Loader.CheckIsLoaded(backendId)

if client == nil {
if client == "" {
return fmt.Errorf("backend %s is not currently loaded", backendId)
}

status, rpcErr := client.Status(context.TODO())
status, rpcErr := client.GRPC().Status(context.TODO())
if rpcErr != nil {
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
val, slbErr := bm.SampleLocalBackendProcess(backendId)
Expand Down
9 changes: 7 additions & 2 deletions api/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"embed"
"encoding/json"

"github.com/go-skynet/LocalAI/metrics"
"github.com/go-skynet/LocalAI/pkg/gallery"
model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/metrics"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -36,7 +36,8 @@ type Option struct {

AutoloadGalleries bool

SingleBackend bool
SingleBackend bool
ParallelBackendRequests bool
}

type AppOption func(*Option)
Expand Down Expand Up @@ -66,6 +67,10 @@ var EnableSingleBackend = func(o *Option) {
o.SingleBackend = true
}

var EnableParallelBackendRequests = func(o *Option) {
o.ParallelBackendRequests = true
}

var EnableGalleriesAutoload = func(o *Option) {
o.AutoloadGalleries = true
}
Expand Down
11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
config "github.com/go-skynet/LocalAI/api/config"
"github.com/go-skynet/LocalAI/api/options"
"github.com/go-skynet/LocalAI/internal"
"github.com/go-skynet/LocalAI/metrics"
"github.com/go-skynet/LocalAI/pkg/gallery"
model "github.com/go-skynet/LocalAI/pkg/model"
"github.com/go-skynet/LocalAI/metrics"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
progressbar "github.com/schollz/progressbar/v3"
Expand Down Expand Up @@ -63,6 +63,11 @@ func main() {
EnvVars: []string{"SINGLE_ACTIVE_BACKEND"},
Usage: "Allow only one backend to be running.",
},
&cli.BoolFlag{
Name: "parallel-requests",
EnvVars: []string{"PARALLEL_REQUESTS"},
Usage: "Enable backends to handle multiple requests in parallel. This is for backends that supports multiple requests in parallel, like llama.cpp or vllm",
},
&cli.BoolFlag{
Name: "cors",
EnvVars: []string{"CORS"},
Expand Down Expand Up @@ -193,7 +198,9 @@ For a list of compatible model, check out: https://localai.io/model-compatibilit
options.WithUploadLimitMB(ctx.Int("upload-limit")),
options.WithApiKeys(ctx.StringSlice("api-keys")),
}

if ctx.Bool("parallel-requests") {
opts = append(opts, options.EnableParallelBackendRequests)
}
if ctx.Bool("single-active-backend") {
opts = append(opts, options.EnableSingleBackend)
}
Expand Down
69 changes: 42 additions & 27 deletions pkg/model/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ var AutoLoadBackends []string = []string{

// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (*grpc.Client, error) {
return func(modelName, modelFile string) (*grpc.Client, error) {
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (ModelAddress, error) {
return func(modelName, modelFile string) (ModelAddress, error) {
log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o)

var client *grpc.Client
var client ModelAddress

getFreeAddress := func() (string, error) {
port, err := freeport.GetFreePort()
Expand All @@ -82,46 +82,46 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
if _, err := os.Stat(uri); err == nil {
serverAddress, err := getFreeAddress()
if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
if err := ml.startProcess(uri, o.model, serverAddress); err != nil {
return nil, err
return "", err
}

log.Debug().Msgf("GRPC Service Started")

client = grpc.NewClient(serverAddress)
client = ModelAddress(serverAddress)
} else {
// address
client = grpc.NewClient(uri)
client = ModelAddress(uri)
}
} else {
grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend)
// Check if the file exists
if _, err := os.Stat(grpcProcess); os.IsNotExist(err) {
return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
return "", fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
}

serverAddress, err := getFreeAddress()
if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
}

// Make sure the process is executable
if err := ml.startProcess(grpcProcess, o.model, serverAddress); err != nil {
return nil, err
return "", err
}

log.Debug().Msgf("GRPC Service Started")

client = grpc.NewClient(serverAddress)
client = ModelAddress(serverAddress)
}

// Wait for the service to start up
ready := false
for i := 0; i < o.grpcAttempts; i++ {
if client.HealthCheck(context.Background()) {
if client.GRPC().HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Service Ready")
ready = true
break
Expand All @@ -131,7 +131,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string

if !ready {
log.Debug().Msgf("GRPC Service NOT ready")
return nil, fmt.Errorf("grpc service not ready")
return "", fmt.Errorf("grpc service not ready")
}

options := *o.gRPCOptions
Expand All @@ -140,19 +140,30 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string

log.Debug().Msgf("GRPC: Loading model with options: %+v", options)

res, err := client.LoadModel(o.context, &options)
res, err := client.GRPC().LoadModel(o.context, &options)
if err != nil {
return nil, fmt.Errorf("could not load model: %w", err)
return "", fmt.Errorf("could not load model: %w", err)
}
if !res.Success {
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
return "", fmt.Errorf("could not load model (no success): %s", res.Message)
}

return client, nil
}
}

func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err error) {
func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) {
if parallel {
return addr.GRPC(), nil
}

if _, ok := ml.grpcClients[string(addr)]; !ok {
ml.grpcClients[string(addr)] = addr.GRPC()
}
return ml.grpcClients[string(addr)], nil
}

func (ml *ModelLoader) BackendLoader(opts ...Option) (client *grpc.Client, err error) {
o := NewOptions(opts...)

log.Debug().Msgf("Loading model %s from %s", o.backendString, o.model)
Expand All @@ -166,22 +177,25 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er
ml.mu.Unlock()
}

// if an external backend is provided, use it
_, externalBackendExists := o.externalBackends[backend]
if externalBackendExists {
return ml.LoadModel(o.model, ml.grpcModel(backend, o))
}
var backendToConsume string

switch backend {
case Gpt4AllLlamaBackend, Gpt4AllMptBackend, Gpt4AllJBackend, Gpt4All:
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "gpt4all")
return ml.LoadModel(o.model, ml.grpcModel(Gpt4All, o))
backendToConsume = Gpt4All
case PiperBackend:
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "espeak-ng-data")
return ml.LoadModel(o.model, ml.grpcModel(PiperBackend, o))
backendToConsume = PiperBackend
default:
return ml.LoadModel(o.model, ml.grpcModel(backend, o))
backendToConsume = backend
}

addr, err := ml.LoadModel(o.model, ml.grpcModel(backendToConsume, o))
if err != nil {
return nil, err
}

return ml.resolveAddress(addr, o.parallelRequests)
}

func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
Expand All @@ -190,10 +204,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
ml.mu.Lock()
// Return earlier if we have a model already loaded
// (avoid looping through all the backends)
if m := ml.CheckIsLoaded(o.model); m != nil {
if m := ml.CheckIsLoaded(o.model); m != "" {
log.Debug().Msgf("Model '%s' already loaded", o.model)
ml.mu.Unlock()
return m, nil

return ml.resolveAddress(m, o.parallelRequests)
}
// If we can have only one backend active, kill all the others (except external backends)
if o.singleActiveBackend {
Expand Down
26 changes: 17 additions & 9 deletions pkg/model/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,23 @@ type ModelLoader struct {
ModelPath string
mu sync.Mutex
// TODO: this needs generics
models map[string]*grpc.Client
grpcClients map[string]*grpc.Client
models map[string]ModelAddress
grpcProcesses map[string]*process.Process
templates map[TemplateType]map[string]*template.Template
}

type ModelAddress string

func (m ModelAddress) GRPC() *grpc.Client {
return grpc.NewClient(string(m))
}

func NewModelLoader(modelPath string) *ModelLoader {
nml := &ModelLoader{
ModelPath: modelPath,
models: make(map[string]*grpc.Client),
grpcClients: make(map[string]*grpc.Client),
models: make(map[string]ModelAddress),
templates: make(map[TemplateType]map[string]*template.Template),
grpcProcesses: make(map[string]*process.Process),
}
Expand Down Expand Up @@ -98,12 +106,12 @@ func (ml *ModelLoader) ListModels() ([]string, error) {
return models, nil
}

func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (*grpc.Client, error)) (*grpc.Client, error) {
func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) {
ml.mu.Lock()
defer ml.mu.Unlock()

// Check if we already have a loaded model
if model := ml.CheckIsLoaded(modelName); model != nil {
if model := ml.CheckIsLoaded(modelName); model != "" {
return model, nil
}

Expand All @@ -113,7 +121,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (

model, err := loader(modelName, modelFile)
if err != nil {
return nil, err
return "", err
}

// TODO: Add a helper method to iterate all prompt templates associated with a config if and only if it's YAML?
Expand All @@ -138,24 +146,24 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
return ml.deleteProcess(modelName)
}

func (ml *ModelLoader) CheckIsLoaded(s string) *grpc.Client {
func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
if m, ok := ml.models[s]; ok {
log.Debug().Msgf("Model already loaded in memory: %s", s)

if !m.HealthCheck(context.Background()) {
if !m.GRPC().HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Model not responding: %s", s)
if !ml.grpcProcesses[s].IsAlive() {
log.Debug().Msgf("GRPC Process is not responding: %s", s)
// stop and delete the process, this forces to re-load the model and re-create again the service
ml.deleteProcess(s)
return nil
return ""
}
}

return m
}

return nil
return ""
}

func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/model/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ type Options struct {
grpcAttempts int
grpcAttemptsDelay int
singleActiveBackend bool
parallelRequests bool
}

type Option func(*Options)

var EnableParallelRequests = func(o *Options) {
o.parallelRequests = true
}

func WithExternalBackend(name string, uri string) Option {
return func(o *Options) {
if o.externalBackends == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func (ml *ModelLoader) StopAllExcept(s string) {
ml.StopGRPC(func(id string, p *process.Process) bool {
if id != s {
for ml.models[id].IsBusy() {
for ml.models[id].GRPC().IsBusy() {
log.Debug().Msgf("%s busy. Waiting.", id)
time.Sleep(2 * time.Second)
}
Expand Down