Skip to content

Commit

Permalink
Fix double pull with introduction of AllocModifyIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 1, 2016
1 parent 1328bb5 commit 410ae59
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 207 deletions.
12 changes: 0 additions & 12 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que
return resp, qm, nil
}

// ClientAllocations is used to return a lightweight list of allocations associated with a node.
// It is primarily used by the client in order to determine which allocations actually need
// an update.
func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) {
var resp map[string]uint64
qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}

// ForceEvaluate is used to force-evaluate an existing node.
func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp nodeEvalResponse
Expand Down
18 changes: 0 additions & 18 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) {
}
}

func TestNodes_ClientAllocations(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

// Looking up by a non-existent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocs, _, err := nodes.ClientAllocations("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
}

func TestNodes_ForceEvaluate(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
Expand Down
18 changes: 17 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
alloc *structs.Allocation
allocLock sync.Mutex

dirtyCh chan struct{}

Expand Down Expand Up @@ -184,6 +185,8 @@ func (r *AllocRunner) DestroyContext() error {

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc
}

Expand Down Expand Up @@ -336,6 +339,11 @@ OUTER:
for {
select {
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
r.alloc = update
r.allocLock.Unlock()

// Check if we're in a terminal status
if update.TerminalStatus() {
break OUTER
Expand Down Expand Up @@ -408,6 +416,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}

// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
Expand Down
19 changes: 6 additions & 13 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,28 @@ func TestAllocRunner_Destroy(t *testing.T) {

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
_, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
start := time.Now()

// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)

// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v %#v", err, ar.Alloc())
})

if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
Expand Down
107 changes: 93 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (c *Client) run() {
}

// Watch for changes in allocations
allocUpdates := make(chan []*structs.Allocation, 1)
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)

// Create a snapshot timer
Expand All @@ -642,8 +642,8 @@ func (c *Client) run() {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}

case allocs := <-allocUpdates:
c.runAllocs(allocs)
case update := <-allocUpdates:
c.runAllocs(update)

case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
Expand Down Expand Up @@ -722,21 +722,47 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
return nil
}

// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {
// pulled is the set of allocations that were downloaded from the servers.
pulled map[string]*structs.Allocation

// filtered is the set of allocations that were not pulled because their
// AllocModifyIndex didn't change.
filtered map[string]struct{}
}

// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
req := structs.NodeSpecificRequest{
NodeID: c.Node().ID,
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
AllowStale: true,
},
}
var resp structs.NodeAllocsResponse
var resp structs.NodeClientAllocsResponse

// The request and response for pulling down the set of allocations that are
// new, or updated server side.
allocsReq := structs.AllocsGetRequest{
QueryOptions: structs.QueryOptions{
Region: c.config.Region,
AllowStale: true,
},
}
var allocsResp structs.AllocsGetResponse

for {
// Get the allocations, blocking for updates
resp = structs.NodeAllocsResponse{}
err := c.RPC("Node.GetAllocs", &req, &resp)
// Get the allocation modify index map, blocking for updates. We will
// use this to determine exactly what allocations need to be downloaded
// in full.
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
Expand All @@ -755,24 +781,77 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) {
default:
}

// Check for updates
// Filter all allocations whose AllocModifyIndex was not incremented.
// These are the allocations who have either not been updated, or whose
// updates are a result of the client sending an update for the alloc.
// This lets us reduce the network traffic to the server as we don't
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
c.allocLock.Lock()
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
runner, ok := c.allocs[allocID]
if !ok || runner.shouldUpdate(modifyIndex) {
pull = append(pull, allocID)
}
filtered[allocID] = struct{}{}
}
c.allocLock.Unlock()
c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))

// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-c.shutdownCh:
return
}
}

// Check for shutdown
select {
case <-c.shutdownCh:
return
default:
}
}

// Update the query index.
if resp.Index <= req.MinQueryIndex {
continue
}
req.MinQueryIndex = resp.Index
c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs))

// Push the updates
// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
}
select {
case allocUpdates <- resp.Allocs:
case updates <- update:
case <-c.shutdownCh:
return
}
}
}

// runAllocs is invoked when we get an updated set of allocations
func (c *Client) runAllocs(updated []*structs.Allocation) {
func (c *Client) runAllocs(update *allocUpdates) {
// Get the existing allocs
c.allocLock.RLock()
exist := make([]*structs.Allocation, 0, len(c.allocs))
Expand All @@ -782,7 +861,7 @@ func (c *Client) runAllocs(updated []*structs.Allocation) {
c.allocLock.RUnlock()

// Diff the existing and updated allocations
diff := diffAllocs(exist, updated)
diff := diffAllocs(exist, update)
c.logger.Printf("[DEBUG] client: %#v", diff)

// Remove the old allocations
Expand Down
10 changes: 7 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,14 @@ func TestClient_WatchAllocs(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Update the other allocation
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(102,
[]*structs.Allocation{alloc2})
[]*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
31 changes: 13 additions & 18 deletions client/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,28 @@ func (d *diffResult) GoString() string {

// diffAllocs is used to diff the existing and updated allocations
// to see what has happened.
func diffAllocs(existing, updated []*structs.Allocation) *diffResult {
result := &diffResult{}

// Index the updated allocations by id
idx := make(map[string]*structs.Allocation)
for _, update := range updated {
idx[update.ID] = update
}

func diffAllocs(existing []*structs.Allocation, allocs *allocUpdates) *diffResult {
// Scan the existing allocations
result := &diffResult{}
existIdx := make(map[string]struct{})
for _, exist := range existing {
// Mark this as existing
existIdx[exist.ID] = struct{}{}

// Check for presence in the new set
update, ok := idx[exist.ID]
// Check if the alloc was updated or filtered because an update wasn't
// needed.
alloc, pulled := allocs.pulled[exist.ID]
_, filtered := allocs.filtered[exist.ID]

// If not present, removed
if !ok {
// If not updated or filtered, removed
if !pulled && !filtered {
result.removed = append(result.removed, exist)
continue
}

// Check for an update
if update.ModifyIndex > exist.ModifyIndex {
result.updated = append(result.updated, allocTuple{exist, update})
if pulled && alloc.AllocModifyIndex > exist.AllocModifyIndex {
result.updated = append(result.updated, allocTuple{exist, alloc})
continue
}

Expand All @@ -66,9 +61,9 @@ func diffAllocs(existing, updated []*structs.Allocation) *diffResult {
}

// Scan the updated allocations for any that are new
for _, update := range updated {
if _, ok := existIdx[update.ID]; !ok {
result.added = append(result.added, update)
for id, pulled := range allocs.pulled {
if _, ok := existIdx[id]; !ok {
result.added = append(result.added, pulled)
}
}
return result
Expand Down
Loading

0 comments on commit 410ae59

Please sign in to comment.