Skip to content

Commit

Permalink
task runner to avoid running task if terminal
Browse files Browse the repository at this point in the history
This change fixes a bug where nomad would avoid running alloc tasks if
the alloc is client terminal but the server copy on the client isn't
marked as running.

Here, we fix the case by having task runner uses the
allocRunner.shouldRun() instead of only checking the server updated
alloc.

Here, we preserve much of the invariants such that `tr.Run()` is always
run, and don't change the overall alloc runner and task runner
lifecycles.

Fixes #5883
  • Loading branch information
Mahmood Ali committed Jun 27, 2019
1 parent 72b9b87 commit f3c944a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
84 changes: 84 additions & 0 deletions client/allocrunner/alloc_runner_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,87 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
require.Equal(t, events[2].Type, structs.TaskStarted)
require.Equal(t, events[3].Type, structs.TaskTerminated)
}

// TestAllocRunner_Restore_Completed asserts that restoring a completed
// batch alloc doesn't run it again
func TestAllocRunner_Restore_CompletedBatch(t *testing.T) {
t.Parallel()

// 1. Run task and wait for it to complete
// 2. Start new alloc runner
// 3. Assert task didn't run again

alloc := mock.Alloc()
alloc.Job.Type = structs.JobTypeBatch
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "2ms",
}

conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
defer cleanup()

// Maintain state for subsequent run
conf.StateDB = state.NewMemDB(conf.Logger)

// Start and wait for task to be running
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
go ar.Run()
defer destroy(ar)

testutil.WaitForResult(func() (bool, error) {
s := ar.AllocState()
if s.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected complete, got %s", s.ClientStatus)
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})

// once job finishes, it shouldn't run again
require.False(t, ar.shouldRun())
initialRunEvents := ar.AllocState().TaskStates[task.Name].Events
require.Len(t, initialRunEvents, 4)

ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
require.NoError(t, err)
require.NotNil(t, ls)
require.Equal(t, structs.TaskStateDead, ts.State)

// Start a new alloc runner and assert it gets stopped
conf2, cleanup2 := testAllocRunnerConfig(t, alloc)
defer cleanup2()

// Use original statedb to maintain hook state
conf2.StateDB = conf.StateDB

// Restore, start, and wait for task to be killed
ar2, err := NewAllocRunner(conf2)
require.NoError(t, err)

require.NoError(t, ar2.Restore())

go ar2.Run()
defer destroy(ar2)

// AR waitCh must be closed even when task doesn't run again
select {
case <-ar2.WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "alloc.waitCh wasn't closed")
}

// TR waitCh must be closed too!
select {
case <-ar2.tasks[task.Name].WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "tr.waitCh wasn't closed")
}

// Assert that events are unmodified, which they would if task re-run
events := ar2.AllocState().TaskStates[task.Name].Events
require.Equal(t, initialRunEvents, events)
}
18 changes: 17 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,22 @@ func (tr *TaskRunner) Run() {
defer close(tr.waitCh)
var result *drivers.ExitResult

tr.stateLock.RLock()
dead := tr.state.State == structs.TaskStateDead
tr.stateLock.RUnlock()

// if restoring a dead task, ensure that task is cleared and all post hooks
// are called without additional state updates
if dead {
// clear driver handle if it was successfully restored on
// already dead task
tr.clearDriverHandle()
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed on terminal task", "error", err)
}
return
}

// Updates are handled asynchronously with the other hooks but each
// triggered update - whether due to alloc updates or a new vault token
// - should be handled serially.
Expand Down Expand Up @@ -899,7 +915,7 @@ func (tr *TaskRunner) Restore() error {
}

alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
if tr.state.State == structs.TaskStateDead || alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}

Expand Down

0 comments on commit f3c944a

Please sign in to comment.