Skip to content

Commit

Permalink
api: return sorted results in certain list endpoints
Browse files Browse the repository at this point in the history
These API endpoints now return results in chronological order. They
can return results in reverse chronological order by setting the
query parameter ascending=true.

- Eval.List
- Deployment.List
  • Loading branch information
shoenig committed Feb 14, 2022
1 parent 282eb10 commit 54e19b4
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 68 deletions.
3 changes: 3 additions & 0 deletions command/agent/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ
return nil, nil
}

query := req.URL.Query()
args.OrderAscending = query.Get("ascending") == "true"

var out structs.DeploymentListResponse
if err := s.agent.RPC("Deployment.List", &args, &out); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
query := req.URL.Query()
args.FilterEvalStatus = query.Get("status")
args.FilterJobID = query.Get("job")
args.OrderAscending = query.Get("ascending") == "true"

var out structs.EvalListResponse
if err := s.agent.RPC("Eval.List", &args, &out); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
Expand Down
4 changes: 2 additions & 2 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws)
iter, err := c.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws)
iter, err := c.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,14 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Deployments(ws)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace())
iter, err = store.Deployments(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws)
iter, err := state.Deployments(ws, false)
if err != nil {
return nil, 0, err
}
Expand Down
17 changes: 10 additions & 7 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
return structs.ErrPermissionDenied
}

fmt.Println("SH Eval.GetEval, id:", args.EvalID, "ns:", args.RequestNamespace())

// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
Expand Down Expand Up @@ -382,8 +384,7 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
}

// List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest,
reply *structs.EvalListResponse) error {
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done {
return err
}
Expand All @@ -404,12 +405,14 @@ func (e *Eval) List(args *structs.EvalListRequest,
// Scan all the evaluations
var err error
var iter memdb.ResultIterator
if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Evals(ws)
} else if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix)
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.EvalsByNamespace(ws, args.RequestNamespace())
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
169 changes: 146 additions & 23 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,122 @@ func TestEvalEndpoint_List(t *testing.T) {
if len(resp2.Evaluations) != 1 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}

func TestEvalEndpoint_List_order(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create register requests
uuid1 := uuid.Generate()
eval1 := mock.Eval()
eval1.ID = uuid1

uuid2 := uuid.Generate()
eval2 := mock.Eval()
eval2.ID = uuid2

uuid3 := uuid.Generate()
eval3 := mock.Eval()
eval3.ID = uuid3

err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})
require.NoError(t, err)

err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval2})
require.NoError(t, err)

err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval3})
require.NoError(t, err)

// update eval2 so we can later assert create index order did not change
err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
require.NoError(t, err)

t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})

t.Run("ascending", func(t *testing.T) {
// Lookup the evaluations in reverse chronological order (newest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: true,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (ascending)
require.Equal(t, uint64(1000), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1002), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[2].ID)
})

t.Run("descending", func(t *testing.T) {
// Lookup the evaluations in chronological order (oldest first)
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "*",
},
OrderAscending: false,
}

var resp structs.EvalListResponse
err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
require.NoError(t, err)
require.Equal(t, uint64(1003), resp.Index)
require.Len(t, resp.Evaluations, 3)

// Assert returned order is by CreateIndex (descending)
require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex)
require.Equal(t, uuid3, resp.Evaluations[0].ID)

require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex)
require.Equal(t, uuid2, resp.Evaluations[1].ID)

require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex)
require.Equal(t, uuid1, resp.Evaluations[2].ID)
})

}

Expand Down Expand Up @@ -895,26 +1011,29 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {

// create a set of evals and field values to filter on. these are
// in the order that the state store will return them from the
// iterator (sorted by key), for ease of writing tests
// iterator (sorted by create index), for ease of writing tests
mocks := []struct {
id string
create int
namespace string
jobID string
status string
}{
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"},
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"},
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"},
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"},
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"},
}

mockEvals := []*structs.Evaluation{}
for _, m := range mocks {
{id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", create: 1000, jobID: "example"}, // 0
{id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", create: 1001, jobID: "example"}, // 1
{id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", create: 1002, namespace: "non-default"}, // 2
{id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", create: 1003, jobID: "example", status: "blocked"}, // 3
{id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", create: 1004}, // 4
{id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9", create: 1005}, // 5
{id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", create: 1006, jobID: "example"}, // 6
{id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", create: 1007, jobID: "example"}, // 7
{id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9", create: 1008}, // 8
}

state := s1.fsm.State()

var evals []*structs.Evaluation
for i, m := range mocks {
eval := mock.Eval()
eval.ID = m.id
if m.namespace != "" { // defaults to "default"
Expand All @@ -926,11 +1045,14 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
if m.status != "" { // defaults to "pending"
eval.Status = m.status
}
mockEvals = append(mockEvals, eval)
evals = append(evals, eval)
index := 1000 + uint64(i)
fmt.Println("upsert index:", index)
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval}))
}

state := s1.fsm.State()
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1000, mockEvals))
fmt.Println("eval0:", evals[0].CreateIndex)
fmt.Println("eval8:", evals[8].CreateIndex)

aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read",
mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)).
Expand All @@ -948,13 +1070,13 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
expectedIDs []string
}{
{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
expectedIDs: []string{
name: "test01 size-2 page-1 default NS",
pageSize: 2,
expectedIDs: []string{ // first two items
"aaaa1111-3350-4b4b-d185-0e1992ed43e9",
"aaaaaa22-3350-4b4b-d185-0e1992ed43e9",
},
expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace
},
{
name: "test02 size-2 page-1 default NS with prefix",
Expand Down Expand Up @@ -1025,8 +1147,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
},
},
{
name: "test08 size-2 page-2 filter skip nextToken",
pageSize: 3, // reads off the end
name: "test08 size-2 page-2 filter skip nextToken", //
pageSize: 3, // reads off the end
filterJobID: "example",
filterStatus: "pending",
nextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9",
Expand Down Expand Up @@ -1084,6 +1206,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) {
req := &structs.EvalListRequest{
FilterJobID: tc.filterJobID,
FilterEvalStatus: tc.filterStatus,
OrderAscending: true, // counting up is easier to think about
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: tc.namespace,
Expand Down
6 changes: 3 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
iter, err := state.Deployments(nil, false)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}
Expand Down Expand Up @@ -2071,7 +2071,7 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
ws := memdb.NewWatchSet()
evals, err := s.snap.Evals(ws)
evals, err := s.snap.Evals(ws, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws)
deployments, err := s.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru
func (s *Server) restoreEvals() error {
// Get an iterator over every evaluation
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().Evals(ws)
iter, err := s.fsm.State().Evals(ws, false)
if err != nil {
return fmt.Errorf("failed to get evaluations: %v", err)
}
Expand Down
Loading

0 comments on commit 54e19b4

Please sign in to comment.