Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: apply consistent behaviour of the reverse query parameter #12244

Merged
merged 8 commits into from
Mar 12, 2022
4 changes: 2 additions & 2 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,12 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo
var opts paginator.StructsTokenizerOptions

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ACLTokenByAccessorIDPrefix(ws, prefix)
iter, err = state.ACLTokenByAccessorIDPrefix(ws, prefix, sort)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
} else if args.GlobalOnly {
iter, err = state.ACLTokensByGlobal(ws, true)
iter, err = state.ACLTokensByGlobal(ws, true, sort)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
return err
} else {
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.AllocsByIDPrefix(ws, namespace, prefix)
iter, err = state.AllocsByIDPrefix(ws, namespace, prefix, sort)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
var opts paginator.StructsTokenizerOptions

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix, sort)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
var opts paginator.StructsTokenizerOptions

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix, sort)
opts = paginator.StructsTokenizerOptions{
WithID: true,
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,13 +1524,13 @@ ERR_WAIT:
// diffACLTokens is used to perform a two-way diff between the local
// tokens and the remote tokens to determine which tokens need to
// be deleted or updated.
func diffACLTokens(state *state.StateStore, minIndex uint64, remoteList []*structs.ACLTokenListStub) (delete []string, update []string) {
func diffACLTokens(store *state.StateStore, minIndex uint64, remoteList []*structs.ACLTokenListStub) (delete []string, update []string) {
// Construct a set of the local and remote policies
local := make(map[string][]byte)
remote := make(map[string]struct{})

// Add all the local global tokens
iter, err := state.ACLTokensByGlobal(nil, true)
iter, err := store.ACLTokensByGlobal(nil, true, state.SortDefault)
if err != nil {
panic("failed to iterate local tokens")
}
Expand Down
12 changes: 6 additions & 6 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,11 +2063,11 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.ModifyTime = now
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
store := s1.fsm.State()
store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
start := time.Now()
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
err := store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -2101,7 +2101,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Allocs)
}

iter, err := state.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID)
iter, err := store.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID, state.SortDefault)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -2133,8 +2133,8 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
allocUpdate.NodeID = alloc.NodeID
allocUpdate.ID = alloc.ID
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate})
store.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
err := store.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
22 changes: 11 additions & 11 deletions nomad/search_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,26 +355,26 @@ func sortSet(matches []fuzzyMatch) {

// getResourceIter takes a context and returns a memdb iterator specific to
// that context
func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix string, ws memdb.WatchSet, state *state.StateStore) (memdb.ResultIterator, error) {
func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix string, ws memdb.WatchSet, store *state.StateStore) (memdb.ResultIterator, error) {
switch context {
case structs.Jobs:
return state.JobsByIDPrefix(ws, namespace, prefix)
return store.JobsByIDPrefix(ws, namespace, prefix)
case structs.Evals:
return state.EvalsByIDPrefix(ws, namespace, prefix)
return store.EvalsByIDPrefix(ws, namespace, prefix, state.SortDefault)
case structs.Allocs:
return state.AllocsByIDPrefix(ws, namespace, prefix)
return store.AllocsByIDPrefix(ws, namespace, prefix, state.SortDefault)
case structs.Nodes:
return state.NodesByIDPrefix(ws, prefix)
return store.NodesByIDPrefix(ws, prefix)
case structs.Deployments:
return state.DeploymentsByIDPrefix(ws, namespace, prefix)
return store.DeploymentsByIDPrefix(ws, namespace, prefix, state.SortDefault)
case structs.Plugins:
return state.CSIPluginsByIDPrefix(ws, prefix)
return store.CSIPluginsByIDPrefix(ws, prefix)
case structs.ScalingPolicies:
return state.ScalingPoliciesByIDPrefix(ws, namespace, prefix)
return store.ScalingPoliciesByIDPrefix(ws, namespace, prefix)
case structs.Volumes:
return state.CSIVolumesByIDPrefix(ws, namespace, prefix)
return store.CSIVolumesByIDPrefix(ws, namespace, prefix)
case structs.Namespaces:
iter, err := state.NamespacesByNamePrefix(ws, prefix)
iter, err := store.NamespacesByNamePrefix(ws, prefix)
if err != nil {
return nil, err
}
Expand All @@ -383,7 +383,7 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix
}
return memdb.NewFilterIterator(iter, nsCapFilter(aclObj)), nil
default:
return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, state)
return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, store)
}
}

Expand Down
62 changes: 52 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,19 @@ func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace
return it, nil
}

func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) {
func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

var iter memdb.ResultIterator
var err error

// Walk the entire deployments table
iter, err := txn.Get("deployment", "id_prefix", deploymentID)
switch sort {
case SortReverse:
iter, err = txn.GetReverse("deployment", "id_prefix", deploymentID)
default:
iter, err = txn.Get("deployment", "id_prefix", deploymentID)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3171,11 +3179,19 @@ func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation

// EvalsByIDPrefix is used to lookup evaluations by prefix in a particular
// namespace
func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

var iter memdb.ResultIterator
var err error

// Get an iterator over all evals by the id prefix
iter, err := txn.Get("evals", "id_prefix", id)
switch sort {
case SortReverse:
iter, err = txn.GetReverse("evals", "id_prefix", id)
default:
iter, err = txn.Get("evals", "id_prefix", id)
}
if err != nil {
return nil, fmt.Errorf("eval lookup failed: %v", err)
}
Expand Down Expand Up @@ -3631,10 +3647,18 @@ func (s *StateStore) allocByIDImpl(txn Txn, ws memdb.WatchSet, id string) (*stru
}

// AllocsByIDPrefix is used to lookup allocs by prefix
func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("allocs", "id_prefix", id)
var iter memdb.ResultIterator
var err error

switch sort {
case SortReverse:
iter, err = txn.GetReverse("allocs", "id_prefix", id)
default:
iter, err = txn.Get("allocs", "id_prefix", id)
}
if err != nil {
return nil, fmt.Errorf("alloc lookup failed: %v", err)
}
Expand Down Expand Up @@ -5535,13 +5559,22 @@ func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*st
}

// ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix
func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("acl_token", "id_prefix", prefix)
var iter memdb.ResultIterator
var err error

switch sort {
case SortReverse:
iter, err = txn.GetReverse("acl_token", "id_prefix", prefix)
default:
iter, err = txn.Get("acl_token", "id_prefix", prefix)
}
if err != nil {
return nil, fmt.Errorf("acl token lookup failed: %v", err)
}

ws.Add(iter.WatchCh())
return iter, nil
}
Expand All @@ -5568,14 +5601,23 @@ func (s *StateStore) ACLTokens(ws memdb.WatchSet, sort SortOption) (memdb.Result
}

// ACLTokensByGlobal returns an iterator over all the tokens filtered by global value
func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) {
func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool, sort SortOption) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

var iter memdb.ResultIterator
var err error

// Walk the entire table
iter, err := txn.Get("acl_token", "global", globalVal)
switch sort {
case SortReverse:
iter, err = txn.GetReverse("acl_token", "global", globalVal)
default:
iter, err = txn.Get("acl_token", "global", globalVal)
}
if err != nil {
return nil, err
}

ws.Add(iter.WatchCh())
return iter, nil
}
Expand Down
Loading