Skip to content

Commit

Permalink
make dynamic extender notifier
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz committed Jun 4, 2024
1 parent 1d86271 commit 6a05b95
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 199 deletions.
4 changes: 1 addition & 3 deletions pkg/addon-operator/converge/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ const (
OperatorStartup ConvergeEvent = "OperatorStartup"
// GlobalValuesChanged is a converge initiated by changing values in the global hook.
GlobalValuesChanged ConvergeEvent = "GlobalValuesChanged"
// KubeConfigChanged is a converge started after changing ConfigMap.
KubeConfigChanged ConvergeEvent = "KubeConfigChanged"
// ReloadAllModules is a converge queued to the
// ReloadAllModules is a converge queued to the main queue after the graph's state change
ReloadAllModules ConvergeEvent = "ReloadAllModules"
)

Expand Down
57 changes: 23 additions & 34 deletions pkg/addon-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/flant/addon-operator/pkg/module_manager/models/hooks/kind"
"github.com/flant/addon-operator/pkg/module_manager/models/modules"
"github.com/flant/addon-operator/pkg/module_manager/models/modules/events"
kube_config_extender "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders/kube_config"
"github.com/flant/addon-operator/pkg/task"
"github.com/flant/addon-operator/pkg/utils"
"github.com/flant/kube-client/client"
Expand Down Expand Up @@ -652,20 +651,14 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str
res.Status = queue.Fail
return res
}

hm := task.HookMetadataAccessor(t)

var handleErr error

