Skip to content

Commit

Permalink
update logentry level
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz committed Jun 22, 2024
1 parent c227732 commit ee6c68b
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 61 deletions.
34 changes: 25 additions & 9 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,21 +657,26 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str

var handleErr error

op.ConvergeState.PhaseLock.Lock()
defer op.ConvergeState.PhaseLock.Unlock()

if op.ConvergeState.Phase == converge.StandBy {
logEntry.Debugf("ConvergeModules: start")

// Deduplicate tasks: remove ConvergeModules tasks right after the current task.
RemoveAdjacentConvergeModules(op.engine.TaskQueues.GetByName(t.GetQueueName()), t.GetId(), logLabels)
op.ConvergeState.Phase = converge.RunBeforeAll
op.ConvergeState.PhaseLock.Lock()
if op.ConvergeState.Phase == converge.StandBy {
op.ConvergeState.Phase = converge.RunBeforeAll
}
op.ConvergeState.PhaseLock.Unlock()
}

if op.ConvergeState.Phase == converge.RunBeforeAll {
// Put BeforeAll tasks before current task.
tasks := op.CreateBeforeAllTasks(t.GetLogLabels(), hm.EventDescription)
op.ConvergeState.Phase = converge.WaitBeforeAll
op.ConvergeState.PhaseLock.Lock()
if op.ConvergeState.Phase == converge.RunBeforeAll {
op.ConvergeState.Phase = converge.WaitBeforeAll
}
op.ConvergeState.PhaseLock.Unlock()
if len(tasks) > 0 {
res.HeadTasks = tasks
res.Status = queue.Keep
Expand All @@ -697,7 +702,12 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str
state.ModulesToEnable = state.AllEnabledModules
}
tasks := op.CreateConvergeModulesTasks(state, t.GetLogLabels(), string(taskEvent))
op.ConvergeState.Phase = converge.WaitDeleteAndRunModules

op.ConvergeState.PhaseLock.Lock()
if op.ConvergeState.Phase == converge.WaitBeforeAll {
op.ConvergeState.Phase = converge.WaitDeleteAndRunModules
}
op.ConvergeState.PhaseLock.Unlock()
if len(tasks) > 0 {
res.HeadTasks = tasks
res.Status = queue.Keep
Expand All @@ -712,7 +722,11 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str
// Put AfterAll tasks before current task.
tasks, handleErr := op.CreateAfterAllTasks(t.GetLogLabels(), hm.EventDescription)
if handleErr == nil {
op.ConvergeState.Phase = converge.WaitAfterAll
op.ConvergeState.PhaseLock.Lock()
if op.ConvergeState.Phase == converge.WaitDeleteAndRunModules {
op.ConvergeState.Phase = converge.WaitAfterAll
}
op.ConvergeState.PhaseLock.Unlock()
if len(tasks) > 0 {
res.HeadTasks = tasks
res.Status = queue.Keep
Expand All @@ -724,7 +738,9 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str

// It is the last phase of ConvergeModules task, reset operator's Converge phase.
if op.ConvergeState.Phase == converge.WaitAfterAll {
op.ConvergeState.PhaseLock.Lock()
op.ConvergeState.Phase = converge.StandBy
op.ConvergeState.PhaseLock.Unlock()
logEntry.Infof("ConvergeModules task done")
res.Status = queue.Success
return res
Expand Down Expand Up @@ -958,7 +974,6 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {

if graphStateChanged {
// ConvergeModules may be in progress, Reset converge state.
op.ConvergeState.PhaseLock.Lock()
convergeTask := converge.NewConvergeModulesTask(
"ReloadAll-After-GlobalHookDynamicUpdate",
converge.ReloadAllModules,
Expand All @@ -974,6 +989,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
// ConvergeModules may be in progress Reset converge state.
op.ConvergeState.PhaseLock.Lock()
op.ConvergeState.Phase = converge.StandBy
op.ConvergeState.PhaseLock.Unlock()
}
Expand Down Expand Up @@ -1023,7 +1039,6 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
}

if event.GlobalSectionChanged || graphStateChanged {
op.ConvergeState.PhaseLock.Lock()
convergeTask = converge.NewConvergeModulesTask(
"ReloadAll-After-KubeConfigChange",
converge.ReloadAllModules,
Expand All @@ -1044,6 +1059,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
// ConvergeModules may be in progress Reset converge state.
op.ConvergeState.PhaseLock.Lock()
op.ConvergeState.Phase = converge.StandBy
op.ConvergeState.PhaseLock.Unlock()
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/addon-operator/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string, logLabels ma
stop = true
}
hm := task.HookMetadataAccessor(t)
logEntry.Infof("Drained converge task of type: %s, module: %s, description: %s", t.GetType(), hm.ModuleName, hm.EventDescription)
logEntry.Debugf("Drained converge task of type: %s, module: %s, description: %s", t.GetType(), hm.ModuleName, hm.EventDescription)
return false
}
return true
Expand Down Expand Up @@ -146,7 +146,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels
// Remove ConvergeModules after current.
if t.GetType() == task.ConvergeModules {
hm := task.HookMetadataAccessor(t)
logEntry.Infof("Drained adjacent ConvergeModules task of type: %s, description: %s", t.GetType(), hm.EventDescription)
logEntry.Debugf("Drained adjacent ConvergeModules task of type: %s, description: %s", t.GetType(), hm.EventDescription)
return false
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,23 @@ func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*Modu
modulesToDisable []string
)

for module, enabled := range modulesDiff {
for module, enabled := range modulesDiff.State {
if enabled {
modulesToEnable = append(modulesToEnable, module)
} else {
modulesToDisable = append(modulesToDisable, module)
}
}

for module, _ := range modulesDiff.UpdatedBy {
if _, found := modulesDiff.State[module]; !found {
mm.SendModuleEvent(events.ModuleEvent{
ModuleName: module,
EventType: events.ModuleStateChanged,
})
}
}

modulesToDisable = utils.SortReverseByReference(modulesToDisable, mm.modules.NamesInOrder())
modulesToEnable = utils.SortByReference(modulesToEnable, mm.modules.NamesInOrder())

Expand Down
71 changes: 46 additions & 25 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,23 @@ type Scheduler struct {
// cache containing currently enabled vertices
enabledModules *[]string
// storage for current module diff
diff map[string]bool
diff Diff
// keeps all errors happened on last run
errList []string
}

type Diff struct {
State map[string]bool
UpdatedBy map[string]string
}

func NewDiff() Diff {
return Diff{
State: make(map[string]bool, 0),
UpdatedBy: make(map[string]string, 0),
}
}

type vertexState struct {
enabled bool
updatedBy string
Expand All @@ -59,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler {
extenders: make([]extenders.Extender, 0),
extCh: make(chan extenders.ExtenderEvent, 1),
dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()),
diff: make(map[string]bool, 0),
diff: NewDiff(),
errList: make([]string, 0),
}
}
Expand Down Expand Up @@ -350,7 +362,7 @@ func (s *Scheduler) GetEnabledModuleNames() ([]string, error) {
// * error if any
// if s.enabledModules is nil, we infer that the graph hasn't been calculatet yet and run RecalculateGraph for the first time.
// if s.errList isn't empty, we try to recalculate the graph in case there were some minor errors last time.
func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modules*/ []string /*modules diff*/, map[string]bool, error) {
func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modules*/ []string /*modules diff*/, Diff, error) {
var recalculateGraph bool
logEntry := log.WithFields(utils.LabelsToLogFields(logLabels))
s.l.Lock()
Expand All @@ -372,7 +384,7 @@ func (s *Scheduler) GetGraphState(logLabels map[string]string) ( /*enabled modul
}

if len(s.errList) > 0 || s.enabledModules == nil {
return nil, nil, fmt.Errorf("couldn't recalculate graph: %s", strings.Join(s.errList, ","))
return nil, Diff{}, fmt.Errorf("couldn't recalculate graph: %s", strings.Join(s.errList, ","))
}

return *s.enabledModules, s.gleanGraphDiff(), nil
Expand All @@ -399,7 +411,7 @@ func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, lo
// determine current states of the modules. Besides, it updates <enabledModules> slice of all currently enabled modules.
// It returns true if the state of the graph has changed or if there were any errors during the run.
func (s *Scheduler) recalculateGraphState(logLabels map[string]string) /* Graph's state has changed */ bool {
diff := make(map[string]bool, 0)
diff := NewDiff()
errList := make([]string, 0)
enabledModules := make([]string, 0)
logEntry := log.WithFields(utils.LabelsToLogFields(logLabels))
Expand Down Expand Up @@ -451,7 +463,11 @@ outerCycle:
}

if vBuf[moduleName].enabled != vertex.GetState() {
diff[vertex.GetName()] = vBuf[moduleName].enabled
diff.State[vertex.GetName()] = vBuf[moduleName].enabled
}

if vBuf[moduleName].updatedBy != vertex.GetUpdatedBy() {
diff.UpdatedBy[vertex.GetName()] = vBuf[moduleName].updatedBy
}

if vBuf[moduleName].enabled {
Expand All @@ -473,6 +489,24 @@ outerCycle:
return true
}

// merge the state diffs
for vertexName, newDiffState := range diff.State {
// if a new diff has an opposite state for the module, the module is deleted from the resulting diff
if currentDiffState, found := s.diff.State[vertexName]; found {
if currentDiffState != newDiffState {
delete(s.diff.State, vertexName)
}
// if current diff doesn't have the module's state - add it to the resulting diff
} else {
s.diff.State[vertexName] = newDiffState
}
}

// merge the updatedBy diffs
for vertexName, newDiffUpdBy := range diff.UpdatedBy {
s.diff.UpdatedBy[vertexName] = newDiffUpdBy
}

// commit changes to the graph
for vertexName, state := range vBuf {
vertex, _, err := s.dag.VertexWithProperties(vertexName)
Expand All @@ -486,31 +520,18 @@ outerCycle:
}

s.enabledModules = &enabledModules
// merge the diff
for module, newState := range diff {
// if a new diff has an opposite state for the module, the module is deleted from the resulting diff
if currentState, found := s.diff[module]; found {
if currentState != newState {
delete(s.diff, module)
}
// if current diff doesn't have the module's state - add it to the resulting diff
} else {
s.diff[module] = newState
}
}

// reset any previous errors
s.errList = make([]string, 0)
logEntry.Infof("Graph was successfully updated, diff: [%v]", s.diff)
logEntry.Debugf("Graph was successfully updated, diff: [%v]", s.diff)

// TODO: provide access to the report via the operator's web server
// s.printGraph()
return len(diff) > 0
return len(diff.State)+len(diff.UpdatedBy) > 0
}

// gleanGraphDiff returns modules diff list
func (s *Scheduler) gleanGraphDiff() map[string]bool {
diff := s.diff
s.diff = make(map[string]bool, 0)
return diff
func (s *Scheduler) gleanGraphDiff() Diff {
curDiff := s.diff
s.diff = NewDiff()
return curDiff
}
Loading

0 comments on commit ee6c68b

Please sign in to comment.