From ee666f173fc939fd241066ee91a5d411f1ae382e Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 11 Aug 2014 17:24:27 -0700 Subject: [PATCH] agent: model dependent tasks using taskChains --- agent/reconcile.go | 118 +++++++++++++++++++++------------------- agent/reconcile_test.go | 86 ++++++++++++++--------------- agent/task.go | 91 ++++++++++++++++++------------- agent/task_test.go | 70 ++++++------------------ 4 files changed, 174 insertions(+), 191 deletions(-) diff --git a/agent/reconcile.go b/agent/reconcile.go index 7ca0ac3f6..ff6f976e9 100644 --- a/agent/reconcile.go +++ b/agent/reconcile.go @@ -86,8 +86,8 @@ func (ar *AgentReconciler) Reconcile(a *Agent) { return } - for t := range ar.calculateTasksForJobs(dAgentState, cAgentState) { - ar.launchTask(t, a) + for tc := range ar.calculateTaskChainsForJobs(dAgentState, cAgentState) { + ar.launchTaskChain(tc, a) } } @@ -107,12 +107,12 @@ func (ar *AgentReconciler) Purge(a *Agent) { for _, cJob := range cAgentState.Jobs { cJob := cJob t := task{ - Type: taskTypeUnloadJob, - Job: cJob, - Reason: taskReasonPurgingAgent, + typ: taskTypeUnloadJob, + reason: taskReasonPurgingAgent, } - ar.launchTask(&t, a) + tc := newTaskChain(cJob, t) + ar.launchTaskChain(tc, a) } time.Sleep(time.Second) @@ -180,11 +180,11 @@ func (ar *AgentReconciler) currentAgentState(a *Agent) (*AgentState, error) { return &as, nil } -// calculateTasksForJobs compares the desired and current state of an Agent. -// The generateed tasks represent what should be done to make the desired +// calculateTaskChainsForJobs compares the desired and current state of an Agent. +// The generated taskChains represent what should be done to make the desired // state match the current state. -func (ar *AgentReconciler) calculateTasksForJobs(dState, cState *AgentState) <-chan *task { - taskchan := make(chan *task) +func (ar *AgentReconciler) calculateTaskChainsForJobs(dState, cState *AgentState) <-chan taskChain { + tcChan := make(chan taskChain) go func() { jobs := pkg.NewUnsafeSet() for cName := range cState.Jobs { @@ -196,16 +196,20 @@ func (ar *AgentReconciler) calculateTasksForJobs(dState, cState *AgentState) <-c } for _, name := range jobs.Values() { - ar.calculateTasksForJob(dState, cState, name, taskchan) + tc := ar.calculateTaskChainForJob(dState, cState, name) + if tc == nil { + continue + } + tcChan <- *tc } - close(taskchan) + close(tcChan) }() - return taskchan + return tcChan } -func (ar *AgentReconciler) calculateTasksForJob(dState, cState *AgentState, jName string, taskchan chan *task) { +func (ar *AgentReconciler) calculateTaskChainForJob(dState, cState *AgentState, jName string) *taskChain { var dJob, cJob *job.Job if dState != nil { dJob = dState.Jobs[jName] @@ -216,84 +220,86 @@ func (ar *AgentReconciler) calculateTasksForJob(dState, cState *AgentState, jNam if dJob == nil && cJob == nil { log.Errorf("Desired state and current state of Job(%s) nil, not sure what to do", jName) - return + return nil } if dJob == nil || dJob.TargetState == job.JobStateInactive { - taskchan <- &task{ - Type: taskTypeUnloadJob, - Job: cJob, - Reason: taskReasonLoadedButNotScheduled, + delete(cState.Jobs, jName) + + t := task{ + typ: taskTypeUnloadJob, + reason: taskReasonLoadedButNotScheduled, } - delete(cState.Jobs, jName) - return + tc := newTaskChain(cJob, t) + return &tc } if cJob == nil { - taskchan <- &task{ - Type: taskTypeLoadJob, - Job: dJob, - Reason: taskReasonScheduledButUnloaded, + t := task{ + typ: taskTypeLoadJob, + reason: taskReasonScheduledButUnloaded, } - return + tc := newTaskChain(dJob, t) + return &tc } if cJob.State == nil { log.Errorf("Current state of Job(%s) unknown, unable to reconcile", jName) - return + return nil } if *cJob.State == dJob.TargetState { log.V(1).Infof("Desired state %q matches current state of Job(%s), nothing to do", *cJob.State, jName) - return + return nil } + tc := newTaskChain(dJob) if *cJob.State == job.JobStateInactive { - taskchan <- &task{ - Type: taskTypeLoadJob, - Job: dJob, - Reason: taskReasonScheduledButUnloaded, - } + tc.Add(task{ + typ: taskTypeLoadJob, + reason: taskReasonScheduledButUnloaded, + }) } if (*cJob.State == job.JobStateInactive || *cJob.State == job.JobStateLoaded) && dJob.TargetState == job.JobStateLaunched { - taskchan <- &task{ - Type: taskTypeStartJob, - Job: cJob, - Reason: taskReasonLoadedDesiredStateLaunched, - } - return + tc.Add(task{ + typ: taskTypeStartJob, + reason: taskReasonLoadedDesiredStateLaunched, + }) } if *cJob.State == job.JobStateLaunched && dJob.TargetState == job.JobStateLoaded { - taskchan <- &task{ - Type: taskTypeStopJob, - Job: cJob, - Reason: taskReasonLaunchedDesiredStateLoaded, - } - return + tc.Add(task{ + typ: taskTypeStopJob, + reason: taskReasonLaunchedDesiredStateLoaded, + }) } - log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#V", jName, dJob, cJob) + if len(tc.tasks) == 0 { + log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#v", jName, dJob, cJob) + return nil + } + + return &tc } -func (ar *AgentReconciler) launchTask(t *task, a *Agent) { - log.V(1).Infof("AgentReconciler attempting task: %s", t) - errchan, err := ar.tManager.Do(t, a) +func (ar *AgentReconciler) launchTaskChain(tc taskChain, a *Agent) { + log.V(1).Infof("AgentReconciler attempting task chain: %s", tc) + reschan, err := ar.tManager.Do(tc, a) if err != nil { - log.Infof("AgentReconciler task failed: task=%s err=%v", t, err) + log.Infof("AgentReconciler task chain failed: chain=%s err=%v", tc, err) return } go func() { - err = <-errchan - - if err == nil { - log.Infof("AgentReconciler completed task: %s", t) - } else { - log.Infof("AgentReconciler task failed: task=%s err=%v", t, err) + for res := range reschan { + if res.err == nil { + log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, tc.job.Name, res.task.reason) + } else { + log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, tc.job.Name, res.task.reason, res.err) + } } }() } diff --git a/agent/reconcile_test.go b/agent/reconcile_test.go index ad70a557a..e72478bba 100644 --- a/agent/reconcile_test.go +++ b/agent/reconcile_test.go @@ -162,7 +162,7 @@ func TestCalculateTasksForJob(t *testing.T) { cState *AgentState jName string - tasks []task + chain *taskChain }{ // nil agent state objects should result in no tasks @@ -170,7 +170,7 @@ func TestCalculateTasksForJob(t *testing.T) { dState: nil, cState: nil, jName: "foo.service", - tasks: []task{}, + chain: nil, }, // nil job should result in no tasks @@ -178,7 +178,7 @@ func TestCalculateTasksForJob(t *testing.T) { dState: NewAgentState(&machine.MachineState{ID: "XXX"}), cState: NewAgentState(&machine.MachineState{ID: "XXX"}), jName: "foo.service", - tasks: []task{}, + chain: nil, }, // no work needs to be done when target state == desired state @@ -196,7 +196,7 @@ func TestCalculateTasksForJob(t *testing.T) { }, }, jName: "foo.service", - tasks: []task{}, + chain: nil, }, // no work needs to be done when target state == desired state @@ -214,7 +214,7 @@ func TestCalculateTasksForJob(t *testing.T) { }, }, jName: "foo.service", - tasks: []task{}, + chain: nil, }, // load jobs that have a loaded desired state @@ -227,11 +227,13 @@ func TestCalculateTasksForJob(t *testing.T) { }, cState: NewAgentState(&machine.MachineState{ID: "XXX"}), jName: "foo.service", - tasks: []task{ - task{ - Type: taskTypeLoadJob, - Job: &job.Job{TargetState: jsLoaded}, - Reason: taskReasonScheduledButUnloaded, + chain: &taskChain{ + job: &job.Job{TargetState: jsLoaded}, + tasks: []task{ + task{ + typ: taskTypeLoadJob, + reason: taskReasonScheduledButUnloaded, + }, }, }, }, @@ -246,11 +248,13 @@ func TestCalculateTasksForJob(t *testing.T) { }, cState: NewAgentState(&machine.MachineState{ID: "XXX"}), jName: "foo.service", - tasks: []task{ - task{ - Type: taskTypeLoadJob, - Job: &job.Job{TargetState: jsLaunched}, - Reason: taskReasonScheduledButUnloaded, + chain: &taskChain{ + job: &job.Job{TargetState: jsLaunched}, + tasks: []task{ + task{ + typ: taskTypeLoadJob, + reason: taskReasonScheduledButUnloaded, + }, }, }, }, @@ -265,11 +269,13 @@ func TestCalculateTasksForJob(t *testing.T) { }, }, jName: "foo.service", - tasks: []task{ - task{ - Type: taskTypeUnloadJob, - Job: &job.Job{State: &jsLoaded}, - Reason: taskReasonLoadedButNotScheduled, + chain: &taskChain{ + job: &job.Job{State: &jsLoaded}, + tasks: []task{ + task{ + typ: taskTypeUnloadJob, + reason: taskReasonLoadedButNotScheduled, + }, }, }, }, @@ -284,11 +290,13 @@ func TestCalculateTasksForJob(t *testing.T) { }, }, jName: "foo.service", - tasks: []task{ - task{ - Type: taskTypeUnloadJob, - Job: &job.Job{State: &jsLaunched}, - Reason: taskReasonLoadedButNotScheduled, + chain: &taskChain{ + job: &job.Job{State: &jsLaunched}, + tasks: []task{ + task{ + typ: taskTypeUnloadJob, + reason: taskReasonLoadedButNotScheduled, + }, }, }, }, @@ -310,11 +318,13 @@ func TestCalculateTasksForJob(t *testing.T) { }, }, jName: "foo.service", - tasks: []task{ - task{ - Type: taskTypeUnloadJob, - Job: &job.Job{State: &jsLoaded}, - Reason: taskReasonLoadedButNotScheduled, + chain: &taskChain{ + job: &job.Job{State: &jsLoaded}, + tasks: []task{ + task{ + typ: taskTypeUnloadJob, + reason: taskReasonLoadedButNotScheduled, + }, }, }, }, @@ -327,19 +337,9 @@ func TestCalculateTasksForJob(t *testing.T) { continue } - taskchan := make(chan *task) - tasks := []task{} - go func() { - ar.calculateTasksForJob(tt.dState, tt.cState, tt.jName, taskchan) - close(taskchan) - }() - - for t := range taskchan { - tasks = append(tasks, *t) - } - - if !reflect.DeepEqual(tt.tasks, tasks) { - t.Errorf("case %d: calculated incorrect list of tasks\nexpected=%v\nreceived=%v\n", i, tt.tasks, tasks) + chain := ar.calculateTaskChainForJob(tt.dState, tt.cState, tt.jName) + if !reflect.DeepEqual(tt.chain, chain) { + t.Errorf("case %d: calculated incorrect task chain\nexpected=%v\nreceived=%v\n", i, tt.chain, chain) } } } diff --git a/agent/task.go b/agent/task.go index 23eb5c043..4d56924f1 100644 --- a/agent/task.go +++ b/agent/task.go @@ -22,18 +22,30 @@ const ( taskReasonPurgingAgent = "purging agent" ) -type task struct { - Type string - Job *job.Job - Reason string +type taskChain struct { + job *job.Job + tasks []task } -func (t *task) String() string { - var jName string - if t.Job != nil { - jName = t.Job.Name +func newTaskChain(j *job.Job, t ...task) taskChain { + return taskChain{ + job: j, + tasks: t, } - return fmt.Sprintf("{Type: %s, Job: %s, Reason: %q}", t.Type, jName, t.Reason) +} + +func (tc *taskChain) Add(t task) { + tc.tasks = append(tc.tasks, t) +} + +type task struct { + typ string + reason string +} + +type taskResult struct { + task task + err error } type taskManager struct { @@ -54,54 +66,57 @@ func newTaskManager() *taskManager { // error channel will be non-nil only if the task could be attempted. The // channel will be closed when the task completes. If the task failed, an // error will be sent to the channel. Do is not threadsafe. -func (tm *taskManager) Do(t *task, a *Agent) (chan error, error) { - if t == nil { - return nil, errors.New("unable to handle nil task") - } - - if t.Job == nil { +func (tm *taskManager) Do(tc taskChain, a *Agent) (chan taskResult, error) { + if tc.job == nil { return nil, errors.New("unable to handle task with nil Job") } - if tm.processing.Contains(t.Job.Name) { + if tm.processing.Contains(tc.job.Name) { return nil, errors.New("task already in flight") } - taskFunc, err := tm.mapper(t, a) - if err != nil { - return nil, err - } - - tm.processing.Add(t.Job.Name) - - errchan := make(chan error) + // Do is not threadsafe due to the race between Contains and Add + tm.processing.Add(tc.job.Name) + reschan := make(chan taskResult, len(tc.tasks)) go func() { - defer tm.processing.Remove(t.Job.Name) - err := taskFunc() - if err != nil { - errchan <- err + defer tm.processing.Remove(tc.job.Name) + for _, t := range tc.tasks { + t := t + res := taskResult{ + task: t, + } + + taskFunc, err := tm.mapper(t, tc.job, a) + if err != nil { + res.err = err + } else { + res.err = taskFunc() + } + + reschan <- res } - close(errchan) + + close(reschan) }() - return errchan, nil + return reschan, nil } -type taskMapperFunc func(t *task, a *Agent) (func() error, error) +type taskMapperFunc func(t task, j *job.Job, a *Agent) (func() error, error) -func mapTaskToFunc(t *task, a *Agent) (fn func() error, err error) { - switch t.Type { +func mapTaskToFunc(t task, j *job.Job, a *Agent) (fn func() error, err error) { + switch t.typ { case taskTypeLoadJob: - fn = func() error { return a.loadJob(t.Job) } + fn = func() error { return a.loadJob(j) } case taskTypeUnloadJob: - fn = func() error { a.unloadJob(t.Job.Name); return nil } + fn = func() error { a.unloadJob(j.Name); return nil } case taskTypeStartJob: - fn = func() error { a.startJob(t.Job.Name); return nil } + fn = func() error { a.startJob(j.Name); return nil } case taskTypeStopJob: - fn = func() error { a.stopJob(t.Job.Name); return nil } + fn = func() error { a.stopJob(j.Name); return nil } default: - err = fmt.Errorf("unrecognized task type %q", t.Type) + err = fmt.Errorf("unrecognized task type %q", t.typ) } return diff --git a/agent/task_test.go b/agent/task_test.go index efc4585d2..bf0d5a3f5 100644 --- a/agent/task_test.go +++ b/agent/task_test.go @@ -10,7 +10,7 @@ import ( func TestTaskManagerTwoInFlight(t *testing.T) { result := make(chan error) - testMapper := func(*task, *Agent) (exec func() error, err error) { + testMapper := func(task, *job.Job, *Agent) (exec func() error, err error) { exec = func() error { return <-result } @@ -22,12 +22,12 @@ func TestTaskManagerTwoInFlight(t *testing.T) { mapper: testMapper, } - errchan1, err := tm.Do(&task{Job: &job.Job{Name: "foo"}}, nil) + errchan1, err := tm.Do(taskChain{job: &job.Job{Name: "foo"}, tasks: []task{task{typ: "test"}}}, nil) if err != nil { t.Fatalf("unable to start task: %v", err) } - errchan2, err := tm.Do(&task{Job: &job.Job{Name: "bar"}}, nil) + errchan2, err := tm.Do(taskChain{job: &job.Job{Name: "bar"}, tasks: []task{task{typ: "test"}}}, nil) if err != nil { t.Fatalf("unable to start task: %v", err) } @@ -39,20 +39,20 @@ func TestTaskManagerTwoInFlight(t *testing.T) { t.Fatalf("expected errchans to be ready to receive within 1s") }() - err = <-errchan1 - if err != nil { - t.Fatalf("received unexpected error from task one: %v", err) + res := <-errchan1 + if res.err != nil { + t.Fatalf("received unexpected error from task one: %v", res.err) } - err = <-errchan2 - if err != nil { - t.Fatalf("received unexpected error from task two: %v", err) + res = <-errchan2 + if res.err != nil { + t.Fatalf("received unexpected error from task two: %v", res.err) } } func TestTaskManagerJobSerialization(t *testing.T) { result := make(chan error) - testMapper := func(*task, *Agent) (exec func() error, err error) { + testMapper := func(task, *job.Job, *Agent) (exec func() error, err error) { exec = func() error { return <-result } @@ -64,13 +64,13 @@ func TestTaskManagerJobSerialization(t *testing.T) { mapper: testMapper, } - errchan, err := tm.Do(&task{Job: &job.Job{Name: "foo"}}, nil) + reschan, err := tm.Do(taskChain{job: &job.Job{Name: "foo"}, tasks: []task{task{typ: "test"}}}, nil) if err != nil { t.Fatalf("unable to start first task: %v", err) } // the first task should block the second, as it is still in progress - _, err = tm.Do(&task{Job: &job.Job{Name: "foo"}}, nil) + _, err = tm.Do(taskChain{job: &job.Job{Name: "foo"}, tasks: []task{task{typ: "test"}}}, nil) if err == nil { t.Fatalf("expected error from attempt to start second task, got nil") } @@ -78,57 +78,19 @@ func TestTaskManagerJobSerialization(t *testing.T) { result <- nil select { - case err := <-errchan: - if err != nil { + case res := <-reschan: + if res.err != nil { t.Errorf("received unexpected error from first task: %v", err) } default: - t.Errorf("expected errchan to be ready to receive") + t.Errorf("expected reschan to be ready to receive") } // since the first task completed, this third task can start - _, err = tm.Do(&task{Job: &job.Job{Name: "foo"}}, nil) + _, err = tm.Do(taskChain{job: &job.Job{Name: "foo"}, tasks: []task{task{typ: "test"}}}, nil) if err != nil { t.Fatalf("unable to start third task: %v", err) } close(result) } - -func TestTaskmanagerTaskValidity(t *testing.T) { - tests := []struct { - task *task - ok bool - }{ - { - task: nil, - ok: false, - }, - - { - task: &task{Job: nil}, - ok: false, - }, - - { - task: &task{Job: &job.Job{Name: "foo"}}, - ok: true, - }, - } - - successfulTaskMapper := func(t *task, a *Agent) (func() error, error) { - return func() error { return nil }, nil - } - - tm := taskManager{ - processing: pkg.NewUnsafeSet(), - mapper: successfulTaskMapper, - } - - for i, tt := range tests { - _, err := tm.Do(tt.task, nil) - if tt.ok != (err == nil) { - t.Errorf("case %d: expected ok=%t, got err=%v", i, tt.ok, err) - } - } -}