Skip to content

Commit

Permalink
make dynamic extender notifier
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz committed Jun 4, 2024
1 parent 1d86271 commit 4fb1971
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 206 deletions.
4 changes: 1 addition & 3 deletions pkg/addon-operator/converge/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ const (
OperatorStartup ConvergeEvent = "OperatorStartup"
// GlobalValuesChanged is a converge initiated by changing values in the global hook.
GlobalValuesChanged ConvergeEvent = "GlobalValuesChanged"
// KubeConfigChanged is a converge started after changing ConfigMap.
KubeConfigChanged ConvergeEvent = "KubeConfigChanged"
// ReloadAllModules is a converge queued to the
// ReloadAllModules is a converge queued to the main queue after the graph's state change
ReloadAllModules ConvergeEvent = "ReloadAllModules"
)

Expand Down
100 changes: 61 additions & 39 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/flant/addon-operator/pkg/module_manager/models/hooks/kind"
"github.com/flant/addon-operator/pkg/module_manager/models/modules"
"github.com/flant/addon-operator/pkg/module_manager/models/modules/events"
kube_config_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/kube_config"
dynamic_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/dynamically_enabled"
"github.com/flant/addon-operator/pkg/task"
"github.com/flant/addon-operator/pkg/utils"
"github.com/flant/kube-client/client"
Expand Down Expand Up @@ -652,20 +652,14 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str
res.Status = queue.Fail
return res
}

hm := task.HookMetadataAccessor(t)

var handleErr error

