Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocrunner: terminate sidecars in the end #8311

Merged
merged 1 commit into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ func (ar *allocRunner) TaskStateUpdated() {
func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh)

hasSidecars := hasSidecarTasks(ar.tasks)

for done := false; !done; {
select {
case <-ar.taskStateUpdatedCh:
Expand All @@ -462,10 +464,6 @@ func (ar *allocRunner) handleTaskStateUpdates() {
// name whose fault it is.
killTask := ""

// True if task runners should be killed because a leader
// failed (informational).
leaderFailed := false

// Task state has been updated; gather the state of the other tasks
trNum := len(ar.tasks)
liveRunners := make([]*taskrunner.TaskRunner, 0, trNum)
Expand All @@ -492,18 +490,24 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
} else if tr.IsLeader() {
killEvent = structs.NewTaskEvent(structs.TaskLeaderDead)
leaderFailed = true
killTask = name
}
}

// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {

// Log kill reason
if leaderFailed {
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
} else {
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

Expand Down
123 changes: 122 additions & 1 deletion client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
found := false
killingMsg := ""
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
if e.Type == structs.TaskLeaderDead {
found = true
}
if e.Type == structs.TaskKilling {
Expand Down Expand Up @@ -142,6 +142,127 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
// entire task group is killed.
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
t.Parallel()

alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0

// Create three tasks in the task group
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
sidecar.Name = "sidecar"
sidecar.Driver = "mock_driver"
sidecar.KillTimeout = 10 * time.Millisecond
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
}

sidecar.Config = map[string]interface{}{
"run_for": "100s",
}

main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main1.Name = "task2"
main1.Driver = "mock_driver"
main1.Config = map[string]interface{}{
"run_for": "1s",
}

main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main2.Name = "task2"
main2.Driver = "mock_driver"
main2.Config = map[string]interface{}{
"run_for": "2s",
}

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
sidecar.Name: tr,
main1.Name: tr,
main2.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

hasTaskMainEvent := func(state *structs.TaskState) bool {
for _, e := range state.Events {
if e.Type == structs.TaskMainDead {
return true
}
}

return false
}

// Wait for all tasks to be killed
upd := conf.StateUpdater.(*MockStateUpdater)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

var state *structs.TaskState

// Task1 should be killed because Task2 exited
state = last.TaskStates[sidecar.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

if !hasTaskMainEvent(state) {
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
}

// main tasks should die naturely
state = last.TaskStates[main1.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events)
}

state = last.TaskStates[main2.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

Expand Down
25 changes: 25 additions & 0 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -108,3 +109,27 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
c.mainTaskCtxCancel()
}
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
return true
}
}

return false
}

// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
return true
}
}

return false
}
89 changes: 89 additions & 0 deletions client/allocrunner/task_hook_coordinator_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package allocrunner

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/helper/testlog"
Expand Down Expand Up @@ -230,3 +232,90 @@ func isChannelClosed(ch <-chan struct{}) bool {
return false
}
}

func TestHasSidecarTasks(t *testing.T) {

falseV, trueV := false, true

cases := []struct {
name string
// nil if main task, false if non-sidecar hook, true if sidecar hook
indicators []*bool

hasSidecars bool
hasNonsidecars bool
}{
{
name: "all sidecar - one",
indicators: []*bool{&trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "all sidecar - multiple",
indicators: []*bool{&trueV, &trueV, &trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "some sidecars, some others",
indicators: []*bool{nil, &falseV, &trueV},
hasSidecars: true,
hasNonsidecars: true,
},
{
name: "no sidecars",
indicators: []*bool{nil, &falseV, nil},
hasSidecars: false,
hasNonsidecars: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := allocWithSidecarIndicators(c.indicators)
arConf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(arConf)
require.NoError(t, err)

require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars")

runners := []*taskrunner.TaskRunner{}
for _, r := range ar.tasks {
runners = append(runners, r)
}
require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars")

})
}
}

func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation {
alloc := mock.BatchAlloc()

tasks := []*structs.Task{}
resources := map[string]*structs.AllocatedTaskResources{}

tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]

for i, indicator := range indicators {
task := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task.Name = fmt.Sprintf("task%d", i)
if indicator != nil {
task.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: *indicator,
}
}
tasks = append(tasks, task)
resources[task.Name] = tr
}

alloc.Job.TaskGroups[0].Tasks = tasks

alloc.AllocatedResources.Tasks = resources
return alloc

}
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6996,6 +6996,9 @@ const (
// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"

// TaskMainDead indicates that the main tasks have dead
TaskMainDead = "Main Tasks Dead"

// TaskHookFailed indicates that one of the hooks for a task failed.
TaskHookFailed = "Task hook failed"

Expand Down Expand Up @@ -7217,6 +7220,8 @@ func (event *TaskEvent) PopulateEventDisplayMessage() {
desc = event.DriverMessage
case TaskLeaderDead:
desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
default:
desc = event.Message
}
Expand Down