Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up leaked deployments on restoration #4329

Merged
merged 1 commit into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}

// COMPAT Remove in 0.10
// Clean up active deployments that do not have a job
if err := n.failLeakedDeployments(newState); err != nil {
return err
}

// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
n.stateLock.Lock()
Expand All @@ -1276,6 +1282,59 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
}

// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}

dindex, err := state.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}

for {
raw := iter.Next()
if raw == nil {
break
}

d := raw.(*structs.Deployment)

// We are only looking for active deployments where the job no longer
// exists
if !d.Active() {
continue
}

// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}

// Job exists.
if job != nil {
continue
}

// Update the deployment to be terminal
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}

return nil
}

// reconcileQueuedAllocations re-calculates the queued allocations for every job that we
// created a Job Summary during the snap shot restore
func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
Expand Down
18 changes: 18 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,6 +2740,24 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}

func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)

// Add some state
fsm := testFSM(t)
state := fsm.State()
d := mock.Deployment()
require.NoError(state.UpsertDeployment(1000, d))

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, _ := state2.DeploymentByID(nil, d.ID)
require.NotNil(out)
require.Equal(structs.DeploymentStatusCancelled, out.Status)
}

func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down