if op.ConvergeState.Phase == converge.StandBy {
logEntry.Debugf("ConvergeModules: start")

convergeDrained := RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), t.GetId())
if convergeDrained {
logEntry.Infof("ConvergeModules: kube config modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
} else {
logEntry.Infof("ConvergeModules: kube config modification detected, reload all modules required")
}

// Deduplicate tasks: remove ConvergeModules tasks right after the current task.
RemoveAdjacentConvergeModules(op.engine.TaskQueues.GetByName(t.GetQueueName()), t.GetId())

Expand All @@ -686,7 +680,7 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str

if op.ConvergeState.Phase == converge.WaitBeforeAll {
logEntry.Infof("ConvergeModules: beforeAll hooks done, run modules")
var state *module_manager.ModulesState

state, handleErr := op.ModuleManager.RefreshEnabledState(t.GetLogLabels())
if handleErr == nil {
// TODO disable hooks before was done in DiscoverModulesStateRefresh. Should we stick to this solution or disable events later during the handling each ModuleDelete task?
Expand Down Expand Up @@ -948,32 +942,62 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
for {
select {
case schedulerEvent := <-op.ModuleManager.SchedulerEventCh():
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
}
eventLogEntry := logEntry.WithFields(utils.LabelsToLogFields(logLabels))

switch event := schedulerEvent.EncapsulatedEvent.(type) {
case config.KubeConfigEvent:
logLabels := map[string]string{
"event.id": uuid.Must(uuid.NewV4()).String(),
// dynamically_enabled_extender
case dynamic_extender.DynamicExtenderEvent:
// we don't need to schedule any tasks as DynamicExtender is inherintelly :
graphStateChanged, err := op.ModuleManager.UpdateGraphState()
if err != nil {
eventLogEntry.Errorf("Couldn't update the graph's state: %v", err)
}

if op.ConvergeState.FirstRunPhase == converge.FirstNotStarted {
eventLogEntry.Infof("kube config modification detected, ignore until starting first converge")
return
}
eventLogEntry := logEntry.WithFields(utils.LabelsToLogFields(logLabels))

if graphStateChanged {
// ConvergeModules may be in progress, Reset converge state.
op.ConvergeState.Phase = converge.StandBy
convergeTask := converge.NewConvergeModulesTask(
"ReloadAll-After-GlobalHookDynamicUpdate",
converge.ReloadAllModules,
logLabels,
)
if RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), op.engine.TaskQueues.GetMain().GetFirst().GetId()) {
logEntry.Infof("ConvergeModules: global hook dynamic modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
op.engine.TaskQueues.GetMain().AddFirst(convergeTask)
op.logTaskAdd(eventLogEntry, "DynamicExtender is updated, put first", convergeTask)
} else {
logEntry.Infof("ConvergeModules: global hook dynamic modification detected, rerun all modules required")
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
}

// kube_config_extender
case config.KubeConfigEvent:
switch event.Type {
case config.KubeConfigInvalid:
op.ModuleManager.SetKubeConfigValid(false)
eventLogEntry.Infof("KubeConfig become invalid")

case config.KubeConfigChanged:
eventLogEntry.Debugf("ModuleManagerEventHandler-KubeConfigChanged: GlobalSectionChanged %v, ModuleValuesChanged %s, ModuleEnabledStateChanged %s", event.GlobalSectionChanged, event.ModuleValuesChanged, event.ModuleEnabledStateChanged)
newModulesStateChanged := []string{}
for _, module := range event.ModuleEnabledStateChanged {
stateChanged, err := op.ModuleManager.StateChangedByExtender(kube_config_extender.Name, module)
if err != nil {
eventLogEntry.Errorf("Couldn't determine the %s module's new state via KubeConfig extender: %s", module, err)
continue
}
if !op.ModuleManager.GetKubeConfigValid() {
eventLogEntry.Infof("KubeConfig become valid")
}
// Config is valid now, add task to update ModuleManager state.
op.ModuleManager.SetKubeConfigValid(true)

if stateChanged {
eventLogEntry.Infof("KubeConfig is changed, module %s state changed", module)
newModulesStateChanged = append(newModulesStateChanged, module)
}
graphStateChanged, err := op.ModuleManager.UpdateGraphState()
if err != nil {
eventLogEntry.Errorf("Couldn't update the graph's state: %v", err)
continue
}

var (
Expand All @@ -990,46 +1014,46 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().AddFirst(kubeConfigTask)
// Cancel delay in case the head task is stuck in the error loop.
op.engine.TaskQueues.GetMain().CancelTaskDelay()
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", kubeConfigTask)
op.logTaskAdd(eventLogEntry, "KubeConfigExtender is updated, put first", kubeConfigTask)
}

if op.ConvergeState.FirstRunPhase == converge.FirstNotStarted {
eventLogEntry.Infof("kube config modification detected, ignore until starting first converge")
return
}

if event.GlobalSectionChanged || len(newModulesStateChanged) > 0 {
if event.GlobalSectionChanged || graphStateChanged {
// ConvergeModules may be in progress Reset converge state.
op.ConvergeState.Phase = converge.StandBy
convergeTask = converge.NewConvergeModulesTask(
"ReloadAll-After-KubeConfigChange",
converge.ReloadAllModules,
logLabels,
)
if kubeConfigTask != nil {

if kubeConfigTask != nil && RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), kubeConfigTask.GetId()) {
logEntry.Infof("ConvergeModules: kube config modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), convergeTask)
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
} else {
op.engine.TaskQueues.GetMain().AddFirst(convergeTask)
logEntry.Infof("ConvergeModules: kube config modification detected, rerun all modules required")
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
// Cancel delay in case the head task is stuck in the error loop.
op.engine.TaskQueues.GetMain().CancelTaskDelay()
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
} else {
enabledModules := []string{}
modulesToRerun := []string{}
for _, moduleName := range event.ModuleValuesChanged {
if op.ModuleManager.IsModuleEnabled(moduleName) {
enabledModules = append(enabledModules, moduleName)
modulesToRerun = append(modulesToRerun, moduleName)
}
}
// Append ModuleRun tasks if ModuleRun is not queued already.
if kubeConfigTask != nil && convergeTask == nil {
reloadTasks := op.CreateReloadModulesTasks(enabledModules, kubeConfigTask.GetLogLabels(), "KubeConfig-Changed-Modules")
reloadTasks := op.CreateReloadModulesTasks(modulesToRerun, kubeConfigTask.GetLogLabels(), "KubeConfig-Changed-Modules")
if len(reloadTasks) > 0 {
for i := len(reloadTasks) - 1; i >= 0; i-- {
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), reloadTasks[i])
}
logEntry.Infof("ConvergeModules: kube config modification detected, append %d tasks to reload modules %+v", len(reloadTasks), enabledModules)
// Reset delay if error-loop.
logEntry.Infof("ConvergeModules: kube config modification detected, append %d tasks to rerun modules %+v", len(reloadTasks), modulesToRerun)
op.logTaskAdd(logEntry, "tail", reloadTasks...)
}
}
Expand Down Expand Up @@ -2250,9 +2274,7 @@ func taskDescriptionForTaskFlowLog(tsk sh_task.Task, action string, phase string
// ConvergeModules task done, result is 'Keep' for converge phase 'WaitBeforeAll', trigger Operator-Startup
if taskEvent, ok := tsk.GetProp(converge.ConvergeEventProp).(converge.ConvergeEvent); ok {
parts = append(parts, string(taskEvent))
if taskEvent != converge.KubeConfigChanged {
parts = append(parts, fmt.Sprintf("in phase '%s'", phase))
}
parts = append(parts, fmt.Sprintf("in phase '%s'", phase))
}

case task.ModuleRun:
Expand Down
2 changes: 1 addition & 1 deletion pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {
break
}

g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after KubeConfigChanged")
g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after ApplyKubeConfigValues")
}

// This test case checks tasks sequence in the 'main' queue after changing
Expand Down
Loading

0 comments on commit 4fb1971

Please sign in to comment.