From 154264fcd9276b53ea837664bceefd8b6f217291 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 8 Mar 2022 20:54:17 -0500 Subject: [PATCH] Add pagination, filtering and sort to more API endpoints (#12186) --- .changelog/12186.txt | 7 + api/api.go | 10 +- api/api_test.go | 4 +- command/agent/http.go | 8 +- helper/raftutil/fsm.go | 38 +- nomad/acl_endpoint.go | 48 ++- nomad/acl_endpoint_test.go | 280 ++++++++++++++ nomad/alloc_endpoint.go | 144 ++++--- nomad/alloc_endpoint_test.go | 350 +++++++++++++++++- nomad/core_sched.go | 2 +- nomad/csi_endpoint.go | 73 ++-- nomad/csi_endpoint_test.go | 194 ++++++++++ nomad/deployment_endpoint.go | 50 +-- nomad/deployment_endpoint_test.go | 26 +- .../deploymentwatcher/deployments_watcher.go | 6 +- nomad/drainer_int_test.go | 14 +- nomad/eval_endpoint.go | 51 +-- nomad/eval_endpoint_test.go | 66 +--- nomad/fsm.go | 16 +- nomad/job_endpoint.go | 158 ++++---- nomad/job_endpoint_test.go | 178 +++++++++ nomad/search_endpoint.go | 22 +- nomad/state/paginator/filter.go | 41 ++ nomad/state/{ => paginator}/filter_test.go | 136 +++++-- nomad/state/{ => paginator}/paginator.go | 64 ++-- nomad/state/{ => paginator}/paginator_test.go | 48 ++- nomad/state/paginator/tokenizer.go | 82 ++++ nomad/state/paginator/tokenizer_test.go | 67 ++++ nomad/state/schema.go | 62 +++- nomad/state/state_store.go | 145 ++++++-- nomad/state/state_store_test.go | 12 +- nomad/structs/csi.go | 26 ++ nomad/structs/structs.go | 101 ++++- 33 files changed, 2024 insertions(+), 505 deletions(-) create mode 100644 .changelog/12186.txt create mode 100644 nomad/state/paginator/filter.go rename nomad/state/{ => paginator}/filter_test.go (61%) rename nomad/state/{ => paginator}/paginator.go (64%) rename nomad/state/{ => paginator}/paginator_test.go (74%) create mode 100644 nomad/state/paginator/tokenizer.go create mode 100644 nomad/state/paginator/tokenizer_test.go diff --git a/.changelog/12186.txt b/.changelog/12186.txt new file mode 100644 index 000000000000..2e39b210a938 --- /dev/null +++ b/.changelog/12186.txt @@ -0,0 +1,7 @@ +```release-note:improvement +api: Add support for filtering, sorting, and pagination to the ACL tokens and allocations list endpoint +``` + +```release-note:improvement +api: Add support for filtering and pagination to the jobs and volumes list endpoint +``` diff --git a/api/api.go b/api/api.go index 08521179b107..2dc00a3f69ca 100644 --- a/api/api.go +++ b/api/api.go @@ -82,10 +82,10 @@ type QueryOptions struct { // previous response. NextToken string - // Ascending is used to have results sorted in ascending chronological order. + // Reverse is used to reverse the default order of list results. // - // Currently only supported by evaluations.List and deployments.list endpoints. - Ascending bool + // Currently only supported by specific endpoints. + Reverse bool // ctx is an optional context pass through to the underlying HTTP // request layer. Use Context() and WithContext() to manage this. @@ -605,8 +605,8 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.NextToken != "" { r.params.Set("next_token", q.NextToken) } - if q.Ascending { - r.params.Set("ascending", "true") + if q.Reverse { + r.params.Set("reverse", "true") } for k, v := range q.Params { r.params.Set(k, v) diff --git a/api/api_test.go b/api/api_test.go index 0f503c624d3a..6ab82526d28f 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -181,7 +181,7 @@ func TestSetQueryOptions(t *testing.T) { WaitIndex: 1000, WaitTime: 100 * time.Second, AuthToken: "foobar", - Ascending: true, + Reverse: true, } r.setQueryOptions(q) @@ -199,7 +199,7 @@ func TestSetQueryOptions(t *testing.T) { try("stale", "") // should not be present try("index", "1000") try("wait", "100000ms") - try("ascending", "true") + try("reverse", "true") } func TestQueryOptionsContext(t *testing.T) { diff --git a/command/agent/http.go b/command/agent/http.go index 8568a0b0e9d9..7d5fed9c452c 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -788,7 +788,7 @@ func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, r *strin parseNamespace(req, &b.Namespace) parsePagination(req, b) parseFilter(req, b) - parseAscending(req, b) + parseReverse(req, b) return parseWait(resp, req, b) } @@ -814,10 +814,10 @@ func parseFilter(req *http.Request, b *structs.QueryOptions) { } } -// parseAscending parses the ascending query parameter for QueryOptions -func parseAscending(req *http.Request, b *structs.QueryOptions) { +// parseReverse parses the reverse query parameter for QueryOptions +func parseReverse(req *http.Request, b *structs.QueryOptions) { query := req.URL.Query() - b.Ascending = query.Get("ascending") == "true" + b.Reverse = query.Get("reverse") == "true" } // parseWriteRequest is a convenience method for endpoints that need to parse a diff --git a/helper/raftutil/fsm.go b/helper/raftutil/fsm.go index 26539215cb2b..cad222f2e182 100644 --- a/helper/raftutil/fsm.go +++ b/helper/raftutil/fsm.go @@ -185,28 +185,28 @@ func (f *FSMHelper) StateAsMap() map[string][]interface{} { } // StateAsMap returns a json-able representation of the state -func StateAsMap(state *state.StateStore) map[string][]interface{} { +func StateAsMap(store *state.StateStore) map[string][]interface{} { result := map[string][]interface{}{ - "ACLPolicies": toArray(state.ACLPolicies(nil)), - "ACLTokens": toArray(state.ACLTokens(nil)), - "Allocs": toArray(state.Allocs(nil)), - "CSIPlugins": toArray(state.CSIPlugins(nil)), - "CSIVolumes": toArray(state.CSIVolumes(nil)), - "Deployments": toArray(state.Deployments(nil, false)), - "Evals": toArray(state.Evals(nil, false)), - "Indexes": toArray(state.Indexes()), - "JobSummaries": toArray(state.JobSummaries(nil)), - "JobVersions": toArray(state.JobVersions(nil)), - "Jobs": toArray(state.Jobs(nil)), - "Nodes": toArray(state.Nodes(nil)), - "PeriodicLaunches": toArray(state.PeriodicLaunches(nil)), - "SITokenAccessors": toArray(state.SITokenAccessors(nil)), - "ScalingEvents": toArray(state.ScalingEvents(nil)), - "ScalingPolicies": toArray(state.ScalingPolicies(nil)), - "VaultAccessors": toArray(state.VaultAccessors(nil)), + "ACLPolicies": toArray(store.ACLPolicies(nil)), + "ACLTokens": toArray(store.ACLTokens(nil, state.SortDefault)), + "Allocs": toArray(store.Allocs(nil, state.SortDefault)), + "CSIPlugins": toArray(store.CSIPlugins(nil)), + "CSIVolumes": toArray(store.CSIVolumes(nil)), + "Deployments": toArray(store.Deployments(nil, state.SortDefault)), + "Evals": toArray(store.Evals(nil, state.SortDefault)), + "Indexes": toArray(store.Indexes()), + "JobSummaries": toArray(store.JobSummaries(nil)), + "JobVersions": toArray(store.JobVersions(nil)), + "Jobs": toArray(store.Jobs(nil)), + "Nodes": toArray(store.Nodes(nil)), + "PeriodicLaunches": toArray(store.PeriodicLaunches(nil)), + "SITokenAccessors": toArray(store.SITokenAccessors(nil)), + "ScalingEvents": toArray(store.ScalingEvents(nil)), + "ScalingPolicies": toArray(store.ScalingPolicies(nil)), + "VaultAccessors": toArray(store.VaultAccessors(nil)), } - insertEnterpriseState(result, state) + insertEnterpriseState(result, store) return result diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index b9a55cfa06dd..8f303195270f 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -3,6 +3,7 @@ package nomad import ( "fmt" "io/ioutil" + "net/http" "os" "path/filepath" "strings" @@ -14,6 +15,7 @@ import ( policy "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" ) @@ -652,6 +654,7 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo } // Setup the blocking query + sort := state.SortOption(args.Reverse) opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, @@ -659,34 +662,59 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo // Iterate over all the tokens var err error var iter memdb.ResultIterator + var opts paginator.StructsTokenizerOptions + if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = state.ACLTokenByAccessorIDPrefix(ws, prefix) + opts = paginator.StructsTokenizerOptions{ + WithID: true, + } } else if args.GlobalOnly { iter, err = state.ACLTokensByGlobal(ws, true) + opts = paginator.StructsTokenizerOptions{ + WithID: true, + } } else { - iter, err = state.ACLTokens(ws) + iter, err = state.ACLTokens(ws, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } if err != nil { return err } - // Convert all the tokens to a list stub - reply.Tokens = nil - for { - raw := iter.Next() - if raw == nil { - break - } - token := raw.(*structs.ACLToken) - reply.Tokens = append(reply.Tokens, token.Stub()) + tokenizer := paginator.NewStructsTokenizer(iter, opts) + + var tokens []*structs.ACLTokenListStub + paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions, + func(raw interface{}) error { + token := raw.(*structs.ACLToken) + tokens = append(tokens, token.Stub()) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) + } + + nextToken, err := paginator.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) } + reply.QueryMeta.NextToken = nextToken + reply.Tokens = tokens + // Use the last index that affected the token table index, err := state.Index("acl_token") if err != nil { return err } reply.Index = index + return nil }} return a.srv.blockingRPC(&opts) diff --git a/nomad/acl_endpoint_test.go b/nomad/acl_endpoint_test.go index 7258aba8853e..6e934b0048ba 100644 --- a/nomad/acl_endpoint_test.go +++ b/nomad/acl_endpoint_test.go @@ -919,6 +919,286 @@ func TestACLEndpoint_ListTokens(t *testing.T) { assert.Equal(t, 2, len(resp3.Tokens)) } +func TestACLEndpoint_ListTokens_PaginationFiltering(t *testing.T) { + t.Parallel() + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.ACLEnabled = true + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // create a set of ACL tokens. these are in the order that the state store + // will return them from the iterator (sorted by key) for ease of writing + // tests + mocks := []struct { + ids []string + typ string + }{ + {ids: []string{"aaaa1111-3350-4b4b-d185-0e1992ed43e9"}, typ: "management"}, // 0 + {ids: []string{"aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}}, // 1 + {ids: []string{"aaaaaa33-3350-4b4b-d185-0e1992ed43e9"}}, // 2 + {ids: []string{"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}}, // 3 + {ids: []string{"aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}}, // 4 + {ids: []string{"aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}}, // 5 + {ids: []string{"aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}}, // 6 + {ids: []string{"00000111-3350-4b4b-d185-0e1992ed43e9"}}, // 7 + {ids: []string{ // 8 + "00000222-3350-4b4b-d185-0e1992ed43e9", + "00000333-3350-4b4b-d185-0e1992ed43e9", + }}, + {}, // 9, index missing + {ids: []string{"bbbb1111-3350-4b4b-d185-0e1992ed43e9"}}, // 10 + } + + state := s1.fsm.State() + + var bootstrapToken string + for i, m := range mocks { + tokensInTx := []*structs.ACLToken{} + for _, id := range m.ids { + token := mock.ACLToken() + token.AccessorID = id + token.Type = m.typ + tokensInTx = append(tokensInTx, token) + } + index := 1000 + uint64(i) + + // bootstrap cluster with the first token + if i == 0 { + token := tokensInTx[0] + bootstrapToken = token.SecretID + err := s1.State().BootstrapACLTokens(structs.MsgTypeTestSetup, index, 0, token) + require.NoError(t, err) + + err = state.UpsertACLTokens(structs.MsgTypeTestSetup, index, tokensInTx[1:]) + require.NoError(t, err) + } else { + err := state.UpsertACLTokens(structs.MsgTypeTestSetup, index, tokensInTx) + require.NoError(t, err) + } + } + + cases := []struct { + name string + prefix string + filter string + nextToken string + pageSize int32 + expectedNextToken string + expectedIDs []string + expectedError string + }{ + { + name: "test01 size-2 page-1", + pageSize: 2, + expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test02 size-2 page-1 with prefix", + prefix: "aaaa", + pageSize: 2, + expectedNextToken: "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test03 size-2 page-2 default NS", + pageSize: 2, + nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test04 go-bexpr filter", + filter: `AccessorID matches "^a+[123]"`, + expectedIDs: []string{ + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test05 go-bexpr filter with pagination", + filter: `AccessorID matches "^a+[123]"`, + pageSize: 2, + expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test06 go-bexpr invalid expression", + filter: `NotValid`, + expectedError: "failed to read filter expression", + }, + { + name: "test07 go-bexpr invalid field", + filter: `InvalidField == "value"`, + expectedError: "error finding value in datum", + }, + { + name: "test08 non-lexicographic order", + pageSize: 1, + nextToken: "1007.00000111-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1008.00000222-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "00000111-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test09 same index", + pageSize: 1, + nextToken: "1008.00000222-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1008.00000333-3350-4b4b-d185-0e1992ed43e9", + expectedIDs: []string{ + "00000222-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test10 missing index", + pageSize: 1, + nextToken: "1009.e9522802-0cd8-4b1d-9c9e-ab3d97938371", + expectedIDs: []string{ + "bbbb1111-3350-4b4b-d185-0e1992ed43e9", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := &structs.ACLTokenListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: tc.prefix, + Filter: tc.filter, + PerPage: tc.pageSize, + NextToken: tc.nextToken, + }, + } + req.AuthToken = bootstrapToken + var resp structs.ACLTokenListResponse + err := msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", req, &resp) + if tc.expectedError == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + return + } + + gotIDs := []string{} + for _, token := range resp.Tokens { + gotIDs = append(gotIDs, token.AccessorID) + } + require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of tokens") + require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken") + }) + } +} + +func TestACLEndpoint_ListTokens_Order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.ACLEnabled = true + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + token1 := mock.ACLManagementToken() + token1.AccessorID = uuid1 + + uuid2 := uuid.Generate() + token2 := mock.ACLToken() + token2.AccessorID = uuid2 + + uuid3 := uuid.Generate() + token3 := mock.ACLToken() + token3.AccessorID = uuid3 + + // bootstrap cluster with the first token + bootstrapToken := token1.SecretID + err := s1.State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1000, 0, token1) + require.NoError(t, err) + + err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1001, []*structs.ACLToken{token2}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1002, []*structs.ACLToken{token3}) + require.NoError(t, err) + + // update token2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertACLTokens(structs.MsgTypeTestSetup, 1003, []*structs.ACLToken{token2}) + require.NoError(t, err) + + t.Run("default", func(t *testing.T) { + // Lookup the tokens in the default order (oldest first) + get := &structs.ACLTokenListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + get.AuthToken = bootstrapToken + + var resp structs.ACLTokenListResponse + err = msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Tokens, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Tokens[0].CreateIndex) + require.Equal(t, uuid1, resp.Tokens[0].AccessorID) + + require.Equal(t, uint64(1001), resp.Tokens[1].CreateIndex) + require.Equal(t, uuid2, resp.Tokens[1].AccessorID) + + require.Equal(t, uint64(1002), resp.Tokens[2].CreateIndex) + require.Equal(t, uuid3, resp.Tokens[2].AccessorID) + }) + + t.Run("reverse", func(t *testing.T) { + // Lookup the tokens in reverse order (newest first) + get := &structs.ACLTokenListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Reverse: true, + }, + } + get.AuthToken = bootstrapToken + + var resp structs.ACLTokenListResponse + err = msgpackrpc.CallWithCodec(codec, "ACL.ListTokens", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Tokens, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Tokens[0].CreateIndex) + require.Equal(t, uuid3, resp.Tokens[0].AccessorID) + + require.Equal(t, uint64(1001), resp.Tokens[1].CreateIndex) + require.Equal(t, uuid2, resp.Tokens[1].AccessorID) + + require.Equal(t, uint64(1000), resp.Tokens[2].CreateIndex) + require.Equal(t, uuid1, resp.Tokens[2].AccessorID) + }) +} + func TestACLEndpoint_ListTokens_Blocking(t *testing.T) { t.Parallel() diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 92abee62f4a5..65b0ea4b7211 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -2,6 +2,7 @@ package nomad import ( "fmt" + "net/http" "time" metrics "github.com/armon/go-metrics" @@ -13,6 +14,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" ) @@ -32,111 +34,103 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes } defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now()) - if args.RequestNamespace() == structs.AllNamespacesSentinel { - return a.listAllNamespaces(args, reply) - } + namespace := args.RequestNamespace() + var allow func(string) bool // Check namespace read-job permissions aclObj, err := a.srv.ResolveToken(args.AuthToken) - if err != nil { + + switch { + case err != nil: return err - } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { + case aclObj == nil: + allow = func(string) bool { + return true + } + case namespace == structs.AllNamespacesSentinel: + allow = func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob) + } + case !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityReadJob): return structs.ErrPermissionDenied + default: + allow = func(string) bool { + return true + } } // Setup the blocking query + sort := state.SortOption(args.Reverse) opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { - // Capture all the allocations + // Scan all the allocations var err error var iter memdb.ResultIterator + var opts paginator.StructsTokenizerOptions - prefix := args.QueryOptions.Prefix - if prefix != "" { - iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix) - } else { - iter, err = state.AllocsByNamespace(ws, args.RequestNamespace()) - } - if err != nil { - return err - } - - var allocs []*structs.AllocListStub - for { - raw := iter.Next() - if raw == nil { - break - } - alloc := raw.(*structs.Allocation) - allocs = append(allocs, alloc.Stub(args.Fields)) - } - reply.Allocations = allocs - - // Use the last index that affected the jobs table - index, err := state.Index("allocs") - if err != nil { - return err - } - reply.Index = index - - // Set the query response - a.srv.setQueryMeta(&reply.QueryMeta) - return nil - }} - return a.srv.blockingRPC(&opts) -} - -// listAllNamespaces lists all allocations across all namespaces -func (a *Alloc) listAllNamespaces(args *structs.AllocListRequest, reply *structs.AllocListResponse) error { - // Check for read-job permissions - aclObj, err := a.srv.ResolveToken(args.AuthToken) - if err != nil { - return err - } - prefix := args.QueryOptions.Prefix - allow := func(ns string) bool { - return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob) - } - - // Setup the blocking query - opts := blockingOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - run: func(ws memdb.WatchSet, state *state.StateStore) error { // get list of accessible namespaces - allowedNSes, err := allowedNSes(aclObj, state, allow) + allowableNamespaces, err := allowedNSes(aclObj, state, allow) if err == structs.ErrPermissionDenied { - // return empty allocations if token isn't authorized for any + // return empty allocation if token is not authorized for any // namespace, matching other endpoints - reply.Allocations = []*structs.AllocListStub{} + reply.Allocations = make([]*structs.AllocListStub, 0) } else if err != nil { return err } else { - var iter memdb.ResultIterator - var err error - if prefix != "" { - iter, err = state.AllocsByIDPrefixAllNSs(ws, prefix) + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = state.AllocsByIDPrefix(ws, namespace, prefix) + opts = paginator.StructsTokenizerOptions{ + WithID: true, + } + } else if namespace != structs.AllNamespacesSentinel { + iter, err = state.AllocsByNamespaceOrdered(ws, namespace, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } else { - iter, err = state.Allocs(ws) + iter, err = state.Allocs(ws, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } if err != nil { return err } - var allocs []*structs.AllocListStub - for raw := iter.Next(); raw != nil; raw = iter.Next() { - alloc := raw.(*structs.Allocation) - if allowedNSes != nil && !allowedNSes[alloc.Namespace] { - continue - } - allocs = append(allocs, alloc.Stub(args.Fields)) + tokenizer := paginator.NewStructsTokenizer(iter, opts) + filters := []paginator.Filter{ + paginator.NamespaceFilter{ + AllowableNamespaces: allowableNamespaces, + }, + } + + var stubs []*structs.AllocListStub + paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + allocation := raw.(*structs.Allocation) + stubs = append(stubs, allocation.Stub(args.Fields)) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) + } + + nextToken, err := paginator.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) } - reply.Allocations = allocs + + reply.QueryMeta.NextToken = nextToken + reply.Allocations = stubs } - // Use the last index that affected the jobs table + // Use the last index that affected the allocs table index, err := state.Index("allocs") if err != nil { return err diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index d34abfbbf2dc..9efdc06bb5af 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -74,6 +74,330 @@ func TestAllocEndpoint_List(t *testing.T) { require.Equal(t, uint64(1000), resp2.Index) require.Len(t, resp2.Allocations, 1) require.Equal(t, alloc.ID, resp2.Allocations[0].ID) + + // Lookup allocations with a filter + get = &structs.AllocListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + Filter: "TaskGroup == web", + }, + } + + var resp3 structs.AllocListResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp3)) + require.Equal(t, uint64(1000), resp3.Index) + require.Len(t, resp3.Allocations, 1) + require.Equal(t, alloc.ID, resp3.Allocations[0].ID) +} + +func TestAllocEndpoint_List_PaginationFiltering(t *testing.T) { + t.Parallel() + s1, _, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // create a set of allocs and field values to filter on. these are in the order + // that the state store will return them from the iterator (sorted by create + // index), for ease of writing tests. + mocks := []struct { + ids []string + namespace string + group string + }{ + {ids: []string{"aaaa1111-3350-4b4b-d185-0e1992ed43e9"}}, // 0 + {ids: []string{"aaaaaa22-3350-4b4b-d185-0e1992ed43e9"}}, // 1 + {ids: []string{"aaaaaa33-3350-4b4b-d185-0e1992ed43e9"}, namespace: "non-default"}, // 2 + {ids: []string{"aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 3 + {ids: []string{"aaaaaabb-3350-4b4b-d185-0e1992ed43e9"}, group: "goo"}, // 4 + {ids: []string{"aaaaaacc-3350-4b4b-d185-0e1992ed43e9"}}, // 5 + {ids: []string{"aaaaaadd-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 6 + {ids: []string{"aaaaaaee-3350-4b4b-d185-0e1992ed43e9"}, group: "goo"}, // 7 + {ids: []string{"aaaaaaff-3350-4b4b-d185-0e1992ed43e9"}, group: "bar"}, // 8 + {ids: []string{"00000111-3350-4b4b-d185-0e1992ed43e9"}}, // 9 + {ids: []string{ // 10 + "00000222-3350-4b4b-d185-0e1992ed43e9", + "00000333-3350-4b4b-d185-0e1992ed43e9", + }}, + {}, // 11, index missing + {ids: []string{"bbbb1111-3350-4b4b-d185-0e1992ed43e9"}}, // 12 + } + + state := s1.fsm.State() + + var allocs []*structs.Allocation + for i, m := range mocks { + allocsInTx := []*structs.Allocation{} + for _, id := range m.ids { + alloc := mock.Alloc() + alloc.ID = id + if m.namespace != "" { + alloc.Namespace = m.namespace + } + if m.group != "" { + alloc.TaskGroup = m.group + } + allocs = append(allocs, alloc) + allocsInTx = append(allocsInTx, alloc) + } + // other fields + index := 1000 + uint64(i) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, allocsInTx)) + } + + require.NoError(t, state.UpsertNamespaces(1099, []*structs.Namespace{ + {Name: "non-default"}, + })) + + aclToken := mock.CreatePolicyAndToken(t, + state, 1100, "test-valid-read", + mock.NamespacePolicy("*", "read", nil), + ).SecretID + + cases := []struct { + name string + namespace string + prefix string + nextToken string + pageSize int32 + filter string + expIDs []string + expNextToken string + expErr string + }{ + { + name: "test01 size-2 page-1 ns-default", + pageSize: 2, + expIDs: []string{ // first two items + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + }, + expNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default ns + }, + { + name: "test02 size-2 page-1 ns-default with-prefix", + prefix: "aaaa", + pageSize: 2, + expIDs: []string{ + "aaaa1111-3350-4b4b-d185-0e1992ed43e9", + "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", + }, + expNextToken: "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + }, + { + name: "test03 size-2 page-2 ns-default", + pageSize: 2, + nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + expNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9", + expIDs: []string{ + "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test04 size-2 page-2 ns-default with prefix", + prefix: "aaaa", + pageSize: 2, + nextToken: "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + expNextToken: "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", + expIDs: []string{ + "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + "aaaaaacc-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test05 go-bexpr filter", + filter: `TaskGroup == "goo"`, + nextToken: "", + expIDs: []string{ + "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + "aaaaaaee-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test06 go-bexpr filter with pagination", + filter: `TaskGroup == "bar"`, + pageSize: 2, + expNextToken: "1008.aaaaaaff-3350-4b4b-d185-0e1992ed43e9", + expIDs: []string{ + "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test07 go-bexpr filter namespace", + namespace: "non-default", + filter: `ID contains "aaa"`, + expIDs: []string{ + "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test08 go-bexpr wrong namespace", + namespace: "default", + filter: `Namespace == "non-default"`, + expIDs: []string(nil), + }, + { + name: "test09 go-bexpr invalid expression", + filter: `NotValid`, + expErr: "failed to read filter expression", + }, + { + name: "test10 go-bexpr invalid field", + filter: `InvalidField == "value"`, + expErr: "error finding value in datum", + }, + { + name: "test11 non-lexicographic order", + pageSize: 1, + nextToken: "1009.00000111-3350-4b4b-d185-0e1992ed43e9", + expNextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9", + expIDs: []string{ + "00000111-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test12 same index", + pageSize: 1, + nextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9", + expNextToken: "1010.00000333-3350-4b4b-d185-0e1992ed43e9", + expIDs: []string{ + "00000222-3350-4b4b-d185-0e1992ed43e9", + }, + }, + { + name: "test13 missing index", + pageSize: 1, + nextToken: "1011.e9522802-0cd8-4b1d-9c9e-ab3d97938371", + expIDs: []string{ + "bbbb1111-3350-4b4b-d185-0e1992ed43e9", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var req = &structs.AllocListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: tc.namespace, + Prefix: tc.prefix, + PerPage: tc.pageSize, + NextToken: tc.nextToken, + Filter: tc.filter, + }, + Fields: &structs.AllocStubFields{ + Resources: false, + TaskStates: false, + }, + } + req.AuthToken = aclToken + var resp structs.AllocListResponse + err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp) + if tc.expErr == "" { + require.NoError(t, err) + } else { + require.Contains(t, err, tc.expErr) + } + + var gotIDs []string + for _, alloc := range resp.Allocations { + gotIDs = append(gotIDs, alloc.ID) + } + require.Equal(t, tc.expIDs, gotIDs) + require.Equal(t, tc.expNextToken, resp.QueryMeta.NextToken) + }) + } +} + +func TestAllocEndpoint_List_order(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create register requests + uuid1 := uuid.Generate() + alloc1 := mock.Alloc() + alloc1.ID = uuid1 + + uuid2 := uuid.Generate() + alloc2 := mock.Alloc() + alloc2.ID = uuid2 + + uuid3 := uuid.Generate() + alloc3 := mock.Alloc() + alloc3.ID = uuid3 + + err := s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc3}) + require.NoError(t, err) + + // update alloc2 again so we can later assert create index order did not change + err = s1.fsm.State().UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + t.Run("default", func(t *testing.T) { + // Lookup the allocations in the default order (oldest first) + get := &structs.AllocListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + }, + } + + var resp structs.AllocListResponse + err = msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Allocations, 3) + + // Assert returned order is by CreateIndex (ascending) + require.Equal(t, uint64(1000), resp.Allocations[0].CreateIndex) + require.Equal(t, uuid1, resp.Allocations[0].ID) + + require.Equal(t, uint64(1001), resp.Allocations[1].CreateIndex) + require.Equal(t, uuid2, resp.Allocations[1].ID) + + require.Equal(t, uint64(1002), resp.Allocations[2].CreateIndex) + require.Equal(t, uuid3, resp.Allocations[2].ID) + }) + + t.Run("reverse", func(t *testing.T) { + // Lookup the allocations in reverse order (newest first) + get := &structs.AllocListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "*", + Reverse: true, + }, + } + + var resp structs.AllocListResponse + err = msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp) + require.NoError(t, err) + require.Equal(t, uint64(1003), resp.Index) + require.Len(t, resp.Allocations, 3) + + // Assert returned order is by CreateIndex (descending) + require.Equal(t, uint64(1002), resp.Allocations[0].CreateIndex) + require.Equal(t, uuid3, resp.Allocations[0].ID) + + require.Equal(t, uint64(1001), resp.Allocations[1].CreateIndex) + require.Equal(t, uuid2, resp.Allocations[1].ID) + + require.Equal(t, uint64(1000), resp.Allocations[2].CreateIndex) + require.Equal(t, uuid1, resp.Allocations[2].ID) + }) } func TestAllocEndpoint_List_Fields(t *testing.T) { @@ -327,18 +651,23 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) { require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2})) // Create the allocations + uuid1 := uuid.Generate() alloc1 := mock.Alloc() - alloc1.ID = "a" + alloc1.ID[1:] + alloc1.ID = uuid1 alloc1.Namespace = ns1.Name + + uuid2 := uuid.Generate() alloc2 := mock.Alloc() - alloc2.ID = "b" + alloc2.ID[1:] + alloc2.ID = uuid2 alloc2.Namespace = ns2.Name + summary1 := mock.JobSummary(alloc1.JobID) summary2 := mock.JobSummary(alloc2.JobID) - require.NoError(t, state.UpsertJobSummary(999, summary1)) - require.NoError(t, state.UpsertJobSummary(999, summary2)) - require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) + require.NoError(t, state.UpsertJobSummary(1000, summary1)) + require.NoError(t, state.UpsertJobSummary(1001, summary2)) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc1})) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2})) t.Run("looking up all allocations", func(t *testing.T) { get := &structs.AllocListRequest{ @@ -349,7 +678,7 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) { } var resp structs.AllocListResponse require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) - require.Equal(t, uint64(1000), resp.Index) + require.Equal(t, uint64(1003), resp.Index) require.Len(t, resp.Allocations, 2) require.ElementsMatch(t, []string{resp.Allocations[0].ID, resp.Allocations[1].ID}, @@ -367,26 +696,23 @@ func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) { } var resp structs.AllocListResponse require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) - require.Equal(t, uint64(1000), resp.Index) + require.Equal(t, uint64(1003), resp.Index) require.Len(t, resp.Allocations, 1) require.Equal(t, alloc1.ID, resp.Allocations[0].ID) require.Equal(t, alloc1.Namespace, resp.Allocations[0].Namespace) }) t.Run("looking up allocations with mismatch prefix", func(t *testing.T) { - // allocations were constructed above to have prefix starting with "a" or "b" - badPrefix := "cc" - get := &structs.AllocListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", Namespace: "*", - Prefix: badPrefix, + Prefix: "000000", // unlikely to match }, } var resp structs.AllocListResponse require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) - require.Equal(t, uint64(1000), resp.Index) + require.Equal(t, uint64(1003), resp.Index) require.Empty(t, resp.Allocations) }) } diff --git a/nomad/core_sched.go b/nomad/core_sched.go index e4dbaf82ab69..1819847c7d40 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error { // Iterate over the deployments ws := memdb.NewWatchSet() - iter, err := c.snap.Deployments(ws, false) + iter, err := c.snap.Deployments(ws, state.SortDefault) if err != nil { return err } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 139ed4b72dfb..e46351bbfa08 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -3,6 +3,7 @@ package nomad import ( "errors" "fmt" + "net/http" "time" metrics "github.com/armon/go-metrics" @@ -12,6 +13,7 @@ import ( "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" ) @@ -136,40 +138,65 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV } else { iter, err = snap.CSIVolumes(ws) } - if err != nil { return err } + tokenizer := paginator.NewStructsTokenizer( + iter, + paginator.StructsTokenizerOptions{ + WithNamespace: true, + WithID: true, + }, + ) + volFilter := paginator.GenericFilter{ + Allow: func(raw interface{}) (bool, error) { + vol := raw.(*structs.CSIVolume) + + // Remove (possibly again) by PluginID to handle passing both + // NodeID and PluginID + if args.PluginID != "" && args.PluginID != vol.PluginID { + return false, nil + } + + // Remove by Namespace, since CSIVolumesByNodeID hasn't used + // the Namespace yet + if ns != structs.AllNamespacesSentinel && vol.Namespace != ns { + return false, nil + } + + return true, nil + }, + } + filters := []paginator.Filter{volFilter} + // Collect results, filter by ACL access vs := []*structs.CSIVolListStub{} - for { - raw := iter.Next() - if raw == nil { - break - } - vol := raw.(*structs.CSIVolume) + paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + vol := raw.(*structs.CSIVolume) - // Remove (possibly again) by PluginID to handle passing both - // NodeID and PluginID - if args.PluginID != "" && args.PluginID != vol.PluginID { - continue - } - - // Remove by Namespace, since CSIVolumesByNodeID hasn't used - // the Namespace yet - if ns != structs.AllNamespacesSentinel && vol.Namespace != ns { - continue - } + vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy()) + if err != nil { + return err + } - vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy()) - if err != nil { - return err - } + vs = append(vs, vol.Stub()) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) + } - vs = append(vs, vol.Stub()) + nextToken, err := paginator.Page() + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) } + + reply.QueryMeta.NextToken = nextToken reply.Volumes = vs return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta) }} diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 528020bc91b3..a7129e8f4079 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -753,6 +753,200 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) { require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace) } +func TestCSIVolumeEndpoint_List_PaginationFiltering(t *testing.T) { + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + nonDefaultNS := "non-default" + + // create a set of volumes. these are in the order that the state store + // will return them from the iterator (sorted by create index), for ease of + // writing tests + mocks := []struct { + id string + namespace string + }{ + {id: "vol-01"}, // 0 + {id: "vol-02"}, // 1 + {id: "vol-03", namespace: nonDefaultNS}, // 2 + {id: "vol-04"}, // 3 + {id: "vol-05"}, // 4 + {id: "vol-06"}, // 5 + {id: "vol-07"}, // 6 + {id: "vol-08"}, // 7 + {}, // 9, missing volume + {id: "vol-10"}, // 10 + } + + state := s1.fsm.State() + plugin := mock.CSIPlugin() + + // Create namespaces. + err := state.UpsertNamespaces(999, []*structs.Namespace{{Name: nonDefaultNS}}) + require.NoError(t, err) + + for i, m := range mocks { + if m.id == "" { + continue + } + + volume := mock.CSIVolume(plugin) + volume.ID = m.id + if m.namespace != "" { // defaults to "default" + volume.Namespace = m.namespace + } + index := 1000 + uint64(i) + require.NoError(t, state.CSIVolumeRegister(index, []*structs.CSIVolume{volume})) + } + + cases := []struct { + name string + namespace string + prefix string + filter string + nextToken string + pageSize int32 + expectedNextToken string + expectedIDs []string + expectedError string + }{ + { + name: "test01 size-2 page-1 default NS", + pageSize: 2, + expectedNextToken: "default.vol-04", + expectedIDs: []string{ + "vol-01", + "vol-02", + }, + }, + { + name: "test02 size-2 page-1 default NS with prefix", + prefix: "vol", + pageSize: 2, + expectedNextToken: "default.vol-04", + expectedIDs: []string{ + "vol-01", + "vol-02", + }, + }, + { + name: "test03 size-2 page-2 default NS", + pageSize: 2, + nextToken: "default.vol-04", + expectedNextToken: "default.vol-06", + expectedIDs: []string{ + "vol-04", + "vol-05", + }, + }, + { + name: "test04 size-2 page-2 default NS with prefix", + prefix: "vol", + pageSize: 2, + nextToken: "default.vol-04", + expectedNextToken: "default.vol-06", + expectedIDs: []string{ + "vol-04", + "vol-05", + }, + }, + { + name: "test05 no valid results with filters and prefix", + prefix: "cccc", + pageSize: 2, + nextToken: "", + expectedIDs: []string{}, + }, + { + name: "test06 go-bexpr filter", + namespace: "*", + filter: `ID matches "^vol-0[123]"`, + expectedIDs: []string{ + "vol-01", + "vol-02", + "vol-03", + }, + }, + { + name: "test07 go-bexpr filter with pagination", + namespace: "*", + filter: `ID matches "^vol-0[123]"`, + pageSize: 2, + expectedNextToken: "non-default.vol-03", + expectedIDs: []string{ + "vol-01", + "vol-02", + }, + }, + { + name: "test08 go-bexpr filter in namespace", + namespace: "non-default", + filter: `Provider == "com.hashicorp:mock"`, + expectedIDs: []string{ + "vol-03", + }, + }, + { + name: "test09 go-bexpr wrong namespace", + namespace: "default", + filter: `Namespace == "non-default"`, + expectedIDs: []string{}, + }, + { + name: "test10 go-bexpr invalid expression", + filter: `NotValid`, + expectedError: "failed to read filter expression", + }, + { + name: "test11 go-bexpr invalid field", + filter: `InvalidField == "value"`, + expectedError: "error finding value in datum", + }, + { + name: "test14 missing volume", + pageSize: 1, + nextToken: "default.vol-09", + expectedIDs: []string{ + "vol-10", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := &structs.CSIVolumeListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: tc.namespace, + Prefix: tc.prefix, + Filter: tc.filter, + PerPage: tc.pageSize, + NextToken: tc.nextToken, + }, + } + var resp structs.CSIVolumeListResponse + err := msgpackrpc.CallWithCodec(codec, "CSIVolume.List", req, &resp) + if tc.expectedError == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + return + } + + gotIDs := []string{} + for _, deployment := range resp.Volumes { + gotIDs = append(gotIDs, deployment.ID) + } + require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of volumes") + require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken") + }) + } +} + func TestCSIVolumeEndpoint_Create(t *testing.T) { t.Parallel() var err error diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 70f685d4a280..d0217f424efb 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -10,33 +10,10 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" ) -// DeploymentPaginationIterator is a wrapper over a go-memdb iterator that -// implements the paginator Iterator interface. -type DeploymentPaginationIterator struct { - iter memdb.ResultIterator - byCreateIndex bool -} - -func (it DeploymentPaginationIterator) Next() (string, interface{}) { - raw := it.iter.Next() - if raw == nil { - return "", nil - } - - d := raw.(*structs.Deployment) - token := d.ID - - // prefix the pagination token by CreateIndex to keep it properly sorted. - if it.byCreateIndex { - token = fmt.Sprintf("%v-%v", d.CreateIndex, d.ID) - } - - return token, d -} - // Deployment endpoint is used for manipulating deployments type Deployment struct { srv *Server @@ -426,6 +403,7 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De } // Setup the blocking query + sort := state.SortOption(args.Reverse) opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, @@ -433,26 +411,34 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De // Capture all the deployments var err error var iter memdb.ResultIterator - var deploymentIter DeploymentPaginationIterator + var opts paginator.StructsTokenizerOptions if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix) - deploymentIter.byCreateIndex = false + opts = paginator.StructsTokenizerOptions{ + WithID: true, + } } else if namespace != structs.AllNamespacesSentinel { - iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.Ascending) - deploymentIter.byCreateIndex = true + iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } else { - iter, err = store.Deployments(ws, args.Ascending) - deploymentIter.byCreateIndex = true + iter, err = store.Deployments(ws, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } if err != nil { return err } - deploymentIter.iter = iter + tokenizer := paginator.NewStructsTokenizer(iter, opts) var deploys []*structs.Deployment - paginator, err := state.NewPaginator(deploymentIter, args.QueryOptions, + paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions, func(raw interface{}) error { deploy := raw.(*structs.Deployment) deploys = append(deploys, deploy) diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index e91bc28a8a77..08982aeda75e 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -1066,13 +1066,12 @@ func TestDeploymentEndpoint_List_order(t *testing.T) { err = s1.fsm.State().UpsertDeployment(1003, dep2) require.NoError(t, err) - t.Run("ascending", func(t *testing.T) { + t.Run("default", func(t *testing.T) { // Lookup the deployments in chronological order (oldest first) get := &structs.DeploymentListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", Namespace: "*", - Ascending: true, }, } @@ -1093,13 +1092,13 @@ func TestDeploymentEndpoint_List_order(t *testing.T) { require.Equal(t, uuid3, resp.Deployments[2].ID) }) - t.Run("descending", func(t *testing.T) { + t.Run("reverse", func(t *testing.T) { // Lookup the deployments in reverse chronological order (newest first) get := &structs.DeploymentListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", Namespace: "*", - Ascending: false, + Reverse: true, }, } @@ -1312,7 +1311,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { { name: "test01 size-2 page-1 default NS", pageSize: 2, - expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", @@ -1331,8 +1330,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { { name: "test03 size-2 page-2 default NS", pageSize: 2, - nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1005-aaaaaacc-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", @@ -1353,8 +1352,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { name: "test05 size-2 page-2 all namespaces", namespace: "*", pageSize: 2, - nextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1004-aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", @@ -1382,7 +1381,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { namespace: "*", filter: `ID matches "^a+[123]"`, pageSize: 2, - expectedNextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", @@ -1415,8 +1414,8 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { { name: "test13 non-lexicographic order", pageSize: 1, - nextToken: "1007-00000111-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1009-bbbb1111-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1007.00000111-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1009.bbbb1111-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "00000111-3350-4b4b-d185-0e1992ed43e9", }, @@ -1424,7 +1423,7 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { { name: "test14 missing index", pageSize: 1, - nextToken: "1008-e9522802-0cd8-4b1d-9c9e-ab3d97938371", + nextToken: "1008.e9522802-0cd8-4b1d-9c9e-ab3d97938371", expectedIDs: []string{ "bbbb1111-3350-4b4b-d185-0e1992ed43e9", }, @@ -1441,7 +1440,6 @@ func TestDeploymentEndpoint_List_Pagination(t *testing.T) { Filter: tc.filter, PerPage: tc.pageSize, NextToken: tc.nextToken, - Ascending: true, // counting up is easier to think about }, } req.AuthToken = aclToken diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 8743f2b1274a..56617430fde2 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -202,9 +202,9 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D } // getDeploysImpl retrieves all deployments from the passed state store. -func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { +func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { - iter, err := state.Deployments(ws, false) + iter, err := store.Deployments(ws, state.SortDefault) if err != nil { return nil, 0, err } @@ -220,7 +220,7 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (in } // Use the last index that affected the deployment table - index, err := state.Index("deployment") + index, err := store.Index("deployment") if err != nil { return nil, 0, err } diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 739e2e14df93..cb8aca2ea9be 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -931,9 +931,9 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { } // Wait for the two allocations to be placed - state := s1.State() + store := s1.State() testutil.WaitForResult(func() (bool, error) { - iter, err := state.Allocs(nil) + iter, err := store.Allocs(nil, state.SortDefault) if err != nil { return false, err } @@ -974,11 +974,11 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger) testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) + allocs, err := store.AllocsByNode(nil, n2.ID) if err != nil { return false, err } @@ -992,7 +992,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { if err := checkAllocPromoter(errCh); err != nil { return false, err } - node, err := state.NodeByID(nil, n1.ID) + node, err := store.NodeByID(nil, n1.ID) if err != nil { return false, err } @@ -1002,7 +1002,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { }) // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) + node, err := store.NodeByID(nil, n1.ID) require.NoError(err) // sometimes test gets a duplicate node drain complete event require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 0b6b26f598aa..daec216d282f 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) @@ -21,30 +22,6 @@ const ( DefaultDequeueTimeout = time.Second ) -// EvalPaginationIterator is a wrapper over a go-memdb iterator that implements -// the paginator Iterator interface. -type EvalPaginationIterator struct { - iter memdb.ResultIterator - byCreateIndex bool -} - -func (it EvalPaginationIterator) Next() (string, interface{}) { - raw := it.iter.Next() - if raw == nil { - return "", nil - } - - eval := raw.(*structs.Evaluation) - token := eval.ID - - // prefix the pagination token by CreateIndex to keep it properly sorted. - if it.byCreateIndex { - token = fmt.Sprintf("%v-%v", eval.CreateIndex, eval.ID) - } - - return token, eval -} - // Eval endpoint is used for eval interactions type Eval struct { srv *Server @@ -431,6 +408,7 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon } // Setup the blocking query + sort := state.SortOption(args.Reverse) opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, @@ -438,17 +416,25 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon // Scan all the evaluations var err error var iter memdb.ResultIterator - var evalIter EvalPaginationIterator + var opts paginator.StructsTokenizerOptions if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = store.EvalsByIDPrefix(ws, namespace, prefix) - evalIter.byCreateIndex = false + opts = paginator.StructsTokenizerOptions{ + WithID: true, + } } else if namespace != structs.AllNamespacesSentinel { - iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.Ascending) - evalIter.byCreateIndex = true + iter, err = store.EvalsByNamespaceOrdered(ws, namespace, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } else { - iter, err = store.Evals(ws, args.Ascending) - evalIter.byCreateIndex = true + iter, err = store.Evals(ws, sort) + opts = paginator.StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + } } if err != nil { return err @@ -460,10 +446,11 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon } return false }) - evalIter.iter = iter + + tokenizer := paginator.NewStructsTokenizer(iter, opts) var evals []*structs.Evaluation - paginator, err := state.NewPaginator(evalIter, args.QueryOptions, + paginator, err := paginator.NewPaginator(iter, tokenizer, nil, args.QueryOptions, func(raw interface{}) error { eval := raw.(*structs.Evaluation) evals = append(evals, eval) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 92463394e7ee..6a2bf4575dd8 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -751,40 +751,12 @@ func TestEvalEndpoint_List_order(t *testing.T) { err = s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2}) require.NoError(t, err) - t.Run("descending", func(t *testing.T) { - // Lookup the evaluations in reverse chronological order + t.Run("default", func(t *testing.T) { + // Lookup the evaluations in the default order (oldest first) get := &structs.EvalListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", Namespace: "*", - Ascending: false, - }, - } - - var resp structs.EvalListResponse - err = msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp) - require.NoError(t, err) - require.Equal(t, uint64(1003), resp.Index) - require.Len(t, resp.Evaluations, 3) - - // Assert returned order is by CreateIndex (descending) - require.Equal(t, uint64(1002), resp.Evaluations[0].CreateIndex) - require.Equal(t, uuid3, resp.Evaluations[0].ID) - - require.Equal(t, uint64(1001), resp.Evaluations[1].CreateIndex) - require.Equal(t, uuid2, resp.Evaluations[1].ID) - - require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) - require.Equal(t, uuid1, resp.Evaluations[2].ID) - }) - - t.Run("ascending", func(t *testing.T) { - // Lookup the evaluations in reverse chronological order (newest first) - get := &structs.EvalListRequest{ - QueryOptions: structs.QueryOptions{ - Region: "global", - Namespace: "*", - Ascending: true, }, } @@ -805,13 +777,13 @@ func TestEvalEndpoint_List_order(t *testing.T) { require.Equal(t, uuid3, resp.Evaluations[2].ID) }) - t.Run("descending", func(t *testing.T) { - // Lookup the evaluations in chronological order (oldest first) + t.Run("reverse", func(t *testing.T) { + // Lookup the evaluations in reverse order (newest first) get := &structs.EvalListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", Namespace: "*", - Ascending: false, + Reverse: true, }, } @@ -831,7 +803,6 @@ func TestEvalEndpoint_List_order(t *testing.T) { require.Equal(t, uint64(1000), resp.Evaluations[2].CreateIndex) require.Equal(t, uuid1, resp.Evaluations[2].ID) }) - } func TestEvalEndpoint_ListAllNamespaces(t *testing.T) { @@ -1084,7 +1055,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", }, - expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace + expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", // next one in default namespace }, { name: "test02 size-2 page-1 default NS with prefix", @@ -1099,8 +1070,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { { name: "test03 size-2 page-2 default NS", pageSize: 2, - nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1005-aaaaaacc-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1005.aaaaaacc-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", "aaaaaabb-3350-4b4b-d185-0e1992ed43e9", @@ -1123,7 +1094,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { filterJobID: "example", filterStatus: "pending", // aaaaaaaa, bb, and cc are filtered by status - expectedNextToken: "1006-aaaaaadd-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1006.aaaaaadd-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", @@ -1159,7 +1130,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { pageSize: 3, // reads off the end filterJobID: "example", filterStatus: "pending", - nextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", expectedNextToken: "", expectedIDs: []string{ "aaaaaadd-3350-4b4b-d185-0e1992ed43e9", @@ -1183,8 +1154,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { name: "test10 size-2 page-2 all namespaces", namespace: "*", pageSize: 2, - nextToken: "1002-aaaaaa33-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1004-aaaaaabb-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1002.aaaaaa33-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1004.aaaaaabb-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaaaa33-3350-4b4b-d185-0e1992ed43e9", "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", @@ -1228,7 +1199,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { name: "test16 go-bexpr filter with pagination", filter: `JobID == "example"`, pageSize: 2, - expectedNextToken: "1003-aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1003.aaaaaaaa-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "aaaa1111-3350-4b4b-d185-0e1992ed43e9", "aaaaaa22-3350-4b4b-d185-0e1992ed43e9", @@ -1267,8 +1238,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { { name: "test22 non-lexicographic order", pageSize: 1, - nextToken: "1009-00000111-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1010-00000222-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1009.00000111-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "00000111-3350-4b4b-d185-0e1992ed43e9", }, @@ -1276,8 +1247,8 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { { name: "test23 same index", pageSize: 1, - nextToken: "1010-00000222-3350-4b4b-d185-0e1992ed43e9", - expectedNextToken: "1010-00000333-3350-4b4b-d185-0e1992ed43e9", + nextToken: "1010.00000222-3350-4b4b-d185-0e1992ed43e9", + expectedNextToken: "1010.00000333-3350-4b4b-d185-0e1992ed43e9", expectedIDs: []string{ "00000222-3350-4b4b-d185-0e1992ed43e9", }, @@ -1285,7 +1256,7 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { { name: "test24 missing index", pageSize: 1, - nextToken: "1011-e9522802-0cd8-4b1d-9c9e-ab3d97938371", + nextToken: "1011.e9522802-0cd8-4b1d-9c9e-ab3d97938371", expectedIDs: []string{ "bbbb1111-3350-4b4b-d185-0e1992ed43e9", }, @@ -1304,7 +1275,6 @@ func TestEvalEndpoint_List_PaginationFiltering(t *testing.T) { PerPage: tc.pageSize, NextToken: tc.nextToken, Filter: tc.filter, - Ascending: true, // counting up is easier to think about }, } req.AuthToken = aclToken diff --git a/nomad/fsm.go b/nomad/fsm.go index c288fc09b4c2..a25e46885ab1 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1704,16 +1704,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // failLeakedDeployments is used to fail deployments that do not have a job. // This state is a broken invariant that should not occur since 0.8.X. -func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { +func (n *nomadFSM) failLeakedDeployments(store *state.StateStore) error { // Scan for deployments that are referencing a job that no longer exists. // This could happen if multiple deployments were created for a given job // and thus the older deployment leaks and then the job is removed. - iter, err := state.Deployments(nil, false) + iter, err := store.Deployments(nil, state.SortDefault) if err != nil { return fmt.Errorf("failed to query deployments: %v", err) } - dindex, err := state.Index("deployment") + dindex, err := store.Index("deployment") if err != nil { return fmt.Errorf("couldn't fetch index of deployments table: %v", err) } @@ -1733,7 +1733,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { } // Find the job - job, err := state.JobByID(nil, d.Namespace, d.JobID) + job, err := store.JobByID(nil, d.Namespace, d.JobID) if err != nil { return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err) } @@ -1747,7 +1747,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error { failed := d.Copy() failed.Status = structs.DeploymentStatusCancelled failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob - if err := state.UpsertDeployment(dindex, failed); err != nil { + if err := store.UpsertDeployment(dindex, failed); err != nil { return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err) } } @@ -2099,7 +2099,7 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the allocations ws := memdb.NewWatchSet() - allocs, err := s.snap.Allocs(ws) + allocs, err := s.snap.Allocs(ws, state.SortDefault) if err != nil { return err } @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the jobs ws := memdb.NewWatchSet() - deployments, err := s.snap.Deployments(ws, false) + deployments, err := s.snap.Deployments(ws, state.SortDefault) if err != nil { return err } @@ -2306,7 +2306,7 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the policies ws := memdb.NewWatchSet() - tokens, err := s.snap.ACLTokens(ws) + tokens, err := s.snap.ACLTokens(ws, state.SortDefault) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 418cbe1afa3f..d6ab290584e4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -3,6 +3,7 @@ package nomad import ( "context" "fmt" + "net/http" "sort" "strings" "time" @@ -20,6 +21,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) @@ -1339,15 +1341,29 @@ func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) } defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now()) - if args.RequestNamespace() == structs.AllNamespacesSentinel { - return j.listAllNamespaces(args, reply) - } + namespace := args.RequestNamespace() + var allow func(string) bool // Check for list-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + aclObj, err := j.srv.ResolveToken(args.AuthToken) + + switch { + case err != nil: return err - } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) { + case aclObj == nil: + allow = func(string) bool { + return true + } + case namespace == structs.AllNamespacesSentinel: + allow = func(ns string) bool { + return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) + } + case !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityListJobs): return structs.ErrPermissionDenied + default: + allow = func(string) bool { + return true + } } // Setup the blocking query @@ -1358,106 +1374,65 @@ func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) // Capture all the jobs var err error var iter memdb.ResultIterator - if prefix := args.QueryOptions.Prefix; prefix != "" { - iter, err = state.JobsByIDPrefix(ws, args.RequestNamespace(), prefix) - } else { - iter, err = state.JobsByNamespace(ws, args.RequestNamespace()) - } - if err != nil { - return err - } - - var jobs []*structs.JobListStub - for { - raw := iter.Next() - if raw == nil { - break - } - job := raw.(*structs.Job) - summary, err := state.JobSummaryByID(ws, args.RequestNamespace(), job.ID) - if err != nil { - return fmt.Errorf("unable to look up summary for job: %v", job.ID) - } - jobs = append(jobs, job.Stub(summary)) - } - reply.Jobs = jobs - - // Use the last index that affected the jobs table or summary - jindex, err := state.Index("jobs") - if err != nil { - return err - } - sindex, err := state.Index("job_summary") - if err != nil { - return err - } - reply.Index = helper.Uint64Max(jindex, sindex) - - // Set the query response - j.srv.setQueryMeta(&reply.QueryMeta) - return nil - }} - return j.srv.blockingRPC(&opts) -} -// listAllNamespaces lists all jobs across all namespaces -func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.JobListResponse) error { - // Check for list-job permissions - aclObj, err := j.srv.ResolveToken(args.AuthToken) - if err != nil { - return err - } - prefix := args.QueryOptions.Prefix - allow := func(ns string) bool { - return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) - } - - // Setup the blocking query - opts := blockingOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - run: func(ws memdb.WatchSet, state *state.StateStore) error { // check if user has permission to all namespaces - allowedNSes, err := allowedNSes(aclObj, state, allow) + allowableNamespaces, err := allowedNSes(aclObj, state, allow) if err == structs.ErrPermissionDenied { // return empty jobs if token isn't authorized for any // namespace, matching other endpoints - reply.Jobs = []*structs.JobListStub{} - return nil + reply.Jobs = make([]*structs.JobListStub, 0) } else if err != nil { return err - } - - // Capture all the jobs - iter, err := state.Jobs(ws) - - if err != nil { - return err - } - - var jobs []*structs.JobListStub - for { - raw := iter.Next() - if raw == nil { - break + } else { + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = state.JobsByIDPrefix(ws, namespace, prefix) + } else if namespace != structs.AllNamespacesSentinel { + iter, err = state.JobsByNamespace(ws, namespace) + } else { + iter, err = state.Jobs(ws) } - job := raw.(*structs.Job) - if allowedNSes != nil && !allowedNSes[job.Namespace] { - // not permitted to this name namespace - continue + if err != nil { + return err } - if prefix != "" && !strings.HasPrefix(job.ID, prefix) { - continue + + tokenizer := paginator.NewStructsTokenizer( + iter, + paginator.StructsTokenizerOptions{ + WithNamespace: true, + WithID: true, + }, + ) + filters := []paginator.Filter{ + paginator.NamespaceFilter{ + AllowableNamespaces: allowableNamespaces, + }, + } + + var jobs []*structs.JobListStub + paginator, err := paginator.NewPaginator(iter, tokenizer, filters, args.QueryOptions, + func(raw interface{}) error { + job := raw.(*structs.Job) + summary, err := state.JobSummaryByID(ws, namespace, job.ID) + if err != nil { + return fmt.Errorf("unable to look up summary for job: %v", job.ID) + } + jobs = append(jobs, job.Stub(summary)) + return nil + }) + if err != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to create result paginator: %v", err) } - summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID) + + nextToken, err := paginator.Page() if err != nil { - return fmt.Errorf("unable to look up summary for job: %v", job.ID) + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to read result page: %v", err) } - stub := job.Stub(summary) - jobs = append(jobs, stub) + reply.QueryMeta.NextToken = nextToken + reply.Jobs = jobs } - reply.Jobs = jobs // Use the last index that affected the jobs table or summary jindex, err := state.Index("jobs") @@ -1475,7 +1450,6 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job return nil }} return j.srv.blockingRPC(&opts) - } // Allocations is used to list the allocations for a job diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3850290fa4f7..8cb93639974e 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5151,6 +5151,184 @@ func TestJobEndpoint_ListJobs_Blocking(t *testing.T) { } } +func TestJobEndpoint_ListJobs_PaginationFiltering(t *testing.T) { + t.Parallel() + s1, _, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // create a set of jobs. these are in the order that the state store will + // return them from the iterator (sorted by key) for ease of writing tests + mocks := []struct { + name string + namespace string + status string + }{ + {name: "job-01"}, // 0 + {name: "job-02"}, // 1 + {name: "job-03", namespace: "non-default"}, // 2 + {name: "job-04"}, // 3 + {name: "job-05", status: structs.JobStatusRunning}, // 4 + {name: "job-06", status: structs.JobStatusRunning}, // 5 + {}, // 6, missing job + {name: "job-08"}, // 7 + {name: "job-03", namespace: "other"}, // 8, same name but in another namespace + } + + state := s1.fsm.State() + require.NoError(t, state.UpsertNamespaces(999, []*structs.Namespace{{Name: "non-default"}, {Name: "other"}})) + + for i, m := range mocks { + if m.name == "" { + continue + } + + index := 1000 + uint64(i) + job := mock.Job() + job.ID = m.name + job.Name = m.name + job.Status = m.status + if m.namespace != "" { // defaults to "default" + job.Namespace = m.namespace + } + job.CreateIndex = index + require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, index, job)) + } + + aclToken := mock.CreatePolicyAndToken(t, state, 1100, "test-valid-read", + mock.NamespacePolicy("*", "read", nil)). + SecretID + + cases := []struct { + name string + namespace string + prefix string + filter string + nextToken string + pageSize int32 + expectedNextToken string + expectedIDs []string + expectedError string + }{ + { + name: "test01 size-2 page-1 default NS", + pageSize: 2, + expectedNextToken: "default.job-04", + expectedIDs: []string{"job-01", "job-02"}, + }, + { + name: "test02 size-2 page-1 default NS with prefix", + prefix: "job", + pageSize: 2, + expectedNextToken: "default.job-04", + expectedIDs: []string{"job-01", "job-02"}, + }, + { + name: "test03 size-2 page-2 default NS", + pageSize: 2, + nextToken: "default.job-04", + expectedNextToken: "default.job-06", + expectedIDs: []string{"job-04", "job-05"}, + }, + { + name: "test04 size-2 page-2 default NS with prefix", + prefix: "job", + pageSize: 2, + nextToken: "default.job-04", + expectedNextToken: "default.job-06", + expectedIDs: []string{"job-04", "job-05"}, + }, + { + name: "test05 no valid results with filters and prefix", + prefix: "not-job", + pageSize: 2, + nextToken: "", + expectedIDs: []string{}, + }, + { + name: "test06 go-bexpr filter", + namespace: "*", + filter: `Name matches "job-0[123]"`, + expectedIDs: []string{"job-01", "job-02", "job-03", "job-03"}, + }, + { + name: "test07 go-bexpr filter with pagination", + namespace: "*", + filter: `Name matches "job-0[123]"`, + pageSize: 2, + expectedNextToken: "non-default.job-03", + expectedIDs: []string{"job-01", "job-02"}, + }, + { + name: "test08 go-bexpr filter in namespace", + namespace: "non-default", + filter: `Status == "pending"`, + expectedIDs: []string{"job-03"}, + }, + { + name: "test09 go-bexpr invalid expression", + filter: `NotValid`, + expectedError: "failed to read filter expression", + }, + { + name: "test10 go-bexpr invalid field", + filter: `InvalidField == "value"`, + expectedError: "error finding value in datum", + }, + { + name: "test11 missing index", + pageSize: 1, + nextToken: "default.job-07", + expectedIDs: []string{ + "job-08", + }, + }, + { + name: "test12 same name but different NS", + namespace: "*", + pageSize: 1, + filter: `Name == "job-03"`, + expectedNextToken: "other.job-03", + expectedIDs: []string{ + "job-03", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: tc.namespace, + Prefix: tc.prefix, + Filter: tc.filter, + PerPage: tc.pageSize, + NextToken: tc.nextToken, + }, + } + req.AuthToken = aclToken + var resp structs.JobListResponse + err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp) + if tc.expectedError == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + return + } + + gotIDs := []string{} + for _, job := range resp.Jobs { + gotIDs = append(gotIDs, job.ID) + } + require.Equal(t, tc.expectedIDs, gotIDs, "unexpected page of jobs") + require.Equal(t, tc.expectedNextToken, resp.QueryMeta.NextToken, "unexpected NextToken") + }) + } +} + func TestJobEndpoint_Allocations(t *testing.T) { t.Parallel() diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 8c4bd25eaacc..73cfb47e2edb 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -394,42 +394,42 @@ func wildcard(namespace string) bool { return namespace == structs.AllNamespacesSentinel } -func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, state *state.StateStore) (memdb.ResultIterator, error) { +func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, store *state.StateStore) (memdb.ResultIterator, error) { switch context { case structs.Jobs: if wildcard(namespace) { - iter, err := state.Jobs(ws) + iter, err := store.Jobs(ws) return nsCapIterFilter(iter, err, aclObj) } - return state.JobsByNamespace(ws, namespace) + return store.JobsByNamespace(ws, namespace) case structs.Allocs: if wildcard(namespace) { - iter, err := state.Allocs(ws) + iter, err := store.Allocs(ws, state.SortDefault) return nsCapIterFilter(iter, err, aclObj) } - return state.AllocsByNamespace(ws, namespace) + return store.AllocsByNamespace(ws, namespace) case structs.Nodes: if wildcard(namespace) { - iter, err := state.Nodes(ws) + iter, err := store.Nodes(ws) return nsCapIterFilter(iter, err, aclObj) } - return state.Nodes(ws) + return store.Nodes(ws) case structs.Plugins: if wildcard(namespace) { - iter, err := state.CSIPlugins(ws) + iter, err := store.CSIPlugins(ws) return nsCapIterFilter(iter, err, aclObj) } - return state.CSIPlugins(ws) + return store.CSIPlugins(ws) case structs.Namespaces: - iter, err := state.Namespaces(ws) + iter, err := store.Namespaces(ws) return nsCapIterFilter(iter, err, aclObj) default: - return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, state) + return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, store) } } diff --git a/nomad/state/paginator/filter.go b/nomad/state/paginator/filter.go new file mode 100644 index 000000000000..e4ed1f250f0c --- /dev/null +++ b/nomad/state/paginator/filter.go @@ -0,0 +1,41 @@ +package paginator + +// Filter is the interface that must be implemented to skip values when using +// the Paginator. +type Filter interface { + // Evaluate returns true if the element should be added to the page. + Evaluate(interface{}) (bool, error) +} + +// GenericFilter wraps a function that can be used to provide simple or in +// scope filtering. +type GenericFilter struct { + Allow func(interface{}) (bool, error) +} + +func (f GenericFilter) Evaluate(raw interface{}) (bool, error) { + return f.Allow(raw) +} + +// NamespaceFilter skips elements with a namespace value that is not in the +// allowable set. +type NamespaceFilter struct { + AllowableNamespaces map[string]bool +} + +func (f NamespaceFilter) Evaluate(raw interface{}) (bool, error) { + if raw == nil { + return false, nil + } + + item, _ := raw.(NamespaceGetter) + namespace := item.GetNamespace() + + if f.AllowableNamespaces == nil { + return true, nil + } + if f.AllowableNamespaces[namespace] { + return true, nil + } + return false, nil +} diff --git a/nomad/state/filter_test.go b/nomad/state/paginator/filter_test.go similarity index 61% rename from nomad/state/filter_test.go rename to nomad/state/paginator/filter_test.go index 2fa1b02ad3e9..d94f49a57782 100644 --- a/nomad/state/filter_test.go +++ b/nomad/state/paginator/filter_test.go @@ -1,15 +1,109 @@ -package state +package paginator import ( "testing" "time" "github.com/hashicorp/go-bexpr" - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) +func TestGenericFilter(t *testing.T) { + t.Parallel() + ids := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"} + + filters := []Filter{GenericFilter{ + Allow: func(raw interface{}) (bool, error) { + result := raw.(*mockObject) + return result.id > "5", nil + }, + }} + iter := newTestIterator(ids) + tokenizer := testTokenizer{} + opts := structs.QueryOptions{ + PerPage: 3, + } + results := []string{} + paginator, err := NewPaginator(iter, tokenizer, filters, opts, + func(raw interface{}) error { + result := raw.(*mockObject) + results = append(results, result.id) + return nil + }, + ) + require.NoError(t, err) + + nextToken, err := paginator.Page() + require.NoError(t, err) + + expected := []string{"6", "7", "8"} + require.Equal(t, "9", nextToken) + require.Equal(t, expected, results) +} + +func TestNamespaceFilter(t *testing.T) { + t.Parallel() + + mocks := []*mockObject{ + {namespace: "default"}, + {namespace: "dev"}, + {namespace: "qa"}, + {namespace: "region-1"}, + } + + cases := []struct { + name string + allowable map[string]bool + expected []string + }{ + { + name: "nil map", + expected: []string{"default", "dev", "qa", "region-1"}, + }, + { + name: "allow default", + allowable: map[string]bool{"default": true}, + expected: []string{"default"}, + }, + { + name: "allow multiple", + allowable: map[string]bool{"default": true, "dev": false, "qa": true}, + expected: []string{"default", "qa"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + filters := []Filter{NamespaceFilter{ + AllowableNamespaces: tc.allowable, + }} + iter := newTestIteratorWithMocks(mocks) + tokenizer := testTokenizer{} + opts := structs.QueryOptions{ + PerPage: int32(len(mocks)), + } + + results := []string{} + paginator, err := NewPaginator(iter, tokenizer, filters, opts, + func(raw interface{}) error { + result := raw.(*mockObject) + results = append(results, result.namespace) + return nil + }, + ) + require.NoError(t, err) + + nextToken, err := paginator.Page() + require.NoError(t, err) + require.Equal(t, "", nextToken) + require.Equal(t, tc.expected, results) + }) + } +} + func BenchmarkEvalListFilter(b *testing.B) { const evalCount = 100_000 @@ -76,9 +170,10 @@ func BenchmarkEvalListFilter(b *testing.B) { for i := 0; i < b.N; i++ { iter, _ := state.EvalsByNamespace(nil, structs.DefaultNamespace) - evalIter := evalPaginationIterator{iter} + tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true}) + var evals []*structs.Evaluation - paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error { + paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error { eval := raw.(*structs.Evaluation) evals = append(evals, eval) return nil @@ -100,9 +195,10 @@ func BenchmarkEvalListFilter(b *testing.B) { for i := 0; i < b.N; i++ { iter, _ := state.Evals(nil, false) - evalIter := evalPaginationIterator{iter} + tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true}) + var evals []*structs.Evaluation - paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error { + paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error { eval := raw.(*structs.Evaluation) evals = append(evals, eval) return nil @@ -137,9 +233,10 @@ func BenchmarkEvalListFilter(b *testing.B) { for i := 0; i < b.N; i++ { iter, _ := state.EvalsByNamespace(nil, structs.DefaultNamespace) - evalIter := evalPaginationIterator{iter} + tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true}) + var evals []*structs.Evaluation - paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error { + paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error { eval := raw.(*structs.Evaluation) evals = append(evals, eval) return nil @@ -175,9 +272,10 @@ func BenchmarkEvalListFilter(b *testing.B) { for i := 0; i < b.N; i++ { iter, _ := state.Evals(nil, false) - evalIter := evalPaginationIterator{iter} + tokenizer := NewStructsTokenizer(iter, StructsTokenizerOptions{WithID: true}) + var evals []*structs.Evaluation - paginator, err := NewPaginator(evalIter, opts, func(raw interface{}) error { + paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error { eval := raw.(*structs.Evaluation) evals = append(evals, eval) return nil @@ -193,12 +291,12 @@ func BenchmarkEvalListFilter(b *testing.B) { // ----------------- // BENCHMARK HELPER FUNCTIONS -func setupPopulatedState(b *testing.B, evalCount int) *StateStore { +func setupPopulatedState(b *testing.B, evalCount int) *state.StateStore { evals := generateEvals(evalCount) index := uint64(0) var err error - state := TestStateStore(b) + state := state.TestStateStore(b) for _, eval := range evals { index++ err = state.UpsertEvals( @@ -235,17 +333,3 @@ func generateEval(i int, ns string) *structs.Evaluation { ModifyTime: now, } } - -type evalPaginationIterator struct { - iter memdb.ResultIterator -} - -func (it evalPaginationIterator) Next() (string, interface{}) { - raw := it.iter.Next() - if raw == nil { - return "", nil - } - - eval := raw.(*structs.Evaluation) - return eval.ID, eval -} diff --git a/nomad/state/paginator.go b/nomad/state/paginator/paginator.go similarity index 64% rename from nomad/state/paginator.go rename to nomad/state/paginator/paginator.go index 607ff8cde07a..f4aa3c2feb0c 100644 --- a/nomad/state/paginator.go +++ b/nomad/state/paginator/paginator.go @@ -1,4 +1,4 @@ -package state +package paginator import ( "fmt" @@ -7,39 +7,37 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// Iterator is the interface that must be implemented to use the Paginator. +// Iterator is the interface that must be implemented to supply data to the +// Paginator. type Iterator interface { - // Next returns the next element to be considered for pagination along with - // a token string used to uniquely identify elements in the iteration. + // Next returns the next element to be considered for pagination. // The page will end if nil is returned. - // Tokens should have a stable order and the order must match the paginator - // ascending property. - Next() (string, interface{}) + Next() interface{} } -// Paginator is an iterator over a memdb.ResultIterator that returns -// only the expected number of pages. +// Paginator wraps an iterator and returns only the expected number of pages. type Paginator struct { iter Iterator + tokenizer Tokenizer + filters []Filter perPage int32 itemCount int32 seekingToken string nextToken string - ascending bool + reverse bool nextTokenFound bool pageErr error - // filterEvaluator is used to filter results using go-bexpr. It's nil if - // no filter expression is defined. - filterEvaluator *bexpr.Evaluator - // appendFunc is the function the caller should use to append raw // entries to the results set. The object is guaranteed to be // non-nil. appendFunc func(interface{}) error } -func NewPaginator(iter Iterator, opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) { +// NewPaginator returns a new Paginator. +func NewPaginator(iter Iterator, tokenizer Tokenizer, filters []Filter, + opts structs.QueryOptions, appendFunc func(interface{}) error) (*Paginator, error) { + var evaluator *bexpr.Evaluator var err error @@ -48,21 +46,23 @@ func NewPaginator(iter Iterator, opts structs.QueryOptions, appendFunc func(inte if err != nil { return nil, fmt.Errorf("failed to read filter expression: %v", err) } + filters = append(filters, evaluator) } return &Paginator{ - iter: iter, - perPage: opts.PerPage, - seekingToken: opts.NextToken, - ascending: opts.Ascending, - nextTokenFound: opts.NextToken == "", - filterEvaluator: evaluator, - appendFunc: appendFunc, + iter: iter, + tokenizer: tokenizer, + filters: filters, + perPage: opts.PerPage, + seekingToken: opts.NextToken, + reverse: opts.Reverse, + nextTokenFound: opts.NextToken == "", + appendFunc: appendFunc, }, nil } // Page populates a page by running the append function -// over all results. Returns the next token +// over all results. Returns the next token. func (p *Paginator) Page() (string, error) { DONE: for { @@ -84,34 +84,36 @@ DONE: } func (p *Paginator) next() (interface{}, paginatorState) { - token, raw := p.iter.Next() + raw := p.iter.Next() if raw == nil { p.nextToken = "" return nil, paginatorComplete } + token := p.tokenizer.GetToken(raw) // have we found the token we're seeking (if any)? p.nextToken = token var passedToken bool - if p.ascending { - passedToken = token < p.seekingToken - } else { + + if p.reverse { passedToken = token > p.seekingToken + } else { + passedToken = token < p.seekingToken } if !p.nextTokenFound && passedToken { return nil, paginatorSkip } - // apply filter if defined - if p.filterEvaluator != nil { - match, err := p.filterEvaluator.Evaluate(raw) + // apply filters if defined + for _, f := range p.filters { + allow, err := f.Evaluate(raw) if err != nil { p.pageErr = err return nil, paginatorComplete } - if !match { + if !allow { return nil, paginatorSkip } } diff --git a/nomad/state/paginator_test.go b/nomad/state/paginator/paginator_test.go similarity index 74% rename from nomad/state/paginator_test.go rename to nomad/state/paginator/paginator_test.go index 0d6f07fdac97..e3678da53ee3 100644 --- a/nomad/state/paginator_test.go +++ b/nomad/state/paginator/paginator_test.go @@ -1,4 +1,4 @@ -package state +package paginator import ( "errors" @@ -58,14 +58,14 @@ func TestPaginator(t *testing.T) { t.Run(tc.name, func(t *testing.T) { iter := newTestIterator(ids) - results := []string{} + tokenizer := testTokenizer{} + opts := structs.QueryOptions{ + PerPage: tc.perPage, + NextToken: tc.nextToken, + } - paginator, err := NewPaginator(iter, - structs.QueryOptions{ - PerPage: tc.perPage, - NextToken: tc.nextToken, - Ascending: true, - }, + results := []string{} + paginator, err := NewPaginator(iter, tokenizer, nil, opts, func(raw interface{}) error { if tc.expectedError != "" { return errors.New(tc.expectedError) @@ -94,27 +94,32 @@ func TestPaginator(t *testing.T) { // helpers for pagination tests -// implements memdb.ResultIterator interface +// implements Iterator interface type testResultIterator struct { results chan interface{} } -func (i testResultIterator) Next() (string, interface{}) { +func (i testResultIterator) Next() interface{} { select { case raw := <-i.results: if raw == nil { - return "", nil + return nil } m := raw.(*mockObject) - return m.id, m + return m default: - return "", nil + return nil } } type mockObject struct { - id string + id string + namespace string +} + +func (m *mockObject) GetNamespace() string { + return m.namespace } func newTestIterator(ids []string) testResultIterator { @@ -124,3 +129,18 @@ func newTestIterator(ids []string) testResultIterator { } return iter } + +func newTestIteratorWithMocks(mocks []*mockObject) testResultIterator { + iter := testResultIterator{results: make(chan interface{}, 20)} + for _, m := range mocks { + iter.results <- m + } + return iter +} + +// implements Tokenizer interface +type testTokenizer struct{} + +func (t testTokenizer) GetToken(raw interface{}) string { + return raw.(*mockObject).id +} diff --git a/nomad/state/paginator/tokenizer.go b/nomad/state/paginator/tokenizer.go new file mode 100644 index 000000000000..527f547a24e6 --- /dev/null +++ b/nomad/state/paginator/tokenizer.go @@ -0,0 +1,82 @@ +package paginator + +import ( + "fmt" + "strings" +) + +// Tokenizer is the interface that must be implemented to provide pagination +// tokens to the Paginator. +type Tokenizer interface { + // GetToken returns the pagination token for the given element. + GetToken(interface{}) string +} + +// IDGetter is the interface that must be implemented by structs that need to +// have their ID as part of the pagination token. +type IDGetter interface { + GetID() string +} + +// NamespaceGetter is the interface that must be implemented by structs that +// need to have their Namespace as part of the pagination token. +type NamespaceGetter interface { + GetNamespace() string +} + +// CreateIndexGetter is the interface that must be implemented by structs that +// need to have their CreateIndex as part of the pagination token. +type CreateIndexGetter interface { + GetCreateIndex() uint64 +} + +// StructsTokenizerOptions is the configuration provided to a StructsTokenizer. +type StructsTokenizerOptions struct { + WithCreateIndex bool + WithNamespace bool + WithID bool +} + +// StructsTokenizer is an pagination token generator that can create different +// formats of pagination tokens based on common fields found in the structs +// package. +type StructsTokenizer struct { + opts StructsTokenizerOptions +} + +// NewStructsTokenizer returns a new StructsTokenizer. +func NewStructsTokenizer(it Iterator, opts StructsTokenizerOptions) StructsTokenizer { + return StructsTokenizer{ + opts: opts, + } +} + +func (it StructsTokenizer) GetToken(raw interface{}) string { + if raw == nil { + return "" + } + + parts := []string{} + + if it.opts.WithCreateIndex { + token := raw.(CreateIndexGetter).GetCreateIndex() + parts = append(parts, fmt.Sprintf("%v", token)) + } + + if it.opts.WithNamespace { + token := raw.(NamespaceGetter).GetNamespace() + parts = append(parts, token) + } + + if it.opts.WithID { + token := raw.(IDGetter).GetID() + parts = append(parts, token) + } + + // Use a character that is not part of validNamespaceName as separator to + // avoid accidentally generating collisions. + // For example, namespace `a` and job `b-c` would collide with namespace + // `a-b` and job `c` into the same token `a-b-c`, since `-` is an allowed + // character in namespace. + return strings.Join(parts, ".") +} diff --git a/nomad/state/paginator/tokenizer_test.go b/nomad/state/paginator/tokenizer_test.go new file mode 100644 index 000000000000..c74fe8a67fd3 --- /dev/null +++ b/nomad/state/paginator/tokenizer_test.go @@ -0,0 +1,67 @@ +package paginator + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/stretchr/testify/require" +) + +func TestStructsTokenizer(t *testing.T) { + j := mock.Job() + + cases := []struct { + name string + opts StructsTokenizerOptions + expected string + }{ + { + name: "ID", + opts: StructsTokenizerOptions{ + WithID: true, + }, + expected: fmt.Sprintf("%v", j.ID), + }, + { + name: "Namespace.ID", + opts: StructsTokenizerOptions{ + WithNamespace: true, + WithID: true, + }, + expected: fmt.Sprintf("%v.%v", j.Namespace, j.ID), + }, + { + name: "CreateIndex.Namespace.ID", + opts: StructsTokenizerOptions{ + WithCreateIndex: true, + WithNamespace: true, + WithID: true, + }, + expected: fmt.Sprintf("%v.%v.%v", j.CreateIndex, j.Namespace, j.ID), + }, + { + name: "CreateIndex.ID", + opts: StructsTokenizerOptions{ + WithCreateIndex: true, + WithID: true, + }, + expected: fmt.Sprintf("%v.%v", j.CreateIndex, j.ID), + }, + { + name: "CreateIndex.Namespace", + opts: StructsTokenizerOptions{ + WithCreateIndex: true, + WithNamespace: true, + }, + expected: fmt.Sprintf("%v.%v", j.CreateIndex, j.Namespace), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tokenizer := StructsTokenizer{opts: tc.opts} + require.Equal(t, tc.expected, tokenizer.GetToken(j)) + }) + } +} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 5c62ae2fde02..0aafe87d2143 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -530,7 +530,7 @@ func allocTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "allocs", Indexes: map[string]*memdb.IndexSchema{ - // Primary index is a UUID + // id index is used for direct lookup of allocation by ID. "id": { Name: "id", AllowMissing: false, @@ -540,6 +540,26 @@ func allocTableSchema() *memdb.TableSchema { }, }, + // create index is used for listing allocations, ordering them by + // creation chronology. (Use a reverse iterator for newest first). + "create": { + Name: "create", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + &memdb.StringFieldIndex{ + Field: "ID", + }, + }, + }, + }, + + // namespace is used to lookup evaluations by namespace. + // todo(shoenig): i think we can deprecate this and other like it "namespace": { Name: "namespace", AllowMissing: false, @@ -549,6 +569,31 @@ func allocTableSchema() *memdb.TableSchema { }, }, + // namespace_create index is used to lookup evaluations by namespace + // in their original chronological order based on CreateIndex. + // + // Use a prefix iterator (namespace_prefix) on a Namespace to iterate + // those evaluations in order of CreateIndex. + "namespace_create": { + Name: "namespace_create", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + AllowMissing: false, + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + &memdb.StringFieldIndex{ + Field: "ID", + }, + }, + }, + }, + // Node index is used to lookup allocations by node "node": { Name: "node", @@ -728,6 +773,21 @@ func aclTokenTableSchema() *memdb.TableSchema { Field: "AccessorID", }, }, + "create": { + Name: "create", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.UintFieldIndex{ + Field: "CreateIndex", + }, + &memdb.StringFieldIndex{ + Field: "AccessorID", + }, + }, + }, + }, "secret": { Name: "secret", AllowMissing: false, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1dd9e481490f..60bf36fc1000 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -22,6 +22,19 @@ import ( // This can be a read or write transaction. type Txn = *txn +// SortOption represents how results can be sorted. +type SortOption bool + +const ( + // SortDefault indicates that the result should be returned using the + // default go-memdb ResultIterator order. + SortDefault SortOption = false + + // SortReverse indicates that the result should be returned using the + // reversed go-memdb ResultIterator order. + SortReverse SortOption = true +) + const ( // NodeRegisterEventReregistered is the message used when the node becomes // reregistered. @@ -544,16 +557,17 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return nil } -func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { +func (s *StateStore) Deployments(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() var it memdb.ResultIterator var err error - if ascending { - it, err = txn.Get("deployment", "create") - } else { + switch sort { + case SortReverse: it, err = txn.GetReverse("deployment", "create") + default: + it, err = txn.Get("deployment", "create") } if err != nil { @@ -578,7 +592,7 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) return iter, nil } -func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { +func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() var ( @@ -587,10 +601,11 @@ func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace exact = terminate(namespace) ) - if ascending { - it, err = txn.Get("deployment", "namespace_create_prefix", exact) - } else { + switch sort { + case SortReverse: it, err = txn.GetReverse("deployment", "namespace_create_prefix", exact) + default: + it, err = txn.Get("deployment", "namespace_create_prefix", exact) } if err != nil { @@ -1919,8 +1934,13 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn return nil, nil } -// JobsByIDPrefix is used to lookup a job by prefix +// JobsByIDPrefix is used to lookup a job by prefix. If querying all namespaces +// the prefix will not be filtered by an index. func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { + if namespace == structs.AllNamespacesSentinel { + return s.jobsByIDPrefixAllNamespaces(ws, id) + } + txn := s.db.ReadTxn() iter, err := txn.Get("jobs", "id_prefix", namespace, id) @@ -1933,6 +1953,30 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me return iter, nil } +func (s *StateStore) jobsByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire jobs table + iter, err := txn.Get("jobs", "id") + + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + // Filter the iterator by ID prefix + f := func(raw interface{}) bool { + job, ok := raw.(*structs.Job) + if !ok { + return true + } + return !strings.HasPrefix(job.ID, prefix) + } + wrap := memdb.NewFilterIterator(iter, f) + return wrap, nil +} + // JobVersionsByID returns all the tracked versions of a job. func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { txn := s.db.ReadTxn() @@ -3188,17 +3232,18 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]* } // Evals returns an iterator over all the evaluations in ascending or descending -// order of CreationIndex as determined by the ascending parameter. -func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) { +// order of CreationIndex as determined by the reverse parameter. +func (s *StateStore) Evals(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() var it memdb.ResultIterator var err error - if ascending { - it, err = txn.Get("evals", "create") - } else { + switch sort { + case SortReverse: it, err = txn.GetReverse("evals", "create") + default: + it, err = txn.Get("evals", "create") } if err != nil { @@ -3227,7 +3272,7 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd return it, nil } -func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) { +func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() var ( @@ -3236,10 +3281,11 @@ func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string exact = terminate(namespace) ) - if ascending { - it, err = txn.Get("evals", "namespace_create_prefix", exact) - } else { + switch sort { + case SortReverse: it, err = txn.GetReverse("evals", "namespace_create_prefix", exact) + default: + it, err = txn.Get("evals", "namespace_create_prefix", exact) } if err != nil { @@ -3609,6 +3655,10 @@ func allocNamespaceFilter(namespace string) func(interface{}) bool { return true } + if namespace == structs.AllNamespacesSentinel { + return false + } + return alloc.Namespace != namespace } } @@ -3766,19 +3816,52 @@ func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) return out, nil } -// Allocs returns an iterator over all the evaluations -func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { +// Allocs returns an iterator over all the evaluations. +func (s *StateStore) Allocs(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("allocs", "id") + var it memdb.ResultIterator + var err error + + switch sort { + case SortReverse: + it, err = txn.GetReverse("allocs", "create") + default: + it, err = txn.Get("allocs", "create") + } + if err != nil { return nil, err } - ws.Add(iter.WatchCh()) + ws.Add(it.WatchCh()) - return iter, nil + return it, nil +} + +func (s *StateStore) AllocsByNamespaceOrdered(ws memdb.WatchSet, namespace string, sort SortOption) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + var ( + it memdb.ResultIterator + err error + exact = terminate(namespace) + ) + + switch sort { + case SortReverse: + it, err = txn.GetReverse("allocs", "namespace_create_prefix", exact) + default: + it, err = txn.Get("allocs", "namespace_create_prefix", exact) + } + + if err != nil { + return nil, err + } + + ws.Add(it.WatchCh()) + + return it, nil } // AllocsByNamespace returns an iterator over all the allocations in the @@ -5464,14 +5547,22 @@ func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string } // ACLTokens returns an iterator over all the tokens -func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) { +func (s *StateStore) ACLTokens(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() - // Walk the entire table - iter, err := txn.Get("acl_token", "id") + var iter memdb.ResultIterator + var err error + + switch sort { + case SortReverse: + iter, err = txn.GetReverse("acl_token", "create") + default: + iter, err = txn.Get("acl_token", "create") + } if err != nil { return nil, err } + ws.Add(iter.WatchCh()) return iter, nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 32987465a28c..e9b58b7b871c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -670,7 +670,7 @@ func TestStateStore_Deployments(t *testing.T) { } ws := memdb.NewWatchSet() - it, err := state.Deployments(ws, true) + it, err := state.Deployments(ws, SortDefault) require.NoError(t, err) var out []*structs.Deployment @@ -5432,7 +5432,7 @@ func TestStateStore_Allocs(t *testing.T) { } ws := memdb.NewWatchSet() - iter, err := state.Allocs(ws) + iter, err := state.Allocs(ws, SortDefault) if err != nil { t.Fatalf("err: %v", err) } @@ -5480,7 +5480,7 @@ func TestStateStore_Allocs_PrevAlloc(t *testing.T) { require.Nil(err) ws := memdb.NewWatchSet() - iter, err := state.Allocs(ws) + iter, err := state.Allocs(ws, SortDefault) require.Nil(err) var out []*structs.Allocation @@ -7508,7 +7508,7 @@ func TestStateStore_BootstrapACLTokens(t *testing.T) { t.Fatalf("expected error") } - iter, err := state.ACLTokens(nil) + iter, err := state.ACLTokens(nil, SortDefault) if err != nil { t.Fatalf("err: %v", err) } @@ -7602,7 +7602,7 @@ func TestStateStore_UpsertACLTokens(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, tk2, out) - iter, err := state.ACLTokens(ws) + iter, err := state.ACLTokens(ws, SortDefault) if err != nil { t.Fatalf("err: %v", err) } @@ -7669,7 +7669,7 @@ func TestStateStore_DeleteACLTokens(t *testing.T) { t.Fatalf("bad: %#v", out) } - iter, err := state.ACLTokens(ws) + iter, err := state.ACLTokens(ws, SortDefault) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 71b28753e5b1..0a97cd9e55a8 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -318,6 +318,32 @@ type CSIVolume struct { ModifyIndex uint64 } +// GetID implements the IDGetter interface, required for pagination. +func (v *CSIVolume) GetID() string { + if v == nil { + return "" + } + return v.ID +} + +// GetNamespace implements the NamespaceGetter interface, required for +// pagination. +func (v *CSIVolume) GetNamespace() string { + if v == nil { + return "" + } + return v.Namespace +} + +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (v *CSIVolume) GetCreateIndex() uint64 { + if v == nil { + return 0 + } + return v.CreateIndex +} + // CSIVolListStub is partial representation of a CSI Volume for inclusion in lists type CSIVolListStub struct { ID string diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a67606b7ff7f..8f2974a3b883 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -273,8 +273,8 @@ type QueryOptions struct { // previous response. NextToken string - // Ascending is used to have results sorted in ascending chronological order. - Ascending bool + // Reverse is used to reverse the default order of list results. + Reverse bool InternalRpcInfo } @@ -4188,6 +4188,33 @@ func (j *Job) NamespacedID() NamespacedID { } } +// GetID implements the IDGetter interface, required for pagination. +func (j *Job) GetID() string { + if j == nil { + return "" + } + return j.ID +} + +// GetNamespace implements the NamespaceGetter interface, required for +// pagination and filtering namespaces in endpoints that support glob namespace +// requests using tokens with limited access. +func (j *Job) GetNamespace() string { + if j == nil { + return "" + } + return j.Namespace +} + +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (j *Job) GetCreateIndex() uint64 { + if j == nil { + return 0 + } + return j.CreateIndex +} + // Canonicalize is used to canonicalize fields in the Job. This should be // called when registering a Job. func (j *Job) Canonicalize() { @@ -9078,6 +9105,15 @@ func (d *Deployment) GetID() string { return d.ID } +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (d *Deployment) GetCreateIndex() uint64 { + if d == nil { + return 0 + } + return d.CreateIndex +} + // HasPlacedCanaries returns whether the deployment has placed canaries func (d *Deployment) HasPlacedCanaries() bool { if d == nil || len(d.TaskGroups) == 0 { @@ -9467,6 +9503,33 @@ type Allocation struct { ModifyTime int64 } +// GetID implements the IDGetter interface, required for pagination. +func (a *Allocation) GetID() string { + if a == nil { + return "" + } + return a.ID +} + +// GetNamespace implements the NamespaceGetter interface, required for +// pagination and filtering namespaces in endpoints that support glob namespace +// requests using tokens with limited access. +func (a *Allocation) GetNamespace() string { + if a == nil { + return "" + } + return a.Namespace +} + +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (a *Allocation) GetCreateIndex() uint64 { + if a == nil { + return 0 + } + return a.CreateIndex +} + // ConsulNamespace returns the Consul namespace of the task group associated // with this allocation. func (a *Allocation) ConsulNamespace() string { @@ -10569,6 +10632,23 @@ type Evaluation struct { ModifyTime int64 } +// GetID implements the IDGetter interface, required for pagination. +func (e *Evaluation) GetID() string { + if e == nil { + return "" + } + return e.ID +} + +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (e *Evaluation) GetCreateIndex() uint64 { + if e == nil { + return 0 + } + return e.CreateIndex +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (e *Evaluation) TerminalStatus() bool { @@ -11339,6 +11419,23 @@ type ACLToken struct { ModifyIndex uint64 } +// GetID implements the IDGetter interface, required for pagination. +func (a *ACLToken) GetID() string { + if a == nil { + return "" + } + return a.AccessorID +} + +// GetCreateIndex implements the CreateIndexGetter interface, required for +// pagination. +func (a *ACLToken) GetCreateIndex() uint64 { + if a == nil { + return 0 + } + return a.CreateIndex +} + func (a *ACLToken) Copy() *ACLToken { c := new(ACLToken) *c = *a