Skip to content

Commit

Permalink
shed evaluations in the node RPC when eval broker has blocked evals
Browse files Browse the repository at this point in the history
When a node updates its fingerprint or status, we need to create new evaluations
to ensure that jobs waiting for resources get a chance to be evaluated. But in
the case of a cluster with a large backup of evaluations and flapping nodes, we
can get a large backlog of evaluations for the same job. Most of these will be
canceled but we still need to write the evaluation in raft and then write its
deletion in raft.

This changeset proposes that we avoid creating evals for jobs that already have
a blocked eval in the eval broker. A blocked eval means that the broker already
has work in-flight *and* work waiting to be re-enqueued, so it's safe to drop
the evaluation.
  • Loading branch information
tgross committed Sep 19, 2022
1 parent 78e2985 commit 7caddce
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
8 changes: 8 additions & 0 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,14 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
}
}

// HasBlockedEval is used to check if the eval broker already has a blocked eval for this job.
func (b *EvalBroker) HasBlockedEval(namespacedJobID structs.NamespacedID) bool {
b.l.Lock()
defer b.l.Unlock()
_, ok := b.blocked[namespacedJobID]
return ok
}

// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
Expand Down
18 changes: 18 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,9 @@ func (n *Node) List(args *structs.NodeListRequest,
// createNodeEvals is used to create evaluations for each alloc on a node.
// Each Eval is scoped to a job, so we need to potentially trigger many evals.
func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, uint64, error) {

broker := n.srv.evalBroker

nodeID := node.ID

// Snapshot the state
Expand All @@ -1477,6 +1480,14 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string,
var sysJobs []*structs.Job
for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() {
job := jobI.(*structs.Job)

// Avoid creating evals for jobs that already have a blocked eval. A
// blocked eval means that the broker already has work in-flight *and*
// work waiting to be re-enqueued, so it's safe to drop the evaluation.
if broker.HasBlockedEval(structs.NewNamespacedID(job.ID, job.Namespace)) {
continue
}

// Avoid creating evals for jobs that don't run in this
// datacenter. We could perform an entire feasibility check
// here, but datacenter is a good optimization to start with as
Expand Down Expand Up @@ -1508,6 +1519,13 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string,
}
jobIDs[alloc.JobNamespacedID()] = struct{}{}

// Avoid creating evals for jobs that already have a blocked eval. A
// blocked eval means that the broker already has work in-flight *and*
// work waiting to be re-enqueued, so it's safe to drop the evaluation.
if broker.HasBlockedEval(alloc.JobNamespacedID()) {
continue
}

// Create a new eval
eval := &structs.Evaluation{
ID: uuid.Generate(),
Expand Down
9 changes: 9 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,15 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

// wait until the previous eval has been enqueued
id := structs.NewNamespacedID(job.ID, job.Namespace)
testutil.Wait(t, func() (bool, error) {
if s1.evalBroker.HasBlockedEval(id) {
return false, fmt.Errorf("previous eval is still blocked")
}
return true, nil
})

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
Expand Down

0 comments on commit 7caddce

Please sign in to comment.