Skip to content

Commit

Permalink
update scheduler logic regarding terminator extenders
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Scherba <mikhail.scherba@flant.com>
  • Loading branch information
miklezzzz committed Jul 11, 2024
1 parent 746f6f2 commit 53da74a
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 35 deletions.
109 changes: 109 additions & 0 deletions pkg/module_manager/scheduler/extenders/mock/extenders_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// a bunch of mocked extenders for the test purpose
package extenders_mock

import "github.com/flant/addon-operator/pkg/module_manager/scheduler/extenders"

type FilterOne struct {
}

func (f *FilterOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterOne) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 13 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *FilterOne) IsTerminator() bool {
return false
}

type FilterTwo struct {
}

func (f *FilterTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterTwo")
}

func (f *FilterTwo) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 28 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *FilterTwo) IsTerminator() bool {
return false
}

type FilterThree struct {
}

func (f *FilterThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterThree")
}

func (f *FilterThree) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 43 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *FilterThree) IsTerminator() bool {
return false
}

type FilterFour struct {
}

func (f *FilterFour) Name() extenders.ExtenderName {
return extenders.ExtenderName("FilterOne")
}

func (f *FilterFour) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 58 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *FilterFour) IsTerminator() bool {
return false
}

type TerminatorOne struct {
}

func (f *TerminatorOne) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorOne")
}

func (f *TerminatorOne) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 73 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *TerminatorOne) IsTerminator() bool {
return true
}

type TerminatorTwo struct {
}

func (f *TerminatorTwo) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorTwo")
}

func (f *TerminatorTwo) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 88 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *TerminatorTwo) IsTerminator() bool {
return true
}

type TerminatorThree struct {
}

func (f *TerminatorThree) Name() extenders.ExtenderName {
return extenders.ExtenderName("TerminatorThree")
}

func (f *TerminatorThree) Filter(moduleName string, logLabels map[string]string) (*bool, error) {

Check failure on line 103 in pkg/module_manager/scheduler/extenders/mock/extenders_mock.go

View workflow job for this annotation

GitHub Actions / Run Go linters

unused-parameter: parameter 'moduleName' seems to be unused, consider removing or renaming it as _ (revive)
return nil, nil
}

func (f *TerminatorThree) IsTerminator() bool {
return true
}
95 changes: 62 additions & 33 deletions pkg/module_manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ var defaultAppliedExtenders = []extenders.ExtenderName{
script_extender.Name,
}

type extenderContainer struct {
ext extenders.Extender
filterAhead bool
}

