Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client only pulls update allocations from server #731

Merged
merged 5 commits into from
Feb 2, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que
return resp, qm, nil
}

// ClientAllocations is used to return a lightweight list of allocations associated with a node.
// It is primarily used by the client in order to determine which allocations actually need
// an update.
func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) {
var resp map[string]uint64
qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}

// ForceEvaluate is used to force-evaluate an existing node.
func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp nodeEvalResponse
Expand Down
18 changes: 0 additions & 18 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) {
}
}

func TestNodes_ClientAllocations(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

// Looking up by a non-existent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocs, _, err := nodes.ClientAllocations("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
}

func TestNodes_ForceEvaluate(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
Expand Down
35 changes: 29 additions & 6 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
alloc *structs.Allocation
allocLock sync.Mutex

dirtyCh chan struct{}

Expand Down Expand Up @@ -155,6 +156,8 @@ func (r *AllocRunner) SaveState() error {
func (r *AllocRunner) saveAllocRunnerState() error {
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
r.allocLock.Lock()
defer r.allocLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Expand Down Expand Up @@ -184,6 +187,8 @@ func (r *AllocRunner) DestroyContext() error {

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to acquire a lock here? I don't see us mutating anything here. Does the caller expects the alloc to not change once it has? We should probably make a copy and return if that's the assumption, otherwise the alloc might change under the hood once the caller has it.

defer r.allocLock.Unlock()
return r.alloc
}

Expand Down Expand Up @@ -221,7 +226,8 @@ func (r *AllocRunner) syncStatus() error {
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
for _, state := range r.alloc.TaskStates {
for _, tr := range r.tasks {
state := tr.state
switch state.State {
case structs.TaskStateRunning:
running = true
Expand All @@ -236,13 +242,17 @@ func (r *AllocRunner) syncStatus() error {
}
}
}
r.taskStatusLock.RUnlock()

// Determine the alloc status
r.allocLock.Lock()
defer r.allocLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soon after this, we are calling the updater method to update the alloc and that makes an RPC call. Have we considered the implications of locking the alloc during the time the rpc happens? On a bad day/network it might be several seconds which means a lot of backed up operations on the client.


if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()

// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
Expand Down Expand Up @@ -336,6 +346,11 @@ OUTER:
for {
select {
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
r.alloc = update
r.allocLock.Unlock()

// Check if we're in a terminal status
if update.TerminalStatus() {
break OUTER
Expand Down Expand Up @@ -371,8 +386,8 @@ OUTER:
}

// Destroy each sub-task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
r.taskLock.Lock()
defer r.taskLock.Unlock()
for _, tr := range r.tasks {
tr.Destroy()
}
Expand Down Expand Up @@ -408,6 +423,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}

// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
Expand Down
19 changes: 6 additions & 13 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,28 @@ func TestAllocRunner_Destroy(t *testing.T) {

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
_, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
start := time.Now()

// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)

// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v %#v", err, ar.Alloc())
})

if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
Expand Down
108 changes: 94 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (c *Client) run() {
}

// Watch for changes in allocations
allocUpdates := make(chan []*structs.Allocation, 1)
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)

// Create a snapshot timer
Expand All @@ -642,8 +642,8 @@ func (c *Client) run() {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}

case allocs := <-allocUpdates:
c.runAllocs(allocs)
case update := <-allocUpdates:
c.runAllocs(update)

case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
Expand Down Expand Up @@ -722,21 +722,47 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
return nil
}

// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {
// pulled is the set of allocations that were downloaded from the servers.
pulled map[string]*structs.Allocation

// filtered is the set of allocations that were not pulled because their
// AllocModifyIndex didn't change.
filtered map[string]struct{}
}

// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
AllowStale: true,
},
}
var resp structs.NodeAllocsResponse
var resp structs.NodeClientAllocsResponse

// The request and response for pulling down the set of allocations that are
// new, or updated server side.
allocsReq := structs.AllocsGetRequest{
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
AllowStale: true,
},
}
var allocsResp structs.AllocsGetResponse

for {
// Get the allocations, blocking for updates
resp = structs.NodeAllocsResponse{}
err := c.RPC("Node.GetAllocs", &req, &resp)
// Get the allocation modify index map, blocking for updates. We will
// use this to determine exactly what allocations need to be downloaded
// in full.
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
Expand All @@ -755,24 +781,78 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
default:
}

// Check for updates
// Filter all allocations whose AllocModifyIndex was not incremented.
// These are the allocations who have either not been updated, or whose
// updates are a result of the client sending an update for the alloc.
// This lets us reduce the network traffic to the server as we don't
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
c.allocLock.Lock()
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
runner, ok := c.allocs[allocID]
if !ok || runner.shouldUpdate(modifyIndex) {
pull = append(pull, allocID)
} else {
filtered[allocID] = struct{}{}
}
}
c.allocLock.Unlock()
c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))

// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-c.shutdownCh:
return
}
}

// Check for shutdown
select {
case <-c.shutdownCh:
return
default:
}
}

// Update the query index.
if resp.Index <= req.MinQueryIndex {
continue
}
req.MinQueryIndex = resp.Index
c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs))

// Push the updates
// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
}
select {
case allocUpdates <- resp.Allocs:
case updates <- update:
case <-c.shutdownCh:
return
}
}
}

// runAllocs is invoked when we get an updated set of allocations
func (c *Client) runAllocs(updated []*structs.Allocation) {
func (c *Client) runAllocs(update *allocUpdates) {
// Get the existing allocs
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
Expand All @@ -782,7 +862,7 @@ func (c *Client) runAllocs(updated []*structs.Allocation) {
c.allocLock.RUnlock()

// Diff the existing and updated allocations
diff := diffAllocs(exist, updated)
diff := diffAllocs(exist, update)
c.logger.Printf("[DEBUG] client: %#v", diff)

// Remove the old allocations
Expand Down
10 changes: 7 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,14 @@ func TestClient_WatchAllocs(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Update the other allocation
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(102,
[]*structs.Allocation{alloc2})
[]*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
Loading