Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update scheduler logic regarding terminator extenders #489

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 8 additions & 24 deletions pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,39 +291,39 @@ func (mm *ModuleManager) Init() error {

gv, err := mm.loadGlobalValues()
if err != nil {
return err
return fmt.Errorf("couldn't load global values: %w", err)
}

staticExtender, err := static_extender.NewExtender(mm.ModulesDir)
if err != nil {
return err
return fmt.Errorf("couldn't create static extender: %w", err)
}
if err := mm.moduleScheduler.AddExtender(staticExtender); err != nil {
return err
return fmt.Errorf("couldn't add static extender: %w", err)
}

err = mm.registerGlobalModule(gv.globalValues, gv.configSchema, gv.valuesSchema)
if err != nil {
return err
return fmt.Errorf("couldn't register global module: %w", err)
}

kubeConfigExtender := kube_config_extender.NewExtender(mm.dependencies.KubeConfigManager)
if err := mm.moduleScheduler.AddExtender(kubeConfigExtender); err != nil {
return err
return fmt.Errorf("couldn't add kube config extender: %w", err)
}

scriptEnabledExtender, err := script_extender.NewExtender(mm.TempDir)
if err != nil {
return err
return fmt.Errorf("couldn't create script_enabled extender: %w", err)
}

if err := mm.moduleScheduler.AddExtender(scriptEnabledExtender); err != nil {
return err
return fmt.Errorf("couldn't add scrpt_enabled extender: %w", err)
}

// by this point we must have all required scheduler extenders attached
if err := mm.moduleScheduler.ApplyExtenders(app.AppliedExtenders); err != nil {
return err
return fmt.Errorf("couldn't apply extenders to the module scheduler: %w", err)
}

return mm.registerModules(scriptEnabledExtender)
Expand Down Expand Up @@ -645,22 +645,6 @@ func (mm *ModuleManager) RunModuleHook(moduleName, hookName string, binding Bind

func (mm *ModuleManager) HandleKubeEvent(kubeEvent KubeEvent, createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo), createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo)) {
mm.LoopByBinding(OnKubernetesEvent, func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook) {
defer func() {
if err := recover(); err != nil {
logEntry := log.WithField("function", "HandleKubeEvent").WithField("event", "OnKubernetesEvent")

if gh != nil {
logEntry.WithField("GlobalHook name", gh.GetName()).WithField("GlobakHook path", gh.GetPath())
}

if mh != nil {
logEntry.WithField("ModuleHook name", mh.GetName()).WithField("ModuleHook path", mh.GetPath())
}

logEntry.Errorf("panic occurred: %s", err)
}
}()

if gh != nil {
if gh.GetHookController().CanHandleKubeEvent(kubeEvent) {
gh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
Expand Down
102 changes: 102 additions & 0 deletions pkg/module_manager/scheduler/extenders/mock/extenders_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// a bunch of mocked extenders for tests
package extenders_mock

import "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders"

type FilterOne struct{}

func (f *FilterOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterOne) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterOne) IsTerminator() bool {
return false
}

type FilterTwo struct{}

func (f *FilterTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterTwo")
}

func (f *FilterTwo) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterTwo) IsTerminator() bool {
return false
}

type FilterThree struct{}

func (f *FilterThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterThree")
}

func (f *FilterThree) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterThree) IsTerminator() bool {
return false
}

type FilterFour struct{}

func (f *FilterFour) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterFour) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *FilterFour) IsTerminator() bool {
return false
}

type TerminatorOne struct{}

func (f *TerminatorOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorOne")
}

func (f *TerminatorOne) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorOne) IsTerminator() bool {
return true
}

type TerminatorTwo struct{}

func (f *TerminatorTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorTwo")
}

func (f *TerminatorTwo) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorTwo) IsTerminator() bool {
return true
}

type TerminatorThree struct{}

func (f *TerminatorThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorThree")
}

func (f *TerminatorThree) Filter(_ string, _ map[string]string) (*bool, error) {
return nil, nil
}

func (f *TerminatorThree) IsTerminator() bool {
return true
}
92 changes: 60 additions & 32 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ var defaultAppliedExtenders = []extenders.ExtenderName{
script_extender.Name,
}

type extenderContainer struct {
ext extenders.Extender
filterAhead bool
}

type Scheduler struct {
ctx context.Context

// list of extenders to cycle over on a run
extenders []extenders.Extender
extenders []extenderContainer
extCh chan extenders.ExtenderEvent
// graph visualization
graphImage image.Image
Expand Down Expand Up @@ -66,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler {
}
return &Scheduler{
ctx: ctx,
extenders: make([]extenders.Extender, 0),
extenders: make([]extenderContainer, 0),
extCh: make(chan extenders.ExtenderEvent, 1),
dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()),
diff: make(map[string]bool),
Expand Down Expand Up @@ -222,8 +227,8 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
appliedExtenders = defaultAppliedExtenders
} else {
availableExtenders := make(map[extenders.ExtenderName]bool, len(s.extenders))
for _, ext := range s.extenders {
availableExtenders[ext.Name()] = true
for _, e := range s.extenders {
availableExtenders[e.ext.Name()] = true
}

extendersFromEnv := strings.Split(extendersEnv, ",")
Expand All @@ -246,12 +251,12 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
}
}

