From 5dc7da442b628bef265f78aebe1aa421a0d60891 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 26 Nov 2023 15:38:43 +0100 Subject: [PATCH] fix lock handling --- pkg/model/initializers.go | 6 ++++- pkg/model/loader.go | 53 +++++++++++++++++++++++---------------- pkg/model/process.go | 10 +++++++- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/pkg/model/initializers.go b/pkg/model/initializers.go index e6b5934c5c19..3729d96c47dc 100644 --- a/pkg/model/initializers.go +++ b/pkg/model/initializers.go @@ -208,7 +208,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) { log.Debug().Msgf("Model '%s' already loaded", o.model) ml.mu.Unlock() - return ml.resolveAddress(m, o.parallelRequests) + if m != "" && ml.healthCheck(o.model, m) { + return ml.resolveAddress(m, o.parallelRequests) + } else { + return ml.GreedyLoader(opts...) + } } // If we can have only one backend active, kill all the others (except external backends) if o.singleActiveBackend { diff --git a/pkg/model/loader.go b/pkg/model/loader.go index f8fd2b995aef..ccef131e313c 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -118,10 +118,11 @@ func (ml *ModelLoader) ListModels() ([]string, error) { func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) { ml.mu.Lock() - defer ml.mu.Unlock() - // Check if we already have a loaded model - if model := ml.CheckIsLoaded(modelName); model != "" { + model := ml.CheckIsLoaded(modelName) + ml.mu.Unlock() + + if model != "" && ml.healthCheck(modelName, model) { return model, nil } @@ -142,14 +143,18 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) ( // return nil, err // } + ml.mu.Lock() ml.models[modelName] = model + ml.mu.Unlock() return model, nil } func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() - defer ml.mu.Unlock() - if _, ok := ml.models[modelName]; !ok { + _, ok := ml.models[modelName] + ml.mu.Unlock() + + if !ok { return fmt.Errorf("model %s not found", modelName) } @@ -157,24 +162,8 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error { } func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress { - var client *grpc.Client if m, ok := ml.models[s]; ok { log.Debug().Msgf("Model already loaded in memory: %s", s) - if c, ok := ml.grpcClients[s]; ok { - client = c - } else { - client = m.GRPC(false, ml.wd) - } - - if !client.HealthCheck(context.Background()) { - log.Debug().Msgf("GRPC Model not responding: %s", s) - if !ml.grpcProcesses[s].IsAlive() { - log.Debug().Msgf("GRPC Process is not responding: %s", s) - // stop and delete the process, this forces to re-load the model and re-create again the service - ml.deleteProcess(s) - return "" - } - } return m } @@ -182,6 +171,28 @@ func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress { return "" } +func (ml *ModelLoader) healthCheck(s string, m ModelAddress) bool { + var client *grpc.Client + + if c, ok := ml.grpcClients[s]; ok { + client = c + } else { + client = m.GRPC(false, ml.wd) + } + + if !client.HealthCheck(context.Background()) { + log.Debug().Msgf("GRPC Model not responding: %s", s) + if !ml.grpcProcesses[s].IsAlive() { + log.Debug().Msgf("GRPC Process is not responding: %s", s) + // stop and delete the process, this forces to re-load the model and re-create again the service + ml.deleteProcess(s) + } + return false + } + + return true +} + func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) { // TODO: should this check be improved? if templateType == ChatMessageTemplate { diff --git a/pkg/model/process.go b/pkg/model/process.go index 5f63ee7fa8e9..9c280c64db73 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -29,9 +29,17 @@ func (ml *ModelLoader) StopAllExcept(s string) { } func (ml *ModelLoader) deleteProcess(s string) error { - if err := ml.grpcProcesses[s].Stop(); err != nil { + ml.mu.Lock() + p := ml.grpcProcesses[s] + ml.mu.Unlock() + + // this can take time and we don't want to block the mutex + if err := p.Stop(); err != nil { return err } + + ml.mu.Lock() + defer ml.mu.Unlock() delete(ml.grpcProcesses, s) delete(ml.models, s) return nil