Skip to content

Commit

Permalink
taskrunner: fix tests that waited on WaitCh
Browse files Browse the repository at this point in the history
Now that "dead" tasks may run again, the taskrunner Run() method will
not return when the task finishes running, so tests must wait for the
task state to be "dead" instead of using the WaitCh, since it won't be
closed until the taskrunner is killed.
  • Loading branch information
lgfa29 committed Aug 17, 2022
1 parent 4f6228e commit 9f7234f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 86 deletions.
12 changes: 10 additions & 2 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,11 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
ar.Run()
defer destroy(ar)

require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus)
testutil.WaitForResult(func() (bool, error) {
return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil
}, func(_ error) {
t.Fatalf("expected alloc to be complete")
})

// Step 2. Modify its directory
task := alloc.Job.TaskGroups[0].Tasks[0]
Expand Down Expand Up @@ -1241,7 +1245,11 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
ar2.Run()
defer destroy(ar2)

require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus)
testutil.WaitForResult(func() (bool, error) {
return ar.AllocState().ClientStatus == structs.AllocClientStatusComplete, nil
}, func(_ error) {
t.Fatalf("expected alloc to be complete")
})

// Ensure that data from ar was moved to ar2
dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file")
Expand Down
7 changes: 1 addition & 6 deletions client/allocrunner/taskrunner/sids_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)
Expand Down Expand Up @@ -297,11 +296,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) {
go tr.Run()

// wait for task runner to finish running
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
r.Fail("timed out waiting for task runner")
}
testWaitForTaskToDie(t, tr)

// assert task exited un-successfully
finalState := tr.TaskState()
Expand Down
136 changes: 58 additions & 78 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for new task runner to exit when the process does
<-newTR.WaitCh()
testWaitForTaskToDie(t, newTR)

// Assert that the process was only started once
started := 0
Expand Down Expand Up @@ -603,11 +603,7 @@ func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
defer cleanup()

// Wait for task to complete
select {
case <-tr.WaitCh():
case <-time.After(3 * time.Second):
require.Fail("timeout waiting for task to exit")
}
testWaitForTaskToDie(t, tr)

