Skip to content

Commit

Permalink
use named value to indicate reverse sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Mar 8, 2022
1 parent f7392b8 commit cb8c6d0
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 84 deletions.
38 changes: 19 additions & 19 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,28 +185,28 @@ func (f *FSMHelper) StateAsMap() map[string][]interface{} {
}

// StateAsMap returns a json-able representation of the state
func StateAsMap(state *state.StateStore) map[string][]interface{} {
func StateAsMap(store *state.StateStore) map[string][]interface{} {
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil, false)),
"Allocs": toArray(state.Allocs(nil, false)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
"Jobs": toArray(state.Jobs(nil)),
"Nodes": toArray(state.Nodes(nil)),
"PeriodicLaunches": toArray(state.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(state.SITokenAccessors(nil)),
"ScalingEvents": toArray(state.ScalingEvents(nil)),
"ScalingPolicies": toArray(state.ScalingPolicies(nil)),
"VaultAccessors": toArray(state.VaultAccessors(nil)),
"ACLPolicies": toArray(store.ACLPolicies(nil)),
"ACLTokens": toArray(store.ACLTokens(nil, state.SortDefault)),
"Allocs": toArray(store.Allocs(nil, state.SortDefault)),
"CSIPlugins": toArray(store.CSIPlugins(nil)),
"CSIVolumes": toArray(store.CSIVolumes(nil)),
"Deployments": toArray(store.Deployments(nil, state.SortDefault)),
"Evals": toArray(store.Evals(nil, state.SortDefault)),
"Indexes": toArray(store.Indexes()),
"JobSummaries": toArray(store.JobSummaries(nil)),
"JobVersions": toArray(store.JobVersions(nil)),
"Jobs": toArray(store.Jobs(nil)),
"Nodes": toArray(store.Nodes(nil)),
"PeriodicLaunches": toArray(store.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(store.SITokenAccessors(nil)),
"ScalingEvents": toArray(store.ScalingEvents(nil)),
"ScalingPolicies": toArray(store.ScalingPolicies(nil)),
"VaultAccessors": toArray(store.VaultAccessors(nil)),
}

insertEnterpriseState(result, state)
insertEnterpriseState(result, store)

return result

Expand Down
3 changes: 2 additions & 1 deletion nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo
}

// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
Expand All @@ -674,7 +675,7 @@ func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTo
WithID: true,
}
} else {
iter, err = state.ACLTokens(ws, args.Reverse)
iter, err = state.ACLTokens(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
Expand Down
5 changes: 3 additions & 2 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
}

// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
Expand All @@ -84,13 +85,13 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = state.AllocsByNamespaceOrdered(ws, namespace, args.Reverse)
iter, err = state.AllocsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = state.Allocs(ws, args.Reverse)
iter, err = state.Allocs(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws, false)
iter, err := c.snap.Deployments(ws, state.SortDefault)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
}

// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
Expand All @@ -418,13 +419,13 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.Reverse)
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = store.Deployments(ws, args.Reverse)
iter, err = store.Deployments(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
Expand Down
6 changes: 3 additions & 3 deletions nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
}

// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws, false)
iter, err := store.Deployments(ws, state.SortDefault)
if err != nil {
return nil, 0, err
}
Expand All @@ -220,7 +220,7 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (in
}

// Use the last index that affected the deployment table
index, err := state.Index("deployment")
index, err := store.Index("deployment")
if err != nil {
return nil, 0, err
}
Expand Down
14 changes: 7 additions & 7 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,9 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
}

// Wait for the two allocations to be placed
state := s1.State()
store := s1.State()
testutil.WaitForResult(func() (bool, error) {
iter, err := state.Allocs(nil, false)
iter, err := store.Allocs(nil, state.SortDefault)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -974,11 +974,11 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger)

testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
allocs, err := store.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
Expand All @@ -992,7 +992,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
Expand All @@ -1002,7 +1002,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
require.NoError(err)
// sometimes test gets a duplicate node drain complete event
require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events)
Expand Down
5 changes: 3 additions & 2 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
}

// Setup the blocking query
sort := state.SortOption(args.Reverse)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
Expand All @@ -423,13 +424,13 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
WithID: true,
}
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.Reverse)
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
}
} else {
iter, err = store.Evals(ws, args.Reverse)
iter, err = store.Evals(ws, sort)
opts = paginator.StructsTokenizerOptions{
WithCreateIndex: true,
WithID: true,
Expand Down
16 changes: 8 additions & 8 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,16 +1704,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {

// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
func (n *nomadFSM) failLeakedDeployments(store *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil, false)
iter, err := store.Deployments(nil, state.SortDefault)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}

dindex, err := state.Index("deployment")
dindex, err := store.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}
Expand All @@ -1733,7 +1733,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
}

// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
job, err := store.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}
Expand All @@ -1747,7 +1747,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
if err := store.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}
Expand Down Expand Up @@ -2099,7 +2099,7 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
ws := memdb.NewWatchSet()
allocs, err := s.snap.Allocs(ws, false)
allocs, err := s.snap.Allocs(ws, state.SortDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws, false)
deployments, err := s.snap.Deployments(ws, state.SortDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -2306,7 +2306,7 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the policies
ws := memdb.NewWatchSet()
tokens, err := s.snap.ACLTokens(ws, false)
tokens, err := s.snap.ACLTokens(ws, state.SortDefault)
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions nomad/search_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,42 +394,42 @@ func wildcard(namespace string) bool {
return namespace == structs.AllNamespacesSentinel
}

func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, state *state.StateStore) (memdb.ResultIterator, error) {
func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, store *state.StateStore) (memdb.ResultIterator, error) {
switch context {
case structs.Jobs:
if wildcard(namespace) {
iter, err := state.Jobs(ws)
iter, err := store.Jobs(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.JobsByNamespace(ws, namespace)
return store.JobsByNamespace(ws, namespace)

case structs.Allocs:
if wildcard(namespace) {
iter, err := state.Allocs(ws, false)
iter, err := store.Allocs(ws, state.SortDefault)
return nsCapIterFilter(iter, err, aclObj)
}
return state.AllocsByNamespace(ws, namespace)
return store.AllocsByNamespace(ws, namespace)

case structs.Nodes:
if wildcard(namespace) {
iter, err := state.Nodes(ws)
iter, err := store.Nodes(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.Nodes(ws)
return store.Nodes(ws)

case structs.Plugins:
if wildcard(namespace) {
iter, err := state.CSIPlugins(ws)
iter, err := store.CSIPlugins(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.CSIPlugins(ws)
return store.CSIPlugins(ws)

case structs.Namespaces:
iter, err := state.Namespaces(ws)
iter, err := store.Namespaces(ws)
return nsCapIterFilter(iter, err, aclObj)

default:
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, state)
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, store)
}
}

Expand Down
Loading

0 comments on commit cb8c6d0

Please sign in to comment.