type Scheduler struct {
ctx context.Context

// list of extenders to cycle over on a run
extenders []extenders.Extender
extenders []extenderContainer
extCh chan extenders.ExtenderEvent
// graph visualization
graphImage image.Image
Expand Down Expand Up @@ -66,7 +71,7 @@ func NewScheduler(ctx context.Context) *Scheduler {
}
return &Scheduler{
ctx: ctx,
extenders: make([]extenders.Extender, 0),
extenders: make([]extenderContainer, 0),
extCh: make(chan extenders.ExtenderEvent, 1),
dag: graph.New(nodeHash, graph.Directed(), graph.Acyclic()),
diff: make(map[string]bool),
Expand Down Expand Up @@ -222,8 +227,8 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
appliedExtenders = defaultAppliedExtenders
} else {
availableExtenders := make(map[extenders.ExtenderName]bool, len(s.extenders))
for _, ext := range s.extenders {
availableExtenders[ext.Name()] = true
for _, e := range s.extenders {
availableExtenders[e.ext.Name()] = true
}

extendersFromEnv := strings.Split(extendersEnv, ",")
Expand All @@ -246,12 +251,12 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {
}
}

newExtenders := []extenders.Extender{}
newExtenders := []extenderContainer{}
for _, appliedExt := range appliedExtenders {
for _, ext := range s.extenders {
if ext.Name() == appliedExt {
newExtenders = append(newExtenders, ext)
if ne, ok := ext.(extenders.NotificationExtender); ok {
for _, e := range s.extenders {
if e.ext.Name() == appliedExt {
newExtenders = append(newExtenders, e)
if ne, ok := e.ext.(extenders.NotificationExtender); ok {
ne.SetNotifyChannel(s.ctx, s.extCh)
}
break
Expand All @@ -261,24 +266,38 @@ func (s *Scheduler) ApplyExtenders(extendersEnv string) error {

s.extenders = newExtenders

// set some extenders' meta
s.setExtendersMeta()

finalList := []extenders.ExtenderName{}
for _, ext := range s.extenders {
finalList = append(finalList, ext.Name())
for _, e := range s.extenders {
finalList = append(finalList, e.ext.Name())
}

log.Infof("The list of applied module extenders: [%s]", finalList)
log.Infof("The list of applied module extenders: %s", finalList)
return nil
}

// setExtendersMeta andd some extra meta to the extenders that lets terminators know if there are any filtering extenders left in the list

Check failure on line 281 in pkg/module_manager/scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / Run Go linters

`andd` is a misspelling of `and` (misspell)
func (s *Scheduler) setExtendersMeta() {
var filterAhead bool
for i := len(s.extenders) - 1; i >= 0; i-- {
s.extenders[i].filterAhead = filterAhead
if !filterAhead && !s.extenders[i].ext.IsTerminator() {
filterAhead = true
}
}
}

// AddExtender adds a new extender to the slice of the extenders that are used to determine modules' states
func (s *Scheduler) AddExtender(ext extenders.Extender) error {
for _, ex := range s.extenders {
if ex.Name() == ext.Name() {
for _, e := range s.extenders {
if e.ext.Name() == ext.Name() {
return fmt.Errorf("extender %s already added", ext.Name())
}
}

s.extenders = append(s.extenders, ext)
s.extenders = append(s.extenders, extenderContainer{ext: ext})
return nil
}

Expand Down Expand Up @@ -403,9 +422,9 @@ func (s *Scheduler) RecalculateGraph(logLabels map[string]string) (bool, []strin

// Filter returns filtering result for the specified extender and module
func (s *Scheduler) Filter(extName extenders.ExtenderName, moduleName string, logLabels map[string]string) (*bool, error) {
for _, ex := range s.extenders {
if ex.Name() == extName {
return ex.Filter(moduleName, logLabels)
for _, e := range s.extenders {
if e.ext.Name() == extName {
return e.ext.Filter(moduleName, logLabels)
}
}
return nil, fmt.Errorf("extender %s not found", extName)
Expand All @@ -424,7 +443,7 @@ func (s *Scheduler) recalculateGraphState(logLabels map[string]string) ( /* Grap

names, err := graph.StableTopologicalSort(s.dag, moduleSortFunc)
if err != nil {
errList = append(errList, err.Error())
errList = append(errList, fmt.Sprintf("couldn't perform stable topological sort: %s", err.Error()))
s.errList = errList
return true, updByDiff
}
Expand All @@ -444,31 +463,41 @@ outerCycle:
moduleName := vertex.GetName()
vBuf[moduleName] = &vertexState{}

for _, ex := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against a terminator
if ok := ex.IsTerminator(); ok && !vBuf[moduleName].enabled {
continue
for _, e := range s.extenders {
// if current extender is a terminating one and by this point the module is already disabled - there's little sense in checking against all other terminators
if e.ext.IsTerminator() && !vBuf[moduleName].enabled && !e.filterAhead {
break
}

moduleStatus, err := ex.Filter(moduleName, logLabels)
moduleStatus, err := e.ext.Filter(moduleName, logLabels)
if err != nil {
if permanent, ok := err.(*exerror.PermanentError); ok {
errList = append(errList, permanent.Error())
errList = append(errList, fmt.Sprintf("%s extender failed to filter %s module: %s", e.ext.Name(), moduleName, permanent.Error()))
break outerCycle
}
}

if moduleStatus != nil {
// if current extender is a terminating one and it says to disable - stop cycling over remaining extenders and disable the module
if ok := ex.IsTerminator(); ok {
if !*moduleStatus && vBuf[moduleName].enabled {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
if e.ext.IsTerminator() {
// if disabled - terminate filtering
if !*moduleStatus {
if vBuf[moduleName].enabled || e.filterAhead {
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
break
}

// if enabled and there are some other filtering extenders ahead - continue filtering
if e.filterAhead {
continue
} else {

Check failure on line 495 in pkg/module_manager/scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / Run Go linters

superfluous-else: if block ends with a continue statement, so drop this else and outdent its block (revive)
break
}
break
}
vBuf[moduleName].enabled = *moduleStatus
vBuf[moduleName].updatedBy = string(ex.Name())
vBuf[moduleName].updatedBy = string(e.ext.Name())
}
}

Expand All @@ -487,8 +516,8 @@ outerCycle:
}

// reset extenders' states if needed (mostly for enabled_script extender)
for _, ex := range s.extenders {
if re, ok := ex.(extenders.ResettableExtender); ok {
for _, e := range s.extenders {
if re, ok := e.ext.(extenders.ResettableExtender); ok {
re.Reset()
}
}
Expand Down
Loading

0 comments on commit 53da74a

Please sign in to comment.