Skip to content

Commit

Permalink
rework queue
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz committed Jun 5, 2024
1 parent 293d327 commit 764b915
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
16 changes: 8 additions & 8 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,11 +955,6 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
eventLogEntry.Errorf("Couldn't update the graph's state: %v", err)
}

if op.ConvergeState.FirstRunPhase == converge.FirstNotStarted {
eventLogEntry.Infof("global hook dynamic modification detected, ignore until starting first converge")
return
}

if graphStateChanged {
// ConvergeModules may be in progress, Reset converge state.
op.ConvergeState.Phase = converge.StandBy
Expand Down Expand Up @@ -1030,10 +1025,15 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
logLabels,
)

if kubeConfigTask != nil && RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), kubeConfigTask.GetId()) {
if RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), op.engine.TaskQueues.GetMain().GetFirst().GetId()) {
logEntry.Infof("ConvergeModules: kube config modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), convergeTask)
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
if kubeConfigTask != nil {
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), convergeTask)
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put after AppplyNewKubeConfig", convergeTask)
} else {
op.engine.TaskQueues.GetMain().AddFirst(convergeTask)
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
}
} else {
logEntry.Infof("ConvergeModules: kube config modification detected, rerun all modules required")
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
Expand Down
2 changes: 2 additions & 0 deletions pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {

// Define task handler to gather task execution history.
type taskInfo struct {
id string
taskType sh_task.TaskType
bindingType BindingType
moduleName string
Expand Down Expand Up @@ -393,6 +394,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {
}
historyMu.Lock()
taskHandleHistory = append(taskHandleHistory, taskInfo{
id: tsk.GetId(),
taskType: tsk.GetType(),
bindingType: hm.BindingType,
moduleName: hm.ModuleName,
Expand Down
5 changes: 4 additions & 1 deletion pkg/addon-operator/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {
// Also keep specified task.
if t.GetId() == afterId {
IDFound = true
} else {
return true
}
return true
}

// Return false to remove converge task right after the specified task.
Expand All @@ -108,6 +109,8 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {
stop = true
}
return false
} else {

Check failure on line 112 in pkg/addon-operator/queue.go

View workflow job for this annotation

GitHub Actions / Run Go linters

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return true
}
// Stop filtering when there is non-converge task after specified task.
stop = true

Check failure on line 116 in pkg/addon-operator/queue.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unreachable: unreachable code (govet)
Expand Down
13 changes: 9 additions & 4 deletions pkg/addon-operator/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.GlobalHookRun, Id: "3"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: "1"},
{Type: task.ModuleHookRun, Id: "2"},
{Type: task.GlobalHookRun, Id: "3"},
},
expectRemoved: true,
},
{
name: "No Converge in progress, preceding tasks present",
Expand All @@ -306,10 +306,10 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: "-1"},
{Type: task.ConvergeModules, Id: "0"},
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ModuleHookRun, Id: "2"},
{Type: task.GlobalHookRun, Id: "3"},
},
expectRemoved: true,
},
{
name: "Single adjacent ConvergeModules task with more Converge tasks",
Expand All @@ -320,7 +320,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.ModuleDelete, Id: "4"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ConvergeModules, Id: "2"},
{Type: task.ModuleRun, Id: "3"},
{Type: task.ModuleDelete, Id: "4"},
},
Expand All @@ -342,7 +342,12 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.GlobalHookRun, Id: "10"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ModuleDelete, Id: "2"},
{Type: task.ModuleDelete, Id: "3"},
{Type: task.ModuleRun, Id: "4", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ModuleRun, Id: "5", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ModuleRun, Id: "6", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ConvergeModules, Id: "7"},
{Type: task.ConvergeModules, Id: "8"},
{Type: task.ConvergeModules, Id: "9"},
{Type: task.ModuleRun, Id: "11"},
Expand Down

0 comments on commit 764b915

Please sign in to comment.