Skip to content

Commit

Permalink
api: return sorted results in certain list endpoints
Browse files Browse the repository at this point in the history
These API endpoints now return results in chronological order. They
can return results in reverse chronological order by setting the
query parameter ascending=true.

- Eval.List
  • Loading branch information
shoenig committed Feb 11, 2022
1 parent cefc58d commit 2ceea96
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 59 deletions.
1 change: 1 addition & 0 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")
args.OrderAscending = query.Has("ascending")

var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
iter, err := c.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down
18 changes: 11 additions & 7 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
}

// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
Expand All @@ -404,12 +403,17 @@ func (e *Eval) List(args *structs.EvalListRequest,
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Evals(ws)
} else if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
fmt.Println("SH EvalsByIDPrefix", "ns:", namespace, "prefix:", prefix)
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
fmt.Println("SH EvalsByNamespace", "ns:", namespace)
iter, err = store.EvalsByNamespace(ws, namespace, args.OrderAscending)
} else {
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
fmt.Println("SH Evals", "ascending:", args.OrderAscending)
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
169 changes: 146 additions & 23 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,122 @@ func TestEvalEndpoint_List(t *testing.T) {
if len(resp2.Evaluations) != 1 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}

func TestEvalEndpoint_List_order(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create register requests
uuid1 := uuid.Generate()
eval1 := mock.Eval()
eval1.ID = uuid1

uuid2 := uuid.Generate()
eval2 := mock.Eval()
eval2.ID = uuid2

uuid3 := uuid.Generate()
eval3 := mock.Eval()
eval3.ID = uuid3

err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})
require.NoError(t, err)

err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval2})
require.NoError(t, err)

err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval3})
require.NoError(t, err)

// update eval2 so we can later assert create index order did not change
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
require.NoError(t, err)

t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})

t.Run("ascending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order (newest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1002), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[2].ID)
})

t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in chronological order (oldest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})

}

Expand Down Expand Up @@ -895,26 +1011,29 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {

// create a set of evals and field values to filter on. these are
// in the order that the state store will return them from the
// iterator (sorted by key), for ease of writing tests
// iterator (sorted by create index), for ease of writing tests
mocks := []struct {
id string
create int
namespace string
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"},
}

mockEvals := []*structs.Evaluation{}
for _, m := range mocks {
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", create: 1000, jobID: "example"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", create: 1001, jobID: "example"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", create: 1002, namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", create: 1003, jobID: "example", status: "blocked"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", create: 1004}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9", create: 1005}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", create: 1006, jobID: "example"}, // 6
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", create: 1007, jobID: "example"}, // 7
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9", create: 1008}, // 8
}

state := s1.fsm.State()

var evals []*structs.Evaluation
for i, m := range mocks {
eval := mock.Eval()
eval.ID = m.id
if m.namespace != "" { // defaults to "default"
Expand All @@ -926,11 +1045,14 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
if m.status != "" { // defaults to "pending"
eval.Status = m.status
}
mockEvals = append(mockEvals, eval)
evals = append(evals, eval)
index := 1000 + uint64(i)
fmt.Println("upsert index:", index)
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval}))
}

state := s1.fsm.State()
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1000, mockEvals))
fmt.Println("eval0:", evals[0].CreateIndex)
fmt.Println("eval8:", evals[8].CreateIndex)

aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read",
mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)).
Expand All @@ -948,13 +1070,13 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
expectedIDs []string
}{
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedIDs: []string{ // first two items
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace
},
{
name: "test02 size-2 page-1 default NS with prefix",
Expand Down Expand Up @@ -1025,8 +1147,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
},
},
{
name: "test08 size-2 page-2 filter skip nextToken",
pageSize: 3, // reads off the end
name: "test08 size-2 page-2 filter skip nextToken", //
pageSize: 3, // reads off the end
filterJobID: "example",
filterStatus: "pending",
nextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
Expand Down Expand Up @@ -1084,6 +1206,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
req := &structs.EvalListRequest{
FilterJobID: tc.filterJobID,
FilterEvalStatus: tc.filterStatus,
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,7 +2071,7 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
ws := memdb.NewWatchSet()
evals, err := s.snap.Evals(ws)
evals, err := s.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru
func (s *Server) restoreEvals() error {
// Get an iterator over every evaluation
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().Evals(ws)
iter, err := s.fsm.State().Evals(ws, false)
if err != nil {
return fmt.Errorf("failed to get evaluations: %v", err)
}
Expand Down
45 changes: 34 additions & 11 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func evalTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "evals",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for direct lookup.
// Primary index is used for direct lookup of an evaluation by ID.
"id": {
Name: "id",
AllowMissing: false,
Expand All @@ -394,16 +394,7 @@ func evalTableSchema() *memdb.TableSchema {
},
},

"namespace": {
Name: "namespace",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Namespace",
},
},

// Job index is used to lookup allocations by job
// Job index is used to lookup evaluations by job ID.
"job": {
Name: "job",
AllowMissing: false,
Expand All @@ -426,6 +417,38 @@ func evalTableSchema() *memdb.TableSchema {
},
},
},

// Namespace is used to lookup evaluations by namespace.
//
// Use a prefix iterator (namespace_prefix) on a Namespace to iterate
// those evaluations in order of CreateIndex.
"namespace": {
Name: "namespace",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
AllowMissing: false,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
},

// Creation index is used for listing evaluations, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
"creation": {
Name: "creation",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
}
}
Expand Down
Loading

0 comments on commit 2ceea96

Please sign in to comment.