// Get the mock driver plugin
driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name)
Expand Down Expand Up @@ -654,7 +650,9 @@ func TestTaskRunner_TaskEnv_Chroot(t *testing.T) {
go tr.Run()
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for task to exit
// Wait for task to exit and kill the task runner to run the stop hooks.
testWaitForTaskToDie(t, tr)
tr.Kill(context.Background(), structs.NewTaskEvent("kill"))
timeout := 15 * time.Second
if testutil.IsCI() {
timeout = 120 * time.Second
Expand Down Expand Up @@ -703,7 +701,9 @@ func TestTaskRunner_TaskEnv_Image(t *testing.T) {
tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

// Wait for task to exit
// Wait for task to exit and kill task runner to run the stop hooks.
testWaitForTaskToDie(t, tr)
tr.Kill(context.Background(), structs.NewTaskEvent("kill"))
select {
case <-tr.WaitCh():
case <-time.After(15 * time.Second):
Expand Down Expand Up @@ -750,7 +750,9 @@ func TestTaskRunner_TaskEnv_None(t *testing.T) {
%s
`, root, taskDir, taskDir, os.Getenv("PATH"))

// Wait for task to exit
// Wait for task to exit and kill the task runner to run the stop hooks.
testWaitForTaskToDie(t, tr)
tr.Kill(context.Background(), structs.NewTaskEvent("kill"))
select {
case <-tr.WaitCh():
case <-time.After(15 * time.Second):
Expand Down Expand Up @@ -818,10 +820,7 @@ func TestTaskRunner_DevicePropogation(t *testing.T) {
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))

// Wait for task to complete
select {
case <-tr.WaitCh():
case <-time.After(3 * time.Second):
}
testWaitForTaskToDie(t, tr)

// Get the mock driver plugin
driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name)
Expand Down Expand Up @@ -1306,15 +1305,15 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
"Received",
"Task Setup",
"Started",
"Restart Signaled",
"Restart Running Signaled",
"Terminated",
"Restarting",
"Started",
"Restart Signaled",
"Restart Running Signaled",
"Terminated",
"Restarting",
"Started",
"Restart Signaled",
"Restart Running Signaled",
"Terminated",
"Not Restarting",
}
Expand All @@ -1328,11 +1327,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
// Wait until the task exits. Don't simply wait for it to run as it may
// get restarted and terminated before the test is able to observe it
// running.
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timeout")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
actualEvents := make([]string, len(state.Events))
Expand Down Expand Up @@ -1421,11 +1416,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) {

// task runner should exit now that it has been unblocked and it is a batch
// job with a zero sleep time
select {
case <-tr.WaitCh():
case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())):
r.Fail("timed out waiting for batch task to exist")
}
testWaitForTaskToDie(t, tr)

// assert task exited successfully
finalState := tr.TaskState()
Expand Down Expand Up @@ -1478,11 +1469,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) {
go tr.Run()

// assert task runner blocks on SI token
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
r.Fail("timed out waiting for task runner")
}
testWaitForTaskToDie(t, tr)

// assert task exited successfully
finalState := tr.TaskState()
Expand Down Expand Up @@ -1598,11 +1585,7 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) {

// TR should exit now that it's unblocked by vault as its a batch job
// with 0 sleeping.
select {
case <-tr.WaitCh():
case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())):
require.Fail(t, "timed out waiting for batch task to exit")
}
testWaitForTaskToDie(t, tr)

// Assert task exited successfully
finalState := tr.TaskState()
Expand All @@ -1615,6 +1598,14 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) {
require.NoError(t, err)
require.Equal(t, token, string(data))

// Kill task runner to trigger stop hooks
tr.Kill(context.Background(), structs.NewTaskEvent("kill"))
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task runner to exit")
}

// Check the token was revoked
testutil.WaitForResult(func() (bool, error) {
if len(vaultClient.StoppedTokens()) != 1 {
Expand Down Expand Up @@ -1661,17 +1652,21 @@ func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
go tr.Run()

// Wait for TR to exit and check its state
// Wait for TR to die and check its state
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
require.False(t, state.Failed)

// Kill task runner to trigger stop hooks
tr.Kill(context.Background(), structs.NewTaskEvent("kill"))
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task runner to exit")
}

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
require.False(t, state.Failed)

require.Equal(t, 1, count)

// Check that the token is on disk
Expand Down Expand Up @@ -1771,11 +1766,7 @@ func TestTaskRunner_Download_ChrootExec(t *testing.T) {
defer cleanup()

// Wait for task to run and exit
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task runner to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -1816,11 +1807,7 @@ func TestTaskRunner_Download_RawExec(t *testing.T) {
defer cleanup()

// Wait for task to run and exit
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task runner to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -1851,11 +1838,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
defer cleanup()

// Wait for task to run and exit
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task runner to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -1902,11 +1885,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -2100,6 +2079,8 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) {
case <-time.After(1 * time.Second):
}

require.Equal(t, structs.TaskStatePending, tr.TaskState().State)

// Send a signal and restart
err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT")
require.EqualError(t, err, ErrTaskNotRunning.Error())
Expand All @@ -2110,12 +2091,7 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) {

// Unblock and let it finish
waitCh <- struct{}{}

select {
case <-tr.WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for task to complete")
}
testWaitForTaskToDie(t, tr)

// Assert the task ran and never restarted
state := tr.TaskState()
Expand Down Expand Up @@ -2153,11 +2129,7 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
require.Fail(t, "timed out waiting for task to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -2202,11 +2174,7 @@ func TestTaskRunner_Template_Artifact(t *testing.T) {
go tr.Run()

// Wait for task to run and exit
select {
case <-tr.WaitCh():
case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())):
require.Fail(t, "timed out waiting for task runner to exit")
}
testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -2536,7 +2504,9 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
tr, err := NewTaskRunner(conf)
require.NoError(t, err)
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
tr.Run()
go tr.Run()

testWaitForTaskToDie(t, tr)

state := tr.TaskState()
require.Equal(t, structs.TaskStateDead, state.State)
Expand Down Expand Up @@ -2568,6 +2538,16 @@ func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
})
}

// testWaitForTaskToDie waits for the task to die or fails the test
func testWaitForTaskToDie(t *testing.T, tr *TaskRunner) {
testutil.WaitForResult(func() (bool, error) {
ts := tr.TaskState()
return ts.State == structs.TaskStateDead, fmt.Errorf("%v", ts.State)
}, func(err error) {
require.NoError(t, err)
})
}

// TestTaskRunner_BaseLabels tests that the base labels for the task metrics
// are set appropriately.
func TestTaskRunner_BaseLabels(t *testing.T) {
Expand Down

0 comments on commit 9f7234f

Please sign in to comment.