Skip to content

Commit

Permalink
engine: fix bugs in rescheduling for replaced units
Browse files Browse the repository at this point in the history
While the 'Replaces' option has been supported since
coreos#1572, the engine didn't actually
unschedule units to be replaced. It was a bug.

So let's implement GetReplacedUnit() to expose the replaced unit from
AgentState to the engine reconciler. And make the engine reconciler
unschedule the replaced unit, and schedule the current unit.
The engine scheduler's decision structure needs to have a helper for
the rescheduling case, by simply scheduling the replaced unit to a
free machine.
  • Loading branch information
Dongsu Park committed Nov 3, 2016
1 parent 5ecd518 commit f54249e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 1 deletion.
8 changes: 8 additions & 0 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,11 @@ func (as *AgentState) AbleToRun(j *job.Job) (jobAction job.JobAction, errstr str

return job.JobActionSchedule, ""
}

func (as *AgentState) GetReplacedUnit(j *job.Job) (string, error) {
cExists, replaced := as.hasReplace(j.Name, j.Replaces())
if !cExists {
return "", fmt.Errorf("cannot find units to be replaced for Unit(%s)", j.Name)
}
return replaced, nil
}
66 changes: 66 additions & 0 deletions engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,62 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
return job.JobActionSchedule, ""
}

handle_reschedule := func(j *job.Job, reason string) bool {
isRescheduled := false

agents := clust.agents()

as, ok := agents[j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return false
}

for _, cj := range clust.jobs {
if !cj.Scheduled() {
continue
}
if j.Name != cj.Name {
continue
}

replacedUnit, err := as.GetReplacedUnit(j)
if err != nil {
log.Debugf("No unit to reschedule: %v", err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, replacedUnit, j.TargetMachineID) {
log.Infof("Job(%s) unschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

dec, err := r.sched.DecideReschedule(clust, j)
if err != nil {
log.Debugf("Unable to schedule Job(%s): %v", j.Name, err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeAttemptScheduleUnit, reason, replacedUnit, dec.machineID) {
log.Infof("Job(%s) attemptschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}
clust.schedule(replacedUnit, dec.machineID)
log.Debugf("rescheduling unit %s to machine %s", replacedUnit, dec.machineID)

clust.schedule(j.Name, j.TargetMachineID)
log.Debugf("scheduling unit %s to machine %s", j.Name, j.TargetMachineID)

isRescheduled = true
}

return isRescheduled
}

go func() {
defer close(taskchan)

Expand All @@ -115,14 +171,24 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
}

act, reason := decide(j)
if act == job.JobActionReschedule && handle_reschedule(j, reason) {
log.Debugf("Job(%s) is rescheduled: %v", j.Name, reason)
continue
}

if act != job.JobActionUnschedule {
log.Debugf("Job(%s) is not to be unscheduled, reason: %v", j.Name, reason)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, j.Name, j.TargetMachineID) {
log.Infof("Job(%s) send failed.", j.Name)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
return
}

log.Debugf("Job(%s) unscheduling.", j.Name)
clust.unschedule(j.Name)
}

Expand Down
35 changes: 34 additions & 1 deletion engine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type decision struct {

type Scheduler interface {
Decide(*clusterState, *job.Job) (*decision, error)
DecideReschedule(*clusterState, *job.Job) (*decision, error)
}

type leastLoadedScheduler struct{}
Expand All @@ -41,7 +42,8 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis

var target *agent.AgentState
for _, as := range agents {
if act, _ := as.AbleToRun(j); act != job.JobActionSchedule {
act, _ := as.AbleToRun(j)
if act == job.JobActionUnschedule {
continue
}

Expand All @@ -61,6 +63,37 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis
return &dec, nil
}

func (lls *leastLoadedScheduler) DecideReschedule(clust *clusterState, j *job.Job) (*decision, error) {
agents := lls.sortedAgents(clust)

if len(agents) == 0 {
return nil, fmt.Errorf("zero agents available")
}

found := false
var target *agent.AgentState
for _, as := range agents {
if as.MState.ID == j.TargetMachineID {
continue
}

as := as
target = as
found = true
break
}

if !found {
return nil, fmt.Errorf("no agents able to run job")
}

dec := decision{
machineID: target.MState.ID,
}

return &dec, nil
}

// sortedAgents returns a list of AgentState objects sorted ascending
// by the number of scheduled units
func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState {
Expand Down

0 comments on commit f54249e

Please sign in to comment.