From ee6c68b214d1f88871fd792f4e5dfe091e3de14d Mon Sep 17 00:00:00 2001 From: Mikhail Scherba Date: Fri, 21 Jun 2024 18:04:07 +0300 Subject: [PATCH] update logentry level Signed-off-by: Mikhail Scherba --- pkg/addon-operator/operator.go | 34 +++++-- pkg/addon-operator/queue.go | 4 +- pkg/module_manager/module_manager.go | 12 ++- pkg/module_manager/scheduler/scheduler.go | 71 +++++++++------ .../scheduler/scheduler_test.go | 89 ++++++++++++++----- 5 files changed, 149 insertions(+), 61 deletions(-) diff --git a/pkg/addon-operator/operator.go b/pkg/addon-operator/operator.go index abe87e944..f30e24231 100644 --- a/pkg/addon-operator/operator.go +++ b/pkg/addon-operator/operator.go @@ -657,21 +657,26 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str var handleErr error - op.ConvergeState.PhaseLock.Lock() - defer op.ConvergeState.PhaseLock.Unlock() - if op.ConvergeState.Phase == converge.StandBy { logEntry.Debugf("ConvergeModules: start") // Deduplicate tasks: remove ConvergeModules tasks right after the current task. RemoveAdjacentConvergeModules(op.engine.TaskQueues.GetByName(t.GetQueueName()), t.GetId(), logLabels) - op.ConvergeState.Phase = converge.RunBeforeAll + op.ConvergeState.PhaseLock.Lock() + if op.ConvergeState.Phase == converge.StandBy { + op.ConvergeState.Phase = converge.RunBeforeAll + } + op.ConvergeState.PhaseLock.Unlock() } if op.ConvergeState.Phase == converge.RunBeforeAll { // Put BeforeAll tasks before current task. tasks := op.CreateBeforeAllTasks(t.GetLogLabels(), hm.EventDescription) - op.ConvergeState.Phase = converge.WaitBeforeAll + op.ConvergeState.PhaseLock.Lock() + if op.ConvergeState.Phase == converge.RunBeforeAll { + op.ConvergeState.Phase = converge.WaitBeforeAll + } + op.ConvergeState.PhaseLock.Unlock() if len(tasks) > 0 { res.HeadTasks = tasks res.Status = queue.Keep @@ -697,7 +702,12 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str state.ModulesToEnable = state.AllEnabledModules } tasks := op.CreateConvergeModulesTasks(state, t.GetLogLabels(), string(taskEvent)) - op.ConvergeState.Phase = converge.WaitDeleteAndRunModules + + op.ConvergeState.PhaseLock.Lock() + if op.ConvergeState.Phase == converge.WaitBeforeAll { + op.ConvergeState.Phase = converge.WaitDeleteAndRunModules + } + op.ConvergeState.PhaseLock.Unlock() if len(tasks) > 0 { res.HeadTasks = tasks res.Status = queue.Keep @@ -712,7 +722,11 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str // Put AfterAll tasks before current task. tasks, handleErr := op.CreateAfterAllTasks(t.GetLogLabels(), hm.EventDescription) if handleErr == nil { - op.ConvergeState.Phase = converge.WaitAfterAll + op.ConvergeState.PhaseLock.Lock() + if op.ConvergeState.Phase == converge.WaitDeleteAndRunModules { + op.ConvergeState.Phase = converge.WaitAfterAll + } + op.ConvergeState.PhaseLock.Unlock() if len(tasks) > 0 { res.HeadTasks = tasks res.Status = queue.Keep @@ -724,7 +738,9 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str // It is the last phase of ConvergeModules task, reset operator's Converge phase. if op.ConvergeState.Phase == converge.WaitAfterAll { + op.ConvergeState.PhaseLock.Lock() op.ConvergeState.Phase = converge.StandBy + op.ConvergeState.PhaseLock.Unlock() logEntry.Infof("ConvergeModules task done") res.Status = queue.Success return res @@ -958,7 +974,6 @@ func (op *AddonOperator) StartModuleManagerEventHandler() { if graphStateChanged { // ConvergeModules may be in progress, Reset converge state. - op.ConvergeState.PhaseLock.Lock() convergeTask := converge.NewConvergeModulesTask( "ReloadAll-After-GlobalHookDynamicUpdate", converge.ReloadAllModules, @@ -974,6 +989,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() { op.engine.TaskQueues.GetMain().AddLast(convergeTask) } // ConvergeModules may be in progress Reset converge state. + op.ConvergeState.PhaseLock.Lock() op.ConvergeState.Phase = converge.StandBy op.ConvergeState.PhaseLock.Unlock() } @@ -1023,7 +1039,6 @@ func (op *AddonOperator) StartModuleManagerEventHandler() { } if event.GlobalSectionChanged || graphStateChanged { - op.ConvergeState.PhaseLock.Lock() convergeTask = converge.NewConvergeModulesTask( "ReloadAll-After-KubeConfigChange", converge.ReloadAllModules, @@ -1044,6 +1059,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() { op.engine.TaskQueues.GetMain().AddLast(convergeTask) } // ConvergeModules may be in progress Reset converge state. + op.ConvergeState.PhaseLock.Lock() op.ConvergeState.Phase = converge.StandBy op.ConvergeState.PhaseLock.Unlock() } else { diff --git a/pkg/addon-operator/queue.go b/pkg/addon-operator/queue.go index 909c09651..099367de1 100644 --- a/pkg/addon-operator/queue.go +++ b/pkg/addon-operator/queue.go @@ -113,7 +113,7 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string, logLabels ma stop = true } hm := task.HookMetadataAccessor(t) - logEntry.Infof("Drained converge task of type: %s, module: %s, description: %s", t.GetType(), hm.ModuleName, hm.EventDescription) + logEntry.Debugf("Drained converge task of type: %s, module: %s, description: %s", t.GetType(), hm.ModuleName, hm.EventDescription) return false } return true @@ -146,7 +146,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels // Remove ConvergeModules after current. if t.GetType() == task.ConvergeModules { hm := task.HookMetadataAccessor(t) - logEntry.Infof("Drained adjacent ConvergeModules task of type: %s, description: %s", t.GetType(), hm.EventDescription) + logEntry.Debugf("Drained adjacent ConvergeModules task of type: %s, description: %s", t.GetType(), hm.EventDescription) return false } diff --git a/pkg/module_manager/module_manager.go b/pkg/module_manager/module_manager.go index 58a25df77..14b5fc0eb 100644 --- a/pkg/module_manager/module_manager.go +++ b/pkg/module_manager/module_manager.go @@ -416,13 +416,23 @@ func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*Modu modulesToDisable []string ) - for module, enabled := range modulesDiff { + for module, enabled := range modulesDiff.State { if enabled { modulesToEnable = append(modulesToEnable, module) } else { modulesToDisable = append(modulesToDisable, module) } } + + for module, _ := range modulesDiff.UpdatedBy { + if _, found := modulesDiff.State[module]; !found { + mm.SendModuleEvent(events.ModuleEvent{ + ModuleName: module, + EventType: events.ModuleStateChanged, + }) + } + } + modulesToDisable = utils.SortReverseByReference(modulesToDisable, mm.modules.NamesInOrder()) modulesToEnable = utils.SortByReference(modulesToEnable, mm.modules.NamesInOrder()) diff --git a/pkg/module_manager/scheduler/scheduler.go b/pkg/module_manager/scheduler/scheduler.go index bf7c195bc..e54141303 100644 --- a/pkg/module_manager/scheduler/scheduler.go +++ b/pkg/module_manager/scheduler/scheduler.go @@ -39,11 +39,23 @@ type Scheduler struct { // cache containing currently enabled vertices enabledModules *[]string // storage for current module diff - diff map[string]bool + diff Diff // keeps all errors happened on last run errList []string } +type Diff struct { + State map[string]bool + UpdatedBy map[string]string +} + +func NewDiff() Diff { + return Diff{ + State: make(map[string]bool, 0), + UpdatedBy: make(map[string]string, 0), + } +} + type vertexState struct { enabled bool updatedBy string @@ -59,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler { extenders: make([]extenders.Extender, 0), extCh: make(chan extenders.ExtenderEvent, 1), dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()), - diff: make(map[string]bool, 0), + diff: NewDiff(), errList: make([]string, 0), } } @@ -350,7 +362,7 @@ func (s *Scheduler) GetEnabledModuleNames() ([]string, error) { // * error if any // if s.enabledModules is nil, we infer that the graph hasn't been calculatet yet and run RecalculateGraph for the first time. // if s.errList isn't empty, we try to recalculate the graph in case there were some minor errors last time. -func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modules*/ []string /*modules diff*/, map[string]bool, error) { +func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modules*/ []string /*modules diff*/, Diff, error) { var recalculateGraph bool logEntry := log.WithFields(utils.LabelsToLogFields(logLabels)) s.l.Lock() @@ -372,7 +384,7 @@ func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modul } if len(s.errList) > 0 || s.enabledModules == nil { - return nil, nil, fmt.Errorf("couldn't recalculate graph: %s", strings.Join(s.errList, ",")) + return nil, Diff{}, fmt.Errorf("couldn't recalculate graph: %s", strings.Join(s.errList, ",")) } return *s.enabledModules, s.gleanGraphDiff(), nil @@ -399,7 +411,7 @@ func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, lo // determine current states of the modules. Besides, it updates slice of all currently enabled modules. // It returns true if the state of the graph has changed or if there were any errors during the run. func (s *Scheduler) recalculateGraphState(logLabels map[string]string) /* Graph's state has changed */ bool { - diff := make(map[string]bool, 0) + diff := NewDiff() errList := make([]string, 0) enabledModules := make([]string, 0) logEntry := log.WithFields(utils.LabelsToLogFields(logLabels)) @@ -451,7 +463,11 @@ outerCycle: } if vBuf[moduleName].enabled != vertex.GetState() { - diff[vertex.GetName()] = vBuf[moduleName].enabled + diff.State[vertex.GetName()] = vBuf[moduleName].enabled + } + + if vBuf[moduleName].updatedBy != vertex.GetUpdatedBy() { + diff.UpdatedBy[vertex.GetName()] = vBuf[moduleName].updatedBy } if vBuf[moduleName].enabled { @@ -473,6 +489,24 @@ outerCycle: return true } + // merge the state diffs + for vertexName, newDiffState := range diff.State { + // if a new diff has an opposite state for the module, the module is deleted from the resulting diff + if currentDiffState, found := s.diff.State[vertexName]; found { + if currentDiffState != newDiffState { + delete(s.diff.State, vertexName) + } + // if current diff doesn't have the module's state - add it to the resulting diff + } else { + s.diff.State[vertexName] = newDiffState + } + } + + // merge the updatedBy diffs + for vertexName, newDiffUpdBy := range diff.UpdatedBy { + s.diff.UpdatedBy[vertexName] = newDiffUpdBy + } + // commit changes to the graph for vertexName, state := range vBuf { vertex, _, err := s.dag.VertexWithProperties(vertexName) @@ -486,31 +520,18 @@ outerCycle: } s.enabledModules = &enabledModules - // merge the diff - for module, newState := range diff { - // if a new diff has an opposite state for the module, the module is deleted from the resulting diff - if currentState, found := s.diff[module]; found { - if currentState != newState { - delete(s.diff, module) - } - // if current diff doesn't have the module's state - add it to the resulting diff - } else { - s.diff[module] = newState - } - } - // reset any previous errors s.errList = make([]string, 0) - logEntry.Infof("Graph was successfully updated, diff: [%v]", s.diff) + logEntry.Debugf("Graph was successfully updated, diff: [%v]", s.diff) // TODO: provide access to the report via the operator's web server // s.printGraph() - return len(diff) > 0 + return len(diff.State)+len(diff.UpdatedBy) > 0 } // gleanGraphDiff returns modules diff list -func (s *Scheduler) gleanGraphDiff() map[string]bool { - diff := s.diff - s.diff = make(map[string]bool, 0) - return diff +func (s *Scheduler) gleanGraphDiff() Diff { + curDiff := s.diff + s.diff = NewDiff() + return curDiff } diff --git a/pkg/module_manager/scheduler/scheduler_test.go b/pkg/module_manager/scheduler/scheduler_test.go index 31980dbf7..87be4d62d 100644 --- a/pkg/module_manager/scheduler/scheduler_test.go +++ b/pkg/module_manager/scheduler/scheduler_test.go @@ -343,13 +343,25 @@ l2LoadBalancerEnabled: false "echo/": false, } - expectedDiff := map[string]bool{ - "admission-policy-engine": true, - "cert-manager": true, - "chrony": true, - "node-local-dns": true, - "flant-integration": true, - "monitoring-applications": true, + expectedDiff := Diff{ + State: map[string]bool{ + "admission-policy-engine": true, + "cert-manager": true, + "chrony": true, + "node-local-dns": true, + "flant-integration": true, + "monitoring-applications": true, + }, + UpdatedBy: map[string]string{ + "admission-policy-engine": "Static", + "cert-manager": "Static", + "chrony": "Static", + "node-local-dns": "Static", + "foo-bar": "Static", + "flant-integration": "Static", + "monitoring-applications": "Static", + "l2-load-balancer": "Static", + }, } summary, err := s.PrintSummary() @@ -395,9 +407,15 @@ l2LoadBalancerEnabled: false "echo/": false, } - expectedDiff = map[string]bool{ - "l2-load-balancer": true, - "openstack-cloud-provider": true, + expectedDiff = Diff{ + State: map[string]bool{ + "l2-load-balancer": true, + "openstack-cloud-provider": true, + }, + UpdatedBy: map[string]string{ + "l2-load-balancer": "DynamicallyEnabled", + "openstack-cloud-provider": "DynamicallyEnabled", + }, } summary, err = s.PrintSummary() @@ -445,12 +463,23 @@ l2LoadBalancerEnabled: false "prometheus-crd/KubeConfig": true, } - expectedDiff = map[string]bool{ - "echo": true, - "prometheus": true, - "prometheus-crd": true, - "foo-bar": true, - "chrony": false, + expectedDiff = Diff{ + State: map[string]bool{ + "echo": true, + "prometheus": true, + "prometheus-crd": true, + "foo-bar": true, + "chrony": false, + }, + UpdatedBy: map[string]string{ + "cert-manager": "KubeConfig", + "chrony": "KubeConfig", + "echo": "KubeConfig", + "prometheus": "KubeConfig", + "prometheus-crd": "KubeConfig", + "foo-bar": "KubeConfig", + "node-local-dns": "DynamicallyEnabled", + }, } summary, err = s.PrintSummary() @@ -491,10 +520,17 @@ l2LoadBalancerEnabled: false "prometheus-crd/KubeConfig": true, } - expectedDiff = map[string]bool{ - "monitoring-applications": false, - "foo-bar": false, - "flant-integration": false, + expectedDiff = Diff{ + State: map[string]bool{ + "monitoring-applications": false, + "foo-bar": false, + "flant-integration": false, + }, + UpdatedBy: map[string]string{ + "flant-integration": "ScriptEnabled", + "monitoring-applications": "ScriptEnabled", + "foo-bar": "ScriptEnabled", + }, } summary, err = s.PrintSummary() @@ -531,9 +567,14 @@ l2LoadBalancerEnabled: false "prometheus-crd/KubeConfig": true, } - expectedDiff = map[string]bool{ - "openstack-cloud-provider": false, - "ingress-nginx": true, + expectedDiff = Diff{ + State: map[string]bool{ + "openstack-cloud-provider": false, + "ingress-nginx": true, + }, + UpdatedBy: map[string]string{ + "ingress-nginx": "DynamicallyEnabled", + }, } summary, err = s.PrintSummary() @@ -567,7 +608,7 @@ l2LoadBalancerEnabled: false "prometheus-crd/KubeConfig": true, } - expectedDiff = nil + expectedDiff = Diff{} assert.Equal(t, expected, summary) assert.Equal(t, expectedDiff, diff)