diff --git a/.changelog/12955.txt b/.changelog/12955.txt new file mode 100644 index 000000000000..cc0dc3c51785 --- /dev/null +++ b/.changelog/12955.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: On node updates skip creating evaluations for jobs not in the node's datacenter. +``` diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 645747476468..39b870186009 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -171,7 +171,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp // Check if we should trigger evaluations if shouldCreateNodeEval(originalNode, args.Node) { - evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index) + evalIDs, evalIndex, err := n.createNodeEvals(args.Node, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err @@ -350,15 +350,16 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, return err } - ws := memdb.NewWatchSet() + nodes := make([]*structs.Node, 0, len(args.NodeIDs)) for _, nodeID := range args.NodeIDs { - node, err := snap.NodeByID(ws, nodeID) + node, err := snap.NodeByID(nil, nodeID) if err != nil { return err } if node == nil { return fmt.Errorf("node not found") } + nodes = append(nodes, node) } // Commit this update via Raft @@ -368,19 +369,21 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, return err } - for _, nodeID := range args.NodeIDs { + for _, node := range nodes { + nodeID := node.ID + // Clear the heartbeat timer if any n.srv.clearHeartbeatTimer(nodeID) // Create the evaluations for this node - evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) + evalIDs, evalIndex, err := n.createNodeEvals(node, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err } // Determine if there are any Vault accessors on the node - if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil { + if accessors, err := snap.VaultAccessorsByNode(nil, nodeID); err != nil { n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err) return err } else if l := len(accessors); l > 0 { @@ -392,7 +395,7 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, } // Determine if there are any SI token accessors on the node - if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil { + if accessors, err := snap.SITokenAccessorsByNode(nil, nodeID); err != nil { n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err) return err } else if l := len(accessors); l > 0 { @@ -490,7 +493,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Check if we should trigger evaluations if structs.ShouldDrainNode(args.Status) || nodeStatusTransitionRequiresEval(args.Status, node.Status) { - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) + evalIDs, evalIndex, err := n.createNodeEvals(node, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err @@ -658,7 +661,7 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, // If the node is transitioning to be eligible, create Node evaluations // because there may be a System job registered that should be evaluated. if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil { - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) + evalIDs, evalIndex, err := n.createNodeEvals(node, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err @@ -754,7 +757,7 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, // If the node is transitioning to be eligible, create Node evaluations // because there may be a System job registered that should be evaluated. if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible { - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) + evalIDs, evalIndex, err := n.createNodeEvals(node, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err @@ -802,7 +805,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp } // Create the evaluation - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex) + evalIDs, evalIndex, err := n.createNodeEvals(node, node.ModifyIndex) if err != nil { n.logger.Error("eval creation failed", "error", err) return err @@ -1444,7 +1447,9 @@ func (n *Node) List(args *structs.NodeListRequest, // createNodeEvals is used to create evaluations for each alloc on a node. // Each Eval is scoped to a job, so we need to potentially trigger many evals. -func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { +func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) { + nodeID := node.ID + // Snapshot the state snap, err := n.srv.fsm.State().Snapshot() if err != nil { @@ -1452,20 +1457,30 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6 } // Find all the allocations for this node - ws := memdb.NewWatchSet() - allocs, err := snap.AllocsByNode(ws, nodeID) + allocs, err := snap.AllocsByNode(nil, nodeID) if err != nil { return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) } - sysJobsIter, err := snap.JobsByScheduler(ws, "system") + sysJobsIter, err := snap.JobsByScheduler(nil, "system") if err != nil { return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err) } var sysJobs []*structs.Job - for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() { - sysJobs = append(sysJobs, job.(*structs.Job)) + for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() { + job := jobI.(*structs.Job) + // Avoid creating evals for jobs that don't run in this + // datacenter. We could perform an entire feasibility check + // here, but datacenter is a good optimization to start with as + // datacenter cardinality tends to be low so the check + // shouldn't add much work. + for _, dc := range job.Datacenters { + if dc == node.Datacenter { + sysJobs = append(sysJobs, job) + break + } + } } // Fast-path if nothing to do diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index b0e35ad283dd..d4f24153a696 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2677,23 +2677,32 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) { s1, cleanupS1 := TestServer(t, nil) defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + idx, err := state.LatestIndex() + require.NoError(t, err) + + node := mock.Node() + err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node) + require.NoError(t, err) + idx++ // Inject fake evaluations alloc := mock.Alloc() - state := s1.fsm.State() + alloc.NodeID = node.ID state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) - if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, idx, []*structs.Allocation{alloc})) + idx++ // Inject a fake system job. job := mock.SystemJob() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil { + if err := state.UpsertJob(structs.MsgTypeTestSetup, idx, job); err != nil { t.Fatalf("err: %v", err) } + idx++ // Create some evaluations - ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1) + ids, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1) if err != nil { t.Fatalf("err: %v", err) } @@ -2790,7 +2799,7 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) { idx++ // Create some evaluations - evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node.ID, 1) + evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1) require.NoError(t, err) require.NotZero(t, index) require.Len(t, evalIDs, 2) @@ -2815,6 +2824,51 @@ func TestClientEndpoint_CreateNodeEvals_MultipleNSes(t *testing.T) { require.Equal(t, nsJob.Namespace, otherNSEval.Namespace) } +// TestClientEndpoint_CreateNodeEvals_MultipleDCes asserts that evals are made +// only for the DC the node is in. +func TestClientEndpoint_CreateNodeEvals_MultipleDCes(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + state := s1.fsm.State() + + idx, err := state.LatestIndex() + require.NoError(t, err) + + node := mock.Node() + node.Datacenter = "test1" + err = state.UpsertNode(structs.MsgTypeTestSetup, idx, node) + require.NoError(t, err) + idx++ + + // Inject a fake system job in the same dc + defaultJob := mock.SystemJob() + defaultJob.Datacenters = []string{"test1", "test2"} + err = state.UpsertJob(structs.MsgTypeTestSetup, idx, defaultJob) + require.NoError(t, err) + idx++ + + // Inject a fake system job in a different dc + nsJob := mock.SystemJob() + nsJob.Datacenters = []string{"test2", "test3"} + err = state.UpsertJob(structs.MsgTypeTestSetup, idx, nsJob) + require.NoError(t, err) + idx++ + + // Create evaluations + evalIDs, index, err := s1.staticEndpoints.Node.createNodeEvals(node, 1) + require.NoError(t, err) + require.NotZero(t, index) + require.Len(t, evalIDs, 1) + + eval, err := state.EvalByID(nil, evalIDs[0]) + require.NoError(t, err) + require.Equal(t, defaultJob.ID, eval.JobID) +} + func TestClientEndpoint_Evaluate(t *testing.T) { ci.Parallel(t)