Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetClientAllocs handles garbage collection events #3452

Merged
merged 2 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,12 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
return fmt.Errorf("missing node ID")
}

// numOldAllocs is used to detect if there is a garbage collection event
// that effects the node. When an allocation is garbage collected, that does
// not change the modify index changes and thus the query won't unblock,
// even though the set of allocations on the node has changed.
var numOldAllocs int

// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
Expand Down Expand Up @@ -708,8 +714,17 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
reply.Allocs = make(map[string]uint64)
reply.MigrateTokens = make(map[string]string)

// preferTableIndex is used to determine whether we should build the
// response index based on the full table indexes versus the modify
// indexes of the allocations on the specific node. This is prefered
// in the case that the node doesn't yet have allocations or when we
// detect a GC that effects the node.
preferTableIndex := true

// Setup the output
if len(allocs) != 0 {
if numAllocs := len(allocs); numAllocs != 0 {
preferTableIndex = false

for _, alloc := range allocs {
reply.Allocs[alloc.ID] = alloc.AllocModifyIndex

Expand Down Expand Up @@ -742,7 +757,18 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,

reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
}
} else {

// Determine if we have less allocations than before. This
// indicates there was a garbage collection
if numAllocs < numOldAllocs {
preferTableIndex = true
}

// Store the new number of allocations
numOldAllocs = numAllocs
}

if preferTableIndex {
// Use the last index that affected the nodes table
index, err := state.Index("allocs")
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,96 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
}
}

func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
node.CreateIndex = resp.Index
node.ModifyIndex = resp.Index

// Inject fake allocations async
alloc1 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
start := time.Now()
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the allocs in a blocking query
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
SecretID: node.SecretID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
MaxQueryTime: time.Second,
},
}
var resp2 structs.NodeClientAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}

// Should block at least 100ms
if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}

if resp2.Index != 100 {
t.Fatalf("Bad index: %d %d", resp2.Index, 100)
}

if len(resp2.Allocs) != 2 || resp2.Allocs[alloc1.ID] != 100 {
t.Fatalf("bad: %#v", resp2.Allocs)
}

// Delete an allocation
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteEval(200, nil, []string{alloc2.ID})
if err != nil {
t.Fatalf("err: %v", err)
}
})

req.QueryOptions.MinQueryIndex = 150
var resp3 structs.NodeClientAllocsResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil {
t.Fatalf("err: %v", err)
}

if time.Since(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
if resp3.Index != 200 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use assert package for continuity.

t.Fatalf("Bad index: %d %d", resp3.Index, 200)
}
if len(resp3.Allocs) != 1 || resp3.Allocs[alloc1.ID] != 100 {
t.Fatalf("bad: %#v", resp3.Allocs)
}
}

// A MigrateToken should not be created if an allocation shares the same node
// with its previous allocation
func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) {
Expand Down