Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
agent: model dependent tasks using taskChains
Browse files Browse the repository at this point in the history
  • Loading branch information
bcwaldon committed Aug 20, 2014
1 parent 2915ab2 commit ee666f1
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 191 deletions.
118 changes: 62 additions & 56 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
return
}

for t := range ar.calculateTasksForJobs(dAgentState, cAgentState) {
ar.launchTask(t, a)
for tc := range ar.calculateTaskChainsForJobs(dAgentState, cAgentState) {
ar.launchTaskChain(tc, a)
}
}

Expand All @@ -107,12 +107,12 @@ func (ar *AgentReconciler) Purge(a *Agent) {
for _, cJob := range cAgentState.Jobs {
cJob := cJob
t := task{
Type: taskTypeUnloadJob,
Job: cJob,
Reason: taskReasonPurgingAgent,
typ: taskTypeUnloadJob,
reason: taskReasonPurgingAgent,
}

ar.launchTask(&t, a)
tc := newTaskChain(cJob, t)
ar.launchTaskChain(tc, a)
}

time.Sleep(time.Second)
Expand Down Expand Up @@ -180,11 +180,11 @@ func (ar *AgentReconciler) currentAgentState(a *Agent) (*AgentState, error) {
return &as, nil
}

// calculateTasksForJobs compares the desired and current state of an Agent.
// The generateed tasks represent what should be done to make the desired
// calculateTaskChainsForJobs compares the desired and current state of an Agent.
// The generated taskChains represent what should be done to make the desired
// state match the current state.
func (ar *AgentReconciler) calculateTasksForJobs(dState, cState *AgentState) <-chan *task {
taskchan := make(chan *task)
func (ar *AgentReconciler) calculateTaskChainsForJobs(dState, cState *AgentState) <-chan taskChain {
tcChan := make(chan taskChain)
go func() {
jobs := pkg.NewUnsafeSet()
for cName := range cState.Jobs {
Expand All @@ -196,16 +196,20 @@ func (ar *AgentReconciler) calculateTasksForJobs(dState, cState *AgentState) <-c
}

for _, name := range jobs.Values() {
ar.calculateTasksForJob(dState, cState, name, taskchan)
tc := ar.calculateTaskChainForJob(dState, cState, name)
if tc == nil {
continue
}
tcChan <- *tc
}

close(taskchan)
close(tcChan)
}()

return taskchan
return tcChan
}

func (ar *AgentReconciler) calculateTasksForJob(dState, cState *AgentState, jName string, taskchan chan *task) {
func (ar *AgentReconciler) calculateTaskChainForJob(dState, cState *AgentState, jName string) *taskChain {
var dJob, cJob *job.Job
if dState != nil {
dJob = dState.Jobs[jName]
Expand All @@ -216,84 +220,86 @@ func (ar *AgentReconciler) calculateTasksForJob(dState, cState *AgentState, jNam

if dJob == nil && cJob == nil {
log.Errorf("Desired state and current state of Job(%s) nil, not sure what to do", jName)
return
return nil
}

if dJob == nil || dJob.TargetState == job.JobStateInactive {
taskchan <- &task{
Type: taskTypeUnloadJob,
Job: cJob,
Reason: taskReasonLoadedButNotScheduled,
delete(cState.Jobs, jName)

t := task{
typ: taskTypeUnloadJob,
reason: taskReasonLoadedButNotScheduled,
}

delete(cState.Jobs, jName)
return
tc := newTaskChain(cJob, t)
return &tc
}

if cJob == nil {
taskchan <- &task{
Type: taskTypeLoadJob,
Job: dJob,
Reason: taskReasonScheduledButUnloaded,
t := task{
typ: taskTypeLoadJob,
reason: taskReasonScheduledButUnloaded,
}

return
tc := newTaskChain(dJob, t)
return &tc
}

if cJob.State == nil {
log.Errorf("Current state of Job(%s) unknown, unable to reconcile", jName)
return
return nil
}

if *cJob.State == dJob.TargetState {
log.V(1).Infof("Desired state %q matches current state of Job(%s), nothing to do", *cJob.State, jName)
return
return nil
}

tc := newTaskChain(dJob)
if *cJob.State == job.JobStateInactive {
taskchan <- &task{
Type: taskTypeLoadJob,
Job: dJob,
Reason: taskReasonScheduledButUnloaded,
}
tc.Add(task{
typ: taskTypeLoadJob,
reason: taskReasonScheduledButUnloaded,
})
}

if (*cJob.State == job.JobStateInactive || *cJob.State == job.JobStateLoaded) && dJob.TargetState == job.JobStateLaunched {
taskchan <- &task{
Type: taskTypeStartJob,
Job: cJob,
Reason: taskReasonLoadedDesiredStateLaunched,
}
return
tc.Add(task{
typ: taskTypeStartJob,
reason: taskReasonLoadedDesiredStateLaunched,
})
}

if *cJob.State == job.JobStateLaunched && dJob.TargetState == job.JobStateLoaded {
taskchan <- &task{
Type: taskTypeStopJob,
Job: cJob,
Reason: taskReasonLaunchedDesiredStateLoaded,
}
return
tc.Add(task{
typ: taskTypeStopJob,
reason: taskReasonLaunchedDesiredStateLoaded,
})
}

log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#V", jName, dJob, cJob)
if len(tc.tasks) == 0 {
log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#v", jName, dJob, cJob)
return nil
}

return &tc
}

func (ar *AgentReconciler) launchTask(t *task, a *Agent) {
log.V(1).Infof("AgentReconciler attempting task: %s", t)
errchan, err := ar.tManager.Do(t, a)
func (ar *AgentReconciler) launchTaskChain(tc taskChain, a *Agent) {
log.V(1).Infof("AgentReconciler attempting task chain: %s", tc)
reschan, err := ar.tManager.Do(tc, a)
if err != nil {
log.Infof("AgentReconciler task failed: task=%s err=%v", t, err)
log.Infof("AgentReconciler task chain failed: chain=%s err=%v", tc, err)
return
}

go func() {
err = <-errchan

if err == nil {
log.Infof("AgentReconciler completed task: %s", t)
} else {
log.Infof("AgentReconciler task failed: task=%s err=%v", t, err)
for res := range reschan {
if res.err == nil {
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, tc.job.Name, res.task.reason)
} else {
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, tc.job.Name, res.task.reason, res.err)
}
}
}()
}
86 changes: 43 additions & 43 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,23 @@ func TestCalculateTasksForJob(t *testing.T) {
cState *AgentState
jName string

tasks []task
chain *taskChain
}{

// nil agent state objects should result in no tasks
{
dState: nil,
cState: nil,
jName: "foo.service",
tasks: []task{},
chain: nil,
},

// nil job should result in no tasks
{
dState: NewAgentState(&machine.MachineState{ID: "XXX"}),
cState: NewAgentState(&machine.MachineState{ID: "XXX"}),
jName: "foo.service",
tasks: []task{},
chain: nil,
},

// no work needs to be done when target state == desired state
Expand All @@ -196,7 +196,7 @@ func TestCalculateTasksForJob(t *testing.T) {
},
},
jName: "foo.service",
tasks: []task{},
chain: nil,
},

// no work needs to be done when target state == desired state
Expand All @@ -214,7 +214,7 @@ func TestCalculateTasksForJob(t *testing.T) {
},
},
jName: "foo.service",
tasks: []task{},
chain: nil,
},

// load jobs that have a loaded desired state
Expand All @@ -227,11 +227,13 @@ func TestCalculateTasksForJob(t *testing.T) {
},
cState: NewAgentState(&machine.MachineState{ID: "XXX"}),
jName: "foo.service",
tasks: []task{
task{
Type: taskTypeLoadJob,
Job: &job.Job{TargetState: jsLoaded},
Reason: taskReasonScheduledButUnloaded,
chain: &taskChain{
job: &job.Job{TargetState: jsLoaded},
tasks: []task{
task{
typ: taskTypeLoadJob,
reason: taskReasonScheduledButUnloaded,
},
},
},
},
Expand All @@ -246,11 +248,13 @@ func TestCalculateTasksForJob(t *testing.T) {
},
cState: NewAgentState(&machine.MachineState{ID: "XXX"}),
jName: "foo.service",
tasks: []task{
task{
Type: taskTypeLoadJob,
Job: &job.Job{TargetState: jsLaunched},
Reason: taskReasonScheduledButUnloaded,
chain: &taskChain{
job: &job.Job{TargetState: jsLaunched},
tasks: []task{
task{
typ: taskTypeLoadJob,
reason: taskReasonScheduledButUnloaded,
},
},
},
},
Expand All @@ -265,11 +269,13 @@ func TestCalculateTasksForJob(t *testing.T) {
},
},
jName: "foo.service",
tasks: []task{
task{
Type: taskTypeUnloadJob,
Job: &job.Job{State: &jsLoaded},
Reason: taskReasonLoadedButNotScheduled,
chain: &taskChain{
job: &job.Job{State: &jsLoaded},
tasks: []task{
task{
typ: taskTypeUnloadJob,
reason: taskReasonLoadedButNotScheduled,
},
},
},
},
Expand All @@ -284,11 +290,13 @@ func TestCalculateTasksForJob(t *testing.T) {
},
},
jName: "foo.service",
tasks: []task{
task{
Type: taskTypeUnloadJob,
Job: &job.Job{State: &jsLaunched},
Reason: taskReasonLoadedButNotScheduled,
chain: &taskChain{
job: &job.Job{State: &jsLaunched},
tasks: []task{
task{
typ: taskTypeUnloadJob,
reason: taskReasonLoadedButNotScheduled,
},
},
},
},
Expand All @@ -310,11 +318,13 @@ func TestCalculateTasksForJob(t *testing.T) {
},
},
jName: "foo.service",
tasks: []task{
task{
Type: taskTypeUnloadJob,
Job: &job.Job{State: &jsLoaded},
Reason: taskReasonLoadedButNotScheduled,
chain: &taskChain{
job: &job.Job{State: &jsLoaded},
tasks: []task{
task{
typ: taskTypeUnloadJob,
reason: taskReasonLoadedButNotScheduled,
},
},
},
},
Expand All @@ -327,19 +337,9 @@ func TestCalculateTasksForJob(t *testing.T) {
continue
}

taskchan := make(chan *task)
tasks := []task{}
go func() {
ar.calculateTasksForJob(tt.dState, tt.cState, tt.jName, taskchan)
close(taskchan)
}()

for t := range taskchan {
tasks = append(tasks, *t)
}

if !reflect.DeepEqual(tt.tasks, tasks) {
t.Errorf("case %d: calculated incorrect list of tasks\nexpected=%v\nreceived=%v\n", i, tt.tasks, tasks)
chain := ar.calculateTaskChainForJob(tt.dState, tt.cState, tt.jName)
if !reflect.DeepEqual(tt.chain, chain) {
t.Errorf("case %d: calculated incorrect task chain\nexpected=%v\nreceived=%v\n", i, tt.chain, chain)
}
}
}
Loading

0 comments on commit ee666f1

Please sign in to comment.