Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #866 from jonboulle/866_agent_reconciler
Browse files Browse the repository at this point in the history
fleet agent does not compare contents of units in reconciler
  • Loading branch information
jonboulle committed Oct 24, 2014
2 parents 2692b31 + bfd31a2 commit bf2966e
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 36 deletions.
19 changes: 16 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func (a *Agent) stopUnit(unitName string) {
a.um.TriggerStop(unitName)
}

type unitStates map[string]job.JobState
type unitState struct {
state job.JobState
hash string
}
type unitStates map[string]unitState

// units returns a map representing the current state of units known by the agent.
func (a *Agent) units() (unitStates, error) {
Expand All @@ -143,15 +147,24 @@ func (a *Agent) units() (unitStates, error) {
filter.Add(u)
}

uStates, err := a.um.GetUnitStates(filter)
if err != nil {
return nil, fmt.Errorf("failed fetching unit states from UnitManager: %v", err)
}

states := make(unitStates)
for _, uName := range units {
for uName, uState := range uStates {
js := job.JobStateInactive
if loaded.Contains(uName) {
js = job.JobStateLoaded
} else if launched.Contains(uName) {
js = job.JobStateLaunched
}
states[uName] = js
us := unitState{
state: js,
hash: uState.UnitHash,
}
states[uName] = us
}

return states, nil
Expand Down
12 changes: 9 additions & 3 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func TestAgentLoadUnloadUnit(t *testing.T) {

jsLoaded := job.JobStateLoaded
expectUnits := unitStates{
"foo.service": jsLoaded,
"foo.service": unitState{
state: jsLoaded,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand Down Expand Up @@ -123,7 +125,9 @@ func TestAgentLoadStartStopUnit(t *testing.T) {

jsLaunched := job.JobStateLaunched
expectUnits := unitStates{
"foo.service": jsLaunched,
"foo.service": unitState{
state: jsLaunched,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand All @@ -139,7 +143,9 @@ func TestAgentLoadStartStopUnit(t *testing.T) {

jsLoaded := job.JobStateLoaded
expectUnits = unitStates{
"foo.service": jsLoaded,
"foo.service": unitState{
state: jsLoaded,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand Down
49 changes: 40 additions & 9 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
return
}

for tc := range ar.calculateTaskChainsForJobs(dAgentState, cAgentState) {
for tc := range ar.calculateTaskChainsForUnits(dAgentState, cAgentState) {
ar.launchTaskChain(tc, a)
}
}

// Purge attempts to unload all Jobs that have been loaded locally
// Purge attempts to unload all Units that have been loaded locally
func (ar *AgentReconciler) Purge(a *Agent) {
for {
cAgentState, err := a.units()
Expand Down Expand Up @@ -160,10 +160,10 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return &as, nil
}

// calculateTaskChainsForJobs compares the desired and current state of an Agent.
// calculateTaskChainsForUnits compares the desired and current state of an Agent.
// The generated taskChains represent what should be done to make the desired
// state match the current state.
func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState unitStates) <-chan taskChain {
func (ar *AgentReconciler) calculateTaskChainsForUnits(dState *AgentState, cState unitStates) <-chan taskChain {
tcChan := make(chan taskChain)
go func() {
jobs := pkg.NewUnsafeSet()
Expand All @@ -176,7 +176,7 @@ func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState
}

for _, name := range jobs.Values() {
tc := ar.calculateTaskChainForJob(dState, cState, name)
tc := ar.calculateTaskChainForUnit(dState, cState, name)
if tc == nil {
continue
}
Expand All @@ -189,14 +189,20 @@ func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState
return tcChan
}

func (ar *AgentReconciler) calculateTaskChainForJob(dState *AgentState, cState unitStates, jName string) *taskChain {
func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState unitStates, jName string) *taskChain {
var dJob *job.Unit
var dJHash string
if dState != nil {
dJob = dState.Units[jName]
if dJob != nil {
dJHash = dJob.Unit.Hash().String()
}
}
var cJState *job.JobState
if state, ok := cState[jName]; ok {
cJState = &state
var cJHash string
if us, ok := cState[jName]; ok {
cJState = &us.state
cJHash = us.hash
}
if dJob == nil && cJState == nil {
log.Errorf("Desired state and current state of Job(%s) nil, not sure what to do", jName)
Expand Down Expand Up @@ -230,7 +236,32 @@ func (ar *AgentReconciler) calculateTaskChainForJob(dState *AgentState, cState u
reason: taskReasonScheduledButUnloaded,
})

// as an optimization, queue the job for launching immediately after loading
// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
})
}

return &tc
}

if cJHash != dJHash {
log.V(1).Infof("Desired hash %q differs to current hash %s of Job(%s) - unloading", dJHash, cJHash, jName)
tc := newTaskChain(u)
tc.Add(task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
})

// queue the correct unit for loading immediately after unloading the old one
tc.Add(task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
})

// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
typ: taskTypeStartUnit,
Expand Down
Loading

0 comments on commit bf2966e

Please sign in to comment.