Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader task #2308

Merged
merged 6 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type Task struct {
Vault *Vault
Templates []*Template
DispatchPayload *DispatchPayloadConfig
Leader *bool
}

// TaskArtifact is used to download artifacts before running a task.
Expand Down Expand Up @@ -256,10 +257,10 @@ const (
TaskNotRestarting = "Not Restarting"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
36 changes: 28 additions & 8 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,39 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv

taskState.State = state
if state == structs.TaskStateDead {
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*TaskRunner
var otherTaskNames []string
leader := false
for task, tr := range r.tasks {
if task != taskName {
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, task)
} else if tr.task.Leader {
leader = true
}
}

// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
} else if leader {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
}

// If the task was a leader task we should kill all the other
// tasks.
}

select {
Expand Down
64 changes: 64 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,70 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
})
}

func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
upd, ar := testAllocRunner(false)

// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "1s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if len(state1.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

found := false
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
found = true
}
}

if !found {
return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
}

// Task Two should be dead
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()
Expand Down
8 changes: 2 additions & 6 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,6 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = "Task exceeded restart policy"
}
case api.TaskVaultRenewalFailed:
if event.VaultError != "" {
desc = event.VaultError
} else {
desc = "Task's Vault token failed to be renewed"
}
case api.TaskSiblingFailed:
if event.FailedSibling != "" {
desc = fmt.Sprintf("Task's sibling %q failed", event.FailedSibling)
Expand Down Expand Up @@ -382,6 +376,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
}

// Reverse order so we are sorted by time
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"driver",
"env",
"kill_timeout",
"leader",
"logs",
"meta",
"resources",
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestParse(t *testing.T) {
Perms: "777",
},
},
Leader: true,
},
&structs.Task{
Name: "storagelocker",
Expand Down
1 change: 1 addition & 0 deletions jobspec/test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ job "binstore-storagelocker" {
task "binstore" {
driver = "docker"
user = "bob"
leader = true

config {
image = "hashicorp/binstore"
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, vDiff)
}

// Artifacts diff
// Template diff
tmplDiffs := primitiveObjectSetDiff(
interfaceSlice(t.Templates),
interfaceSlice(other.Templates),
Expand Down
22 changes: 22 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "0",
},
{
Type: DiffTypeAdded,
Name: "Leader",
Old: "",
New: "false",
},
},
},
{
Expand Down Expand Up @@ -1811,6 +1817,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "0",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Leader",
Old: "false",
New: "",
},
},
},
},
Expand Down Expand Up @@ -1880,6 +1892,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
Expand All @@ -1892,6 +1905,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
Expected: &TaskDiff{
Type: DiffTypeNone,
Expand All @@ -1911,6 +1925,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
Expand All @@ -1923,6 +1938,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "baz",
},
KillTimeout: 2 * time.Second,
Leader: false,
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
Expand All @@ -1946,6 +1962,12 @@ func TestTaskDiff(t *testing.T) {
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeEdited,
Name: "Leader",
Old: "true",
New: "false",
},
{
Type: DiffTypeEdited,
Name: "Meta[foo]",
Expand Down
20 changes: 18 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,8 +1931,9 @@ func (tg *TaskGroup) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have an ephemeral disk object", tg.Name))
}

// Check for duplicate tasks
// Check for duplicate tasks and that there is only leader task if any
tasks := make(map[string]int)
leaderTasks := 0
for idx, task := range tg.Tasks {
if task.Name == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d missing name", idx+1))
Expand All @@ -1941,6 +1942,14 @@ func (tg *TaskGroup) Validate() error {
} else {
tasks[task.Name] = idx
}

if task.Leader {
leaderTasks++
}
}

if leaderTasks > 1 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}

// Validate the tasks
Expand Down Expand Up @@ -2289,6 +2298,10 @@ type Task struct {
// Artifacts is a list of artifacts to download and extract before running
// the task.
Artifacts []*TaskArtifact

// Leader marks the task as the leader within the group. When the leader
// task exits, other tasks will be gracefully terminated.
Leader bool
}

func (t *Task) Copy() *Task {
Expand Down Expand Up @@ -2775,12 +2788,15 @@ const (

// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"

// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"

// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
9 changes: 6 additions & 3 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ func TestTaskGroup_Validate(t *testing.T) {
Name: "web",
Count: 1,
Tasks: []*Task{
&Task{Name: "web"},
&Task{Name: "web"},
&Task{Name: "web", Leader: true},
&Task{Name: "web", Leader: true},
&Task{},
},
RestartPolicy: &RestartPolicy{
Expand All @@ -442,7 +442,10 @@ func TestTaskGroup_Validate(t *testing.T) {
if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") {
if !strings.Contains(mErr.Errors[3].Error(), "Only one task may be marked as leader") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[4].Error(), "Task web validation failed") {
t.Fatalf("err: %s", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion website/source/docs/http/alloc.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ be specified using the `?region=` query parameter.
* `Failed Artifact Download` - Artifact(s) specified in the task failed to download.
* `Restart Signaled` - The task was signalled to be restarted.
* `Signaling` - The task was is being sent a signal.
* `Sibling task failed` - A task in the same task group failed.
* `Sibling Task Failed` - A task in the same task group failed.
* `Leader Task Dead` - The group's leader task is dead.
* `Driver` - A message from the driver.

Depending on the type the event will have applicable annotations.
7 changes: 7 additions & 0 deletions website/source/docs/http/json-jobs.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ The `Task` object supports the following keys:
sends `SIGTERM` if the task doesn't die after the `KillTimeout` duration has
elapsed. The default `KillTimeout` is 5 seconds.

* `leader` - Specifies whether the task is the leader task of the task group. If
Copy link
Contributor

@jippi jippi Feb 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so useful!

set to true, when the leader task completes, all other tasks within the task
group will be gracefully shutdown.

* `LogConfig` - This allows configuring log rotation for the `stdout` and `stderr`
buffers of a Task. See the log rotation reference below for more details.

Expand Down Expand Up @@ -681,6 +685,9 @@ README][ct].
"SIGUSR1" or "SIGINT". This option is required if the `ChangeMode` is
`signal`.

* `perms` - Specifies the rendered template's permissions. File permissions are
given as octal of the unix file permissions rwxrwxrwx.

* `Splay` - Specifies a random amount of time to wait between 0ms and the given
splay value before invoking the change mode. Should be specified in
nanoseconds.
Expand Down
4 changes: 4 additions & 0 deletions website/source/docs/job-specification/task.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ job "docs" {
If the task does not exit before the configured timeout, `SIGKILL` is sent to
the task.

- `leader` `(bool: false)` - Specifies whether the task is the leader task of
the task group. If set to true, when the leader task completes, all other
tasks within the task group will be gracefully shutdown.

- `logs` <code>([Logs][]: nil)</code> - Specifies logging configuration for the
`stdout` and `stderr` of the task.

Expand Down
Loading