Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
agent: compare unit hashes when reconciling
Browse files Browse the repository at this point in the history
This incorporates hashes into the decision when agent reconciler is
calculating what tasks should be performed.
  • Loading branch information
jonboulle committed Oct 21, 2014
1 parent 6ec83b7 commit a978c3b
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 37 deletions.
23 changes: 18 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func (a *Agent) stopUnit(unitName string) {
a.um.TriggerStop(unitName)
}

type unitStates map[string]job.JobState
type unitState struct {
state job.JobState
hash string
}
type unitStates map[string]unitState

// units returns a map representing the current state of units known by the agent.
func (a *Agent) units() (unitStates, error) {
Expand All @@ -133,25 +137,34 @@ func (a *Agent) units() (unitStates, error) {
loaded.Add(jName)
}

units, err := a.um.Units()
unitFiles, err := a.um.Units()
if err != nil {
return nil, fmt.Errorf("failed fetching loaded units from UnitManager: %v", err)
}

filter := pkg.NewUnsafeSet()
for _, u := range units {
for _, u := range unitFiles {
filter.Add(u)
}

units, err := a.um.GetUnitStates(filter)
if err != nil {
return nil, fmt.Errorf("failed fetching unit states from UnitManager: %v", err)
}

states := make(unitStates)
for _, uName := range units {
for uName, state := range units {
js := job.JobStateInactive
if loaded.Contains(uName) {
js = job.JobStateLoaded
} else if launched.Contains(uName) {
js = job.JobStateLaunched
}
states[uName] = js
us := unitState{
state: js,
hash: state.UnitHash,
}
states[uName] = us
}

return states, nil
Expand Down
12 changes: 9 additions & 3 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func TestAgentLoadUnloadUnit(t *testing.T) {

jsLoaded := job.JobStateLoaded
expectUnits := unitStates{
"foo.service": jsLoaded,
"foo.service": unitState{
state: jsLoaded,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand Down Expand Up @@ -123,7 +125,9 @@ func TestAgentLoadStartStopUnit(t *testing.T) {

jsLaunched := job.JobStateLaunched
expectUnits := unitStates{
"foo.service": jsLaunched,
"foo.service": unitState{
state: jsLaunched,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand All @@ -139,7 +143,9 @@ func TestAgentLoadStartStopUnit(t *testing.T) {

jsLoaded := job.JobStateLoaded
expectUnits = unitStates{
"foo.service": jsLoaded,
"foo.service": unitState{
state: jsLoaded,
},
}

if !reflect.DeepEqual(expectUnits, units) {
Expand Down
35 changes: 27 additions & 8 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
return
}

for tc := range ar.calculateTaskChainsForJobs(dAgentState, cAgentState) {
for tc := range ar.calculateTaskChainsForUnits(dAgentState, cAgentState) {
ar.launchTaskChain(tc, a)
}
}

// Purge attempts to unload all Jobs that have been loaded locally
// Purge attempts to unload all Units that have been loaded locally
func (ar *AgentReconciler) Purge(a *Agent) {
for {
cAgentState, err := a.units()
Expand Down Expand Up @@ -160,10 +160,10 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return &as, nil
}

// calculateTaskChainsForJobs compares the desired and current state of an Agent.
// calculateTaskChainsForUnits compares the desired and current state of an Agent.
// The generated taskChains represent what should be done to make the desired
// state match the current state.
func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState unitStates) <-chan taskChain {
func (ar *AgentReconciler) calculateTaskChainsForUnits(dState *AgentState, cState unitStates) <-chan taskChain {
tcChan := make(chan taskChain)
go func() {
jobs := pkg.NewUnsafeSet()
Expand All @@ -176,7 +176,7 @@ func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState
}

for _, name := range jobs.Values() {
tc := ar.calculateTaskChainForJob(dState, cState, name)
tc := ar.calculateTaskChainForUnit(dState, cState, name)
if tc == nil {
continue
}
Expand All @@ -189,14 +189,20 @@ func (ar *AgentReconciler) calculateTaskChainsForJobs(dState *AgentState, cState
return tcChan
}

func (ar *AgentReconciler) calculateTaskChainForJob(dState *AgentState, cState unitStates, jName string) *taskChain {
func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState unitStates, jName string) *taskChain {
var dJob *job.Unit
var dJHash string
if dState != nil {
dJob = dState.Units[jName]
if dJob != nil {
dJHash = dJob.Unit.Hash().String()
}
}
var cJState *job.JobState
if state, ok := cState[jName]; ok {
cJState = &state
var cJHash string
if us, ok := cState[jName]; ok {
cJState = &us.state
cJHash = us.hash
}
if dJob == nil && cJState == nil {
log.Errorf("Desired state and current state of Job(%s) nil, not sure what to do", jName)
Expand Down Expand Up @@ -241,6 +247,19 @@ func (ar *AgentReconciler) calculateTaskChainForJob(dState *AgentState, cState u
return &tc
}

if cJHash != dJHash {
log.V(1).Infof("Desired hash %q differs to current hash %s of Job(%s) - unloading", dJHash, cJHash, jName)
t := task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
}
u := &job.Unit{
Name: jName,
}
tc := newTaskChain(u, t)
return &tc
}

if *cJState == dJob.TargetState {
log.V(1).Infof("Desired state %q matches current state of Job(%s), nothing to do", *cJState, jName)
return nil
Expand Down
Loading

0 comments on commit a978c3b

Please sign in to comment.