Skip to content

Commit

Permalink
api: paginated results with different ordering (#12128)
Browse files Browse the repository at this point in the history
The paginator logic was built when go-memdb iterators would return items
ordered lexicographically by their ID prefixes, but #12054 added the
option for some tables to return results ordered by their `CreateIndex`
instead, which invalidated the previous paginator assumption.

The iterator used for pagination must still return results in some order
so that the paginator can properly handle requests where the next_token
value is not present in the results anymore (e.g., the eval was GC'ed).

In these situations, the paginator will start the returned page in the
first element right after where the requested token should've been.

This commit moves the logic to generate pagination tokens from the
elements being paginated to the iterator itself so that callers can have
more control over the token format to make sure they are properly
ordered and stable.

It also allows configuring the paginator as being ordered in ascending
or descending order, which is relevant when looking for a token that may
not be present anymore.
  • Loading branch information
lgfa29 committed Mar 1, 2022
1 parent 9cf99ce commit b246227
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 184 deletions.
119 changes: 39 additions & 80 deletions api/evaluations_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package api

import (
"reflect"
"sort"
"strings"
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/require"
)

func TestEvaluations_List(t *testing.T) {
Expand All @@ -17,41 +16,27 @@ func TestEvaluations_List(t *testing.T) {

// Listing when nothing exists returns empty
result, qm, err := e.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(result); n != 0 {
t.Fatalf("expected 0 evaluations, got: %d", n)
}
require.NoError(t, err)
require.Equal(t, uint64(0), qm.LastIndex, "bad index")
require.Equal(t, 0, len(result), "expected 0 evaluations")

// Register a job. This will create an evaluation.
jobs := c.Jobs()
job := testJob()
resp, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertWriteMeta(t, wm)

// Check the evaluations again
result, qm, err = e.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertQueryMeta(t, qm)

// if the eval fails fast there can be more than 1
// but they are in order of most recent first, so look at the last one
if len(result) == 0 {
t.Fatalf("expected eval (%s), got none", resp.EvalID)
}
require.Greater(t, len(result), 0, "expected eval (%s), got none", resp.EvalID)
idx := len(result) - 1
if result[idx].ID != resp.EvalID {
t.Fatalf("expected eval (%s), got: %#v", resp.EvalID, result[idx])
}
require.Equal(t, resp.EvalID, result[idx].ID, "expected eval (%s), got: %#v", resp.EvalID, result[idx])

// wait until the 2nd eval shows up before we try paging
results := []*Evaluation{}
Expand All @@ -65,26 +50,26 @@ func TestEvaluations_List(t *testing.T) {
t.Fatalf("err: %s", err)
})

// Check the evaluations again with paging; note that while this
// package sorts by timestamp, the actual HTTP API sorts by ID
// so we need to use that for the NextToken
ids := []string{results[0].ID, results[1].ID}
sort.Strings(ids)
result, qm, err = e.List(&QueryOptions{PerPage: int32(1), NextToken: ids[1]})
if err != nil {
t.Fatalf("err: %s", err)
}
if len(result) != 1 {
t.Fatalf("expected no evals after last one but got %v", result[0])
}
// query first page
result, qm, err = e.List(&QueryOptions{
PerPage: int32(1),
})
require.NoError(t, err)
require.Equal(t, 1, len(result), "expected no evals after last one but got %d: %#v", len(result), result)

// query second page
result, qm, err = e.List(&QueryOptions{
PerPage: int32(1),
NextToken: qm.NextToken,
})
require.NoError(t, err)
require.Equal(t, 1, len(result), "expected no evals after last one but got %d: %#v", len(result), result)

// Query evaluations using a filter.
results, _, err = e.List(&QueryOptions{
Filter: `TriggeredBy == "job-register"`,
})
if len(result) != 1 {
t.Fatalf("expected 1 eval, got %d", len(result))
}
require.Equal(t, 1, len(result), "expected 1 eval, got %d", len(result))
}

func TestEvaluations_PrefixList(t *testing.T) {
Expand All @@ -95,36 +80,25 @@ func TestEvaluations_PrefixList(t *testing.T) {

// Listing when nothing exists returns empty
result, qm, err := e.PrefixList("abcdef")
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(result); n != 0 {
t.Fatalf("expected 0 evaluations, got: %d", n)
}
require.NoError(t, err)
require.Equal(t, uint64(0), qm.LastIndex, "bad index")
require.Equal(t, 0, len(result), "expected 0 evaluations")

// Register a job. This will create an evaluation.
jobs := c.Jobs()
job := testJob()
resp, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertWriteMeta(t, wm)

// Check the evaluations again
result, qm, err = e.PrefixList(resp.EvalID[:4])
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertQueryMeta(t, qm)

// Check if we have the right list
if len(result) != 1 || result[0].ID != resp.EvalID {
t.Fatalf("bad: %#v", result)
}
require.Equal(t, 1, len(result))
require.Equal(t, resp.EvalID, result[0].ID)
}

func TestEvaluations_Info(t *testing.T) {
Expand All @@ -135,30 +109,23 @@ func TestEvaluations_Info(t *testing.T) {

// Querying a nonexistent evaluation returns error
_, _, err := e.Info("8E231CF4-CA48-43FF-B694-5801E69E22FA", nil)
if err == nil || !strings.Contains(err.Error(), "not found") {
t.Fatalf("expected not found error, got: %s", err)
}
require.Error(t, err)

// Register a job. Creates a new evaluation.
jobs := c.Jobs()
job := testJob()
resp, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertWriteMeta(t, wm)

// Try looking up by the new eval ID
result, qm, err := e.Info(resp.EvalID, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.NoError(t, err)
assertQueryMeta(t, qm)

// Check that we got the right result
if result == nil || result.ID != resp.EvalID {
t.Fatalf("expected eval %q, got: %#v", resp.EvalID, result)
}
require.NotNil(t, result)
require.Equal(t, resp.EvalID, result.ID)
}

func TestEvaluations_Allocations(t *testing.T) {
Expand All @@ -169,15 +136,9 @@ func TestEvaluations_Allocations(t *testing.T) {

// Returns empty if no allocations
allocs, qm, err := e.Allocations("8E231CF4-CA48-43FF-B694-5801E69E22FA", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
require.NoError(t, err)
require.Equal(t, uint64(0), qm.LastIndex, "bad index")
require.Equal(t, 0, len(allocs), "expected 0 evaluations")
}

func TestEvaluations_Sort(t *testing.T) {
Expand All @@ -194,7 +155,5 @@ func TestEvaluations_Sort(t *testing.T) {
{CreateIndex: 2},
{CreateIndex: 1},
}
if !reflect.DeepEqual(evals, expect) {
t.Fatalf("\n\n%#v\n\n%#v", evals, expect)
}
require.Equal(t, expect, evals)
}
32 changes: 31 additions & 1 deletion nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,30 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

// DeploymentPaginationIterator is a wrapper over a go-memdb iterator that
// implements the paginator Iterator interface.
type DeploymentPaginationIterator struct {
iter memdb.ResultIterator
byCreateIndex bool
}

func (it DeploymentPaginationIterator) Next() (string, interface{}) {
raw := it.iter.Next()
if raw == nil {
return "", nil
}

d := raw.(*structs.Deployment)
token := d.ID

// prefix the pagination token by CreateIndex to keep it properly sorted.
if it.byCreateIndex {
token = fmt.Sprintf("%v-%v", d.CreateIndex, d.ID)
}

return token, d
}

// Deployment endpoint is used for manipulating deployments
type Deployment struct {
srv *Server
Expand Down Expand Up @@ -409,20 +433,26 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
var deploymentIter DeploymentPaginationIterator

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
deploymentIter.byCreateIndex = false
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.Ascending)
deploymentIter.byCreateIndex = true
} else {
iter, err = store.Deployments(ws, args.Ascending)
deploymentIter.byCreateIndex = true
}
if err != nil {
return err
}

deploymentIter.iter = iter

var deploys []*structs.Deployment
paginator, err := state.NewPaginator(iter, args.QueryOptions,
paginator, err := state.NewPaginator(deploymentIter, args.QueryOptions,
func(raw interface{}) error {
deploy := raw.(*structs.Deployment)
deploys = append(deploys, deploy)
Expand Down
Loading

0 comments on commit b246227

Please sign in to comment.