Skip to content

Commit

Permalink
Merge pull request #2177 from hashicorp/b-blocking-getallocs
Browse files Browse the repository at this point in the history
GetAllocs uses a blocking query
  • Loading branch information
dadgar committed Jan 11, 2017
2 parents 1b56b60 + 23e84ec commit 41f8972
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 35 deletions.
36 changes: 28 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))

// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
var pulledAllocs map[string]*structs.Allocation
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsReq.MinQueryIndex = resp.Index - 1
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
Expand All @@ -1296,6 +1295,28 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulledAllocs[alloc.ID] = alloc
}

for _, desiredID := range pull {
if _, ok := pulledAllocs[desiredID]; !ok {
// We didn't get everything we wanted. Do not update the
// MinQueryIndex, sleep and then retry.
wait := c.retryIntv(2 * time.Second)
select {
case <-time.After(wait):
// Wait for the server we contact to receive the
// allocations
continue
case <-c.shutdownCh:
return
}
}
}

// Check for shutdown
select {
case <-c.shutdownCh:
Expand All @@ -1304,19 +1325,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)",
resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered))

// Update the query index.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}

// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
pulled: pulledAllocs,
}
select {
case updates <- update:
Expand Down
90 changes: 65 additions & 25 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
Expand All @@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())

// Lookup the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
// Build the watch
items := make([]watch.Item, 0, len(args.AllocIDs))
for _, allocID := range args.AllocIDs {
items = append(items, watch.Item{Alloc: allocID})
}

allocs := make([]*structs.Allocation, len(args.AllocIDs))
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
return fmt.Errorf("unknown alloc id %q", alloc)
}

allocs[i] = out
if reply.Index < out.ModifyIndex {
reply.Index = out.ModifyIndex
}
}

// Set the response
a.srv.setQueryMeta(&reply.QueryMeta)
reply.Allocs = allocs
return nil
// Setup the blocking query. We wait for at least one of the requested
// allocations to be above the min query index. This guarantees that the
// server has received that index.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(items...),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}

thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
// We don't have the alloc yet
thresholdMet = false
break
}

// Store the pointer
allocs[i] = out

// Check if we have passed the minimum index
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
thresholdMet = true
}

if maxIndex < out.ModifyIndex {
maxIndex = out.ModifyIndex
}
}

// Setup the output
if thresholdMet {
reply.Allocs = allocs
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}

// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
},
}
return a.srv.blockingRPC(&opts)
}
60 changes: 58 additions & 2 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{Region: "global"},
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.AllocsGetResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
Expand All @@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
t.Fatalf("expect error")
}
}

func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc1.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
},
}
var resp structs.AllocsGetResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocs) != 2 {
t.Fatalf("bad: %#v", resp.Allocs)
}
}

0 comments on commit 41f8972

Please sign in to comment.