if op.ConvergeState.Phase == converge.StandBy {
logEntry.Debugf("ConvergeModules: start")

convergeDrained := RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), t.GetId())
if convergeDrained {
logEntry.Infof("ConvergeModules: kube config modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
} else {
logEntry.Infof("ConvergeModules: kube config modification detected, reload all modules required")
}

// Deduplicate tasks: remove ConvergeModules tasks right after the current task.
RemoveAdjacentConvergeModules(op.engine.TaskQueues.GetByName(t.GetQueueName()), t.GetId())

Expand All @@ -686,7 +679,7 @@ func (op *AddonOperator) HandleConvergeModules(t sh_task.Task, logLabels map[str

if op.ConvergeState.Phase == converge.WaitBeforeAll {
logEntry.Infof("ConvergeModules: beforeAll hooks done, run modules")
var state *module_manager.ModulesState

state, handleErr := op.ModuleManager.RefreshEnabledState(t.GetLogLabels())
if handleErr == nil {
// TODO disable hooks before was done in DiscoverModulesStateRefresh. Should we stick to this solution or disable events later during the handling each ModuleDelete task?
Expand Down Expand Up @@ -962,18 +955,16 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {

case config.KubeConfigChanged:
eventLogEntry.Debugf("ModuleManagerEventHandler-KubeConfigChanged: GlobalSectionChanged %v, ModuleValuesChanged %s, ModuleEnabledStateChanged %s", event.GlobalSectionChanged, event.ModuleValuesChanged, event.ModuleEnabledStateChanged)
newModulesStateChanged := []string{}
for _, module := range event.ModuleEnabledStateChanged {
stateChanged, err := op.ModuleManager.StateChangedByExtender(kube_config_extender.Name, module)
if err != nil {
eventLogEntry.Errorf("Couldn't determine the %s module's new state via KubeConfig extender: %s", module, err)
continue
}
if !op.ModuleManager.GetKubeConfigValid() {
eventLogEntry.Infof("KubeConfig become valid")
}
// Config is valid now, add task to update ModuleManager state.
op.ModuleManager.SetKubeConfigValid(true)

if stateChanged {
eventLogEntry.Infof("KubeConfig is changed, module %s state changed", module)
newModulesStateChanged = append(newModulesStateChanged, module)
}
graphStateChanged, err := op.ModuleManager.UpdateGraphState()
if err != nil {
eventLogEntry.Errorf("Couldn't update the graph's state: %v", err)
continue
}

var (
Expand All @@ -998,38 +989,38 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
return
}

if event.GlobalSectionChanged || len(newModulesStateChanged) > 0 {
if event.GlobalSectionChanged || graphStateChanged {
// ConvergeModules may be in progress Reset converge state.
op.ConvergeState.Phase = converge.StandBy
convergeTask = converge.NewConvergeModulesTask(
"ReloadAll-After-KubeConfigChange",
converge.ReloadAllModules,
logLabels,
)
if kubeConfigTask != nil {

if kubeConfigTask != nil && RemoveCurrentConvergeTasks(op.engine.TaskQueues.GetMain(), kubeConfigTask.GetId()) {
logEntry.Infof("ConvergeModules: kube config modification detected, restart current converge process (%s)", op.ConvergeState.Phase)
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), convergeTask)
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
} else {
op.engine.TaskQueues.GetMain().AddFirst(convergeTask)
logEntry.Infof("ConvergeModules: kube config modification detected, rerun all modules required")
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
// Cancel delay in case the head task is stuck in the error loop.
op.engine.TaskQueues.GetMain().CancelTaskDelay()
op.logTaskAdd(eventLogEntry, "KubeConfig is changed, put first", convergeTask)
} else {
enabledModules := []string{}
modulesToRerun := []string{}
for _, moduleName := range event.ModuleValuesChanged {
if op.ModuleManager.IsModuleEnabled(moduleName) {
enabledModules = append(enabledModules, moduleName)
modulesToRerun = append(modulesToRerun, moduleName)
}
}
// Append ModuleRun tasks if ModuleRun is not queued already.
if kubeConfigTask != nil && convergeTask == nil {
reloadTasks := op.CreateReloadModulesTasks(enabledModules, kubeConfigTask.GetLogLabels(), "KubeConfig-Changed-Modules")
reloadTasks := op.CreateReloadModulesTasks(modulesToRerun, kubeConfigTask.GetLogLabels(), "KubeConfig-Changed-Modules")
if len(reloadTasks) > 0 {
for i := len(reloadTasks) - 1; i >= 0; i-- {
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), reloadTasks[i])
}
logEntry.Infof("ConvergeModules: kube config modification detected, append %d tasks to reload modules %+v", len(reloadTasks), enabledModules)
// Reset delay if error-loop.
logEntry.Infof("ConvergeModules: kube config modification detected, append %d tasks to rerun modules %+v", len(reloadTasks), modulesToRerun)
op.logTaskAdd(logEntry, "tail", reloadTasks...)
}
}
Expand Down Expand Up @@ -2250,9 +2241,7 @@ func taskDescriptionForTaskFlowLog(tsk sh_task.Task, action string, phase string
// ConvergeModules task done, result is 'Keep' for converge phase 'WaitBeforeAll', trigger Operator-Startup
if taskEvent, ok := tsk.GetProp(converge.ConvergeEventProp).(converge.ConvergeEvent); ok {
parts = append(parts, string(taskEvent))
if taskEvent != converge.KubeConfigChanged {
parts = append(parts, fmt.Sprintf("in phase '%s'", phase))
}
parts = append(parts, fmt.Sprintf("in phase '%s'", phase))
}

case task.ModuleRun:
Expand Down
2 changes: 1 addition & 1 deletion pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {
break
}

g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after KubeConfigChanged")
g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after ApplyKubeConfigValues")
}

// This test case checks tasks sequence in the 'main' queue after changing
Expand Down
134 changes: 65 additions & 69 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"

"github.com/flant/addon-operator/pkg/addon-operator/converge"
"github.com/flant/addon-operator/pkg/app"
"github.com/flant/addon-operator/pkg/helm"
"github.com/flant/addon-operator/pkg/helm_resources_manager"
Expand Down Expand Up @@ -394,21 +393,9 @@ func (mm *ModuleManager) stateFromHelmReleases(releases []string) (*ModulesState
}, nil
}

// RefreshEnabledState runs enabled hooks for all 'enabled by config' modules and
// calculates new arrays of enabled modules. It returns ModulesState with
// lists of modules to disable and enable.
//
// This method is called after beforeAll hooks to take into account
// possible changes to 'dynamic enabled'.
//
// This method updates caches:
// RefreshEnabledState gets current diff of the graph and forms ModuleState
// - mm.enabledModules
func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*ModulesState, error) {
moduleDiff, err := mm.moduleScheduler.UpdateAndApplyNewState()
if err != nil {
return nil, err
}

refreshLogLabels := utils.MergeLabels(logLabels, map[string]string{
"operator.component": "ModuleManager.RefreshEnabledState",
})
Expand All @@ -421,12 +408,14 @@ func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*Modu

logEntry.Infof("Enabled modules: %+v", enabledModules)

modulesDiff := mm.moduleScheduler.GleanGraphDiff()

var (
modulesToEnable []string
modulesToDisable []string
)

for module, enabled := range moduleDiff {
for module, enabled := range modulesDiff {
if enabled {
modulesToEnable = append(modulesToEnable, module)
} else {
Expand Down Expand Up @@ -828,6 +817,11 @@ func (mm *ModuleManager) applyEnabledPatch(enabledPatch utils.ValuesPatch, exten
return nil
}

// UpdateGraphState runs corresponding scheduler method that returns true if the graph's state has changed
func (mm *ModuleManager) UpdateGraphState() (bool, error) {
return mm.moduleScheduler.UpdateGraphState()
}

// DynamicEnabledChecksum returns checksum for dynamicEnabled map
func (mm *ModuleManager) DynamicEnabledChecksum() string {
jsonBytes, _ := json.Marshal(mm.moduleScheduler.DumpExtender(dynamic_extender.Name))
Expand Down Expand Up @@ -926,42 +920,6 @@ func (mm *ModuleManager) UpdateModuleLastErrorAndNotify(module *modules.BasicMod
})
}

// PushDeleteModule pushes moduleDelete task for a module into the main queue
func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
// check if there is already moduleDelete task in the main queue for the module
if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
return
}

newTask := sh_task.NewTask(task.ModuleDelete).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: "ModuleManager-Delete-Module",
ModuleName: moduleName,
})
newTask.SetProp("triggered-by", "ModuleManager")

mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))

log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
mm.PushConvergeModulesTask(moduleName, "disabled")
}

// PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
newConvergeTask := sh_task.NewTask(task.ConvergeModules).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
ModuleName: moduleName,
}).
WithQueuedAt(time.Now())
newConvergeTask.SetProp("triggered-by", "ModuleManager")
newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)

mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
}

// PushRunModuleTask pushes moduleRun task for a module into the main queue if there is no such a task for the module
func (mm *ModuleManager) PushRunModuleTask(moduleName string, doModuleStartup bool) error {
// update module's kube config
Expand Down Expand Up @@ -1039,7 +997,9 @@ func (mm *ModuleManager) RunModuleWithNewStaticValues(moduleName, moduleSource,
// If it's a new module - converges all modules - EXPERIMENTAL
func (mm *ModuleManager) RegisterModule(_, _ string) error {
return fmt.Errorf("Not implemented yet")
/*
}

/*
if !mm.modules.IsInited() {
return moduleset.ErrNotInited
}
Expand Down Expand Up @@ -1193,9 +1153,58 @@ func (mm *ModuleManager) RegisterModule(_, _ string) error {
}
return nil
*/
}
// PushDeleteModule pushes moduleDelete task for a module into the main queue
// TODO: EXPERIMENTAL
/*func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
// check if there is already moduleDelete task in the main queue for the module
if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
return
}
newTask := sh_task.NewTask(task.ModuleDelete).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: "ModuleManager-Delete-Module",
ModuleName: moduleName,
})
newTask.SetProp("triggered-by", "ModuleManager")
mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
mm.PushConvergeModulesTask(moduleName, "disabled")
}
// PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
// TODO: EXPERIMENTAL
func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
newConvergeTask := sh_task.NewTask(task.ConvergeModules).
WithQueueName("main").
WithMetadata(task.HookMetadata{
EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
ModuleName: moduleName,
}).
WithQueuedAt(time.Now())
newConvergeTask.SetProp("triggered-by", "ModuleManager")
newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)
mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
}
// queueHasPendingModuleDeleteTask returns true if queue has pending tasks
// with the type "ModuleDelete" related to the module "moduleName"
// TODO: EXPERIMENTAL
func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
if q == nil {
return false
}
modules := modulesWithPendingTasks(q, task.ModuleDelete)
meta, has := modules[moduleName]
return has && meta.doStartup
} */

// registerModules load all available modules from modules directory.
func (mm *ModuleManager) registerModules() error {
if mm.ModulesDir == "" {
Expand Down Expand Up @@ -1251,7 +1260,9 @@ func (mm *ModuleManager) registerModules() error {
mm.modules = set
mm.modules.SetInited()

return nil
_, err = mm.moduleScheduler.UpdateGraphState()

return err
}

// SetModuleEventsChannel sets an event channel for Module Manager
Expand All @@ -1275,10 +1286,6 @@ func (mm *ModuleManager) SchedulerEventCh() chan extenders.ExtenderEvent {
return mm.moduleScheduler.EventCh()
}

func (mm *ModuleManager) StateChangedByExtender(extName extenders.ExtenderName, moduleName string) (bool, error) {
return mm.moduleScheduler.StateChanged(extName, moduleName)
}

// queueHasPendingModuleRunTaskWithStartup returns true if queue has pending tasks
// with the type "ModuleRun" related to the module "moduleName" and DoModuleStartup is set to true.
func queueHasPendingModuleRunTaskWithStartup(q *queue.TaskQueue, moduleName string) bool {
Expand All @@ -1290,17 +1297,6 @@ func queueHasPendingModuleRunTaskWithStartup(q *queue.TaskQueue, moduleName stri
return has && meta.doStartup
}

// queueHasPendingModuleDeleteTask returns true if queue has pending tasks
// with the type "ModuleDelete" related to the module "moduleName"
func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
if q == nil {
return false
}
modules := modulesWithPendingTasks(q, task.ModuleDelete)
meta, has := modules[moduleName]
return has && meta.doStartup
}

func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[string]struct{ doStartup bool } {
if q == nil {
return nil
Expand Down
Loading

0 comments on commit 6a05b95

Please sign in to comment.