From e4640209cac46ab70c7cf14bb718ba5c4bc9b695 Mon Sep 17 00:00:00 2001 From: Mikhail Scherba <41360396+miklezzzz@users.noreply.github.com> Date: Fri, 12 Jan 2024 15:19:07 +0400 Subject: [PATCH] [addon-operator] add module reregister (#432) Signed-off-by: Mikhail Scherba --- pkg/addon-operator/bootstrap.go | 1 + pkg/kube_config_manager/backend/backend.go | 2 +- .../backend/configmap/configmap.go | 4 +- .../kube_config_manager.go | 27 ++ pkg/module_manager/loader/fs/fs.go | 127 ++++--- pkg/module_manager/loader/loader.go | 1 + .../models/modules/events/events.go | 3 + .../models/moduleset/moduleset.go | 19 +- .../models/moduleset/moduleset_test.go | 2 + pkg/module_manager/module_manager.go | 315 +++++++++++++++++- 10 files changed, 442 insertions(+), 59 deletions(-) diff --git a/pkg/addon-operator/bootstrap.go b/pkg/addon-operator/bootstrap.go index 69563301..04003b1e 100644 --- a/pkg/addon-operator/bootstrap.go +++ b/pkg/addon-operator/bootstrap.go @@ -87,6 +87,7 @@ func (op *AddonOperator) SetupModuleManager(modulesDir string, globalHooksDir st HelmResourcesManager: op.HelmResourcesManager, MetricStorage: op.engine.MetricStorage, HookMetricStorage: op.engine.HookMetricStorage, + TaskQueues: op.engine.TaskQueues, } cfg := module_manager.ModuleManagerConfig{ diff --git a/pkg/kube_config_manager/backend/backend.go b/pkg/kube_config_manager/backend/backend.go index 93206dcc..c9382d16 100644 --- a/pkg/kube_config_manager/backend/backend.go +++ b/pkg/kube_config_manager/backend/backend.go @@ -13,7 +13,7 @@ type ConfigHandler interface { StartInformer(ctx context.Context, eventC chan config.Event) // LoadConfig loads initial modules config before starting the informer - LoadConfig(ctx context.Context) (*config.KubeConfig, error) + LoadConfig(ctx context.Context, modulesNames ...string) (*config.KubeConfig, error) // SaveConfigValues saves patches for modules in backend (if supported), overriding the configuration // Deprecated: saving values in the values source is not recommended and shouldn't be used anymore diff --git a/pkg/kube_config_manager/backend/configmap/configmap.go b/pkg/kube_config_manager/backend/configmap/configmap.go index c434b796..e97b7561 100644 --- a/pkg/kube_config_manager/backend/configmap/configmap.go +++ b/pkg/kube_config_manager/backend/configmap/configmap.go @@ -46,9 +46,9 @@ func New(logger dlogger.Logger, kubeClient *client.Client, namespace, name strin return backend } -// LoadConfig gets config from ConfigMap before starting informer. +// LoadConfig gets config from ConfigMap before starting informer (selective loading configs for a list of modules isn't implemented). // Set checksums for global section and modules. -func (b Backend) LoadConfig(ctx context.Context) (*config.KubeConfig, error) { +func (b Backend) LoadConfig(ctx context.Context, _ ...string) (*config.KubeConfig, error) { obj, err := b.getConfigMap(ctx) if err != nil { return nil, err diff --git a/pkg/kube_config_manager/kube_config_manager.go b/pkg/kube_config_manager/kube_config_manager.go index 769cf70e..d8debfac 100644 --- a/pkg/kube_config_manager/kube_config_manager.go +++ b/pkg/kube_config_manager/kube_config_manager.go @@ -72,6 +72,15 @@ func NewKubeConfigManager(ctx context.Context, bk backend.ConfigHandler, runtime } } +func (kcm *KubeConfigManager) IsModuleEnabled(moduleName string) bool { + moduleConfig, found := kcm.currentConfig.Modules[moduleName] + if !found { + return false + } + + return moduleConfig.IsEnabled != nil && *moduleConfig.IsEnabled +} + func (kcm *KubeConfigManager) Init() error { kcm.logEntry.Debug("Init: KubeConfigManager") @@ -108,6 +117,24 @@ func (kcm *KubeConfigManager) KubeConfigEventCh() chan config.KubeConfigEvent { return kcm.configEventCh } +// UpdateModuleConfig updates a single module config +func (kcm *KubeConfigManager) UpdateModuleConfig(moduleName string) error { + newModuleConfig, err := kcm.backend.LoadConfig(kcm.ctx, moduleName) + if err != nil { + return err + } + + if moduleConfig, found := newModuleConfig.Modules[moduleName]; found { + if kcm.knownChecksums != nil { + kcm.knownChecksums.Set(moduleName, moduleConfig.Checksum) + } + + kcm.currentConfig.Modules[moduleName] = moduleConfig + } + + return nil +} + // loadConfig gets config from ConfigMap before starting informer. // Set checksums for global section and modules. func (kcm *KubeConfigManager) loadConfig() error { diff --git a/pkg/module_manager/loader/fs/fs.go b/pkg/module_manager/loader/fs/fs.go index 15ef074d..b961b6ba 100644 --- a/pkg/module_manager/loader/fs/fs.go +++ b/pkg/module_manager/loader/fs/fs.go @@ -29,6 +29,78 @@ func NewFileSystemLoader(moduleDirs string, vv *validation.ValuesValidator) *Fil } } +func (fl *FileSystemLoader) getBasicModule(definition moduleDefinition, commonStaticValues utils.Values) (*modules.BasicModule, error) { + err := validateModuleName(definition.Name) + if err != nil { + return nil, err + } + + valuesModuleName := utils.ModuleNameToValuesKey(definition.Name) + initialValues := utils.Values{valuesModuleName: map[string]interface{}{}} + // build initial values + // 1. from common static values + if commonStaticValues.HasKey(valuesModuleName) { + initialValues = utils.MergeValues(initialValues, commonStaticValues) + } + + // 2. from module static values + moduleStaticValues, err := utils.LoadValuesFileFromDir(definition.Path) + if err != nil { + return nil, err + } + + if moduleStaticValues != nil { + initialValues = utils.MergeValues(initialValues, moduleStaticValues) + } + + // 3. from openapi defaults + + cb, vb, err := fl.readOpenAPIFiles(filepath.Join(definition.Path, "openapi")) + if err != nil { + return nil, err + } + + if cb != nil && vb != nil { + err = fl.valuesValidator.SchemaStorage.AddModuleValuesSchemas(valuesModuleName, cb, vb) + if err != nil { + return nil, err + } + } + + // + moduleValues, ok := initialValues[valuesModuleName].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("expect map[string]interface{} in module values") + } + + return modules.NewBasicModule(definition.Name, definition.Path, definition.Order, moduleValues, fl.valuesValidator), nil +} + +// read single directory and return BasicModule for loading +func (fl *FileSystemLoader) ReloadModule(_, modulePath string) (*modules.BasicModule, error) { + _, err := readDir(modulePath) + if err != nil { + return nil, err + } + + commonStaticValues, err := utils.LoadValuesFileFromDir(modulePath) + if err != nil { + return nil, err + } + + modDef, err := moduleFromDirName(filepath.Base(modulePath), modulePath) + if err != nil { + return nil, err + } + + bm, err := fl.getBasicModule(modDef, commonStaticValues) + if err != nil { + return nil, err + } + + return bm, nil +} + func (fl *FileSystemLoader) LoadModules() ([]*modules.BasicModule, error) { result := make([]*modules.BasicModule, 0) @@ -44,51 +116,10 @@ func (fl *FileSystemLoader) LoadModules() ([]*modules.BasicModule, error) { } for _, module := range modDefs { - err = validateModuleName(module.Name) - if err != nil { - return nil, err - } - - valuesModuleName := utils.ModuleNameToValuesKey(module.Name) - initialValues := utils.Values{valuesModuleName: map[string]interface{}{}} - // build initial values - // 1. from common static values - if commonStaticValues.HasKey(valuesModuleName) { - initialValues = utils.MergeValues(initialValues, commonStaticValues) - } - - // 2. from module static values - moduleStaticValues, err := utils.LoadValuesFileFromDir(module.Path) + bm, err := fl.getBasicModule(module, commonStaticValues) if err != nil { return nil, err } - - if moduleStaticValues != nil { - initialValues = utils.MergeValues(initialValues, moduleStaticValues) - } - - // 3. from openapi defaults - - cb, vb, err := fl.readOpenAPIFiles(filepath.Join(module.Path, "openapi")) - if err != nil { - return nil, err - } - - if cb != nil && vb != nil { - err = fl.valuesValidator.SchemaStorage.AddModuleValuesSchemas(valuesModuleName, cb, vb) - if err != nil { - return nil, err - } - } - - // - moduleValues, ok := initialValues[valuesModuleName].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("expect map[string]interface{} in module values") - } - - bm := modules.NewBasicModule(module.Name, module.Path, module.Order, moduleValues, fl.valuesValidator) - result = append(result, bm) } } @@ -102,7 +133,8 @@ type moduleDefinition struct { Order uint32 } -func (fl *FileSystemLoader) findModulesInDir(modulesDir string) ([]moduleDefinition, error) { +// checks if dir exists and returns entries +func readDir(modulesDir string) ([]os.DirEntry, error) { dirEntries, err := os.ReadDir(modulesDir) if err != nil && os.IsNotExist(err) { return nil, fmt.Errorf("path '%s' does not exist", modulesDir) @@ -111,6 +143,15 @@ func (fl *FileSystemLoader) findModulesInDir(modulesDir string) ([]moduleDefinit return nil, fmt.Errorf("listing modules directory '%s': %s", modulesDir, err) } + return dirEntries, nil +} + +func (fl *FileSystemLoader) findModulesInDir(modulesDir string) ([]moduleDefinition, error) { + dirEntries, err := readDir(modulesDir) + if err != nil { + return nil, err + } + mods := make([]moduleDefinition, 0) for _, dirEntry := range dirEntries { name, absPath, err := resolveDirEntry(modulesDir, dirEntry) diff --git a/pkg/module_manager/loader/loader.go b/pkg/module_manager/loader/loader.go index c209670d..9fb51769 100644 --- a/pkg/module_manager/loader/loader.go +++ b/pkg/module_manager/loader/loader.go @@ -6,4 +6,5 @@ import ( type ModuleLoader interface { LoadModules() ([]*modules.BasicModule, error) + ReloadModule(moduleName string, modulePath string) (*modules.BasicModule, error) } diff --git a/pkg/module_manager/models/modules/events/events.go b/pkg/module_manager/models/modules/events/events.go index 170c6262..5f2f3cd0 100644 --- a/pkg/module_manager/models/modules/events/events.go +++ b/pkg/module_manager/models/modules/events/events.go @@ -15,4 +15,7 @@ const ( type ModuleEvent struct { ModuleName string EventType ModuleEventType + + // an option for registering a module without reload + Reregister bool } diff --git a/pkg/module_manager/models/moduleset/moduleset.go b/pkg/module_manager/models/moduleset/moduleset.go index 5a297029..a8bba5d1 100644 --- a/pkg/module_manager/models/moduleset/moduleset.go +++ b/pkg/module_manager/models/moduleset/moduleset.go @@ -1,18 +1,35 @@ package moduleset import ( + "errors" "sort" "sync" "github.com/flant/addon-operator/pkg/module_manager/models/modules" ) +var ErrNotInited = errors.New("modules haven't been initialized yet") + type ModulesSet struct { lck sync.RWMutex modules map[string]*modules.BasicModule orderedNames []string + inited bool +} + +func (s *ModulesSet) SetInited() { + s.lck.Lock() + defer s.lck.Unlock() + s.inited = true +} + +func (s *ModulesSet) IsInited() bool { + s.lck.RLock() + defer s.lck.RUnlock() + return s.inited } +// adds a new module or overwrite an existing func (s *ModulesSet) Add(mods ...*modules.BasicModule) { if len(mods) == 0 { return @@ -27,7 +44,7 @@ func (s *ModulesSet) Add(mods ...*modules.BasicModule) { for _, module := range mods { // Invalidate ordered names cache. - if _, ok := s.modules[module.GetName()]; ok { + if _, ok := s.modules[module.GetName()]; !ok { s.orderedNames = nil } s.modules[module.GetName()] = module diff --git a/pkg/module_manager/models/moduleset/moduleset_test.go b/pkg/module_manager/models/moduleset/moduleset_test.go index 74c59f1a..ff1ef96a 100644 --- a/pkg/module_manager/models/moduleset/moduleset_test.go +++ b/pkg/module_manager/models/moduleset/moduleset_test.go @@ -37,6 +37,7 @@ func TestBasicModuleSet(t *testing.T) { Name: "BasicModule-four", Order: 20, }) + ms.SetInited() expectNames := []string{ "BasicModule-one", @@ -49,4 +50,5 @@ func TestBasicModuleSet(t *testing.T) { g.Expect(ms.NamesInOrder()).Should(Equal(expectNames)) g.Expect(ms.Has("BasicModule-four")).Should(BeTrue(), "should have BasicModule-four") g.Expect(ms.Get("BasicModule-four").Order).Should(Equal(uint32(20)), "should have BasicModule-four with order:20") + g.Expect(ms.IsInited()).Should(BeTrue(), "should be inited") } diff --git a/pkg/module_manager/module_manager.go b/pkg/module_manager/module_manager.go index c0df01ae..7a00513b 100644 --- a/pkg/module_manager/module_manager.go +++ b/pkg/module_manager/module_manager.go @@ -26,6 +26,7 @@ import ( "github.com/flant/addon-operator/pkg/module_manager/models/modules" "github.com/flant/addon-operator/pkg/module_manager/models/modules/events" "github.com/flant/addon-operator/pkg/module_manager/models/moduleset" + "github.com/flant/addon-operator/pkg/task" "github.com/flant/addon-operator/pkg/utils" "github.com/flant/addon-operator/pkg/values/validation" . "github.com/flant/shell-operator/pkg/hook/binding_context" @@ -36,6 +37,8 @@ import ( . "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metric_storage" "github.com/flant/shell-operator/pkg/schedule_manager" + sh_task "github.com/flant/shell-operator/pkg/task" + "github.com/flant/shell-operator/pkg/task/queue" utils_checksum "github.com/flant/shell-operator/pkg/utils/checksum" ) @@ -62,6 +65,9 @@ type DirectoryConfig struct { type KubeConfigManager interface { SaveConfigValues(key string, values utils.Values) error + IsModuleEnabled(moduleName string) bool + UpdateModuleConfig(moduleName string) error + SafeReadConfig(handler func(config *config.KubeConfig)) } // ModuleManagerDependencies pass dependencies for ModuleManager @@ -74,6 +80,7 @@ type ModuleManagerDependencies struct { HelmResourcesManager helm_resources_manager.HelmResourcesManager MetricStorage *metric_storage.MetricStorage HookMetricStorage *metric_storage.MetricStorage + TaskQueues *queue.TaskQueueSet } type ModuleManagerConfig struct { @@ -111,7 +118,7 @@ type ModuleManager struct { enabledModulesByConfig map[string]struct{} // List of effectively enabled modules after running enabled scripts. - enabledModules []string + enabledModules *eModules globalSynchronizationState *modules.SynchronizationState @@ -124,6 +131,47 @@ type ModuleManager struct { moduleEventC chan events.ModuleEvent } +type eModules struct { + lock sync.RWMutex + modules []string +} + +func (m *eModules) Add(name string) { + m.lock.Lock() + defer m.lock.Unlock() + m.modules = append(m.modules, name) +} + +func (m *eModules) Delete(name string) { + m.lock.Lock() + defer m.lock.Unlock() + for i, n := range m.modules { + if n == name { + m.modules[i] = m.modules[len(m.modules)-1] + m.modules = m.modules[:len(m.modules)-1] + break + } + } +} + +func (m *eModules) Replace(modules []string) { + m.lock.Lock() + defer m.lock.Unlock() + m.modules = modules +} + +func (m *eModules) GetAll() []string { + m.lock.RLock() + defer m.lock.RUnlock() + return m.modules +} + +func (m *eModules) String() string { + m.lock.RLock() + defer m.lock.RUnlock() + return fmt.Sprintf("%v", m.modules) +} + // NewModuleManager returns new MainModuleManager func NewModuleManager(ctx context.Context, cfg *ModuleManagerConfig) *ModuleManager { cctx, cancel := context.WithCancel(ctx) @@ -149,7 +197,7 @@ func NewModuleManager(ctx context.Context, cfg *ModuleManagerConfig) *ModuleMana modules: new(moduleset.ModulesSet), enabledModulesByConfig: make(map[string]struct{}), - enabledModules: make([]string, 0), + enabledModules: &eModules{modules: make([]string, 0)}, dynamicEnabled: make(map[string]*bool), globalSynchronizationState: modules.NewSynchronizationState(), @@ -281,7 +329,7 @@ func (mm *ModuleManager) HandleNewKubeConfig(kubeConfig *config.KubeConfig) (*Mo // Return list of changed modules when only values are changed. if len(modulesChanged) > 0 { return &ModulesState{ - AllEnabledModules: mm.enabledModules, + AllEnabledModules: mm.enabledModules.GetAll(), ModulesToReload: modulesChanged, }, nil } @@ -492,7 +540,7 @@ func (mm *ModuleManager) RefreshStateFromHelmReleases(logLabels map[string]strin state := mm.stateFromHelmReleases(releasedModules) // Initiate enabled modules list. - mm.enabledModules = state.AllEnabledModules + mm.enabledModules.Replace(state.AllEnabledModules) return state, nil } @@ -564,11 +612,11 @@ func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*Modu // of enabled modules after running enabled scripts. // Newly enabled modules are that present in the list after running enabled scripts // but not present in the list of currently enabled modules. - newlyEnabledModules := utils.ListSubtract(enabledModules, mm.enabledModules) + newlyEnabledModules := utils.ListSubtract(enabledModules, mm.enabledModules.GetAll()) // Disabled modules are that present in the list of currently enabled modules // but not present in the list after running enabled scripts - disabledModules := utils.ListSubtract(mm.enabledModules, enabledModules) + disabledModules := utils.ListSubtract(mm.enabledModules.GetAll(), enabledModules) disabledModules = utils.SortReverseByReference(disabledModules, mm.modules.NamesInOrder()) logEntry.Debugf("Refresh state results:\n"+ @@ -582,13 +630,13 @@ func (mm *ModuleManager) RefreshEnabledState(logLabels map[string]string) (*Modu newlyEnabledModules) // Update state - mm.enabledModules = enabledModules + mm.enabledModules.Replace(enabledModules) - mm.global.SetEnabledModules(mm.enabledModules) + mm.global.SetEnabledModules(mm.enabledModules.GetAll()) // Return lists for ConvergeModules task. return &ModulesState{ - AllEnabledModules: mm.enabledModules, + AllEnabledModules: mm.enabledModules.GetAll(), ModulesToDisable: disabledModules, ModulesToEnable: newlyEnabledModules, }, nil @@ -603,12 +651,28 @@ func (mm *ModuleManager) GetModuleNames() []string { } func (mm *ModuleManager) GetEnabledModuleNames() []string { - return mm.enabledModules + return mm.enabledModules.GetAll() +} + +func (mm *ModuleManager) AddEnabledModuleName(name string) { + mm.enabledModules.Add(name) +} + +func (mm *ModuleManager) DeleteEnabledModuleName(name string) { + mm.enabledModules.Delete(name) +} + +func (mm *ModuleManager) AddEnabledModuleByConfigName(name string) { + mm.enabledModulesByConfig[name] = struct{}{} +} + +func (mm *ModuleManager) DeleteEnabledModuleByConfigName(name string) { + delete(mm.enabledModulesByConfig, name) } // IsModuleEnabled ... func (mm *ModuleManager) IsModuleEnabled(moduleName string) bool { - for _, modName := range mm.enabledModules { + for _, modName := range mm.enabledModules.GetAll() { if modName == moduleName { return true } @@ -891,7 +955,7 @@ func (mm *ModuleManager) LoopByBinding(binding BindingType, fn func(gh *hooks.Gl fn(gh, nil, nil) } - for _, moduleName := range mm.enabledModules { + for _, moduleName := range mm.enabledModules.GetAll() { m := mm.GetModule(moduleName) moduleHooks := m.GetHooks(binding) for _, mh := range moduleHooks { @@ -1042,6 +1106,185 @@ func mergeEnabled(enabledFlags ...*bool) bool { return result } +// PushDeleteModule push moduleDelete task for a module into the main queue +func (mm *ModuleManager) PushDeleteModule(moduleName string) { + // check if there is already moduleDelete task in the main queue for the module + if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) { + return + } + + newTask := sh_task.NewTask(task.ModuleDelete). + WithQueueName("main"). + WithMetadata(task.HookMetadata{ + EventDescription: "ModuleManager-Delete-Module", + ModuleName: moduleName, + }) + newTask.SetProp("triggered-by", "ModuleManager") + + mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now())) +} + +// PushRerunModule push moduleRun task for a module into the main queue +func (mm *ModuleManager) PushRerunModule(moduleName string) error { + err := mm.dependencies.KubeConfigManager.UpdateModuleConfig(moduleName) + if err != nil { + return fmt.Errorf("couldn't update module %s kube config: %w", moduleName, err) + } + + mm.dependencies.KubeConfigManager.SafeReadConfig(func(config *config.KubeConfig) { + _, err = mm.HandleNewKubeConfig(config) + }) + if err != nil { + return fmt.Errorf("couldn't reload kube config: %s", err) + } + + // check if there is already moduleRun task in the main queue for the module + if queueHasPendingModuleRunTaskWithStartup(mm.dependencies.TaskQueues.GetMain(), moduleName) { + return nil + } + + newTask := sh_task.NewTask(task.ModuleRun). + WithQueueName("main"). + WithMetadata(task.HookMetadata{ + EventDescription: "ModuleManager-Update-Module", + ModuleName: moduleName, + DoModuleStartup: true, + }) + newTask.SetProp("triggered-by", "ModuleManager") + + mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now())) + + return nil +} + +// AreModulesInited returns true if moduleset has already been initialized +func (mm *ModuleManager) AreModulesInited() bool { + return mm.modules.IsInited() +} + +// ReregisterModule disable and deregister modules' hooks, and reload module config +func (mm *ModuleManager) ReregisterModule(moduleName, modulePath string) error { + if !mm.modules.IsInited() { + return moduleset.ErrNotInited + } + + if mm.ModulesDir == "" { + log.Warnf("Empty modules directory is passed! No modules to load.") + return nil + } + + if mm.moduleLoader == nil { + log.Errorf("no module loader set") + return fmt.Errorf("no module loader set") + } + + mod, err := mm.moduleLoader.ReloadModule(moduleName, modulePath) + if err != nil { + return fmt.Errorf("failed to get module's definition: %w", err) + } + + // load and registry global hooks + dep := &hooks.HookExecutionDependencyContainer{ + HookMetricsStorage: mm.dependencies.HookMetricStorage, + KubeConfigManager: mm.dependencies.KubeConfigManager, + KubeObjectPatcher: mm.dependencies.KubeObjectPatcher, + MetricStorage: mm.dependencies.MetricStorage, + GlobalValuesGetter: mm.global, + } + + mod.WithDependencies(dep) + + // check if module exists + if mm.modules.Has(mod.GetName()) { + // if module is disabled in module manager + if !mm.IsModuleEnabled(mod.GetName()) { + // and disabled in the module kube config - update and exit + if !mm.dependencies.KubeConfigManager.IsModuleEnabled(mod.GetName()) { + mm.modules.Add(mod) + return nil + } + // if the module kube config has enabled true, check enable scripts + mm.modules.Add(mod) + module := mm.GetModule(mod.GetName()) + isEnabled, err := module.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{}) + if err != nil { + return err + } + + if isEnabled { + mm.AddEnabledModuleName(mod.GetName()) + mm.AddEnabledModuleByConfigName(mod.GetName()) + // enqueue module startup sequence if it is enabled + err := mm.PushRerunModule(mod.GetName()) + if err != nil { + return err + } + mm.SendModuleEvent(events.ModuleEvent{ + ModuleName: mod.GetName(), + EventType: events.ModuleEnabled, + }) + } + + return nil + } + + // module is enabled, disable its hooks + mm.DisableModuleHooks(mod.GetName()) + + module := mm.GetModule(mod.GetName()) + // check for nil to prevent operator from panicking + if module == nil { + return fmt.Errorf("couldn't get %s module configuration", mod.GetName()) + } + + // deregister modules' hooks + module.DeregisterHooks() + + // upsert a new module in the moduleset + mm.modules.Add(mod) + + // get new module version + module = mm.GetModule(mod.GetName()) + + // check if module is enabled via enabled scripts + isEnabled, err := module.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{}) + if err != nil { + return err + } + + if isEnabled { + // enqueue module startup sequence if it is enabled + mm.PushRerunModule(mod.GetName()) + if err != nil { + return err + } + } else { + mm.DeleteEnabledModuleName(mod.GetName()) + // enqueue module delete sequence if it is disabled + mm.PushDeleteModule(mod.GetName()) + // throw disable module event + mm.SendModuleEvent(events.ModuleEvent{ + ModuleName: mod.GetName(), + EventType: events.ModuleDisabled, + }) + } + + return nil + } + + // module doesn't exist + mm.modules.Add(mod) + + // a new module requires to be registered, possibly fast + mm.SendModuleEvent(events.ModuleEvent{ + ModuleName: mod.GetName(), + EventType: events.ModuleRegistered, + Reregister: true, + }) + + return nil +} + // registerModules load all available modules from modules directory. func (mm *ModuleManager) registerModules() error { if mm.ModulesDir == "" { @@ -1091,6 +1334,7 @@ func (mm *ModuleManager) registerModules() error { log.Debugf("Found modules: %v", set.NamesInOrder()) mm.modules = set + mm.modules.SetInited() return nil } @@ -1161,3 +1405,50 @@ func loadStaticValues(moduleName, modulePath string) (utils.Values, error) { return utils.LoadValuesFileFromDir(modulePath) } + +// queueHasPendingModuleRunTaskWithStartup returns true if queue has pending tasks +// with the type "ModuleRun" related to the module "moduleName" and DoModuleStartup is set to true. +func queueHasPendingModuleRunTaskWithStartup(q *queue.TaskQueue, moduleName string) bool { + if q == nil { + return false + } + modules := modulesWithPendingTasks(q, task.ModuleRun) + meta, has := modules[moduleName] + return has && meta.doStartup +} + +// queueHasPendingModuleDeleteTask returns true if queue has pending tasks +// with the type "ModuleDelete" related to the module "moduleName" +func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool { + if q == nil { + return false + } + modules := modulesWithPendingTasks(q, task.ModuleDelete) + meta, has := modules[moduleName] + return has && meta.doStartup +} + +func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[string]struct{ doStartup bool } { + if q == nil { + return nil + } + + modules := make(map[string]struct{ doStartup bool }) + + skipFirstTask := true + + q.Iterate(func(t sh_task.Task) { + // Skip the first task in the queue as it can be executed already, i.e. "not pending". + if skipFirstTask { + skipFirstTask = false + return + } + + if t.GetType() == taskType { + hm := task.HookMetadataAccessor(t) + modules[hm.ModuleName] = struct{ doStartup bool }{doStartup: hm.DoModuleStartup} + } + }) + + return modules +}