Skip to content

Commit

Permalink
wip deployments busted
Browse files Browse the repository at this point in the history
  • Loading branch information
shoenig committed Feb 14, 2022
1 parent 6f22656 commit 8e0202c
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 40 deletions.
2 changes: 1 addition & 1 deletion helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func StateAsMap(state *state.StateStore) map[string][]interface{} {
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Deployments": toArray(state.Deployments(nil, false)),
"Evals": toArray(state.Evals(nil, false)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws)
iter, err := c.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions nomad/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,14 @@ func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.De
// Capture all the deployments
var err error
var iter memdb.ResultIterator
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = store.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if args.RequestNamespace() == structs.AllNamespacesSentinel {
iter, err = store.Deployments(ws)
iter, err = store.DeploymentsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
iter, err = store.DeploymentsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
iter, err = store.DeploymentsByNamespace(ws, args.RequestNamespace())
iter, err = store.Deployments(ws, args.OrderAscending)
}
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *Watcher) getDeploys(ctx context.Context, minIndex uint64) ([]*structs.D
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {

iter, err := state.Deployments(ws)
iter, err := state.Deployments(ws, false)
if err != nil {
return nil, 0, err
}
Expand Down
5 changes: 1 addition & 4 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,10 @@ func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListRespon
namespace := args.RequestNamespace()

if prefix := args.QueryOptions.Prefix; prefix != "" {
fmt.Println("SH EvalsByIDPrefix", "ns:", namespace, "prefix:", prefix)
iter, err = store.EvalsByIDPrefix(ws, namespace, prefix)
} else if namespace != structs.AllNamespacesSentinel {
fmt.Println("SH EvalsByNamespace", "ns:", namespace)
iter, err = store.EvalsByNamespace(ws, namespace, args.OrderAscending)
iter, err = store.EvalsByNamespaceOrdered(ws, namespace, args.OrderAscending)
} else {
fmt.Println("SH Evals", "ascending:", args.OrderAscending)
iter, err = store.Evals(ws, args.OrderAscending)
}
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// Scan for deployments that are referencing a job that no longer exists.
// This could happen if multiple deployments were created for a given job
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
iter, err := state.Deployments(nil, false)
if err != nil {
return fmt.Errorf("failed to query deployments: %v", err)
}
Expand Down Expand Up @@ -2250,7 +2250,7 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
ws := memdb.NewWatchSet()
deployments, err := s.snap.Deployments(ws)
deployments, err := s.snap.Deployments(ws, false)
if err != nil {
return err
}
Expand Down
84 changes: 67 additions & 17 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func deploymentSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "deployment",
Indexes: map[string]*memdb.IndexSchema{
// id index is used for direct lookup of an deployment by ID.
"id": {
Name: "id",
AllowMissing: false,
Expand All @@ -312,6 +313,20 @@ func deploymentSchema() *memdb.TableSchema {
},
},

// create index is used for listing deploy, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
//
// There may be more than one deployment per CreateIndex.
"create": {
Name: "create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UUIDFieldIndex{
Field: "CreateIndex",
},
},

// namespace is used to lookup evaluations by namespace.
"namespace": {
Name: "namespace",
AllowMissing: false,
Expand All @@ -321,7 +336,31 @@ func deploymentSchema() *memdb.TableSchema {
},
},

// Job index is used to lookup deployments by job
// namespace_create index is used to lookup deployments by namespace
// in their original chronological order based on CreateIndex.
//
// Use a prefix iterator (namespace_create_prefix) to iterate deployments
// of a Namespace in order of CreateIndex.
//
// There may be more than one deployment per CreateIndex.
"namespace_create": {
Name: "namespace_create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
AllowMissing: false,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
},

// job index is used to lookup deployments by job
"job": {
Name: "job",
AllowMissing: false,
Expand Down Expand Up @@ -384,7 +423,7 @@ func evalTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "evals",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for direct lookup of an evaluation by ID.
// id index is used for direct lookup of an evaluation by ID.
"id": {
Name: "id",
AllowMissing: false,
Expand All @@ -394,7 +433,18 @@ func evalTableSchema() *memdb.TableSchema {
},
},

// Job index is used to lookup evaluations by job ID.
// create index is used for listing evaluations, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
"create": {
Name: "create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UintFieldIndex{
Field: "CreateIndex",
},
},

// job index is used to lookup evaluations by job ID.
"job": {
Name: "job",
AllowMissing: false,
Expand All @@ -418,12 +468,23 @@ func evalTableSchema() *memdb.TableSchema {
},
},

// Namespace is used to lookup evaluations by namespace.
// namespace is used to lookup evaluations by namespace.
"namespace": {
Name: "namespace",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Namespace",
},
},

// namespace_create index is used to lookup evaluations by namespace
// in their original chronological order based on CreateIndex.
//
// Use a prefix iterator (namespace_prefix) on a Namespace to iterate
// those evaluations in order of CreateIndex.
"namespace": {
Name: "namespace",
"namespace_create": {
Name: "namespace_create",
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
Expand All @@ -438,17 +499,6 @@ func evalTableSchema() *memdb.TableSchema {
},
},
},

// Creation index is used for listing evaluations, ordering them by
// creation chronology. (Use a reverse iterator for newest first).
"creation": {
Name: "creation",
AllowMissing: false,
Unique: false,
Indexer: &memdb.UintFieldIndex{
Field: "CreateIndex",
},
},
},
}
}
Expand Down
66 changes: 56 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,17 +536,25 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
return nil
}

