Skip to content

Commit

Permalink
Merge pull request #731 from hashicorp/f-reduce-client-alloc-pulls
Browse files Browse the repository at this point in the history
Client only pulls update allocations from server
  • Loading branch information
dadgar committed Feb 2, 2016
2 parents 9e7e214 + 019883c commit 04f48c5
Show file tree
Hide file tree
Showing 20 changed files with 406 additions and 310 deletions.
12 changes: 0 additions & 12 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que
return resp, qm, nil
}

// ClientAllocations is used to return a lightweight list of allocations associated with a node.
// It is primarily used by the client in order to determine which allocations actually need
// an update.
func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) {
var resp map[string]uint64
qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}

// ForceEvaluate is used to force-evaluate an existing node.
func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp nodeEvalResponse
Expand Down
18 changes: 0 additions & 18 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) {
}
}

func TestNodes_ClientAllocations(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

// Looking up by a non-existent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocs, _, err := nodes.ClientAllocations("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
}

func TestNodes_ForceEvaluate(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
Expand Down
176 changes: 118 additions & 58 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"encoding/json"
"fmt"
"log"
"os"
Expand All @@ -22,12 +21,6 @@ const (
allocSyncRetryIntv = 15 * time.Second
)

// taskStatus is used to track the status of a task
type taskStatus struct {
Status string
Description string
}

// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error

Expand All @@ -38,12 +31,18 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
alloc *structs.Allocation
allocLock sync.Mutex

// Explicit status of allocation. Set when there are failures
allocClientStatus string
allocClientDescription string

dirtyCh chan struct{}

ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex
Expand All @@ -60,10 +59,12 @@ type AllocRunner struct {

// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Alloc *structs.Allocation
RestartPolicy *structs.RestartPolicy
TaskStatus map[string]taskStatus
Context *driver.ExecContext
Alloc *structs.Allocation
AllocClientStatus string
AllocClientDescription string
RestartPolicy *structs.RestartPolicy
TaskStates map[string]*structs.TaskState
Context *driver.ExecContext
}

// NewAllocRunner is used to create a new allocation context
Expand All @@ -77,6 +78,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: alloc.TaskStates,
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
Expand All @@ -102,18 +104,20 @@ func (r *AllocRunner) RestoreState() error {
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.ctx = snap.Context
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
r.taskStates = snap.TaskStates

// Restore the task runners
var mErr multierror.Error
for name, state := range r.alloc.TaskStates {
for name, state := range r.taskStates {
// Mark the task as restored.
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -155,10 +159,15 @@ func (r *AllocRunner) SaveState() error {
func (r *AllocRunner) saveAllocRunnerState() error {
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
r.allocLock.Lock()
defer r.allocLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
TaskStates: r.taskStates,
}
return persistState(r.stateFilePath(), &snap)
}
Expand All @@ -184,7 +193,47 @@ func (r *AllocRunner) DestroyContext() error {

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
r.allocLock.Lock()
alloc := r.alloc.Copy()
r.allocLock.Unlock()

// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = r.taskStates
for _, state := range r.taskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
r.taskStatusLock.RUnlock()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
return alloc
}

// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
alloc.ClientStatus = structs.AllocClientStatusDead
}
return alloc
}

// dirtySyncState is used to watch for state being marked dirty to sync
Expand Down Expand Up @@ -218,43 +267,13 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
for _, state := range r.alloc.TaskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()

// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Get a copy of our alloc.
alloc := r.Alloc()

// Attempt to update the status
if err := r.updater(r.alloc); err != nil {
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
Expand All @@ -271,13 +290,42 @@ func (r *AllocRunner) setStatus(status, desc string) {
}

// setTaskState is used to set the status of a task
func (r *AllocRunner) setTaskState(taskName string) {
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
if !ok {
r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName)
return
}

// Set the tasks state.
taskState.State = state
r.appendTaskEvent(taskState, event)

select {
case r.dirtyCh <- struct{}{}:
default:
}
}

// appendTaskEvent updates the task status by appending the new event.
func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
capacity := 10
if state.Events == nil {
state.Events = make([]*structs.TaskEvent, 0, capacity)
}

// If we hit capacity, then shift it.
if len(state.Events) == capacity {
old := state.Events
state.Events = make([]*structs.TaskEvent, 0, capacity)
state.Events = append(state.Events, old[1:]...)
}

state.Events = append(state.Events, event)
}

// Run is a long running goroutine used to manage an allocation
func (r *AllocRunner) Run() {
defer close(r.waitCh)
Expand Down Expand Up @@ -324,8 +372,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand All @@ -336,6 +383,11 @@ OUTER:
for {
select {
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
r.alloc = update
r.allocLock.Unlock()

// Check if we're in a terminal status
if update.TerminalStatus() {
break OUTER
Expand Down Expand Up @@ -371,8 +423,8 @@ OUTER:
}

// Destroy each sub-task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
r.taskLock.Lock()
defer r.taskLock.Unlock()
for _, tr := range r.tasks {
tr.Destroy()
}
Expand Down Expand Up @@ -408,6 +460,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}

// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
Expand Down
19 changes: 6 additions & 13 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,28 @@ func TestAllocRunner_Destroy(t *testing.T) {

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
_, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
start := time.Now()

// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)

// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v %#v", err, ar.Alloc())
})

if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
Expand Down
Loading

0 comments on commit 04f48c5

Please sign in to comment.