From c3cc413e7fc3b06b310779dfa3cb4863ea9f3ed2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 24 Apr 2023 16:04:37 +0200 Subject: [PATCH] feat: async calls (#311) * feat: async calls Signed-off-by: Alex Jones * feat: added concurrency settings Signed-off-by: Alex Jones * feat: added in ability to set max concurrency Signed-off-by: Alex Jones --------- Signed-off-by: Alex Jones Co-authored-by: Matthis <99146727+matthisholleville@users.noreply.github.com> --- cmd/analyze/analyze.go | 22 ++++---- pkg/analysis/analysis.go | 112 +++++++++++++++++++++++++++------------ pkg/server/server.go | 19 ++++--- 3 files changed, 104 insertions(+), 49 deletions(-) diff --git a/cmd/analyze/analyze.go b/cmd/analyze/analyze.go index a880928eb1..9bde5c84b8 100644 --- a/cmd/analyze/analyze.go +++ b/cmd/analyze/analyze.go @@ -23,14 +23,15 @@ import ( ) var ( - explain bool - backend string - output string - filters []string - language string - nocache bool - namespace string - anonymize bool + explain bool + backend string + output string + filters []string + language string + nocache bool + namespace string + anonymize bool + maxConcurrency int ) // AnalyzeCmd represents the problems command @@ -43,7 +44,8 @@ var AnalyzeCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { // AnalysisResult configuration - config, err := analysis.NewAnalysis(backend, language, filters, namespace, nocache, explain) + config, err := analysis.NewAnalysis(backend, + language, filters, namespace, nocache, explain, maxConcurrency) if err != nil { color.Red("Error: %v", err) os.Exit(1) @@ -92,4 +94,6 @@ func init() { AnalyzeCmd.Flags().StringVarP(&output, "output", "o", "text", "Output format (text, json)") // add language options for output AnalyzeCmd.Flags().StringVarP(&language, "language", "l", "english", "Languages to use for AI (e.g. 'English', 'Spanish', 'French', 'German', 'Italian', 'Portuguese', 'Dutch', 'Russian', 'Chinese', 'Japanese', 'Korean')") + // add max concurrency + AnalyzeCmd.Flags().IntVarP(&maxConcurrency, "max-concurrency", "m", 10, "Maximum number of concurrent requests to the Kubernetes API server") } diff --git a/pkg/analysis/analysis.go b/pkg/analysis/analysis.go index d95dd23b4e..257c09706f 100644 --- a/pkg/analysis/analysis.go +++ b/pkg/analysis/analysis.go @@ -20,6 +20,7 @@ import ( "os" "reflect" "strings" + "sync" "github.com/fatih/color" "github.com/k8sgpt-ai/k8sgpt/pkg/ai" @@ -32,14 +33,15 @@ import ( ) type Analysis struct { - Context context.Context - Filters []string - Client *kubernetes.Client - AIClient ai.IAI - Results []common.Result - Namespace string - NoCache bool - Explain bool + Context context.Context + Filters []string + Client *kubernetes.Client + AIClient ai.IAI + Results []common.Result + Namespace string + NoCache bool + Explain bool + MaxConcurrency int } type AnalysisStatus string @@ -55,7 +57,7 @@ type JsonOutput struct { Results []common.Result `json:"results"` } -func NewAnalysis(backend string, language string, filters []string, namespace string, noCache bool, explain bool) (*Analysis, error) { +func NewAnalysis(backend string, language string, filters []string, namespace string, noCache bool, explain bool, maxConcurrency int) (*Analysis, error) { var configAI ai.AIConfiguration err := viper.UnmarshalKey("ai", &configAI) if err != nil { @@ -99,13 +101,14 @@ func NewAnalysis(backend string, language string, filters []string, namespace st } return &Analysis{ - Context: ctx, - Filters: filters, - Client: client, - AIClient: aiClient, - Namespace: namespace, - NoCache: noCache, - Explain: explain, + Context: ctx, + Filters: filters, + Client: client, + AIClient: aiClient, + Namespace: namespace, + NoCache: noCache, + Explain: explain, + MaxConcurrency: maxConcurrency, }, nil } @@ -122,45 +125,86 @@ func (a *Analysis) RunAnalysis() []error { } var errorList []error - + semaphore := make(chan struct{}, a.MaxConcurrency) // if there are no filters selected and no active_filters then run all of them if len(a.Filters) == 0 && len(activeFilters) == 0 { + var wg sync.WaitGroup + var mutex sync.Mutex for _, analyzer := range analyzerMap { - results, err := analyzer.Analyze(analyzerConfig) - if err != nil { - errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", reflect.TypeOf(analyzer).Name(), err))) - } - a.Results = append(a.Results, results...) + wg.Add(1) + semaphore <- struct{}{} + go func(analyzer common.IAnalyzer, wg *sync.WaitGroup, semaphore chan struct{}) { + defer wg.Done() + results, err := analyzer.Analyze(analyzerConfig) + if err != nil { + mutex.Lock() + errorList = append(errorList, fmt.Errorf(fmt.Sprintf("[%s] %s", reflect.TypeOf(analyzer).Name(), err))) + mutex.Unlock() + } + mutex.Lock() + a.Results = append(a.Results, results...) + mutex.Unlock() + <-semaphore + }(analyzer, &wg, semaphore) + } + wg.Wait() return errorList } - + semaphore = make(chan struct{}, a.MaxConcurrency) // if the filters flag is specified if len(a.Filters) != 0 { + var wg sync.WaitGroup + var mutex sync.Mutex for _, filter := range a.Filters { if analyzer, ok := analyzerMap[filter]; ok { - results, err := analyzer.Analyze(analyzerConfig) - if err != nil { - errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", filter, err))) - } - a.Results = append(a.Results, results...) + semaphore <- struct{}{} + wg.Add(1) + go func(analyzer common.IAnalyzer) { + defer wg.Done() + results, err := analyzer.Analyze(analyzerConfig) + if err != nil { + mutex.Lock() + errorList = append(errorList, fmt.Errorf(fmt.Sprintf("[%s] %s", filter, err))) + mutex.Unlock() + } + mutex.Lock() + a.Results = append(a.Results, results...) + mutex.Unlock() + <-semaphore + }(analyzer) } else { - errorList = append(errorList, errors.New(fmt.Sprintf("\"%s\" filter does not exist. Please run k8sgpt filters list.", filter))) + errorList = append(errorList, fmt.Errorf(fmt.Sprintf("\"%s\" filter does not exist. Please run k8sgpt filters list.", filter))) } } + wg.Wait() return errorList } + var wg sync.WaitGroup + var mutex sync.Mutex + semaphore = make(chan struct{}, a.MaxConcurrency) // use active_filters for _, filter := range activeFilters { if analyzer, ok := analyzerMap[filter]; ok { - results, err := analyzer.Analyze(analyzerConfig) - if err != nil { - errorList = append(errorList, errors.New(fmt.Sprintf("[%s] %s", filter, err))) - } - a.Results = append(a.Results, results...) + semaphore <- struct{}{} + wg.Add(1) + go func(analyzer common.IAnalyzer) { + defer wg.Done() + results, err := analyzer.Analyze(analyzerConfig) + if err != nil { + mutex.Lock() + errorList = append(errorList, fmt.Errorf("[%s] %s", filter, err)) + mutex.Unlock() + } + mutex.Lock() + a.Results = append(a.Results, results...) + mutex.Unlock() + <-semaphore + }(analyzer) } } + wg.Wait() return errorList } diff --git a/pkg/server/server.go b/pkg/server/server.go index 6ee11b0eba..7796f30aa5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -26,11 +26,12 @@ import ( ) type Config struct { - Port string - Backend string - Key string - Token string - Output string + Port string + Backend string + Key string + Token string + Output string + maxConcurrency int } type Health struct { @@ -55,13 +56,19 @@ func (s *Config) analyzeHandler(w http.ResponseWriter, r *http.Request) { anonymize := getBoolParam(r.URL.Query().Get("anonymize")) nocache := getBoolParam(r.URL.Query().Get("nocache")) language := r.URL.Query().Get("language") + + var err error + s.maxConcurrency, err = strconv.Atoi(r.URL.Query().Get("maxConcurrency")) + if err != nil { + s.maxConcurrency = 10 + } s.Output = r.URL.Query().Get("output") if s.Output == "" { s.Output = "json" } - config, err := analysis.NewAnalysis(s.Backend, language, []string{}, namespace, nocache, explain) + config, err := analysis.NewAnalysis(s.Backend, language, []string{}, namespace, nocache, explain, s.maxConcurrency) if err != nil { health.Failure++ http.Error(w, err.Error(), http.StatusInternalServerError)