func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) {
func (s *StateStore) Deployments(ws memdb.WatchSet, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

// Walk the entire deployments table
iter, err := txn.Get("deployment", "id")
var it memdb.ResultIterator
var err error

if ascending {
it, err = txn.Get("deployment", "create")
} else {
it, err = txn.GetReverse("deployment", "create")
}

if err != nil {
return nil, err
}

ws.Add(iter.WatchCh())
return iter, nil
ws.Add(it.WatchCh())

return it, nil
}

func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
Expand All @@ -562,6 +570,27 @@ func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string)
return iter, nil
}

func (s *StateStore) DeploymentsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

var it memdb.ResultIterator
var err error

if ascending {
it, err = txn.Get("deployment", "namespace_create_prefix", namespace)
} else {
it, err = txn.GetReverse("deployment", "namespace_create_prefix")
}

if err != nil {
return nil, err
}

ws.Add(it.WatchCh())

return it, nil
}

func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

Expand Down Expand Up @@ -3121,9 +3150,9 @@ func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultItera
var err error

if ascending {
it, err = txn.Get("evals", "creation")
it, err = txn.Get("evals", "create")
} else {
it, err = txn.GetReverse("evals", "creation")
it, err = txn.GetReverse("evals", "create")
}

if err != nil {
Expand All @@ -3135,16 +3164,33 @@ func (s *StateStore) Evals(ws memdb.WatchSet, ascending bool) (memdb.ResultItera
return it, nil
}

func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
// EvalsByNamespace returns an iterator over all evaluations in no particular
// order.
//
// todo(shoenig): can this be removed?
func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

it, err := txn.Get("evals", "namespace", namespace)
if err != nil {
return nil, err
}

ws.Add(it.WatchCh())

return it, nil
}

func (s *StateStore) EvalsByNamespaceOrdered(ws memdb.WatchSet, namespace string, ascending bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

var it memdb.ResultIterator
var err error

if ascending {
it, err = txn.Get("evals", "namespace_prefix", namespace)
it, err = txn.Get("evals", "namespace_create_prefix", namespace)
} else {
it, err = txn.GetReverse("evals", "namespace_prefix")
it, err = txn.GetReverse("evals", "namespace_create_prefix")
}

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ type GenericRequest struct {

// DeploymentListRequest is used to list the deployments
type DeploymentListRequest struct {
OrderAscending bool
QueryOptions
}

Expand Down

0 comments on commit 8e0202c

Please sign in to comment.