diff --git a/api/allocations.go b/api/allocations.go index 829c1030f97d..00ab6984dde4 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -52,6 +52,7 @@ type Allocation struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } @@ -83,6 +84,7 @@ type AllocationListStub struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } diff --git a/api/tasks.go b/api/tasks.go index 2535d5ec565c..2eb77f9351b1 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -110,3 +110,29 @@ func (t *Task) Constrain(c *Constraint) *Task { t.Constraints = append(t.Constraints, c) return t } + +// TaskState tracks the current state of a task and events that caused state +// transistions. +type TaskState struct { + State string + Events []*TaskEvent +} + +const ( + TaskDriverFailure = "Driver Failure" + TaskStarted = "Started" + TaskTerminated = "Terminated" + TaskKilled = "Killed" +) + +// TaskEvent is an event that effects the state of a task and contains meta-data +// appropriate to the events type. +type TaskEvent struct { + Type string + Time int64 + DriverError string + ExitCode int + Signal int + Message string + KillError string +} diff --git a/client/alloc_runner.go b/client/alloc_runner.go index aa7cf79e5931..d723478dbe80 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -43,10 +43,10 @@ type AllocRunner struct { ctx *driver.ExecContext tasks map[string]*TaskRunner + restored map[string]struct{} RestartPolicy *structs.RestartPolicy taskLock sync.RWMutex - taskStatus map[string]taskStatus taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -68,16 +68,16 @@ type allocRunnerState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner { ar := &AllocRunner{ - config: config, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - taskStatus: make(map[string]taskStatus), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), + config: config, + updater: updater, + logger: logger, + alloc: alloc, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return ar } @@ -98,20 +98,22 @@ func (r *AllocRunner) RestoreState() error { // Restore fields r.alloc = snap.Alloc r.RestartPolicy = snap.RestartPolicy - r.taskStatus = snap.TaskStatus r.ctx = snap.Context // Restore the task runners var mErr multierror.Error - for name, status := range r.taskStatus { + for name, state := range r.alloc.TaskStates { + // Mark the task as restored. + r.restored[name] = struct{}{} + task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) - tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) r.tasks[name] = tr // Skip tasks in terminal states. - if status.Status == structs.AllocClientStatusDead || - status.Status == structs.AllocClientStatusFailed { + if state.State == structs.TaskStateDead { continue } @@ -152,7 +154,6 @@ func (r *AllocRunner) saveAllocRunnerState() error { snap := allocRunnerState{ Alloc: r.alloc, RestartPolicy: r.RestartPolicy, - TaskStatus: r.taskStatus, Context: r.ctx, } return persistState(r.stateFilePath(), &snap) @@ -182,16 +183,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation { return r.alloc } -// setAlloc is used to update the allocation of the runner -// we preserve the existing client status and description -func (r *AllocRunner) setAlloc(alloc *structs.Allocation) { - if r.alloc != nil { - alloc.ClientStatus = r.alloc.ClientStatus - alloc.ClientDescription = r.alloc.ClientDescription - } - r.alloc = alloc -} - // dirtySyncState is used to watch for state being marked dirty to sync func (r *AllocRunner) dirtySyncState() { for { @@ -223,22 +214,26 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { // syncStatus is used to run and sync the status when it changes func (r *AllocRunner) syncStatus() error { - // Scan the task status to termine the status of the alloc + // Scan the task states to determine the status of the alloc var pending, running, dead, failed bool r.taskStatusLock.RLock() - pending = len(r.taskStatus) < len(r.tasks) - for _, status := range r.taskStatus { - switch status.Status { - case structs.AllocClientStatusRunning: + for _, state := range r.alloc.TaskStates { + switch state.State { + case structs.TaskStateRunning: running = true - case structs.AllocClientStatusDead: - dead = true - case structs.AllocClientStatusFailed: - failed = true + case structs.TaskStatePending: + pending = true + case structs.TaskStateDead: + last := len(state.Events) - 1 + if state.Events[last].Type == structs.TaskDriverFailure { + failed = true + } else { + dead = true + } } } - if len(r.taskStatus) > 0 { - taskDesc, _ := json.Marshal(r.taskStatus) + if len(r.alloc.TaskStates) > 0 { + taskDesc, _ := json.Marshal(r.alloc.TaskStates) r.alloc.ClientDescription = string(taskDesc) } r.taskStatusLock.RUnlock() @@ -271,14 +266,8 @@ func (r *AllocRunner) setStatus(status, desc string) { } } -// setTaskStatus is used to set the status of a task -func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { - r.taskStatusLock.Lock() - r.taskStatus[taskName] = taskStatus{ - Status: status, - Description: desc, - } - r.taskStatusLock.Unlock() +// setTaskState is used to set the status of a task +func (r *AllocRunner) setTaskState(taskName string) { select { case r.dirtyCh <- struct{}{}: default: @@ -323,15 +312,15 @@ func (r *AllocRunner) Run() { // Start the task runners r.taskLock.Lock() for _, task := range tg.Tasks { - // Skip tasks that were restored - if _, ok := r.taskStatus[task.Name]; ok { + if _, ok := r.restored[task.Name]; ok { continue } // Merge in the task resources task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) - tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) r.tasks[task.Name] = tr go tr.Run() } @@ -344,7 +333,6 @@ OUTER: case update := <-r.updateCh: // Check if we're in a terminal status if update.TerminalStatus() { - r.setAlloc(update) break OUTER } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 281189fbcfb2..9abe6b8a27af 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -24,20 +24,25 @@ func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error { return m.Err } -func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) { +func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { logger := testLogger() conf := DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() + if !restarts { + alloc.Job.Type = structs.JobTypeBatch + *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} + } + ar := NewAllocRunner(logger, conf, upd.Update, alloc) return upd, ar } func TestAllocRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) go ar.Run() defer ar.Destroy() @@ -54,7 +59,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { func TestAllocRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -76,17 +81,17 @@ func TestAllocRunner_Destroy(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) - if time.Since(start) > time.Second { + if time.Since(start) > 8*time.Second { t.Fatalf("took too long to terminate") } } func TestAllocRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -109,17 +114,17 @@ func TestAllocRunner_Update(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) - if time.Since(start) > time.Second { + if time.Since(start) > 8*time.Second { t.Fatalf("took too long to terminate") } } func TestAllocRunner_SaveRestoreState(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -156,7 +161,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) if time.Since(start) > 15*time.Second { diff --git a/client/driver/docker.go b/client/driver/docker.go index b89f31eb32da..11bfe75bb5bd 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -9,13 +9,14 @@ import ( "strconv" "strings" - docker "github.com/fsouza/go-dockerclient" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + docker "github.com/fsouza/go-dockerclient" + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) type DockerDriver struct { @@ -35,7 +36,7 @@ type dockerHandle struct { cleanupImage bool imageID string containerID string - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -414,7 +415,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle imageID: dockerImage.ID, containerID: container.ID, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -474,7 +475,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er imageID: pid.ImageID, containerID: pid.ContainerID, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -493,7 +494,7 @@ func (h *dockerHandle) ID() string { return fmt.Sprintf("DOCKER:%s", string(data)) } -func (h *dockerHandle) WaitCh() chan error { +func (h *dockerHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -565,8 +566,6 @@ func (h *dockerHandle) run() { } close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) } diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 872c2419b723..da4eba725588 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -11,6 +11,7 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/environment" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -58,7 +59,7 @@ func TestDockerDriver_Handle(t *testing.T) { imageID: "imageid", containerID: "containerid", doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() @@ -163,9 +164,9 @@ func TestDockerDriver_Start_Wait(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -210,9 +211,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { defer handle.Kill() select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -268,9 +269,9 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { }() select { - case err := <-handle.WaitCh(): - if err == nil { - t.Fatalf("should err: %v", err) + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatalf("should err: %v", res) } case <-time.After(10 * time.Second): t.Fatalf("timeout") diff --git a/client/driver/driver.go b/client/driver/driver.go index e2739e2b8a29..259c9df3608b 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -11,6 +11,8 @@ import ( "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // BuiltinDrivers contains the built in registered drivers @@ -85,7 +87,7 @@ type DriverHandle interface { ID() string // WaitCh is used to return a channel used wait for task completion - WaitCh() chan error + WaitCh() chan *cstructs.WaitResult // Update is used to update the task if possible Update(task *structs.Task) error diff --git a/client/driver/exec.go b/client/driver/exec.go index 4de719c465a4..f246dd48e45c 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // ExecDriver fork/execs tasks using as many of the underlying OS's isolation @@ -25,7 +27,7 @@ type ExecDriver struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -106,7 +108,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -123,7 +125,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -134,7 +136,7 @@ func (h *execHandle) ID() string { return id } -func (h *execHandle) WaitCh() chan error { +func (h *execHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -154,10 +156,8 @@ func (h *execHandle) Kill() error { } func (h *execHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 06c7107456a5..86e4320e6739 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -99,9 +99,9 @@ func TestExecDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(4 * time.Second): t.Fatalf("timeout") @@ -143,9 +143,9 @@ func TestExecDriver_Start_Artifact_basic(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -187,9 +187,9 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -224,9 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -278,8 +278,8 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(8 * time.Second): diff --git a/client/driver/executor/exec.go b/client/driver/executor/exec.go index 8cf076bab251..c514890ef311 100644 --- a/client/driver/executor/exec.go +++ b/client/driver/executor/exec.go @@ -27,6 +27,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var errNoResources = fmt.Errorf("No resources are associated with this task") @@ -54,7 +56,7 @@ type Executor interface { Open(string) error // Wait waits till the user's command is completed. - Wait() error + Wait() *cstructs.WaitResult // Returns a handle that is executor specific for use in reopening. ID() (string, error) diff --git a/client/driver/executor/exec_basic.go b/client/driver/executor/exec_basic.go index 4b5f6a9f15fc..438ae6b92129 100644 --- a/client/driver/executor/exec_basic.go +++ b/client/driver/executor/exec_basic.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/client/driver/spawn" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // BasicExecutor should work everywhere, and as a result does not include @@ -92,17 +94,8 @@ func (e *BasicExecutor) Open(id string) error { return e.spawn.Valid() } -func (e *BasicExecutor) Wait() error { - code, err := e.spawn.Wait() - if err != nil { - return err - } - - if code != 0 { - return fmt.Errorf("Task exited with code: %d", code) - } - - return nil +func (e *BasicExecutor) Wait() *cstructs.WaitResult { + return e.spawn.Wait() } func (e *BasicExecutor) ID() (string, error) { diff --git a/client/driver/executor/exec_linux.go b/client/driver/executor/exec_linux.go index 0d1d033cad45..14f4f809ce5d 100644 --- a/client/driver/executor/exec_linux.go +++ b/client/driver/executor/exec_linux.go @@ -24,6 +24,8 @@ import ( cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" "github.com/opencontainers/runc/libcontainer/cgroups/systemd" cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -199,15 +201,11 @@ func (e *LinuxExecutor) Start() error { // Wait waits til the user process exits and returns an error on non-zero exit // codes. Wait also cleans up the task directory and created cgroups. -func (e *LinuxExecutor) Wait() error { +func (e *LinuxExecutor) Wait() *cstructs.WaitResult { errs := new(multierror.Error) - code, err := e.spawn.Wait() - if err != nil { - errs = multierror.Append(errs, err) - } - - if code != 0 { - errs = multierror.Append(errs, fmt.Errorf("Task exited with code: %d", code)) + res := e.spawn.Wait() + if res.Err != nil { + errs = multierror.Append(errs, res.Err) } if err := e.destroyCgroup(); err != nil { @@ -218,7 +216,8 @@ func (e *LinuxExecutor) Wait() error { errs = multierror.Append(errs, err) } - return errs.ErrorOrNil() + res.Err = errs.ErrorOrNil() + return res } func (e *LinuxExecutor) Shutdown() error { diff --git a/client/driver/executor/test_harness.go b/client/driver/executor/test_harness.go index 1e37f8eff93b..8ebd5434c9bb 100644 --- a/client/driver/executor/test_harness.go +++ b/client/driver/executor/test_harness.go @@ -127,8 +127,8 @@ func Executor_Start_Wait(t *testing.T, command buildExecCommand) { log.Panicf("Start() failed: %v", err) } - if err := e.Wait(); err != nil { - log.Panicf("Wait() failed: %v", err) + if res := e.Wait(); !res.Successful() { + log.Panicf("Wait() failed: %v", res) } output, err := ioutil.ReadFile(absFilePath) @@ -215,8 +215,8 @@ func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func() Ex log.Panicf("Open(%v) failed: %v", id, err) } - if err := e2.Wait(); err != nil { - log.Panicf("Wait() failed: %v", err) + if res := e2.Wait(); !res.Successful() { + log.Panicf("Wait() failed: %v", res) } output, err := ioutil.ReadFile(absFilePath) diff --git a/client/driver/java.go b/client/driver/java.go index 1aa2c6d3f941..5b4a8b5bd644 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // JavaDriver is a simple driver to execute applications packaged in Jars. @@ -28,7 +30,7 @@ type JavaDriver struct { // javaHandle is returned from Start/Open as a handle to the PID type javaHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -148,7 +150,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &javaHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -166,7 +168,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &javaHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -178,7 +180,7 @@ func (h *javaHandle) ID() string { return id } -func (h *javaHandle) WaitCh() chan error { +func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -198,10 +200,8 @@ func (h *javaHandle) Kill() error { } func (h *javaHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 0c3490d0e15f..b72c5899bddd 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -118,9 +118,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): // expect the timeout b/c it's a long lived process @@ -171,8 +171,8 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(8 * time.Second): diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 79193a217ce5..1b9835cdcfca 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -33,7 +35,7 @@ type QemuDriver struct { // qemuHandle is returned from Start/Open as a handle to the PID type qemuHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -193,7 +195,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &qemuHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -211,7 +213,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -222,7 +224,7 @@ func (h *qemuHandle) ID() string { return id } -func (h *qemuHandle) WaitCh() chan error { +func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -244,10 +246,8 @@ func (h *qemuHandle) Kill() error { } func (h *qemuHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 12e99b7f4265..d3b78d04f47f 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -12,6 +12,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) const ( @@ -30,7 +32,7 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -115,7 +117,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -132,7 +134,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -143,7 +145,7 @@ func (h *rawExecHandle) ID() string { return id } -func (h *rawExecHandle) WaitCh() chan error { +func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -163,10 +165,8 @@ func (h *rawExecHandle) Kill() error { } func (h *rawExecHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 053f29337b0f..f91425fc09d6 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -216,9 +216,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -252,9 +252,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -305,8 +305,8 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(2 * time.Second): diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 3f1912531747..1463f78a4258 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -19,6 +19,8 @@ import ( "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -39,7 +41,7 @@ type rktHandle struct { proc *os.Process image string logger *log.Logger - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -183,7 +185,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e image: img, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -209,7 +211,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error image: qpid.Image, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -229,7 +231,7 @@ func (h *rktHandle) ID() string { return fmt.Sprintf("Rkt:%s", string(data)) } -func (h *rktHandle) WaitCh() chan error { +func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -253,10 +255,11 @@ func (h *rktHandle) Kill() error { func (h *rktHandle) run() { ps, err := h.proc.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } else if !ps.Success() { - h.waitCh <- fmt.Errorf("task exited with error") + code := 0 + if !ps.Success() { + // TODO: Better exit code parsing. + code = 1 } + h.waitCh <- cstructs.NewWaitResult(code, 0, err) close(h.waitCh) } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 53ea2a4277c3..aeed4288bd41 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" + cstructs "github.com/hashicorp/nomad/client/driver/structs" ctestutils "github.com/hashicorp/nomad/client/testutil" ) @@ -35,7 +36,7 @@ func TestRktDriver_Handle(t *testing.T) { proc: &os.Process{Pid: 123}, image: "foo", doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() @@ -143,9 +144,9 @@ func TestRktDriver_Start_Wait(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -184,9 +185,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -220,9 +221,9 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) { defer handle.Kill() select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") diff --git a/client/driver/spawn/spawn.go b/client/driver/spawn/spawn.go index 9cf06e990917..32a27e11c570 100644 --- a/client/driver/spawn/spawn.go +++ b/client/driver/spawn/spawn.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/command" "github.com/hashicorp/nomad/helper/discover" ) @@ -203,7 +204,7 @@ func (s *Spawner) sendAbortCommand(w io.Writer) error { // Wait returns the exit code of the user process or an error if the wait // failed. -func (s *Spawner) Wait() (int, error) { +func (s *Spawner) Wait() *structs.WaitResult { if os.Getpid() == s.SpawnPpid { return s.waitAsParent() } @@ -212,9 +213,9 @@ func (s *Spawner) Wait() (int, error) { } // waitAsParent waits on the process if the current process was the spawner. -func (s *Spawner) waitAsParent() (int, error) { +func (s *Spawner) waitAsParent() *structs.WaitResult { if s.SpawnPpid != os.Getpid() { - return -1, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid()) + return structs.NewWaitResult(-1, 0, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid())) } // Try to reattach to the spawn. @@ -228,7 +229,7 @@ func (s *Spawner) waitAsParent() (int, error) { } if _, err := s.spawn.Wait(); err != nil { - return -1, err + return structs.NewWaitResult(-1, 0, err) } return s.pollWait() @@ -237,11 +238,11 @@ func (s *Spawner) waitAsParent() (int, error) { // pollWait polls on the spawn daemon to determine when it exits. After it // exits, it reads the state file and returns the exit code and possibly an // error. -func (s *Spawner) pollWait() (int, error) { +func (s *Spawner) pollWait() *structs.WaitResult { // Stat to check if it is there to avoid a race condition. stat, err := os.Stat(s.StateFile) if err != nil { - return -1, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err)) } // If there is data it means that the file has already been written. @@ -261,29 +262,29 @@ func (s *Spawner) pollWait() (int, error) { // readExitCode parses the state file and returns the exit code of the task. It // returns an error if the file can't be read. -func (s *Spawner) readExitCode() (int, error) { +func (s *Spawner) readExitCode() *structs.WaitResult { f, err := os.Open(s.StateFile) defer f.Close() if err != nil { - return -1, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err)) } stat, err := f.Stat() if err != nil { - return -1, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err)) } if stat.Size() == 0 { - return -1, fmt.Errorf("Empty state file: %v", s.StateFile) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Empty state file: %v", s.StateFile)) } var exitStatus command.SpawnExitStatus dec := json.NewDecoder(f) if err := dec.Decode(&exitStatus); err != nil { - return -1, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err)) } - return exitStatus.ExitCode, nil + return structs.NewWaitResult(exitStatus.ExitCode, 0, nil) } // Valid checks that the state of the Spawner is valid and that a subsequent @@ -297,7 +298,7 @@ func (s *Spawner) Valid() error { } // The task isn't alive so check that there is a valid exit code file. - if _, err := s.readExitCode(); err == nil { + if res := s.readExitCode(); res.Err == nil { return nil } diff --git a/client/driver/spawn/spawn_test.go b/client/driver/spawn/spawn_test.go index de5d8e97ce38..eb013db0de2c 100644 --- a/client/driver/spawn/spawn_test.go +++ b/client/driver/spawn/spawn_test.go @@ -68,8 +68,8 @@ func TestSpawn_SetsLogs(t *testing.T) { t.Fatalf("Spawn() failed: %v", err) } - if code, err := spawn.Wait(); code != 0 && err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", code, err) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } stdout2, err := os.Open(stdout.Name()) @@ -129,13 +129,8 @@ func TestSpawn_ParentWaitExited(t *testing.T) { time.Sleep(1 * time.Second) - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -152,13 +147,8 @@ func TestSpawn_ParentWait(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -179,13 +169,8 @@ func TestSpawn_NonParentWaitExited(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -213,13 +198,8 @@ func TestSpawn_NonParentWait(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -255,8 +235,8 @@ func TestSpawn_DeadSpawnDaemon_Parent(t *testing.T) { t.FailNow() } - if _, err := spawn.Wait(); err == nil { - t.Fatalf("Wait() should have failed: %v", err) + if res := spawn.Wait(); res.Err == nil { + t.Fatalf("Wait() should have failed: %v", res.Err) } } @@ -294,8 +274,8 @@ func TestSpawn_DeadSpawnDaemon_NonParent(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - if _, err := spawn.Wait(); err == nil { - t.Fatalf("Wait() should have failed: %v", err) + if res := spawn.Wait(); res.Err == nil { + t.Fatalf("Wait() should have failed: %v", res.Err) } } @@ -316,8 +296,8 @@ func TestSpawn_Valid_TaskRunning(t *testing.T) { t.Fatalf("Valid() failed: %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } } @@ -334,8 +314,8 @@ func TestSpawn_Valid_TaskExit_ExitCode(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } if err := spawn.Valid(); err != nil { @@ -355,8 +335,8 @@ func TestSpawn_Valid_TaskExit_NoExitCode(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } // Delete the file so that it can't find the exit code. diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go new file mode 100644 index 000000000000..a60daf8df4a0 --- /dev/null +++ b/client/driver/structs/structs.go @@ -0,0 +1,27 @@ +package structs + +import "fmt" + +// WaitResult stores the result of a Wait operation. +type WaitResult struct { + ExitCode int + Signal int + Err error +} + +func NewWaitResult(code, signal int, err error) *WaitResult { + return &WaitResult{ + ExitCode: code, + Signal: signal, + Err: err, + } +} + +func (r *WaitResult) Successful() bool { + return r.ExitCode == 0 && r.Signal == 0 && r.Err == nil +} + +func (r *WaitResult) String() string { + return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v", + r.ExitCode, r.Signal, r.Err) +} diff --git a/client/restarts.go b/client/restarts.go index 4141405f8d6c..ae940c2f1e10 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -1,8 +1,9 @@ package client import ( - "github.com/hashicorp/nomad/nomad/structs" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) // The errorCounter keeps track of the number of times a process has exited @@ -30,6 +31,11 @@ func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) res } } +// noRestartsTracker returns a RestartTracker that never restarts. +func noRestartsTracker() restartTracker { + return &batchRestartTracker{maxAttempts: 0} +} + type batchRestartTracker struct { maxAttempts int delay time.Duration diff --git a/client/task_runner.go b/client/task_runner.go index c8a5a390aa15..984af27670fa 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // TaskRunner is used to wrap a task within an allocation and provide the execution context. @@ -25,6 +27,7 @@ type TaskRunner struct { restartTracker restartTracker task *structs.Task + state *structs.TaskState updateCh chan *structs.Task handle driver.DriverHandle @@ -42,13 +45,14 @@ type taskRunnerState struct { HandleID string } -// TaskStateUpdater is used to update the status of a task -type TaskStateUpdater func(taskName, status, desc string) +// TaskStateUpdater is used to signal that tasks state has changed. +type TaskStateUpdater func(taskName string) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, - allocID string, task *structs.Task, restartTracker restartTracker) *TaskRunner { + allocID string, task *structs.Task, state *structs.TaskState, + restartTracker restartTracker) *TaskRunner { tc := &TaskRunner{ config: config, @@ -58,6 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, allocID: allocID, task: task, + state: state, updateCh: make(chan *structs.Task, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), @@ -132,12 +137,35 @@ func (r *TaskRunner) DestroyState() error { return os.RemoveAll(r.stateFilePath()) } -// setStatus is used to update the status of the task runner -func (r *TaskRunner) setStatus(status, desc string) { +func (r *TaskRunner) appendEvent(event *structs.TaskEvent) { + capacity := 10 + if r.state.Events == nil { + r.state.Events = make([]*structs.TaskEvent, 0, capacity) + } + + // If we hit capacity, then shift it. + if len(r.state.Events) == capacity { + old := r.state.Events + r.state.Events = make([]*structs.TaskEvent, 0, capacity) + r.state.Events = append(r.state.Events, old[1:]...) + } + + r.state.Events = append(r.state.Events, event) +} + +// setState is used to update the state of the task runner +func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { + // Update the task. + r.state.State = state + r.appendEvent(event) + + // Persist our state to disk. if err := r.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state of Task Runner: %v", r.task.Name) } - r.updater(r.task.Name, status, desc) + + // Indicate the task has been updated. + r.updater(r.task.Name) } // createDriver makes a driver for the task @@ -157,7 +185,8 @@ func (r *TaskRunner) startTask() error { // Create a driver driver, err := r.createDriver() if err != nil { - r.setStatus(structs.AllocClientStatusFailed, err.Error()) + e := structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err) + r.setState(structs.TaskStateDead, e) return err } @@ -166,94 +195,118 @@ func (r *TaskRunner) startTask() error { if err != nil { r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) - r.setStatus(structs.AllocClientStatusFailed, - fmt.Sprintf("failed to start: %v", err)) + e := structs.NewTaskEvent(structs.TaskDriverFailure). + SetDriverError(fmt.Errorf("failed to start: %v", err)) + r.setState(structs.TaskStateDead, e) return err } r.handle = handle - r.setStatus(structs.AllocClientStatusRunning, "task started") + r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil } // Run is a long running routine used to manage the task func (r *TaskRunner) Run() { - var err error defer close(r.waitCh) r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.allocID) - // Start the task if not yet started - if r.handle == nil { - if err := r.startTask(); err != nil { + r.run() + return +} + +func (r *TaskRunner) run() { + var forceStart bool + for { + // Start the task if not yet started or it is being forced. + if r.handle == nil || forceStart { + forceStart = false + if err := r.startTask(); err != nil { + return + } + } + + // Store the errors that caused use to stop waiting for updates. + var waitRes *cstructs.WaitResult + var destroyErr error + destroyed := false + + OUTER: + // Wait for updates + for { + select { + case waitRes = <-r.handle.WaitCh(): + break OUTER + case update := <-r.updateCh: + // Update + r.task = update + if err := r.handle.Update(update); err != nil { + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + } + case <-r.destroyCh: + // Send the kill signal, and use the WaitCh to block until complete + if err := r.handle.Kill(); err != nil { + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + destroyErr = err + } + destroyed = true + } + } + + // If the user destroyed the task, we do not attempt to do any restarts. + if destroyed { + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) return } - } - // Monitoring the Driver - defer r.DestroyState() - err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh) - for err != nil { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + // Log whether the task was successful or not. + if !waitRes.Successful() { + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) + } else { + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) + } + + // Check if we should restart. If not mark task as dead and exit. + waitEvent := r.waitErrorToEvent(waitRes) shouldRestart, when := r.restartTracker.nextRestart() if !shouldRestart { r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) - r.setStatus(structs.AllocClientStatusDead, fmt.Sprintf("task failed with: %v", err)) + r.setState(structs.TaskStateDead, waitEvent) return } r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) - r.setStatus(structs.AllocClientStatusPending, "Task Restarting") r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) + r.setState(structs.TaskStatePending, waitEvent) + + // Sleep but watch for destroy events. select { case <-time.After(when): case <-r.destroyCh: } + + // Destroyed while we were waiting to restart, so abort. r.destroyLock.Lock() - if r.destroy { + destroyed = r.destroy + r.destroyLock.Unlock() + if destroyed { r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) - break - } - if err = r.startTask(); err != nil { - r.destroyLock.Unlock() - continue + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + return } - r.destroyLock.Unlock() - err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh) - } - // Cleanup after ourselves - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) - r.setStatus(structs.AllocClientStatusDead, "task completed") + // Set force start because we are restarting the task. + forceStart = true + } + return } -// This functions listens to messages from the driver and blocks until the -// driver exits -func (r *TaskRunner) monitorDriver(waitCh chan error, updateCh chan *structs.Task, destroyCh chan struct{}) error { - var err error -OUTER: - // Wait for updates - for { - select { - case err = <-waitCh: - break OUTER - case update := <-updateCh: - // Update - r.task = update - if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) - } - - case <-destroyCh: - // Send the kill signal, and use the WaitCh to block until complete - if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) - } - } - } - return err +// Helper function for converting a WaitResult into a TaskTerminated event. +func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { + return structs.NewTaskEvent(structs.TaskTerminated). + SetExitCode(res.ExitCode). + SetSignal(res.Signal). + SetExitMessage(res.Err) } // Update is used to update the task of the context diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 751d00fa9ec9..c8f8697fa3a9 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -4,7 +4,6 @@ import ( "log" "os" "path/filepath" - "strings" "testing" "time" @@ -21,21 +20,11 @@ func testLogger() *log.Logger { return log.New(os.Stderr, "", log.LstdFlags) } -type MockTaskStateUpdater struct { - Count int - Name []string - Status []string - Description []string -} +type MockTaskStateUpdater struct{} -func (m *MockTaskStateUpdater) Update(name, status, desc string) { - m.Count += 1 - m.Name = append(m.Name, name) - m.Status = append(m.Status, status) - m.Description = append(m.Description, desc) -} +func (m *MockTaskStateUpdater) Update(name string) {} -func testTaskRunner() (*MockTaskStateUpdater, *TaskRunner) { +func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { logger := testLogger() conf := DefaultConfig() conf.StateDir = os.TempDir() @@ -54,13 +43,18 @@ func testTaskRunner() (*MockTaskStateUpdater, *TaskRunner) { ctx := driver.NewExecContext(allocDir, alloc.ID) rp := structs.NewRestartPolicy(structs.JobTypeService) restartTracker := newRestartTracker(structs.JobTypeService, rp) - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, restartTracker) + if !restarts { + restartTracker = noRestartsTracker() + } + + state := alloc.TaskStates[task.Name] + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker) return upd, tr } func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + _, tr := testTaskRunner(false) go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -71,33 +65,26 @@ func TestTaskRunner_SimpleRun(t *testing.T) { t.Fatalf("timeout") } - if upd.Count != 2 { - t.Fatalf("should have 2 updates: %#v", upd) - } - if upd.Name[0] != tr.task.Name { - t.Fatalf("bad: %#v", upd.Name) - } - if upd.Status[0] != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", upd.Status) - } - if upd.Description[0] != "task started" { - t.Fatalf("bad: %#v", upd.Description) + if len(tr.state.Events) != 2 { + t.Fatalf("should have 2 updates: %#v", tr.state.Events) } - if upd.Name[1] != tr.task.Name { - t.Fatalf("bad: %#v", upd.Name) + if tr.state.State != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) } - if upd.Status[1] != structs.AllocClientStatusDead { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.Events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) } - if upd.Description[1] != "task completed" { - t.Fatalf("bad: %#v", upd.Description) + + if tr.state.Events[1].Type != structs.TaskTerminated { + t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskTerminated) } } func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + _, tr := testTaskRunner(true) defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -113,27 +100,31 @@ func TestTaskRunner_Destroy(t *testing.T) { select { case <-tr.WaitCh(): - case <-time.After(2 * time.Second): + case <-time.After(8 * time.Second): t.Fatalf("timeout") } - if upd.Count != 2 { - t.Fatalf("should have 2 updates: %#v", upd) + if len(tr.state.Events) != 2 { + t.Fatalf("should have 2 updates: %#v", tr.state.Events) } - if upd.Status[0] != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.State != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) } - if upd.Status[1] != structs.AllocClientStatusDead { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.Events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) } - if !strings.Contains(upd.Description[1], "task failed") { - t.Fatalf("bad: %#v", upd.Description) + + if tr.state.Events[1].Type != structs.TaskKilled { + t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskKilled) } + } func TestTaskRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - _, tr := testTaskRunner() + _, tr := testTaskRunner(false) // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" @@ -158,7 +149,7 @@ func TestTaskRunner_Update(t *testing.T) { func TestTaskRunner_SaveRestoreState(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + upd, tr := testTaskRunner(false) // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" @@ -174,7 +165,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Create a new task runner tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.restartTracker) + tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 329ecd87200a..2ef5c834afd9 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1,8 +1,9 @@ package mock import ( - "github.com/hashicorp/nomad/nomad/structs" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) func Node() *structs.Node { @@ -221,6 +222,11 @@ func Alloc() *structs.Allocation { }, }, }, + TaskStates: map[string]*structs.TaskState{ + "web": &structs.TaskState{ + State: structs.TaskStatePending, + }, + }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ce5007ac54cd..cdfdfa53e42a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1053,6 +1053,95 @@ func (t *Task) GoString() string { return fmt.Sprintf("*%#v", *t) } +// Set of possible states for a task. +const ( + TaskStatePending = "pending" // The task is waiting to be run. + TaskStateRunning = "running" // The task is currently running. + TaskStateDead = "dead" // Terminal state of task. +) + +// TaskState tracks the current state of a task and events that caused state +// transistions. +type TaskState struct { + // The current state of the task. + State string + + // Series of task events that transistion the state of the task. + Events []*TaskEvent +} + +const ( + // A Driver failure indicates that the task could not be started due to a + // failure in the driver. + TaskDriverFailure = "Driver Failure" + + // Task Started signals that the task was started and its timestamp can be + // used to determine the running length of the task. + TaskStarted = "Started" + + // Task terminated indicates that the task was started and exited. + TaskTerminated = "Terminated" + + // Task Killed indicates a user has killed the task. + TaskKilled = "Killed" +) + +// TaskEvent is an event that effects the state of a task and contains meta-data +// appropriate to the events type. +type TaskEvent struct { + Type string + Time int64 // Unix Nanosecond timestamp + + // Driver Failure fields. + DriverError string // A driver error occured while starting the task. + + // Task Terminated Fields. + ExitCode int // The exit code of the task. + Signal int // The signal that terminated the task. + Message string // A possible message explaining the termination of the task. + + // Task Killed Fields. + KillError string // Error killing the task. +} + +func NewTaskEvent(event string) *TaskEvent { + return &TaskEvent{ + Type: event, + Time: time.Now().UnixNano(), + } +} + +func (e *TaskEvent) SetDriverError(err error) *TaskEvent { + if err != nil { + e.DriverError = err.Error() + } + return e +} + +func (e *TaskEvent) SetExitCode(c int) *TaskEvent { + e.ExitCode = c + return e +} + +func (e *TaskEvent) SetSignal(s int) *TaskEvent { + e.Signal = s + return e +} + +func (e *TaskEvent) SetExitMessage(err error) *TaskEvent { + if err != nil { + e.Message = err.Error() + } + return e +} + +func (e *TaskEvent) SetKillError(err error) *TaskEvent { + if err != nil { + e.KillError = err.Error() + } + return e +} + // Validate is used to sanity check a task group func (t *Task) Validate() error { var mErr multierror.Error @@ -1171,6 +1260,9 @@ type Allocation struct { // ClientStatusDescription is meant to provide more human useful information ClientDescription string + // TaskStates stores the state of each task, + TaskStates map[string]*TaskState + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1200,6 +1292,7 @@ func (a *Allocation) Stub() *AllocListStub { DesiredDescription: a.DesiredDescription, ClientStatus: a.ClientStatus, ClientDescription: a.ClientDescription, + TaskStates: a.TaskStates, CreateIndex: a.CreateIndex, ModifyIndex: a.ModifyIndex, } @@ -1217,6 +1310,7 @@ type AllocListStub struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 7957f2360ff8..b3b486658837 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -285,11 +285,13 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index d3f6fb27e608..d448642ff531 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -252,11 +252,13 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/util.go b/scheduler/util.go index 43b4b0b0cfd4..44bd2ae08512 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -445,3 +445,11 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { return c } + +func initTaskState(tg *structs.TaskGroup, state string) map[string]*structs.TaskState { + states := make(map[string]*structs.TaskState, len(tg.Tasks)) + for _, task := range tg.Tasks { + states[task.Name] = &structs.TaskState{State: state} + } + return states +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 7873c55c5829..275be30ba9a1 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -648,3 +648,26 @@ func TestTaskGroupConstraints(t *testing.T) { } } + +func TestInitTaskState(t *testing.T) { + tg := &structs.TaskGroup{ + Tasks: []*structs.Task{ + &structs.Task{Name: "foo"}, + &structs.Task{Name: "bar"}, + }, + } + expPending := map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + "bar": &structs.TaskState{State: structs.TaskStatePending}, + } + expDead := map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStateDead}, + "bar": &structs.TaskState{State: structs.TaskStateDead}, + } + actPending := initTaskState(tg, structs.TaskStatePending) + actDead := initTaskState(tg, structs.TaskStateDead) + + if !(reflect.DeepEqual(expPending, actPending) && reflect.DeepEqual(expDead, actDead)) { + t.Fatal("Expected and actual not equal") + } +}