From 7caddce7e62e91d4763ec9fd9e27d7d57faa9f22 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 19 Sep 2022 14:55:41 -0400 Subject: [PATCH] shed evaluations in the node RPC when eval broker has blocked evals When a node updates its fingerprint or status, we need to create new evaluations to ensure that jobs waiting for resources get a chance to be evaluated. But in the case of a cluster with a large backup of evaluations and flapping nodes, we can get a large backlog of evaluations for the same job. Most of these will be canceled but we still need to write the evaluation in raft and then write its deletion in raft. This changeset proposes that we avoid creating evals for jobs that already have a blocked eval in the eval broker. A blocked eval means that the broker already has work in-flight *and* work waiting to be re-enqueued, so it's safe to drop the evaluation. --- nomad/eval_broker.go | 8 ++++++++ nomad/node_endpoint.go | 18 ++++++++++++++++++ nomad/node_endpoint_test.go | 9 +++++++++ 3 files changed, 35 insertions(+) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17258..eb76b25639e5 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -876,6 +876,14 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { } } +// HasBlockedEval is used to check if the eval broker already has a blocked eval for this job. +func (b *EvalBroker) HasBlockedEval(namespacedJobID structs.NamespacedID) bool { + b.l.Lock() + defer b.l.Unlock() + _, ok := b.blocked[namespacedJobID] + return ok +} + // BrokerStats returns all the stats about the broker type BrokerStats struct { TotalReady int diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 689b1082e777..e1b6597fc1af 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1455,6 +1455,9 @@ func (n *Node) List(args *structs.NodeListRequest, // createNodeEvals is used to create evaluations for each alloc on a node. // Each Eval is scoped to a job, so we need to potentially trigger many evals. func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) { + + broker := n.srv.evalBroker + nodeID := node.ID // Snapshot the state @@ -1477,6 +1480,14 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, var sysJobs []*structs.Job for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() { job := jobI.(*structs.Job) + + // Avoid creating evals for jobs that already have a blocked eval. A + // blocked eval means that the broker already has work in-flight *and* + // work waiting to be re-enqueued, so it's safe to drop the evaluation. + if broker.HasBlockedEval(structs.NewNamespacedID(job.ID, job.Namespace)) { + continue + } + // Avoid creating evals for jobs that don't run in this // datacenter. We could perform an entire feasibility check // here, but datacenter is a good optimization to start with as @@ -1508,6 +1519,13 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, } jobIDs[alloc.JobNamespacedID()] = struct{}{} + // Avoid creating evals for jobs that already have a blocked eval. A + // blocked eval means that the broker already has work in-flight *and* + // work waiting to be re-enqueued, so it's safe to drop the evaluation. + if broker.HasBlockedEval(alloc.JobNamespacedID()) { + continue + } + // Create a new eval eval := &structs.Evaluation{ ID: uuid.Generate(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 899f51470cba..6b50bc3db079 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -654,6 +654,15 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { t.Fatalf("expected one eval; got %#v", resp.EvalIDs) } + // wait until the previous eval has been enqueued + id := structs.NewNamespacedID(job.ID, job.Namespace) + testutil.Wait(t, func() (bool, error) { + if s1.evalBroker.HasBlockedEval(id) { + return false, fmt.Errorf("previous eval is still blocked") + } + return true, nil + }) + node.Status = structs.NodeStatusReady reg = &structs.NodeRegisterRequest{ Node: node,