Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetClientAllocs handles garbage collection events #3452

Merged
merged 2 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ BUG FIXES:
change [GH-3214]
* api: Fix search handling of jobs with more than four hyphens and case were
length could cause lookup error [GH-3203]
* client: Improve the speed at which clients detect garbage collection events
[GH_-3452]
* client: Fix lock contention that could cause a node to miss a heartbeat and
be marked as down [GH-3195]
* driver/docker: Fix docker user specified syslogging [GH-3184]
Expand Down
30 changes: 28 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,12 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
return fmt.Errorf("missing node ID")
}

// numOldAllocs is used to detect if there is a garbage collection event
// that effects the node. When an allocation is garbage collected, that does
// not change the modify index changes and thus the query won't unblock,
// even though the set of allocations on the node has changed.
var numOldAllocs int

// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
Expand Down Expand Up @@ -708,8 +714,17 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
reply.Allocs = make(map[string]uint64)
reply.MigrateTokens = make(map[string]string)

// preferTableIndex is used to determine whether we should build the
// response index based on the full table indexes versus the modify
// indexes of the allocations on the specific node. This is prefered
// in the case that the node doesn't yet have allocations or when we
// detect a GC that effects the node.
preferTableIndex := true

// Setup the output
if len(allocs) != 0 {
if numAllocs := len(allocs); numAllocs != 0 {
preferTableIndex = false

for _, alloc := range allocs {
reply.Allocs[alloc.ID] = alloc.AllocModifyIndex

Expand Down Expand Up @@ -742,7 +757,18 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,

reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
}
} else {

// Determine if we have less allocations than before. This
// indicates there was a garbage collection
if numAllocs < numOldAllocs {
preferTableIndex = true
}

// Store the new number of allocations
numOldAllocs = numAllocs
}

if preferTableIndex {
// Use the last index that affected the nodes table
index, err := state.Index("allocs")
if err != nil {
Expand Down
74 changes: 74 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,80 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
}
}

func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.GenericResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
node.CreateIndex = resp.Index
node.ModifyIndex = resp.Index

// Inject fake allocations async
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
start := time.Now()
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}))
})

// Lookup the allocs in a blocking query
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
SecretID: node.SecretID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
MaxQueryTime: time.Second,
},
}
var resp2 structs.NodeClientAllocsResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2))

// Should block at least 100ms
if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}

assert.EqualValues(100, resp2.Index)
if assert.Len(resp2.Allocs, 2) {
assert.EqualValues(100, resp2.Allocs[alloc1.ID])
}

// Delete an allocation
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}))
})

req.QueryOptions.MinQueryIndex = 150
var resp3 structs.NodeClientAllocsResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3))

if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
assert.EqualValues(200, resp3.Index)
if assert.Len(resp3.Allocs, 1) {
assert.EqualValues(100, resp3.Allocs[alloc1.ID])
}
}

// A MigrateToken should not be created if an allocation shares the same node
// with its previous allocation
func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) {
Expand Down