Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: DesiredCanaries can be set on every pass safely #8456

Merged
merged 2 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (w *deploymentWatcher) SetAllocHealth(
}

// Check if the group has autorevert set
group, ok := w.getDeployment().TaskGroups[alloc.TaskGroup]
if !ok || !group.AutoRevert {
dstate, ok := w.getDeployment().TaskGroups[alloc.TaskGroup]
if !ok || !dstate.AutoRevert {
continue
}

Expand Down Expand Up @@ -285,13 +285,13 @@ func (w *deploymentWatcher) autoPromoteDeployment(allocs []*structs.AllocListStu

// AutoPromote iff every task group is marked auto_promote and is healthy. The whole
// job version has been incremented, so we promote together. See also AutoRevert
for _, tv := range d.TaskGroups {
if !tv.AutoPromote || tv.DesiredCanaries != len(tv.PlacedCanaries) {
for _, dstate := range d.TaskGroups {
if !dstate.AutoPromote || dstate.DesiredCanaries != len(dstate.PlacedCanaries) {
return nil
}

// Find the health status of each canary
for _, c := range tv.PlacedCanaries {
for _, c := range dstate.PlacedCanaries {
for _, a := range allocs {
if c == a.ID && !a.DeploymentStatus.IsHealthy() {
return nil
Expand Down Expand Up @@ -347,8 +347,8 @@ func (w *deploymentWatcher) FailDeployment(

// Determine if we should rollback
rollback := false
for _, state := range w.getDeployment().TaskGroups {
if state.AutoRevert {
for _, dstate := range w.getDeployment().TaskGroups {
if dstate.AutoRevert {
rollback = true
break
}
Expand Down Expand Up @@ -653,14 +653,14 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {
}

fail = false
for tg, state := range d.TaskGroups {
for tg, dstate := range d.TaskGroups {
// If we are in a canary state we fail if there aren't enough healthy
// allocs to satisfy DesiredCanaries
if state.DesiredCanaries > 0 && !state.Promoted {
if state.HealthyAllocs >= state.DesiredCanaries {
if dstate.DesiredCanaries > 0 && !dstate.Promoted {
if dstate.HealthyAllocs >= dstate.DesiredCanaries {
continue
}
} else if state.HealthyAllocs >= state.DesiredTotal {
} else if dstate.HealthyAllocs >= dstate.DesiredTotal {
continue
}

Expand All @@ -685,19 +685,19 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {
func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
var next time.Time
doneTGs := w.doneGroups(d)
for name, state := range d.TaskGroups {
for name, dstate := range d.TaskGroups {
// This task group is done so we don't have to concern ourselves with
// its progress deadline.
if done, ok := doneTGs[name]; ok && done {
continue
}

if state.RequireProgressBy.IsZero() {
if dstate.RequireProgressBy.IsZero() {
continue
}

if next.IsZero() || state.RequireProgressBy.Before(next) {
next = state.RequireProgressBy
if next.IsZero() || dstate.RequireProgressBy.Before(next) {
next = dstate.RequireProgressBy
}
}
return next
Expand Down Expand Up @@ -734,15 +734,15 @@ func (w *deploymentWatcher) doneGroups(d *structs.Deployment) map[string]bool {

// Go through each group and check if it done
groups := make(map[string]bool, len(d.TaskGroups))
for name, state := range d.TaskGroups {
for name, dstate := range d.TaskGroups {
// Requires promotion
if state.DesiredCanaries != 0 && !state.Promoted {
if dstate.DesiredCanaries != 0 && !dstate.Promoted {
groups[name] = false
continue
}

// Check we have enough healthy currently running allocations
groups[name] = healthy[name] >= state.DesiredTotal
groups[name] = healthy[name] >= dstate.DesiredTotal
}

return groups
Expand Down
30 changes: 15 additions & 15 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3768,8 +3768,8 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD

// canaryIndex is the set of placed canaries in the deployment
canaryIndex := make(map[string]struct{}, len(deployment.TaskGroups))
for _, state := range deployment.TaskGroups {
for _, c := range state.PlacedCanaries {
for _, dstate := range deployment.TaskGroups {
for _, c := range dstate.PlacedCanaries {
canaryIndex[c] = struct{}{}
}
}
Expand Down Expand Up @@ -3810,12 +3810,12 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD

// Determine if we have enough healthy allocations
var unhealthyErr multierror.Error
for tg, state := range deployment.TaskGroups {
for tg, dstate := range deployment.TaskGroups {
if _, ok := groupIndex[tg]; !req.All && !ok {
continue
}

need := state.DesiredCanaries
need := dstate.DesiredCanaries
if need == 0 {
continue
}
Expand Down Expand Up @@ -4552,35 +4552,35 @@ func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *st
deploymentCopy := deployment.Copy()
deploymentCopy.ModifyIndex = index

state := deploymentCopy.TaskGroups[alloc.TaskGroup]
state.PlacedAllocs += placed
state.HealthyAllocs += healthy
state.UnhealthyAllocs += unhealthy
dstate := deploymentCopy.TaskGroups[alloc.TaskGroup]
dstate.PlacedAllocs += placed
dstate.HealthyAllocs += healthy
dstate.UnhealthyAllocs += unhealthy

// Ensure PlacedCanaries accurately reflects the alloc canary status
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
found := false
for _, canary := range state.PlacedCanaries {
for _, canary := range dstate.PlacedCanaries {
if alloc.ID == canary {
found = true
break
}
}
if !found {
state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID)
dstate.PlacedCanaries = append(dstate.PlacedCanaries, alloc.ID)
}
}

// Update the progress deadline
if pd := state.ProgressDeadline; pd != 0 {
if pd := dstate.ProgressDeadline; pd != 0 {
// If we are the first placed allocation for the deployment start the progress deadline.
if placed != 0 && state.RequireProgressBy.IsZero() {
if placed != 0 && dstate.RequireProgressBy.IsZero() {
// Use modify time instead of create time because we may in-place
// update the allocation to be part of a new deployment.
state.RequireProgressBy = time.Unix(0, alloc.ModifyTime).Add(pd)
dstate.RequireProgressBy = time.Unix(0, alloc.ModifyTime).Add(pd)
} else if healthy != 0 {
if d := alloc.DeploymentStatus.Timestamp.Add(pd); d.After(state.RequireProgressBy) {
state.RequireProgressBy = d
if d := alloc.DeploymentStatus.Timestamp.Add(pd); d.After(dstate.RequireProgressBy) {
dstate.RequireProgressBy = d
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1798,12 +1798,12 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) {
if plan.Deployment == nil {
t.Fatalf("bad: %#v", plan)
}
state, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
dstate, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
if !ok {
t.Fatalf("bad: %#v", plan)
}
if state.DesiredTotal != 10 && state.DesiredCanaries != 0 {
t.Fatalf("bad: %#v", state)
if dstate.DesiredTotal != 10 && dstate.DesiredCanaries != 0 {
t.Fatalf("bad: %#v", dstate)
}
}

Expand Down Expand Up @@ -1922,12 +1922,12 @@ func TestServiceSched_JobModify_Rolling_FullNode(t *testing.T) {
if plan.Deployment == nil {
t.Fatalf("bad: %#v", plan)
}
state, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
dstate, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
if !ok {
t.Fatalf("bad: %#v", plan)
}
if state.DesiredTotal != 5 || state.DesiredCanaries != 0 {
t.Fatalf("bad: %#v", state)
if dstate.DesiredTotal != 5 || dstate.DesiredCanaries != 0 {
t.Fatalf("bad: %#v", dstate)
}
}

Expand Down Expand Up @@ -2032,10 +2032,10 @@ func TestServiceSched_JobModify_Canaries(t *testing.T) {
}

// Ensure local state was not altered in scheduler
staleState, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
staleDState, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
require.True(t, ok)

require.Equal(t, 0, len(staleState.PlacedCanaries))
require.Equal(t, 0, len(staleDState.PlacedCanaries))

ws := memdb.NewWatchSet()

Expand Down
22 changes: 11 additions & 11 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
if requireCanary && !a.deploymentPaused && !a.deploymentFailed {
number := strategy.Canary - len(canaries)
desiredChanges.Canary += uint64(number)
if !existingDeployment {
dstate.DesiredCanaries = strategy.Canary
}

for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
Expand Down Expand Up @@ -617,18 +617,18 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru

// Cancel any non-promoted canaries from the older deployment
if a.oldDeployment != nil {
for _, s := range a.oldDeployment.TaskGroups {
if !s.Promoted {
stop = append(stop, s.PlacedCanaries...)
for _, dstate := range a.oldDeployment.TaskGroups {
if !dstate.Promoted {
stop = append(stop, dstate.PlacedCanaries...)
}
}
}

// Cancel any non-promoted canaries from a failed deployment
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed {
for _, s := range a.deployment.TaskGroups {
if !s.Promoted {
stop = append(stop, s.PlacedCanaries...)
for _, dstate := range a.deployment.TaskGroups {
if !dstate.Promoted {
stop = append(stop, dstate.PlacedCanaries...)
}
}
}
Expand All @@ -644,8 +644,8 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru
// needed by just stopping them.
if a.deployment != nil {
var canaryIDs []string
for _, s := range a.deployment.TaskGroups {
canaryIDs = append(canaryIDs, s.PlacedCanaries...)
for _, dstate := range a.deployment.TaskGroups {
canaryIDs = append(canaryIDs, dstate.PlacedCanaries...)
}

canaries = all.fromKeys(canaryIDs)
Expand Down