newExtenders := []extenders.Extender{}
newExtenders := []extenderContainer{}
for _, appliedExt := range appliedExtenders {
for _, ext := range s.extenders {
if ext.Name() == appliedExt {
newExtenders = append(newExtenders, ext)
if ne, ok := ext.(extenders.NotificationExtender); ok {
for _, e := range s.extenders {
if e.ext.Name() == appliedExt {
newExtenders = append(newExtenders, e)
if ne, ok := e.ext.(extenders.NotificationExtender); ok {
ne.SetNotifyChannel(s.ctx, s.extCh)
}
break
Expand All @@ -261,24 +266,38 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {

s.extenders = newExtenders

// set some extenders' meta
s.setExtendersMeta()

finalList := []extenders.ExtenderName{}
for _, ext := range s.extenders {
finalList = append(finalList, ext.Name())
for _, e := range s.extenders {
finalList = append(finalList, e.ext.Name())
}

log.Infof("The list of applied module extenders: [%s]", finalList)
log.Infof("The list of applied module extenders: %s", finalList)
return nil
}

// setExtendersMeta and some extra meta to the extenders that lets terminators know if there are any filtering extenders left in the list
func (s *Scheduler) setExtendersMeta() {
var filterAhead bool
for i := len(s.extenders) - 1; i >= 0; i-- {
s.extenders[i].filterAhead = filterAhead
if !filterAhead && !s.extenders[i].ext.IsTerminator() {
filterAhead = true
}
}
}

// AddExtender adds a new extender to the slice of the extenders that are used to determine modules' states
func (s *Scheduler) AddExtender(ext extenders.Extender) error {
for _, ex := range s.extenders {
if ex.Name() == ext.Name() {
for _, e := range s.extenders {
if e.ext.Name() == ext.Name() {
return fmt.Errorf("extender %s already added", ext.Name())
}
}

s.extenders = append(s.extenders, ext)
s.extenders = append(s.extenders, extenderContainer{ext: ext})
return nil
}

Expand Down Expand Up @@ -403,9 +422,9 @@ func (s *Scheduler) RecalculateGraph(logLabels map[string]string) (bool, []strin

// Filter returns filtering result for the specified extender and module
func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, logLabels map[string]string) (*bool, error) {
for _, ex := range s.extenders {
if ex.Name() == extName {
return ex.Filter(moduleName, logLabels)
for _, e := range s.extenders {
if e.ext.Name() == extName {
return e.ext.Filter(moduleName, logLabels)
}
}
return nil, fmt.Errorf("extender %s not found", extName)
Expand All @@ -424,7 +443,7 @@ func (s *Scheduler) recalculateGraphState(logLabels map[string]string) ( /* Grap

names, err := graph.StableTopologicalSort(s.dag, moduleSortFunc)
if err != nil {
errList = append(errList, err.Error())
errList = append(errList, fmt.Sprintf("couldn't perform stable topological sort: %s", err.Error()))
s.errList = errList
return true, updByDiff
}
Expand All @@ -444,31 +463,40 @@ outerCycle:
moduleName := vertex.GetName()
vBuf[moduleName] = &vertexState{}

for _, ex := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against a terminator
if ok := ex.IsTerminator(); ok && !vBuf[moduleName].enabled {
continue
for _, e := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against all other terminators
if e.ext.IsTerminator() && !vBuf[moduleName].enabled && !e.filterAhead {
break
}

moduleStatus, err := ex.Filter(moduleName, logLabels)
moduleStatus, err := e.ext.Filter(moduleName, logLabels)
if err != nil {
if permanent, ok := err.(*exerror.PermanentError); ok {
errList = append(errList, permanent.Error())
errList = append(errList, fmt.Sprintf("%s extender failed to filter %s module: %s", e.ext.Name(), moduleName, permanent.Error()))
break outerCycle
}
}

if moduleStatus != nil {
// if current extender is a terminating one and it says to disable - stop cycling over remaining extenders and disable the module
if ok := ex.IsTerminator(); ok {
if !*moduleStatus && vBuf[moduleName].enabled {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
if e.ext.IsTerminator() {
// if disabled - terminate filtering
if !*moduleStatus {
if vBuf[moduleName].enabled || e.filterAhead {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
break
}

// if enabled and there are some other filtering extenders ahead - continue filtering
if e.filterAhead {
continue
}
break
}
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
}

Expand All @@ -487,8 +515,8 @@ outerCycle:
}

// reset extenders' states if needed (mostly for enabled_script extender)
for _, ex := range s.extenders {
if re, ok := ex.(extenders.ResettableExtender); ok {
for _, e := range s.extenders {
if re, ok := e.ext.(extenders.ResettableExtender); ok {
re.Reset()
}
}
Expand Down
Loading
Loading