Skip to content

Commit

Permalink
Tweak cell sync logic a bit to decrease the number of control structures
Browse files Browse the repository at this point in the history
  • Loading branch information
mumoshu committed Jan 2, 2022
1 parent 7140044 commit 3eb45cf
Showing 1 changed file with 161 additions and 158 deletions.
319 changes: 161 additions & 158 deletions pkg/cell/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,223 +340,226 @@ func Sync(config SyncInput) error {
}
}

if !desiredVerIsBlocked {
canary := cell.Spec.UpdateStrategy.Canary
canarySteps := canary.Steps
if desiredVerIsBlocked {
log.Printf("Version %s is blocked. Please specify another version that is not blocked to start a rollout.", desiredVer)
return nil
}

passedAllCanarySteps = currentCanaryTGsWeight == 100
canary := cell.Spec.UpdateStrategy.Canary
canarySteps := canary.Steps

if len(canarySteps) > 0 && !passedAllCanarySteps {
var analysisRunList rolloutsv1alpha1.AnalysisRunList
passedAllCanarySteps = currentCanaryTGsWeight == 100

if err := runtimeClient.List(ctx, &analysisRunList, &client.ListOptions{
LabelSelector: everythingOwnedByThisCell,
}); err != nil {
return err
}
if len(canarySteps) > 0 && !passedAllCanarySteps {
var analysisRunList rolloutsv1alpha1.AnalysisRunList

var maxSuccessfulAnalysisRunStepIndex int
for _, ar := range analysisRunList.Items {
if ar.Status.Phase.Completed() {
stepIndexStr, ok := ar.Labels[LabelKeyStepIndex]
if !ok {
log.Printf("AnalysisRun %q does not have as step-index label. Perhaps this is not the one managed by okra? Skipping.", ar.Name)
continue
}
stepIndex, err := strconv.Atoi(stepIndexStr)
if err != nil {
return fmt.Errorf("parsing step index %q: %v", stepIndexStr, err)
}
if err := runtimeClient.List(ctx, &analysisRunList, &client.ListOptions{
LabelSelector: everythingOwnedByThisCell,
}); err != nil {
return err
}

if stepIndex > maxSuccessfulAnalysisRunStepIndex {
maxSuccessfulAnalysisRunStepIndex = stepIndex
}
var maxSuccessfulAnalysisRunStepIndex int
for _, ar := range analysisRunList.Items {
if ar.Status.Phase.Completed() {
stepIndexStr, ok := ar.Labels[LabelKeyStepIndex]
if !ok {
log.Printf("AnalysisRun %q does not have as step-index label. Perhaps this is not the one managed by okra? Skipping.", ar.Name)
continue
}
stepIndex, err := strconv.Atoi(stepIndexStr)
if err != nil {
return fmt.Errorf("parsing step index %q: %v", stepIndexStr, err)
}
}

ccr := cellComponentReconciler{
cell: cell,
runtimeClient: runtimeClient,
scheme: scheme,
if stepIndex > maxSuccessfulAnalysisRunStepIndex {
maxSuccessfulAnalysisRunStepIndex = stepIndex
}
}
}

STEPS:
for stepIndex, step := range canarySteps {
stepIndexStr := strconv.Itoa(stepIndex)
ccr := cellComponentReconciler{
cell: cell,
runtimeClient: runtimeClient,
scheme: scheme,
}

if a := canary.Analysis; a != nil {
// A background analysis works very much like
// Argo Rollouts Background Analysis as documented at
// https://argoproj.github.io/argo-rollouts/features/analysis/#background-analysis
// except that okra's works against clusters(backing e.g. AWSTargetGroups) instead of replicasets.
STEPS:
for stepIndex, step := range canarySteps {
stepIndexStr := strconv.Itoa(stepIndex)

start := int32(0)
if a.StartingStep != nil {
start = *a.StartingStep
}
if a := canary.Analysis; a != nil {
// A background analysis works very much like
// Argo Rollouts Background Analysis as documented at
// https://argoproj.github.io/argo-rollouts/features/analysis/#background-analysis
// except that okra's works against clusters(backing e.g. AWSTargetGroups) instead of replicasets.

if int32(stepIndex) >= start {
r, err := ccr.reconcileAnalysisRun(ctx, "bg", &a.RolloutAnalysis)
if err != nil {
return err
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}

// We accept both StepInProgress and StepPassed
// as a background analysis makes the cell degraded
// only if it failed.
}
start := int32(0)
if a.StartingStep != nil {
start = *a.StartingStep
}

if step.Analysis != nil {
r, err := ccr.reconcileAnalysisRun(ctx, stepIndexStr, step.Analysis)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else if step.Experiment != nil {
r, err := ccr.reconcileExperiment(ctx, stepIndexStr, step.Experiment)
if int32(stepIndex) >= start {
r, err := ccr.reconcileAnalysisRun(ctx, "bg", &a.RolloutAnalysis)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else if step.SetWeight != nil {
desiredStableTGsWeight -= int(*step.SetWeight)

if desiredStableTGsWeight < currentStableTGsWeight {
break STEPS
}
} else if step.Pause != nil {
r, err := ccr.reconcilePause(ctx, stepIndexStr, step.Pause)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else {
return fmt.Errorf("steps[%d]: only setWeight, analysis, and pause step are supported. got %v", stepIndex, step)
// We accept both StepInProgress and StepPassed
// as a background analysis makes the cell degraded
// only if it failed.
}
}

if stepIndex+1 == len(canarySteps) {
passedAllCanarySteps = true
if step.Analysis != nil {
r, err := ccr.reconcileAnalysisRun(ctx, stepIndexStr, step.Analysis)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else if step.Experiment != nil {
r, err := ccr.reconcileExperiment(ctx, stepIndexStr, step.Experiment)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else if step.SetWeight != nil {
desiredStableTGsWeight -= int(*step.SetWeight)

if desiredStableTGsWeight < currentStableTGsWeight {
break STEPS
}
} else if step.Pause != nil {
r, err := ccr.reconcilePause(ctx, stepIndexStr, step.Pause)
if err != nil {
return err
} else if r == ComponentInProgress {
break STEPS
} else if r == ComponentFailed {
anyStepFailed = true
break STEPS
}
} else {
return fmt.Errorf("steps[%d]: only setWeight, analysis, and pause step are supported. got %v", stepIndex, step)
}
}

if passedAllCanarySteps || len(canarySteps) == 0 {
desiredStableTGsWeight = 0
if stepIndex+1 == len(canarySteps) {
passedAllCanarySteps = true
}
}
}

if anyStepFailed {
desiredStableTGsWeight = 100
}
if passedAllCanarySteps || len(canarySteps) == 0 {
desiredStableTGsWeight = 0
}

if desiredStableTGsWeight < 0 {
return fmt.Errorf("stable tgs weight cannot be less than 0: %v", desiredStableTGsWeight)
}
if anyStepFailed {
desiredStableTGsWeight = 100
}

// Do update by step weight
var updatedTGs []okrav1alpha1.ForwardTargetGroup
if desiredStableTGsWeight < 0 {
return fmt.Errorf("stable tgs weight cannot be less than 0: %v", desiredStableTGsWeight)
}

numStableTGs := len(currentStableTGs)
// Do update by step weight
var updatedTGs []okrav1alpha1.ForwardTargetGroup

updatedStableTGs := map[string]okrav1alpha1.ForwardTargetGroup{}
numStableTGs := len(currentStableTGs)

for i, tg := range currentStableTGs {
tg := tg
updatedStableTGs := map[string]okrav1alpha1.ForwardTargetGroup{}

var weight int
for i, tg := range currentStableTGs {
tg := tg

if desiredStableTGsWeight > 0 {
weight = desiredStableTGsWeight / numStableTGs
var weight int

if i == numStableTGs-1 && numStableTGs > 1 {
weight = desiredStableTGsWeight - (weight * (numStableTGs - 1))
}
}
if desiredStableTGsWeight > 0 {
weight = desiredStableTGsWeight / numStableTGs

updatedStableTGs[tg.Name] = okrav1alpha1.ForwardTargetGroup{
Name: tg.Name,
ARN: tg.ARN,
Weight: weight,
if i == numStableTGs-1 && numStableTGs > 1 {
weight = desiredStableTGsWeight - (weight * (numStableTGs - 1))
}
}

for _, tg := range updatedStableTGs {
updatedTGs = append(updatedTGs, tg)
updatedStableTGs[tg.Name] = okrav1alpha1.ForwardTargetGroup{
Name: tg.Name,
ARN: tg.ARN,
Weight: weight,
}
}

desiredCanaryTGsWeight = 100 - desiredStableTGsWeight
for _, tg := range updatedStableTGs {
updatedTGs = append(updatedTGs, tg)
}

updatedCanatyTGs := map[string]okrav1alpha1.ForwardTargetGroup{}
desiredCanaryTGsWeight = 100 - desiredStableTGsWeight

for i, tg := range desiredTGs {
var weight int
updatedCanatyTGs := map[string]okrav1alpha1.ForwardTargetGroup{}

if desiredCanaryTGsWeight > 0 {
weight = desiredCanaryTGsWeight / numLatestTGs
for i, tg := range desiredTGs {
var weight int

if i == numLatestTGs-1 && numLatestTGs > 1 {
weight = desiredCanaryTGsWeight - (weight * (numLatestTGs - 1))
}
}
if desiredCanaryTGsWeight > 0 {
weight = desiredCanaryTGsWeight / numLatestTGs

updatedCanatyTGs[tg.Name] = okrav1alpha1.ForwardTargetGroup{
Name: tg.Name,
ARN: tg.Spec.ARN,
Weight: weight,
if i == numLatestTGs-1 && numLatestTGs > 1 {
weight = desiredCanaryTGsWeight - (weight * (numLatestTGs - 1))
}
}

for _, tg := range updatedCanatyTGs {
updatedTGs = append(updatedTGs, tg)
updatedCanatyTGs[tg.Name] = okrav1alpha1.ForwardTargetGroup{
Name: tg.Name,
ARN: tg.Spec.ARN,
Weight: weight,
}
}

sort.Slice(updatedTGs, func(i, j int) bool {
return updatedTGs[i].Name < updatedTGs[j].Name
})
for _, tg := range updatedCanatyTGs {
updatedTGs = append(updatedTGs, tg)
}

updated := make(map[string]int)
for _, tg := range updatedTGs {
updated[tg.Name] = tg.Weight
}
sort.Slice(updatedTGs, func(i, j int) bool {
return updatedTGs[i].Name < updatedTGs[j].Name
})

albConfig.Spec.Listener.Rule.Forward.TargetGroups = updatedTGs
updated := make(map[string]int)
for _, tg := range updatedTGs {
updated[tg.Name] = tg.Weight
}

currentHash := albConfig.Annotations[LabelKeyTemplateHash]
desiredHash := sync.ComputeHash(albConfig.Spec)
albConfig.Spec.Listener.Rule.Forward.TargetGroups = updatedTGs

if currentHash != desiredHash {
if currentStableTGsWeight != desiredStableTGsWeight {
log.Printf("Changing stable weight(%v): %d -> %d\n", currentStableTGsMaxVer, currentStableTGsWeight, desiredStableTGsWeight)
}
if currentCanaryTGsWeight != desiredCanaryTGsWeight {
log.Printf("Changing canary(%s) weight: %d -> %d\n", desiredVer, currentCanaryTGsWeight, desiredCanaryTGsWeight)
}
currentHash := albConfig.Annotations[LabelKeyTemplateHash]
desiredHash := sync.ComputeHash(albConfig.Spec)

metav1.SetMetaDataAnnotation(&albConfig.ObjectMeta, LabelKeyTemplateHash, desiredHash)
if currentHash != desiredHash {
if currentStableTGsWeight != desiredStableTGsWeight {
log.Printf("Changing stable weight(%v): %d -> %d\n", currentStableTGsMaxVer, currentStableTGsWeight, desiredStableTGsWeight)
}
if currentCanaryTGsWeight != desiredCanaryTGsWeight {
log.Printf("Changing canary(%s) weight: %d -> %d\n", desiredVer, currentCanaryTGsWeight, desiredCanaryTGsWeight)
}

if err := runtimeClient.Update(ctx, &albConfig); err != nil {
return err
}
metav1.SetMetaDataAnnotation(&albConfig.ObjectMeta, LabelKeyTemplateHash, desiredHash)

log.Printf("Updated target groups and weights to: %v\n", updated)
} else {
log.Printf("No change detected on AWSApplicationLoadBalancerConfig and target group weights. Skipped updating.")
if err := runtimeClient.Update(ctx, &albConfig); err != nil {
return err
}

log.Printf("Updated target groups and weights to: %v\n", updated)
} else {
log.Printf("No change detected on AWSApplicationLoadBalancerConfig and target group weights. Skipped updating.")
}

if anyStepFailed {
Expand Down Expand Up @@ -597,7 +600,7 @@ func Sync(config SyncInput) error {

log.Printf("Finishing reconcilation. desiredTargetTGsWeight=%v, passedAllCanarySteps=%v, anyStepFailed=%v, desiredVerIsBlocked=%v", desiredStableTGsWeight, passedAllCanarySteps, anyStepFailed, desiredVerIsBlocked)

if desiredStableTGsWeight == 0 && passedAllCanarySteps || anyStepFailed || desiredVerIsBlocked {
if desiredStableTGsWeight == 0 && passedAllCanarySteps || anyStepFailed {
objects := []runtime.Object{
&rolloutsv1alpha1.AnalysisRun{},
&rolloutsv1alpha1.Experiment{},
Expand Down

0 comments on commit 3eb45cf

Please sign in to comment.