Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: emit node evals only for sys jobs in dc #12955

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12955.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core: On node updates skip creating evaluations for jobs not in the node's datacenter.
```
49 changes: 32 additions & 17 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp

// Check if we should trigger evaluations
if shouldCreateNodeEval(originalNode, args.Node) {
evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
evalIDs, evalIndex, err := n.createNodeEvals(args.Node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -350,15 +350,16 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return err
}

ws := memdb.NewWatchSet()
nodes := make([]*structs.Node, 0, len(args.NodeIDs))
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
nodes = append(nodes, node)
}

// Commit this update via Raft
Expand All @@ -368,19 +369,21 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return err
}

for _, nodeID := range args.NodeIDs {
for _, node := range nodes {
nodeID := node.ID

// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)

// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

// Determine if there are any Vault accessors on the node
if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil {
if accessors, err := snap.VaultAccessorsByNode(nil, nodeID); err != nil {
n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err)
return err
} else if l := len(accessors); l > 0 {
Expand All @@ -392,7 +395,7 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
}

// Determine if there are any SI token accessors on the node
if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil {
if accessors, err := snap.SITokenAccessorsByNode(nil, nodeID); err != nil {
n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err)
return err
} else if l := len(accessors); l > 0 {
Expand Down Expand Up @@ -490,7 +493,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Check if we should trigger evaluations
if structs.ShouldDrainNode(args.Status) ||
nodeStatusTransitionRequiresEval(args.Status, node.Status) {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -658,7 +661,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -754,7 +757,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
// If the node is transitioning to be eligible, create Node evaluations
// because there may be a System job registered that should be evaluated.
if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
evalIDs, evalIndex, err := n.createNodeEvals(node, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -802,7 +805,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
}

// Create the evaluation
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex)
evalIDs, evalIndex, err := n.createNodeEvals(node, node.ModifyIndex)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
Expand Down Expand Up @@ -1444,28 +1447,40 @@ func (n *Node) List(args *structs.NodeListRequest,

// createNodeEvals is used to create evaluations for each alloc on a node.
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) {
nodeID := node.ID

// Snapshot the state
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return nil, 0, fmt.Errorf("failed to snapshot state: %v", err)
}

// Find all the allocations for this node
ws := memdb.NewWatchSet()
allocs, err := snap.AllocsByNode(ws, nodeID)
allocs, err := snap.AllocsByNode(nil, nodeID)
if err != nil {
return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
}

sysJobsIter, err := snap.JobsByScheduler(ws, "system")
sysJobsIter, err := snap.JobsByScheduler(nil, "system")
if err != nil {
return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
}

var sysJobs []*structs.Job
for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() {
sysJobs = append(sysJobs, job.(*structs.Job))
for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() {
job := jobI.(*structs.Job)
// Avoid creating evals for jobs that don't run in this
// datacenter. We could perform an entire feasibility check
// here, but datacenter is a good optimization to start with as
// datacenter cardinality tends to be low so the check
// shouldn't add much work.
for _, dc := range job.Datacenters {
if dc == node.Datacenter {
sysJobs = append(sysJobs, job)
break
}
}
}

// Fast-path if nothing to do
Expand Down
68 changes: 61 additions & 7 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,23 +2677,32 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

idx, err := state.LatestIndex()
require.NoError(t, err)

node := mock.Node()
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++

// Inject fake evaluations
alloc := mock.Alloc()
state := s1.fsm.State()
alloc.NodeID = node.ID
state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc}))
idx++

// Inject a fake system job.
job := mock.SystemJob()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, idx, job); err != nil {
t.Fatalf("err: %v", err)
}
idx++

// Create some evaluations
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -2790,7 +2799,7 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
idx++

// Create some evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1)
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 2)
Expand All @@ -2815,6 +2824,51 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) {
require.Equal(t, nsJob.Namespace, otherNSEval.Namespace)
}

// TestClientEndpoint_CreateNodeEvals_MultipleDCes asserts that evals are made
// only for the DC the node is in.
func TestClientEndpoint_CreateNodeEvals_MultipleDCes(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

state := s1.fsm.State()

idx, err := state.LatestIndex()
require.NoError(t, err)

node := mock.Node()
node.Datacenter = "test1"
err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node)
require.NoError(t, err)
idx++

// Inject a fake system job in the same dc
defaultJob := mock.SystemJob()
defaultJob.Datacenters = []string{"test1", "test2"}
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob)
require.NoError(t, err)
idx++

// Inject a fake system job in a different dc
nsJob := mock.SystemJob()
nsJob.Datacenters = []string{"test2", "test3"}
err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob)
require.NoError(t, err)
idx++

// Create evaluations
evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1)
require.NoError(t, err)
require.NotZero(t, index)
require.Len(t, evalIDs, 1)

eval, err := state.EvalByID(nil, evalIDs[0])
require.NoError(t, err)
require.Equal(t, defaultJob.ID, eval.JobID)
}

func TestClientEndpoint_Evaluate(t *testing.T) {
ci.Parallel(t)

Expand Down