Skip to content

Commit

Permalink
fix lock handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mudler committed Nov 26, 2023
1 parent 1d5b496 commit 5dc7da4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
6 changes: 5 additions & 1 deletion pkg/model/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
log.Debug().Msgf("Model '%s' already loaded", o.model)
ml.mu.Unlock()

return ml.resolveAddress(m, o.parallelRequests)
if m != "" && ml.healthCheck(o.model, m) {
return ml.resolveAddress(m, o.parallelRequests)
} else {
return ml.GreedyLoader(opts...)
}
}
// If we can have only one backend active, kill all the others (except external backends)
if o.singleActiveBackend {
Expand Down
53 changes: 32 additions & 21 deletions pkg/model/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ func (ml *ModelLoader) ListModels() ([]string, error) {

func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) {
ml.mu.Lock()
defer ml.mu.Unlock()

// Check if we already have a loaded model
if model := ml.CheckIsLoaded(modelName); model != "" {
model := ml.CheckIsLoaded(modelName)
ml.mu.Unlock()

if model != "" && ml.healthCheck(modelName, model) {
return model, nil
}

Expand All @@ -142,46 +143,56 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
// return nil, err
// }

ml.mu.Lock()
ml.models[modelName] = model
ml.mu.Unlock()
return model, nil
}

func (ml *ModelLoader) ShutdownModel(modelName string) error {
ml.mu.Lock()
defer ml.mu.Unlock()
if _, ok := ml.models[modelName]; !ok {
_, ok := ml.models[modelName]
ml.mu.Unlock()

if !ok {
return fmt.Errorf("model %s not found", modelName)
}

return ml.deleteProcess(modelName)
}

func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
var client *grpc.Client
if m, ok := ml.models[s]; ok {
log.Debug().Msgf("Model already loaded in memory: %s", s)
if c, ok := ml.grpcClients[s]; ok {
client = c
} else {
client = m.GRPC(false, ml.wd)
}

if !client.HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Model not responding: %s", s)
if !ml.grpcProcesses[s].IsAlive() {
log.Debug().Msgf("GRPC Process is not responding: %s", s)
// stop and delete the process, this forces to re-load the model and re-create again the service
ml.deleteProcess(s)
return ""
}
}

return m
}

return ""
}

func (ml *ModelLoader) healthCheck(s string, m ModelAddress) bool {
var client *grpc.Client

if c, ok := ml.grpcClients[s]; ok {
client = c
} else {
client = m.GRPC(false, ml.wd)
}

if !client.HealthCheck(context.Background()) {
log.Debug().Msgf("GRPC Model not responding: %s", s)
if !ml.grpcProcesses[s].IsAlive() {
log.Debug().Msgf("GRPC Process is not responding: %s", s)
// stop and delete the process, this forces to re-load the model and re-create again the service
ml.deleteProcess(s)
}
return false
}

return true
}

func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) {
// TODO: should this check be improved?
if templateType == ChatMessageTemplate {
Expand Down
10 changes: 9 additions & 1 deletion pkg/model/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@ func (ml *ModelLoader) StopAllExcept(s string) {
}

func (ml *ModelLoader) deleteProcess(s string) error {
if err := ml.grpcProcesses[s].Stop(); err != nil {
ml.mu.Lock()
p := ml.grpcProcesses[s]
ml.mu.Unlock()

// this can take time and we don't want to block the mutex
if err := p.Stop(); err != nil {
return err
}

ml.mu.Lock()
defer ml.mu.Unlock()
delete(ml.grpcProcesses, s)
delete(ml.models, s)
return nil
Expand Down

0 comments on commit 5dc7da4

Please sign in to comment.