Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Eikykun committed Aug 15, 2023
1 parent 8674768 commit 7bf7e79
Showing 1 changed file with 38 additions and 39 deletions.
77 changes: 38 additions & 39 deletions pkg/controllers/ruleset/ruleset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,8 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
}
}

stages := r.GetStages()
wg := sync.WaitGroup{}
wg.Add(len(stages))
mu := sync.RWMutex{}
var ruleStates []*appsv1alpha1.RuleState
var interval *time.Duration
var shouldRetry bool
details := map[string]*appsv1alpha1.Detail{}
oldDetails := map[string]*appsv1alpha1.Detail{}

for i, de := range ruleSet.Status.Details {
oldDetails[de.Name] = ruleSet.Status.Details[i]
}

for _, stage := range stages {
currentStage := stage
go func() {
defer wg.Done()
res := processor.NewRuleProcessor(r.Client, currentStage, ruleSet, r.Logger).Process(targetPods)
mu.Lock()
defer mu.Unlock()
if res.Interval != nil {
if interval == nil || *interval > *res.Interval {
interval = res.Interval
}
}
if res.RuleStates != nil {
ruleStates = append(ruleStates, res.RuleStates...)
}
if res.Retry {
shouldRetry = true
}
updateDetail(details, res, currentStage)
}()
}

// TODO: webhook processor

wg.Wait()
// process rules
shouldRetry, interval, details, ruleStates := r.process(ruleSet, targetPods)

res := reconcile.Result{
Requeue: shouldRetry,
Expand Down Expand Up @@ -261,10 +224,46 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
}
}

oldDetails := map[string]*appsv1alpha1.Detail{}

for i, de := range ruleSet.Status.Details {
oldDetails[de.Name] = ruleSet.Status.Details[i]
}
addQueues(ruleSet, details, oldDetails)
return res, nil
}

func (r *RuleSetReconciler) process(rs *appsv1alpha1.RuleSet, pods map[string]*corev1.Pod) (shouldRetry bool, interval *time.Duration, details map[string]*appsv1alpha1.Detail, ruleStates []*appsv1alpha1.RuleState) {
stages := r.GetStages()
wg := sync.WaitGroup{}
wg.Add(len(stages))
mu := sync.RWMutex{}
details = map[string]*appsv1alpha1.Detail{}
for _, stage := range stages {
currentStage := stage
go func() {
defer wg.Done()
res := processor.NewRuleProcessor(r.Client, currentStage, rs, r.Logger).Process(pods)
mu.Lock()
defer mu.Unlock()
if res.Interval != nil {
if interval == nil || *interval > *res.Interval {
interval = res.Interval
}
}
if res.RuleStates != nil {
ruleStates = append(ruleStates, res.RuleStates...)
}
if res.Retry {
shouldRetry = true
}
updateDetail(details, res, currentStage)
}()
}
wg.Wait()
return shouldRetry, interval, details, ruleStates
}

func addQueues(rs *appsv1alpha1.RuleSet, details map[string]*appsv1alpha1.Detail, oldDetails map[string]*appsv1alpha1.Detail) {
if len(podEventQueues) == 0 {
return
Expand Down

0 comments on commit 7bf7e79

Please sign in to comment.