Skip to content

Commit

Permalink
backport of commit e7d9773 (#13628)
Browse files Browse the repository at this point in the history
This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Jul 6, 2022
1 parent 260ea61 commit 701cd78
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 24 deletions.
3 changes: 3 additions & 0 deletions .changelog/12955.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: On node updates skip creating evaluations for jobs not in the node's datacenter.
```
49 changes: 32 additions & 17 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp

// Check if we should trigger evaluations
if shouldCreateNodeEval(originalNode, args.Node) {
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
evalIDs, evalIndex, err := n.createNodeEvals(args.Node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -350,15 +350,16 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return err
}

ws := memdb.NewWatchSet()
nodes := make([]*structs.Node, 0, len(args.NodeIDs))
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
nodes = append(nodes, node)
}

// Commit this update via Raft
Expand All @@ -368,19 +369,21 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return err
}

for _, nodeID := range args.NodeIDs {
for _, node := range nodes {
nodeID := node.ID

// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)

// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

// Determine if there are any Vault accessors on the node
if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil {
if accessors, err := snap.VaultAccessorsByNode(nil, nodeID); err != nil {
n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err)
return err
} else if l := len(accessors); l > 0 {
Expand All @@ -392,7 +395,7 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
}

// Determine if there are any SI token accessors on the node
if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil {
if accessors, err := snap.SITokenAccessorsByNode(nil, nodeID); err != nil {
n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err)
return err
} else if l := len(accessors); l > 0 {
Expand Down Expand Up @@ -490,7 +493,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Status) ||
nodeStatusTransitionRequiresEval(args.Status, node.Status) {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -658,7 +661,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -754,7 +757,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -802,7 +805,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
}

// Create the evaluation
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex)
evalIDs, evalIndex, err := n.createNodeEvals(node, node.ModifyIndex)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -1444,28 +1447,40 @@ func (n *Node) List(args *structs.NodeListRequest,

// createNodeEvals is used to create evaluations for each alloc on a node.
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) {
nodeID := node.ID

// Snapshot the state
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return nil, 0, fmt.Errorf("failed to snapshot state: %v", err)
}

// Find all the allocations for this node
ws := memdb.NewWatchSet()
allocs, err := snap.AllocsByNode(ws, nodeID)
allocs, err := snap.AllocsByNode(nil, nodeID)
if err != nil {
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
}

sysJobsIter, err := snap.JobsByScheduler(ws, "system")
sysJobsIter, err := snap.JobsByScheduler(nil, "system")
if err != nil {
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
}

var sysJobs []*structs.Job
for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() {
sysJobs = append(sysJobs, job.(*structs.Job))
for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() {
job := jobI.(*structs.Job)
// Avoid creating evals for jobs that don't run in this
// datacenter. We could perform an entire feasibility check
// here, but datacenter is a good optimization to start with as
// datacenter cardinality tends to be low so the check
// shouldn't add much work.
for _, dc := range job.Datacenters {
if dc == node.Datacenter {
sysJobs = append(sysJobs, job)
break
}
}
}

// Fast-path if nothing to do
Expand Down
68 changes: 61 additions & 7 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,23 +2677,32 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

idx, err := state.LatestIndex()
require.NoError(t, err)

node := mock.Node()
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++

// Inject fake evaluations
alloc := mock.Alloc()
state := s1.fsm.State()
alloc.NodeID = node.ID
state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc}))
idx++

// Inject a fake system job.
job := mock.SystemJob()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, idx, job); err != nil {
t.Fatalf("err: %v", err)
}
idx++

// Create some evaluations
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -2790,7 +2799,7 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
idx++

// Create some evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1)
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 2)
Expand All @@ -2815,6 +2824,51 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
}

// TestClientEndpoint_CreateNodeEvals_MultipleDCes asserts that evals are made
// only for the DC the node is in.
func TestClientEndpoint_CreateNodeEvals_MultipleDCes(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

state := s1.fsm.State()

idx, err := state.LatestIndex()
require.NoError(t, err)

node := mock.Node()
node.Datacenter = "test1"
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++

// Inject a fake system job in the same dc
defaultJob := mock.SystemJob()
defaultJob.Datacenters = []string{"test1", "test2"}
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob)
require.NoError(t, err)
idx++

// Inject a fake system job in a different dc
nsJob := mock.SystemJob()
nsJob.Datacenters = []string{"test2", "test3"}
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob)
require.NoError(t, err)
idx++

// Create evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 1)

eval, err := state.EvalByID(nil, evalIDs[0])
require.NoError(t, err)
require.Equal(t, defaultJob.ID, eval.JobID)
}

func TestClientEndpoint_Evaluate(t *testing.T) {
ci.Parallel(t)

Expand Down

0 comments on commit 701cd78

Please sign in to comment.