From 21b0a409ba5c3628425e18856c313402320badfc Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Thu, 10 Feb 2022 11:50:34 -0600 Subject: [PATCH] api: return sorted results in certain list endpoints These API endpoints now return results in chronological order. They can return results in reverse chronological order by setting the query parameter ascending=true. - Eval.List - Deployment.List --- command/agent/deployment_endpoint.go | 3 + command/agent/eval_endpoint.go | 1 + helper/raftutil/fsm.go | 4 +- nomad/core_sched.go | 4 +- nomad/deployment_endpoint.go | 10 +- nomad/deployment_endpoint_test.go | 111 ++++++++++-- .../deploymentwatcher/deployments_watcher.go | 2 +- nomad/eval_endpoint.go | 15 +- nomad/eval_endpoint_test.go | 166 +++++++++++++++--- nomad/fsm.go | 6 +- nomad/leader.go | 2 +- nomad/state/schema.go | 87 ++++++++- nomad/state/state_store.go | 93 ++++++++-- nomad/state/state_store_test.go | 2 +- nomad/structs/structs.go | 14 ++ 15 files changed, 441 insertions(+), 79 deletions(-) diff --git a/command/agent/deployment_endpoint.go b/command/agent/deployment_endpoint.go index 58829adce36a..729f95fd3867 100644 --- a/command/agent/deployment_endpoint.go +++ b/command/agent/deployment_endpoint.go @@ -17,6 +17,9 @@ func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Requ return nil, nil } + query := req.URL.Query() + args.OrderAscending = query.Get("ascending") == "true" + var out structs.DeploymentListResponse if err := s.agent.RPC("Deployment.List", &args, &out); err != nil { return nil, err diff --git a/command/agent/eval_endpoint.go b/command/agent/eval_endpoint.go index a51c9e9407ca..3494b85085cd 100644 --- a/command/agent/eval_endpoint.go +++ b/command/agent/eval_endpoint.go @@ -20,6 +20,7 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) ( query := req.URL.Query() args.FilterEvalStatus = query.Get("status") args.FilterJobID = query.Get("job") + args.OrderAscending = query.Get("ascending") == "true" var out structs.EvalListResponse if err := s.agent.RPC("Eval.List", &args, &out); err != nil { diff --git a/helper/raftutil/fsm.go b/helper/raftutil/fsm.go index 3410b5ffb537..2d27f797298b 100644 --- a/helper/raftutil/fsm.go +++ b/helper/raftutil/fsm.go @@ -192,8 +192,8 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} { "Allocs": toArray(state.Allocs(nil)), "CSIPlugins": toArray(state.CSIPlugins(nil)), "CSIVolumes": toArray(state.CSIVolumes(nil)), - "Deployments": toArray(state.Deployments(nil)), - "Evals": toArray(state.Evals(nil)), + "Deployments": toArray(state.Deployments(nil, false)), + "Evals": toArray(state.Evals(nil, false)), "Indexes": toArray(state.Indexes()), "JobSummaries": toArray(state.JobSummaries(nil)), "JobVersions": toArray(state.JobVersions(nil)), diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 17925ecc7deb..e4dbaf82ab69 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -231,7 +231,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations ws := memdb.NewWatchSet() - iter, err := c.snap.Evals(ws) + iter, err := c.snap.Evals(ws, false) if err != nil { return err } @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error { // Iterate over the deployments ws := memdb.NewWatchSet() - iter, err := c.snap.Deployments(ws) + iter, err := c.snap.Deployments(ws, false) if err != nil { return err } diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 2c18de98d540..7c3f41e8f5d1 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -407,12 +407,14 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De // Capture all the deployments var err error var iter memdb.ResultIterator + namespace := args.RequestNamespace() + if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix) - } else if args.RequestNamespace() == structs.AllNamespacesSentinel { - iter, err = store.Deployments(ws) + iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix) + } else if namespace != structs.AllNamespacesSentinel { + iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending) } else { - iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace()) + iter, err = store.Deployments(ws, args.OrderAscending) } if err != nil { return err diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index 9868fb021bd6..867b3e86671c 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -8,6 +8,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -1031,6 +1032,95 @@ func TestDeploymentEndpoint_List(t *testing.T) { assert.Len(resp.Deployments, 2, "Deployments") } +func TestDeploymentEndpoint_List_order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + dep1 := mock.Deployment() + dep1.ID = uuid1 + + uuid2 := uuid.Generate() + dep2 := mock.Deployment() + dep2.ID = uuid2 + + uuid3 := uuid.Generate() + dep3 := mock.Deployment() + dep3.ID = uuid3 + + err := s1.fsm.State().UpsertDeployment(1000, dep1) + require.NoError(t, err) + + err = s1.fsm.State().UpsertDeployment(1001, dep2) + require.NoError(t, err) + + err = s1.fsm.State().UpsertDeployment(1002, dep3) + require.NoError(t, err) + + // update dep2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertDeployment(1003, dep2) + require.NoError(t, err) + + t.Run("ascending", func(t *testing.T) { + // Lookup the deployments in chronological order (oldest first) + get := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: true, + } + + var resp structs.DeploymentListResponse + err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Deployments, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Deployments[0].CreateIndex) + require.Equal(t, uuid1, resp.Deployments[0].ID) + + require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex) + require.Equal(t, uuid2, resp.Deployments[1].ID) + + require.Equal(t, uint64(1002), resp.Deployments[2].CreateIndex) + require.Equal(t, uuid3, resp.Deployments[2].ID) + }) + + t.Run("descending", func(t *testing.T) { + // Lookup the deployments in reverse chronological order (newest first) + get := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.DeploymentListResponse + err = msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Deployments, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Deployments[0].CreateIndex) + require.Equal(t, uuid3, resp.Deployments[0].ID) + + require.Equal(t, uint64(1001), resp.Deployments[1].CreateIndex) + require.Equal(t, uuid2, resp.Deployments[1].ID) + + require.Equal(t, uint64(1000), resp.Deployments[2].CreateIndex) + require.Equal(t, uuid1, resp.Deployments[2].ID) + }) +} + func TestDeploymentEndpoint_List_ACL(t *testing.T) { t.Parallel() @@ -1174,23 +1264,23 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { jobID string status string }{ - {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, - {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, + {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, // 0 + {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}, // 1 + {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2 + {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, // 3 + {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4 + {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5 + {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, // 6 } state := s1.fsm.State() - index := uint64(1000) - for _, m := range mocks { - index++ + for i, m := range mocks { + index := 1000 + uint64(i) deployment := mock.Deployment() deployment.Status = structs.DeploymentStatusCancelled deployment.ID = m.id + deployment.CreateIndex = index if m.namespace != "" { // defaults to "default" deployment.Namespace = m.namespace } @@ -1262,6 +1352,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { req := &structs.DeploymentListRequest{ + OrderAscending: true, // counting up is easier to think about QueryOptions: structs.QueryOptions{ Region: "global", Namespace: tc.namespace, diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index d42128321f0d..8743f2b1274a 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D // getDeploysImpl retrieves all deployments from the passed state store. func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - iter, err := state.Deployments(ws) + iter, err := state.Deployments(ws, false) if err != nil { return nil, 0, err } diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 8a48e27c1dee..510a1add46af 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -382,8 +382,7 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest, } // List is used to get a list of the evaluations in the system -func (e *Eval) List(args *structs.EvalListRequest, - reply *structs.EvalListResponse) error { +func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error { if done, err := e.srv.forward("Eval.List", args, args, reply); done { return err } @@ -404,12 +403,14 @@ func (e *Eval) List(args *structs.EvalListRequest, // Scan all the evaluations var err error var iter memdb.ResultIterator - if args.RequestNamespace() == structs.AllNamespacesSentinel { - iter, err = store.Evals(ws) - } else if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = store.EvalsByIDPrefix(ws, args.RequestNamespace(), prefix) + namespace := args.RequestNamespace() + + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = store.EvalsByIDPrefix(ws, namespace, prefix) + } else if namespace != structs.AllNamespacesSentinel { + iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending) } else { - iter, err = store.EvalsByNamespace(ws, args.RequestNamespace()) + iter, err = store.Evals(ws, args.OrderAscending) } if err != nil { return err diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 2b78c6b72d79..e2cef6904f37 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -715,6 +715,122 @@ func TestEvalEndpoint_List(t *testing.T) { if len(resp2.Evaluations) != 1 { t.Fatalf("bad: %#v", resp2.Evaluations) } +} + +func TestEvalEndpoint_List_order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + eval1 := mock.Eval() + eval1.ID = uuid1 + + uuid2 := uuid.Generate() + eval2 := mock.Eval() + eval2.ID = uuid2 + + uuid3 := uuid.Generate() + eval3 := mock.Eval() + eval3.ID = uuid3 + + err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval2}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval3}) + require.NoError(t, err) + + // update eval2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2}) + require.NoError(t, err) + + t.Run("descending", func(t *testing.T) { + // Lookup the evaluations in reverse chronological order + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[2].ID) + }) + + t.Run("ascending", func(t *testing.T) { + // Lookup the evaluations in reverse chronological order (newest first) + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: true, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1002), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[2].ID) + }) + + t.Run("descending", func(t *testing.T) { + // Lookup the evaluations in chronological order (oldest first) + get := &structs.EvalListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + OrderAscending: false, + } + + var resp structs.EvalListResponse + err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Evaluations, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex) + require.Equal(t, uuid3, resp.Evaluations[0].ID) + + require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) + require.Equal(t, uuid2, resp.Evaluations[1].ID) + + require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) + require.Equal(t, uuid1, resp.Evaluations[2].ID) + }) } @@ -895,26 +1011,28 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { // create a set of evals and field values to filter on. these are // in the order that the state store will return them from the - // iterator (sorted by key), for ease of writing tests + // iterator (sorted by create index), for ease of writing tests mocks := []struct { id string namespace string jobID string status string }{ - {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, - {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"}, - {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, - {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, - {id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, - } - - mockEvals := []*structs.Evaluation{} - for _, m := range mocks { + {id: "aaaa1111-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 0 + {id: "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 1 + {id: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", namespace: "non-default"}, // 2 + {id: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", jobID: "example", status: "blocked"}, // 3 + {id: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, // 4 + {id: "aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}, // 5 + {id: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 6 + {id: "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", jobID: "example"}, // 7 + {id: "aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, // 8 + } + + state := s1.fsm.State() + + var evals []*structs.Evaluation + for i, m := range mocks { eval := mock.Eval() eval.ID = m.id if m.namespace != "" { // defaults to "default" @@ -926,12 +1044,11 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { if m.status != "" { // defaults to "pending" eval.Status = m.status } - mockEvals = append(mockEvals, eval) + evals = append(evals, eval) + index := 1000 + uint64(i) + require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval})) } - state := s1.fsm.State() - require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1000, mockEvals)) - aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read", mock.NamespacePolicy(structs.DefaultNamespace, "read", nil)). SecretID @@ -948,13 +1065,13 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { expectedIDs []string }{ { - name: "test01 size-2 page-1 default NS", - pageSize: 2, - expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", - expectedIDs: []string{ + name: "test01 size-2 page-1 default NS", + pageSize: 2, + expectedIDs: []string{ // first two items "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", }, + expectedNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace }, { name: "test02 size-2 page-1 default NS with prefix", @@ -1025,8 +1142,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { }, }, { - name: "test08 size-2 page-2 filter skip nextToken", - pageSize: 3, // reads off the end + name: "test08 size-2 page-2 filter skip nextToken", // + pageSize: 3, // reads off the end filterJobID: "example", filterStatus: "pending", nextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", @@ -1084,6 +1201,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { req := &structs.EvalListRequest{ FilterJobID: tc.filterJobID, FilterEvalStatus: tc.filterStatus, + OrderAscending: true, // counting up is easier to think about QueryOptions: structs.QueryOptions{ Region: "global", Namespace: tc.namespace, diff --git a/nomad/fsm.go b/nomad/fsm.go index 90dba3231586..6fcd0a04478a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1708,7 +1708,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { // Scan for deployments that are referencing a job that no longer exists. // This could happen if multiple deployments were created for a given job // and thus the older deployment leaks and then the job is removed. - iter, err := state.Deployments(nil) + iter, err := state.Deployments(nil, false) if err != nil { return fmt.Errorf("failed to query deployments: %v", err) } @@ -2071,7 +2071,7 @@ func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the evaluations ws := memdb.NewWatchSet() - evals, err := s.snap.Evals(ws) + evals, err := s.snap.Evals(ws, false) if err != nil { return err } @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs ws := memdb.NewWatchSet() - deployments, err := s.snap.Deployments(ws) + deployments, err := s.snap.Deployments(ws, false) if err != nil { return err } diff --git a/nomad/leader.go b/nomad/leader.go index ecffabc85c77..7facb66c7e62 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -503,7 +503,7 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru func (s *Server) restoreEvals() error { // Get an iterator over every evaluation ws := memdb.NewWatchSet() - iter, err := s.fsm.State().Evals(ws) + iter, err := s.fsm.State().Evals(ws, false) if err != nil { return fmt.Errorf("failed to get evaluations: %v", err) } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index da647970bf00..eb6805f04ab7 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -303,6 +303,7 @@ func deploymentSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "deployment", Indexes: map[string]*memdb.IndexSchema{ + // id index is used for direct lookup of an deployment by ID. "id": { Name: "id", AllowMissing: false, @@ -312,6 +313,20 @@ func deploymentSchema() *memdb.TableSchema { }, }, + // create index is used for listing deploy, ordering them by + // creation chronology. (Use a reverse iterator for newest first). + // + // There may be more than one deployment per CreateIndex. + "create": { + Name: "create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + + // namespace is used to lookup evaluations by namespace. "namespace": { Name: "namespace", AllowMissing: false, @@ -321,7 +336,31 @@ func deploymentSchema() *memdb.TableSchema { }, }, - // Job index is used to lookup deployments by job + // namespace_create index is used to lookup deployments by namespace + // in their original chronological order based on CreateIndex. + // + // Use a prefix iterator (namespace_create_prefix) to iterate deployments + // of a Namespace in order of CreateIndex. + // + // There may be more than one deployment per CreateIndex. + "namespace_create": { + Name: "namespace_create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.CompoundIndex{ + AllowMissing: false, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + }, + }, + + // job index is used to lookup deployments by job "job": { Name: "job", AllowMissing: false, @@ -384,7 +423,7 @@ func evalTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "evals", Indexes: map[string]*memdb.IndexSchema{ - // Primary index is used for direct lookup. + // id index is used for direct lookup of an evaluation by ID. "id": { Name: "id", AllowMissing: false, @@ -394,16 +433,18 @@ func evalTableSchema() *memdb.TableSchema { }, }, - "namespace": { - Name: "namespace", + // create index is used for listing evaluations, ordering them by + // creation chronology. (Use a reverse iterator for newest first). + "create": { + Name: "create", AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Namespace", + Indexer: &memdb.UintFieldIndex{ + Field: "CreateIndex", }, }, - // Job index is used to lookup allocations by job + // job index is used to lookup evaluations by job ID. "job": { Name: "job", AllowMissing: false, @@ -426,6 +467,38 @@ func evalTableSchema() *memdb.TableSchema { }, }, }, + + // namespace is used to lookup evaluations by namespace. + "namespace": { + Name: "namespace", + AllowMissing: false, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "Namespace", + }, + }, + + // namespace_create index is used to lookup evaluations by namespace + // in their original chronological order based on CreateIndex. + // + // Use a prefix iterator (namespace_prefix) on a Namespace to iterate + // those evaluations in order of CreateIndex. + "namespace_create": { + Name: "namespace_create", + AllowMissing: false, + Unique: false, + Indexer: &memdb.CompoundIndex{ + AllowMissing: false, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + }, + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f32fd3a38f6c..02a4f576f0d6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -536,17 +536,25 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return nil } -func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { +func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire deployments table - iter, err := txn.Get("deployment", "id") + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("deployment", "create") + } else { + it, err = txn.GetReverse("deployment", "create") + } + if err != nil { return nil, err } - ws.Add(iter.WatchCh()) - return iter, nil + ws.Add(it.WatchCh()) + + return it, nil } func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { @@ -562,6 +570,27 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) return iter, nil } +func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("deployment", "namespace_create_prefix", namespace) + } else { + it, err = txn.GetReverse("deployment", "namespace_create_prefix") + } + + if err != nil { + return nil, err + } + + ws.Add(it.WatchCh()) + + return it, nil +} + func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() @@ -3112,35 +3141,65 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* return out, nil } -// Evals returns an iterator over all the evaluations -func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { +// Evals returns an iterator over all the evaluations in ascending or descending +// order of CreationIndex as determined by the ascending parameter. +func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("evals", "id") + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("evals", "create") + } else { + it, err = txn.GetReverse("evals", "create") + } + if err != nil { return nil, err } - ws.Add(iter.WatchCh()) + ws.Add(it.WatchCh()) - return iter, nil + return it, nil } -// EvalsByNamespace returns an iterator over all the evaluations in the given -// namespace +// EvalsByNamespace returns an iterator over all evaluations in no particular +// order. +// +// todo(shoenig): can this be removed? func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("evals", "namespace", namespace) + it, err := txn.Get("evals", "namespace", namespace) if err != nil { return nil, err } - ws.Add(iter.WatchCh()) + ws.Add(it.WatchCh()) - return iter, nil + return it, nil +} + +func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + var it memdb.ResultIterator + var err error + + if ascending { + it, err = txn.Get("evals", "namespace_create_prefix", namespace) + } else { + it, err = txn.GetReverse("evals", "namespace_create_prefix") + } + + if err != nil { + return nil, err + } + + ws.Add(it.WatchCh()) + + return it, nil } // UpdateAllocsFromClient is used to update an allocation based on input diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 561765d94138..54d087304cb3 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3827,7 +3827,7 @@ func TestStateStore_Evals(t *testing.T) { } ws := memdb.NewWatchSet() - iter, err := state.Evals(ws) + iter, err := state.Evals(ws, false) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7051a26b4af6..2fa2c7c7ee0b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -859,6 +859,8 @@ type EvalDequeueRequest struct { type EvalListRequest struct { FilterJobID string FilterEvalStatus string + FilterNamespace string + OrderAscending bool QueryOptions } @@ -871,9 +873,20 @@ func (req *EvalListRequest) ShouldBeFiltered(e *Evaluation) bool { if req.FilterEvalStatus != "" && req.FilterEvalStatus != e.Status { return true } + if req.filterNamespace(e.Namespace) { + return true + } return false } +func (req *EvalListRequest) filterNamespace(namespace string) bool { + accept := "default" + if req.FilterNamespace != "" { + accept = req.FilterNamespace + } + return accept != namespace +} + // PlanRequest is used to submit an allocation plan to the leader type PlanRequest struct { Plan *Plan @@ -1093,6 +1106,7 @@ type GenericRequest struct { // DeploymentListRequest is used to list the deployments type DeploymentListRequest struct { + OrderAscending bool QueryOptions }