Skip to content

Commit

Permalink
remove retro status (#480)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
Co-authored-by: Yuriy Losev <yuriy.losev@flant.com>
  • Loading branch information
miklezzzz and yalosev authored Jun 28, 2024
1 parent 1d86271 commit fc642ff
Show file tree
Hide file tree
Showing 36 changed files with 979 additions and 530 deletions.
9 changes: 5 additions & 4 deletions pkg/addon-operator/converge/converge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package converge

import (
"sync"
"time"

"github.com/flant/addon-operator/pkg/hook/types"
Expand All @@ -9,7 +10,9 @@ import (
)

type ConvergeState struct {
Phase ConvergePhase
PhaseLock sync.RWMutex
Phase ConvergePhase

FirstRunPhase firstConvergePhase
FirstRunDoneC chan struct{}
StartedAt int64
Expand Down Expand Up @@ -58,9 +61,7 @@ const (
OperatorStartup ConvergeEvent = "OperatorStartup"
// GlobalValuesChanged is a converge initiated by changing values in the global hook.
GlobalValuesChanged ConvergeEvent = "GlobalValuesChanged"
// KubeConfigChanged is a converge started after changing ConfigMap.
KubeConfigChanged ConvergeEvent = "KubeConfigChanged"
// ReloadAllModules is a converge queued to the
// ReloadAllModules is a converge queued to the main queue after the graph's state change
ReloadAllModules ConvergeEvent = "ReloadAllModules"
)

Expand Down
12 changes: 2 additions & 10 deletions pkg/addon-operator/debug_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ func (op *AddonOperator) RegisterDebugGlobalRoutes(dbgSrv *debug.Server) {

func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
dbgSrv.RegisterHandler(http.MethodGet, "/module/list.{format:(json|yaml|text)}", func(_ *http.Request) (interface{}, error) {
mods, err := op.ModuleManager.GetEnabledModuleNames()
if err != nil {
return map[string][]string{}, err
}
mods := op.ModuleManager.GetEnabledModuleNames()
sort.Strings(mods)
return map[string][]string{"enabledModules": mods}, nil
})
Expand Down Expand Up @@ -139,12 +136,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
dbgSrv.RegisterHandler(http.MethodGet, "/module/resource-monitor.{format:(json|yaml)}", func(_ *http.Request) (interface{}, error) {
dump := map[string]interface{}{}

mods, err := op.ModuleManager.GetEnabledModuleNames()
if err != nil {
return dump, err
}

for _, moduleName := range mods {
for _, moduleName := range op.ModuleManager.GetEnabledModuleNames() {
if !op.HelmResourcesManager.HasMonitor(moduleName) {
dump[moduleName] = "No monitor"
continue
Expand Down
159 changes: 72 additions & 87 deletions pkg/addon-operator/operator.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,

err = op.InitModuleManager()
g.Expect(err).ShouldNot(HaveOccurred(), "Should init ModuleManager")
_ = op.ModuleManager.RecalculateGraph(map[string]string{})

return op, result
}
Expand Down Expand Up @@ -360,6 +361,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {

// Define task handler to gather task execution history.
type taskInfo struct {
id string
taskType sh_task.TaskType
bindingType BindingType
moduleName string
Expand Down Expand Up @@ -393,6 +395,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {
}
historyMu.Lock()
taskHandleHistory = append(taskHandleHistory, taskInfo{
id: tsk.GetId(),
taskType: tsk.GetType(),
bindingType: hm.BindingType,
moduleName: hm.ModuleName,
Expand Down Expand Up @@ -449,7 +452,7 @@ func Test_HandleConvergeModules_global_changed_during_converge(t *testing.T) {
break
}

g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after KubeConfigChanged")
g.Expect(hasReloadAllInStandby).To(BeTrue(), "Should have ReloadAllModules right after ApplyKubeConfigValues")
}

// This test case checks tasks sequence in the 'main' queue after changing
Expand Down
18 changes: 13 additions & 5 deletions pkg/addon-operator/queue.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package addon_operator

import (
log "github.com/sirupsen/logrus"

"github.com/flant/addon-operator/pkg/addon-operator/converge"
"github.com/flant/addon-operator/pkg/task"
"github.com/flant/addon-operator/pkg/utils"
sh_task "github.com/flant/shell-operator/pkg/task"
"github.com/flant/shell-operator/pkg/task/queue"
)
Expand Down Expand Up @@ -79,11 +82,12 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
// RemoveCurrentConvergeTasks detects if converge tasks present in the main
// queue after task which ID equals to 'afterID'. These tasks are drained
// and the method returns true.
func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {
func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string, logLabels map[string]string) bool {
if q == nil {
return false
}

logEntry := log.WithFields(utils.LabelsToLogFields(logLabels))
IDFound := false
convergeDrained := false
stop := false
Expand All @@ -96,8 +100,9 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {
// Also keep specified task.
if t.GetId() == afterId {
IDFound = true
} else {
return true
}
return true
}

// Return false to remove converge task right after the specified task.
Expand All @@ -107,10 +112,10 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {
if t.GetType() == task.ConvergeModules {
stop = true
}
hm := task.HookMetadataAccessor(t)
logEntry.Debugf("Drained converge task of type: %s, module: %s, description: %s", t.GetType(), hm.ModuleName, hm.EventDescription)
return false
}
// Stop filtering when there is non-converge task after specified task.
stop = true
return true
})

Expand All @@ -119,11 +124,12 @@ func RemoveCurrentConvergeTasks(q *queue.TaskQueue, afterId string) bool {

// RemoveAdjacentConvergeModules removes ConvergeModules tasks right
// after the task with the specified ID.
func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string) {
func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels map[string]string) {
if q == nil {
return
}

logEntry := log.WithFields(utils.LabelsToLogFields(logLabels))
IDFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
Expand All @@ -139,6 +145,8 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string) {

// Remove ConvergeModules after current.
if t.GetType() == task.ConvergeModules {
hm := task.HookMetadataAccessor(t)
logEntry.Debugf("Drained adjacent ConvergeModules task of type: %s, description: %s", t.GetType(), hm.EventDescription)
return false
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/addon-operator/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
}
require.Equal(t, len(tt.in), q.Length(), "Should add all tasks to the queue.")

RemoveAdjacentConvergeModules(q, tt.afterID)
RemoveAdjacentConvergeModules(q, tt.afterID, map[string]string{})

// Check tasks after remove.
require.Equal(t, len(tt.expect), q.Length(), "queue length should match length of expected tasks")
Expand Down Expand Up @@ -289,10 +289,10 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.GlobalHookRun, Id: "3"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: "1"},
{Type: task.ModuleHookRun, Id: "2"},
{Type: task.GlobalHookRun, Id: "3"},
},
expectRemoved: true,
},
{
name: "No Converge in progress, preceding tasks present",
Expand All @@ -306,10 +306,10 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: "-1"},
{Type: task.ConvergeModules, Id: "0"},
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ModuleHookRun, Id: "2"},
{Type: task.GlobalHookRun, Id: "3"},
},
expectRemoved: true,
},
{
name: "Single adjacent ConvergeModules task with more Converge tasks",
Expand All @@ -320,7 +320,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.ModuleDelete, Id: "4"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ConvergeModules, Id: "2"},
{Type: task.ModuleRun, Id: "3"},
{Type: task.ModuleDelete, Id: "4"},
},
Expand All @@ -342,7 +342,12 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
{Type: task.GlobalHookRun, Id: "10"},
},
expectTasks: []sh_task.BaseTask{
{Type: task.ConvergeModules, Id: currentTaskID},
{Type: task.ModuleDelete, Id: "2"},
{Type: task.ModuleDelete, Id: "3"},
{Type: task.ModuleRun, Id: "4", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ModuleRun, Id: "5", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ModuleRun, Id: "6", Metadata: task.HookMetadata{IsReloadAll: true}},
{Type: task.ConvergeModules, Id: "7"},
{Type: task.ConvergeModules, Id: "8"},
{Type: task.ConvergeModules, Id: "9"},
{Type: task.ModuleRun, Id: "11"},
Expand All @@ -368,7 +373,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
require.Equal(t, len(tt.initialTasks), q.Length(), "Should add all tasks to the queue.")

// Try to clean the queue.
removed := RemoveCurrentConvergeTasks(q, currentTaskID)
removed := RemoveCurrentConvergeTasks(q, currentTaskID, map[string]string{})

// Check result.
if tt.expectRemoved {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kube_config_manager/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ConfigHandler interface {
// LoadConfig loads initial modules config before starting the informer
LoadConfig(ctx context.Context, modulesNames ...string) (*config.KubeConfig, error)

// DeprecatedSaveConfigValues saves patches for modules in backend (if supported), overriding the configuration
// SaveConfigValues saves patches for modules in backend (if supported), overriding the configuration
// Deprecated: saving values in the values source is not recommended and shouldn't be used anymore
DeprecatedSaveConfigValues(ctx context.Context, key string, values utils.Values) ( /*checksum*/ string, error)
SaveConfigValues(ctx context.Context, key string, values utils.Values) ( /*checksum*/ string, error)
}
4 changes: 2 additions & 2 deletions pkg/kube_config_manager/backend/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (b Backend) LoadConfig(ctx context.Context, _ ...string) (*config.KubeConfi
return parseConfigMapData(obj.Data)
}

// DeprecatedSaveConfigValues saves patches in the ConfigMap
func (b Backend) DeprecatedSaveConfigValues(ctx context.Context, key string, values utils.Values) ( /*checksum*/ string, error) {
// SaveConfigValues saves patches in the ConfigMap
func (b Backend) SaveConfigValues(ctx context.Context, key string, values utils.Values) ( /*checksum*/ string, error) {
if key == utils.GlobalValuesKey {
return b.saveGlobalConfigValues(ctx, values)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kube_config_manager/kube_config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (kcm *KubeConfigManager) Init() error {
return nil
}

// DeprecatedSaveConfigValues updates `global` or `module` section in ConfigMap.
// SaveConfigValues updates `global` or `module` section in ConfigMap.
// It uses knownChecksums to prevent KubeConfigChanged event on self-update.
func (kcm *KubeConfigManager) DeprecatedSaveConfigValues(key string, values utils.Values) error {
checksum, err := kcm.backend.DeprecatedSaveConfigValues(kcm.ctx, key, values)
func (kcm *KubeConfigManager) SaveConfigValues(key string, values utils.Values) error {
checksum, err := kcm.backend.SaveConfigValues(kcm.ctx, key, values)
if err != nil {
kcm.withLock(func() {
kcm.knownChecksums.Remove(key, checksum)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kube_config_manager/kube_config_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ func Test_KubeConfigManager_SaveValuesToConfigMap(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.globalValues != nil {
err = kcm.DeprecatedSaveConfigValues(utils.GlobalValuesKey, *test.globalValues)
err = kcm.SaveConfigValues(utils.GlobalValuesKey, *test.globalValues)
if !assert.NoError(t, err, "Global Values should be saved") {
t.FailNow()
}
} else if test.moduleValues != nil {
err = kcm.DeprecatedSaveConfigValues(test.moduleName, *test.moduleValues)
err = kcm.SaveConfigValues(test.moduleName, *test.moduleValues)
if !assert.NoError(t, err, "Module Values should be saved") {
t.FailNow()
}
Expand Down Expand Up @@ -367,7 +367,7 @@ moduleLongName:
g.Expect(err).ShouldNot(HaveOccurred(), "values should load from bytes")
g.Expect(modVals).To(HaveKey("moduleLongName"))

err = kcm.DeprecatedSaveConfigValues("module-long-name", modVals)
err = kcm.SaveConfigValues("module-long-name", modVals)
g.Expect(err).ShouldNot(HaveOccurred())

// Check that values are updated in ConfigMap
Expand Down
2 changes: 1 addition & 1 deletion pkg/module_manager/models/hooks/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type hooksMetricsStorage interface {
}

type kubeConfigManager interface {
DeprecatedSaveConfigValues(moduleName string, configValuesPatch utils.Values) error
SaveConfigValues(moduleName string, configValuesPatch utils.Values) error
}

type metricStorage interface {
Expand Down
17 changes: 1 addition & 16 deletions pkg/module_manager/models/modules/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,6 @@ func (bm *BasicModule) RunEnabledScript(tmpDir string, precedingEnabledModules [

logEntry := log.WithFields(utils.LabelsToLogFields(logLabels))
enabledScriptPath := filepath.Join(bm.Path, "enabled")

f, err := os.Stat(enabledScriptPath)
if os.IsNotExist(err) {
logEntry.Debugf("MODULE '%s' is ENABLED. Enabled script doesn't exist!", bm.Name)
return true, nil
} else if err != nil {
logEntry.Errorf("Cannot stat enabled script '%s': %s", enabledScriptPath, err)
return false, err
}

if !utils_file.IsFileExecutable(f) {
logEntry.Errorf("Found non-executable enabled script '%s'", enabledScriptPath)
return false, fmt.Errorf("non-executable enable script")
}

configValuesPath, err := bm.prepareConfigValuesJsonFile(tmpDir)
if err != nil {
logEntry.Errorf("Prepare CONFIG_VALUES_PATH file for '%s': %s", enabledScriptPath, err)
Expand Down Expand Up @@ -681,7 +666,7 @@ func (bm *BasicModule) executeHook(h *hooks.ModuleHook, bindingType sh_op_types.
)
}

err := bm.dc.KubeConfigManager.DeprecatedSaveConfigValues(bm.Name, configValuesPatchResult.Values)
err := bm.dc.KubeConfigManager.SaveConfigValues(bm.Name, configValuesPatchResult.Values)
if err != nil {
logEntry.Debugf("Module hook '%s' kube module config values stay unchanged:\n%s", h.GetName(), bm.valuesStorage.GetConfigValues(false).DebugString())
return fmt.Errorf("module hook '%s': set kube module config failed: %s", h.GetName(), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/module_manager/models/modules/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (gm *GlobalModule) executeHook(h *hooks.GlobalHook, bindingType sh_op_types
return fmt.Errorf("cannot apply config values patch for global values: %w", validationErr)
}

err := gm.dc.KubeConfigManager.DeprecatedSaveConfigValues(utils.GlobalValuesKey, configValuesPatchResult.Values)
err := gm.dc.KubeConfigManager.SaveConfigValues(utils.GlobalValuesKey, configValuesPatchResult.Values)
if err != nil {
logEntry.Debugf("Global hook '%s' kube config global values stay unchanged:\n%s", h.GetName(), gm.valuesStorage.GetConfigValues(false).DebugString())
return fmt.Errorf("global hook '%s': set kube config failed: %s", h.GetName(), err)
Expand Down
Loading

0 comments on commit fc642ff

Please sign in to comment.