diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a8778216ffda..dc7e2d6a0974 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -679,6 +679,12 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, return fmt.Errorf("missing node ID") } + // numOldAllocs is used to detect if there is a garbage collection event + // that effects the node. When an allocation is garbage collected, that does + // not change the modify index changes and thus the query won't unblock, + // even though the set of allocations on the node has changed. + var numOldAllocs int + // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, @@ -708,8 +714,17 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Allocs = make(map[string]uint64) reply.MigrateTokens = make(map[string]string) + // preferTableIndex is used to determine whether we should build the + // response index based on the full table indexes versus the modify + // indexes of the allocations on the specific node. This is prefered + // in the case that the node doesn't yet have allocations or when we + // detect a GC that effects the node. + preferTableIndex := true + // Setup the output - if len(allocs) != 0 { + if numAllocs := len(allocs); numAllocs != 0 { + preferTableIndex = false + for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex @@ -742,7 +757,18 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } - } else { + + // Determine if we have less allocations than before. This + // indicates there was a garbage collection + if numAllocs < numOldAllocs { + preferTableIndex = true + } + + // Store the new number of allocations + numOldAllocs = numAllocs + } + + if preferTableIndex { // Use the last index that affected the nodes table index, err := state.Index("allocs") if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 772a40e3e043..f6ee39d98a46 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1328,6 +1328,96 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } +func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { + t.Parallel() + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake allocations async + alloc1 := mock.Alloc() + alloc1.NodeID = node.ID + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID + state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) + start := time.Now() + time.AfterFunc(100*time.Millisecond, func() { + err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the allocs in a blocking query + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + SecretID: node.SecretID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 50, + MaxQueryTime: time.Second, + }, + } + var resp2 structs.NodeClientAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Since(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + if resp2.Index != 100 { + t.Fatalf("Bad index: %d %d", resp2.Index, 100) + } + + if len(resp2.Allocs) != 2 || resp2.Allocs[alloc1.ID] != 100 { + t.Fatalf("bad: %#v", resp2.Allocs) + } + + // Delete an allocation + time.AfterFunc(100*time.Millisecond, func() { + err := state.DeleteEval(200, nil, []string{alloc2.ID}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.QueryOptions.MinQueryIndex = 150 + var resp3 structs.NodeClientAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + + if time.Since(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + if resp3.Index != 200 { + t.Fatalf("Bad index: %d %d", resp3.Index, 200) + } + if len(resp3.Allocs) != 1 || resp3.Allocs[alloc1.ID] != 100 { + t.Fatalf("bad: %#v", resp3.Allocs) + } +} + // A MigrateToken should not be created if an allocation shares the same node // with its previous allocation func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) {