Skip to content

Commit

Permalink
Merge pull request #4329 from hashicorp/b-leaked-deployments
Browse files Browse the repository at this point in the history
Clean up leaked deployments on restoration
  • Loading branch information
dadgar committed May 30, 2018
2 parents aaf1e19 + 689d396 commit 84383bc
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
59 changes: 59 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}

// COMPAT Remove in 0.10
// Clean up active deployments that do not have a job
if err := n.failLeakedDeployments(newState); err != nil {
return err
}

// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
n.stateLock.Lock()
Expand All @@ -1276,6 +1282,59 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
}

// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}

dindex, err := state.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}

for {
raw := iter.Next()
if raw == nil {
break
}

d := raw.(*structs.Deployment)

// We are only looking for active deployments where the job no longer
// exists
if !d.Active() {
continue
}

// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}

// Job exists.
if job != nil {
continue
}

// Update the deployment to be terminal
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}

return nil
}

// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
Expand Down
18 changes: 18 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,24 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}

func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)

// Add some state
fsm := testFSM(t)
state := fsm.State()
d := mock.Deployment()
require.NoError(state.UpsertDeployment(1000, d))

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, _ := state2.DeploymentByID(nil, d.ID)
require.NotNil(out)
require.Equal(structs.DeploymentStatusCancelled, out.Status)
}

func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down

0 comments on commit 84383bc

Please sign in to comment.