Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1134 from bcwaldon/catasktrophe
Browse files Browse the repository at this point in the history
Ordered task execution
  • Loading branch information
bcwaldon committed Feb 27, 2015
2 parents 29651bd + 7b44072 commit a35ee29
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 344 deletions.
159 changes: 74 additions & 85 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package agent

import (
"fmt"
"sort"
"time"

"github.com/coreos/fleet/job"
Expand Down Expand Up @@ -79,9 +80,8 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
return
}

for tc := range ar.calculateTaskChainsForUnits(dAgentState, cAgentState) {
ar.launchTaskChain(tc, a)
}
tasks := ar.calculateTasksForUnits(dAgentState, cAgentState)
ar.launchTasks(tasks, a)
}

// Purge attempts to unload all Units that have been loaded locally
Expand All @@ -96,18 +96,18 @@ func (ar *AgentReconciler) Purge(a *Agent) {
return
}

var tasks []task
for name, _ := range cAgentState {
t := task{
tasks = append(tasks, task{
typ: taskTypeUnloadUnit,
reason: taskReasonPurgingAgent,
}
u := &job.Unit{
Name: name,
}
tc := newTaskChain(u, t)
ar.launchTaskChain(tc, a)
unit: &job.Unit{
Name: name,
},
})
}

ar.launchTasks(tasks, a)
time.Sleep(time.Second)
}
}
Expand Down Expand Up @@ -158,36 +158,29 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return &as, nil
}

// calculateTaskChainsForUnits compares the desired and current state of an Agent.
// The generated taskChains represent what should be done to make the desired
// state match the current state.
func (ar *AgentReconciler) calculateTaskChainsForUnits(dState *AgentState, cState unitStates) <-chan taskChain {
tcChan := make(chan taskChain)
go func() {
jobs := pkg.NewUnsafeSet()
for cName := range cState {
jobs.Add(cName)
}

for dName := range dState.Units {
jobs.Add(dName)
}
// calculateTasksForUnits compares the desired and current state of an Agent.
// The generated tasks represent what, in order, should be done to make the
// desired state match the current state.
func (ar *AgentReconciler) calculateTasksForUnits(dState *AgentState, cState unitStates) []task {
jobs := pkg.NewUnsafeSet()
for cName := range cState {
jobs.Add(cName)
}

for _, name := range jobs.Values() {
tc := ar.calculateTaskChainForUnit(dState, cState, name)
if tc == nil {
continue
}
tcChan <- *tc
}
for dName := range dState.Units {
jobs.Add(dName)
}

close(tcChan)
}()
var tasks []task
for _, name := range jobs.Values() {
tasks = append(tasks, ar.calculateTasksForUnit(dState, cState, name)...)
}

return tcChan
sort.Sort(sortableTasks(tasks))
return tasks
}

func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState unitStates, jName string) *taskChain {
func (ar *AgentReconciler) calculateTasksForUnit(dState *AgentState, cState unitStates, jName string) (tasks []task) {
var dJob *job.Unit
var dJHash string
if dState != nil {
Expand All @@ -207,119 +200,115 @@ func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState
return nil
}

u := &job.Unit{
Name: jName,
}

if dJob == nil || dJob.TargetState == job.JobStateInactive {
if cJState == nil {
return nil
}
t := task{
tasks = append(tasks, task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButNotScheduled,
}
u := &job.Unit{
Name: jName,
}
tc := newTaskChain(u, t)
return &tc
unit: u,
})
return
}

u := &job.Unit{
Name: jName,
Unit: dJob.Unit,
}
u.Unit = dJob.Unit

if cJState == nil {
tc := newTaskChain(u)
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
})

// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

return &tc
return
}

if cJHash != dJHash {
log.Debugf("Desired hash %q differs to current hash %s of Job(%s) - unloading", dJHash, cJHash, jName)
tc := newTaskChain(u)
tc.Add(task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
})

// queue the correct unit for loading immediately after unloading the old one
tc.Add(task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
})
tasks = append(tasks,
task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
unit: u,
},
task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
},
)

// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

return &tc
return
}

if *cJState == dJob.TargetState {
log.Debugf("Desired state %q matches current state of Job(%s), nothing to do", *cJState, jName)
return nil
}

tc := newTaskChain(u)
if *cJState == job.JobStateInactive {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
})
}

if (*cJState == job.JobStateInactive || *cJState == job.JobStateLoaded) && dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

if *cJState == job.JobStateLaunched && dJob.TargetState == job.JobStateLoaded {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStopUnit,
reason: taskReasonLaunchedDesiredStateLoaded,
unit: u,
})
}

if len(tc.tasks) == 0 {
if len(tasks) == 0 {
log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#v", jName, dJob, cJState)
return nil
}

return &tc
return
}

func (ar *AgentReconciler) launchTaskChain(tc taskChain, a *Agent) {
log.Debugf("AgentReconciler attempting task chain %s", tc)
reschan, err := ar.tManager.Do(tc, a)
if err != nil {
log.Infof("AgentReconciler task chain failed: chain=%s err=%v", tc, err)
return
}

go func() {
for res := range reschan {
if res.err == nil {
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, tc.unit.Name, res.task.reason)
} else {
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, tc.unit.Name, res.task.reason, res.err)
}
func (ar *AgentReconciler) launchTasks(tasks []task, a *Agent) {
log.Debugf("AgentReconciler attempting tasks %s", tasks)
results := ar.tManager.Do(tasks, a)
for _, res := range results {
if res.err == nil {
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, res.task.unit.Name, res.task.reason)
} else {
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, res.task.unit.Name, res.task.reason, res.err)
}
}()
}
}
Loading

0 comments on commit a35ee29

Please sign in to comment.