Skip to content

Commit

Permalink
Merge pull request #416 from hashicorp/f-task-state
Browse files Browse the repository at this point in the history
Track Tasks State
  • Loading branch information
dadgar committed Nov 16, 2015
2 parents c4a2a9c + 95222f6 commit c5315d1
Show file tree
Hide file tree
Showing 32 changed files with 559 additions and 345 deletions.
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Allocation struct {
DesiredDescription string
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
}
Expand Down Expand Up @@ -83,6 +84,7 @@ type AllocationListStub struct {
DesiredDescription string
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
}
Expand Down
26 changes: 26 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,29 @@ func (t *Task) Constrain(c *Constraint) *Task {
t.Constraints = append(t.Constraints, c)
return t
}

// TaskState tracks the current state of a task and events that caused state
// transistions.
type TaskState struct {
State string
Events []*TaskEvent
}

const (
TaskDriverFailure = "Driver Failure"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
}
88 changes: 38 additions & 50 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type AllocRunner struct {

ctx *driver.ExecContext
tasks map[string]*TaskRunner
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex

taskStatus map[string]taskStatus
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation
Expand All @@ -68,16 +68,16 @@ type allocRunnerState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStatus: make(map[string]taskStatus),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return ar
}
Expand All @@ -98,20 +98,22 @@ func (r *AllocRunner) RestoreState() error {
// Restore fields
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.taskStatus = snap.TaskStatus
r.ctx = snap.Context

// Restore the task runners
var mErr multierror.Error
for name, status := range r.taskStatus {
for name, state := range r.alloc.TaskStates {
// Mark the task as restored.
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
r.tasks[name] = tr

// Skip tasks in terminal states.
if status.Status == structs.AllocClientStatusDead ||
status.Status == structs.AllocClientStatusFailed {
if state.State == structs.TaskStateDead {
continue
}

Expand Down Expand Up @@ -152,7 +154,6 @@ func (r *AllocRunner) saveAllocRunnerState() error {
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
TaskStatus: r.taskStatus,
Context: r.ctx,
}
return persistState(r.stateFilePath(), &snap)
Expand Down Expand Up @@ -182,16 +183,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
}

// setAlloc is used to update the allocation of the runner
// we preserve the existing client status and description
func (r *AllocRunner) setAlloc(alloc *structs.Allocation) {
if r.alloc != nil {
alloc.ClientStatus = r.alloc.ClientStatus
alloc.ClientDescription = r.alloc.ClientDescription
}
r.alloc = alloc
}

// dirtySyncState is used to watch for state being marked dirty to sync
func (r *AllocRunner) dirtySyncState() {
for {
Expand Down Expand Up @@ -223,22 +214,26 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Scan the task status to termine the status of the alloc
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
for _, state := range r.alloc.TaskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
Expand Down Expand Up @@ -271,14 +266,8 @@ func (r *AllocRunner) setStatus(status, desc string) {
}
}

// setTaskStatus is used to set the status of a task
func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
r.taskStatusLock.Lock()
r.taskStatus[taskName] = taskStatus{
Status: status,
Description: desc,
}
r.taskStatusLock.Unlock()
// setTaskState is used to set the status of a task
func (r *AllocRunner) setTaskState(taskName string) {
select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down Expand Up @@ -323,15 +312,15 @@ func (r *AllocRunner) Run() {
// Start the task runners
r.taskLock.Lock()
for _, task := range tg.Tasks {
// Skip tasks that were restored
if _, ok := r.taskStatus[task.Name]; ok {
if _, ok := r.restored[task.Name]; ok {
continue
}

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand All @@ -344,7 +333,6 @@ OUTER:
case update := <-r.updateCh:
// Check if we're in a terminal status
if update.TerminalStatus() {
r.setAlloc(update)
break OUTER
}

Expand Down
25 changes: 15 additions & 10 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@ func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
return m.Err
}

func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) {
func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
logger := testLogger()
conf := DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
}

ar := NewAllocRunner(logger, conf, upd.Update, alloc)
return upd, ar
}

func TestAllocRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)
go ar.Run()
defer ar.Destroy()

Expand All @@ -54,7 +59,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) {

func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand All @@ -76,17 +81,17 @@ func TestAllocRunner_Destroy(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > time.Second {
if time.Since(start) > 8*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand All @@ -109,17 +114,17 @@ func TestAllocRunner_Update(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > time.Second {
if time.Since(start) > 8*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand Down Expand Up @@ -156,7 +161,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > 15*time.Second {
Expand Down
17 changes: 8 additions & 9 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"strconv"
"strings"

docker "github.com/fsouza/go-dockerclient"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"

docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)

type DockerDriver struct {
Expand All @@ -35,7 +36,7 @@ type dockerHandle struct {
cleanupImage bool
imageID string
containerID string
waitCh chan error
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

Expand Down Expand Up @@ -414,7 +415,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
imageID: dockerImage.ID,
containerID: container.ID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
Expand Down Expand Up @@ -474,7 +475,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
imageID: pid.ImageID,
containerID: pid.ContainerID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
Expand All @@ -493,7 +494,7 @@ func (h *dockerHandle) ID() string {
return fmt.Sprintf("DOCKER:%s", string(data))
}

func (h *dockerHandle) WaitCh() chan error {
func (h *dockerHandle) WaitCh() chan *cstructs.WaitResult {
return h.waitCh
}

Expand Down Expand Up @@ -565,8 +566,6 @@ func (h *dockerHandle) run() {
}

close(h.doneCh)
if err != nil {
h.waitCh <- err
}
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)
}
Loading

0 comments on commit c5315d1

Please sign in to comment.