Skip to content

Commit

Permalink
use named value to indicate reverse sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Mar 7, 2022
1 parent 837884c commit 934c9a8
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 92 deletions.
38 changes: 19 additions & 19 deletions helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,28 +185,28 @@ func (f *FSMHelper) StateAsMap() map[string][]interface{} {
}

// StateAsMap returns a json-able representation of the state
func StateAsMap(state *state.StateStore) map[string][]interface{} {
func StateAsMap(store *state.StateStore) map[string][]interface{} {
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil, false)),
"Allocs": toArray(state.Allocs(nil, false)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
"Jobs": toArray(state.Jobs(nil)),
"Nodes": toArray(state.Nodes(nil)),
"PeriodicLaunches": toArray(state.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(state.SITokenAccessors(nil)),
"ScalingEvents": toArray(state.ScalingEvents(nil)),
"ScalingPolicies": toArray(state.ScalingPolicies(nil)),
"VaultAccessors": toArray(state.VaultAccessors(nil)),
"ACLPolicies": toArray(store.ACLPolicies(nil)),
"ACLTokens": toArray(store.ACLTokens(nil, state.SortedDefault)),
"Allocs": toArray(store.Allocs(nil, state.SortedDefault)),
"CSIPlugins": toArray(store.CSIPlugins(nil)),
"CSIVolumes": toArray(store.CSIVolumes(nil)),
"Deployments": toArray(store.Deployments(nil, state.SortedDefault)),
"Evals": toArray(store.Evals(nil, state.SortedDefault)),
"Indexes": toArray(store.Indexes()),
"JobSummaries": toArray(store.JobSummaries(nil)),
"JobVersions": toArray(store.JobVersions(nil)),
"Jobs": toArray(store.Jobs(nil)),
"Nodes": toArray(store.Nodes(nil)),
"PeriodicLaunches": toArray(store.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(store.SITokenAccessors(nil)),
"ScalingEvents": toArray(store.ScalingEvents(nil)),
"ScalingPolicies": toArray(store.ScalingPolicies(nil)),
"VaultAccessors": toArray(store.VaultAccessors(nil)),
}

insertEnterpriseState(result, state)
insertEnterpriseState(result, store)

return result

Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws, false)
iter, err := c.snap.Deployments(ws, state.SortedDefault)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
}

// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws, false)
iter, err := store.Deployments(ws, state.SortedDefault)
if err != nil {
return nil, 0, err
}
Expand All @@ -220,7 +220,7 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (in
}

// Use the last index that affected the deployment table
index, err := state.Index("deployment")
index, err := store.Index("deployment")
if err != nil {
return nil, 0, err
}
Expand Down
14 changes: 7 additions & 7 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,9 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
}

// Wait for the two allocations to be placed
state := s1.State()
store := s1.State()
testutil.WaitForResult(func() (bool, error) {
iter, err := state.Allocs(nil, false)
iter, err := store.Allocs(nil, state.SortedDefault)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -974,11 +974,11 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger)

testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
allocs, err := store.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
Expand All @@ -992,7 +992,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
Expand All @@ -1002,7 +1002,7 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
node, err := store.NodeByID(nil, n1.ID)
require.NoError(err)
// sometimes test gets a duplicate node drain complete event
require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events)
Expand Down
16 changes: 8 additions & 8 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,16 +1704,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {

// failLeakedDeployments is used to fail deployments that do not have a job.
// This state is a broken invariant that should not occur since 0.8.X.
func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
func (n *nomadFSM) failLeakedDeployments(store *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil, false)
iter, err := store.Deployments(nil, state.SortedDefault)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}

dindex, err := state.Index("deployment")
dindex, err := store.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}
Expand All @@ -1733,7 +1733,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
}

// Find the job
job, err := state.JobByID(nil, d.Namespace, d.JobID)
job, err := store.JobByID(nil, d.Namespace, d.JobID)
if err != nil {
return fmt.Errorf("failed to lookup job %s from deployment %q: %v", d.JobID, d.ID, err)
}
Expand All @@ -1747,7 +1747,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
failed := d.Copy()
failed.Status = structs.DeploymentStatusCancelled
failed.StatusDescription = structs.DeploymentStatusDescriptionStoppedJob
if err := state.UpsertDeployment(dindex, failed); err != nil {
if err := store.UpsertDeployment(dindex, failed); err != nil {
return fmt.Errorf("failed to mark leaked deployment %q as failed: %v", failed.ID, err)
}
}
Expand Down Expand Up @@ -2099,7 +2099,7 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the allocations
ws := memdb.NewWatchSet()
allocs, err := s.snap.Allocs(ws, false)
allocs, err := s.snap.Allocs(ws, state.SortedDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws, false)
deployments, err := s.snap.Deployments(ws, state.SortedDefault)
if err != nil {
return err
}
Expand Down Expand Up @@ -2306,7 +2306,7 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the policies
ws := memdb.NewWatchSet()
tokens, err := s.snap.ACLTokens(ws, false)
tokens, err := s.snap.ACLTokens(ws, state.SortedDefault)
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions nomad/search_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,42 +394,42 @@ func wildcard(namespace string) bool {
return namespace == structs.AllNamespacesSentinel
}

func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, state *state.StateStore) (memdb.ResultIterator, error) {
func getFuzzyResourceIterator(context structs.Context, aclObj *acl.ACL, namespace string, ws memdb.WatchSet, store *state.StateStore) (memdb.ResultIterator, error) {
switch context {
case structs.Jobs:
if wildcard(namespace) {
iter, err := state.Jobs(ws)
iter, err := store.Jobs(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.JobsByNamespace(ws, namespace)
return store.JobsByNamespace(ws, namespace)

case structs.Allocs:
if wildcard(namespace) {
iter, err := state.Allocs(ws, false)
iter, err := store.Allocs(ws, state.SortedDefault)
return nsCapIterFilter(iter, err, aclObj)
}
return state.AllocsByNamespace(ws, namespace)
return store.AllocsByNamespace(ws, namespace)

case structs.Nodes:
if wildcard(namespace) {
iter, err := state.Nodes(ws)
iter, err := store.Nodes(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.Nodes(ws)
return store.Nodes(ws)

case structs.Plugins:
if wildcard(namespace) {
iter, err := state.CSIPlugins(ws)
iter, err := store.CSIPlugins(ws)
return nsCapIterFilter(iter, err, aclObj)
}
return state.CSIPlugins(ws)
return store.CSIPlugins(ws)

case structs.Namespaces:
iter, err := state.Namespaces(ws)
iter, err := store.Namespaces(ws)
return nsCapIterFilter(iter, err, aclObj)

default:
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, state)
return getEnterpriseFuzzyResourceIter(context, aclObj, namespace, ws, store)
}
}

Expand Down
5 changes: 4 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventReregistered = "Node re-registered"

SortedDefault = false
SortedReverse = true
)

// terminate appends the go-memdb terminator character to s.
Expand Down Expand Up @@ -3215,7 +3218,7 @@ func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*
}

// Evals returns an iterator over all the evaluations in ascending or descending
// order of CreationIndex as determined by the ascending parameter.
// order of CreationIndex as determined by the reverse parameter.
func (s *StateStore) Evals(ws memdb.WatchSet, reverse bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

Expand Down
Loading

0 comments on commit 934c9a8

Please sign in to comment.