Skip to content

Commit

Permalink
core: track node plan rejections and mark clients as ineligible
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Jun 17, 2022
1 parent bae04d0 commit cf278ec
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 8 deletions.
44 changes: 40 additions & 4 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,38 @@ type planner struct {
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue

// badNodeTracker keeps a score for nodes that have plan rejections.
// Plan rejections are somewhat expected given Nomad's optimistic
// scheduling, but repeated rejections for the same node may indicate an
// undetected issue, so we need to track rejection history.
badNodeTracker *BadNodeTracker
}

// newPlanner returns a new planner to be used for managing allocation plans.
func newPlanner(s *Server) (*planner, error) {
log := s.logger.Named("planner")

// Create a plan queue
planQueue, err := NewPlanQueue()
if err != nil {
return nil, err
}

// Create the bad node tracker.
size := 50
badNodeTracker, err := NewBadNodeTracker(log, size,
s.config.NodePlanRejectionWindow,
s.config.NodePlanRejectionThreshold)
if err != nil {
return nil, err
}

return &planner{
Server: s,
log: s.logger.Named("planner"),
planQueue: planQueue,
Server: s,
log: log,
planQueue: planQueue,
badNodeTracker: badNodeTracker,
}, nil
}

Expand Down Expand Up @@ -209,9 +227,19 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
IneligibleNodes: make([]string, 0),
EvalID: plan.EvalID,
}

// Check the bad node history to decide if the rejected nodes should be
// marked as ineligible.
for _, nodeID := range result.RejectedNodes {
p.badNodeTracker.Add(nodeID)
if p.badNodeTracker.IsBad(nodeID) {
req.IneligibleNodes = append(req.IneligibleNodes, nodeID)
}
}

preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()

Expand Down Expand Up @@ -466,6 +494,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
// errors since we are processing in parallel.
var mErr multierror.Error
partialCommit := false
rejectedNodes := make(map[string]struct{}, 0)

// handleResult is used to process the result of evaluateNodePlan
handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) {
Expand All @@ -489,8 +518,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
"node_id", nodeID, "reason", reason, "eval_id", plan.EvalID,
"namespace", plan.Job.Namespace)
}
// Set that this is a partial commit
// Set that this is a partial commit and store the node that was
// rejected so the plan applier can detect repeated plan rejections
// for the same node.
partialCommit = true
rejectedNodes[nodeID] = struct{}{}

// If we require all-at-once scheduling, there is no point
// to continue the evaluation, as we've already failed.
Expand Down Expand Up @@ -595,6 +627,10 @@ OUTER:
// placed but wasn't actually placed
correctDeploymentCanaries(result)
}

for n := range rejectedNodes {
result.RejectedNodes = append(result.RejectedNodes, n)
}
return result, mErr.ErrorOrNil()
}

Expand Down
103 changes: 103 additions & 0 deletions nomad/plan_apply_node_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package nomad

import (
"fmt"
"time"

"github.com/hashicorp/go-hclog"
lru "github.com/hashicorp/golang-lru"
)

// BadNodeTracker keeps a record of nodes marked as bad by the plan applier.
//
// It takes a time window and a threshold score value. Node scores are
// incremented every time they are added to the tracker. If the score is above
// the threshold within the time window the node is reported as bad.
//
// The tracker uses a fixed size ARC cache that evicts old entries based on
// access frequency and recency.
type BadNodeTracker struct {
logger hclog.Logger
cache *lru.ARCCache
window time.Duration
threshold int
}

// NewBadNodeTracker returns a new BadNodeTracker.
func NewBadNodeTracker(logger hclog.Logger, size int, window time.Duration, threshold int) (*BadNodeTracker, error) {
cache, err := lru.NewARC(size)
if err != nil {
return nil, fmt.Errorf("failed to create new bad node tracker: %v", err)
}

return &BadNodeTracker{
logger: logger.Named("bad_node_tracker").
With("threshold", threshold).
With("window", window),
cache: cache,
window: window,
threshold: threshold,
}, nil
}

// IsBad returns true if the node has a score above the threshold within the
// time window.
func (t *BadNodeTracker) IsBad(nodeID string) bool {
value, ok := t.cache.Get(nodeID)
if !ok {
return false
}

stats := value.(*badNodeStats)
score := stats.score(time.Now())

t.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score)
return score > t.threshold
}

// Add adds a node to be tracked. If the node is already being tracked its
// score is incremented.
func (t *BadNodeTracker) Add(nodeID string) {
value, ok := t.cache.Get(nodeID)
if !ok {
value = newBadNodeStats(t.window)
t.cache.Add(nodeID, value)
}

stats := value.(*badNodeStats)
score := stats.incrementScore()
t.logger.Debug("marking node as bad", "node_id", nodeID, "score", score)
}

// badNodeStats represents a node being tracked by BadNodeTracker.
type badNodeStats struct {
count int
ts time.Time
window time.Duration
}

func newBadNodeStats(window time.Duration) *badNodeStats {
return &badNodeStats{
count: 0,
ts: time.Now(),
window: window,
}
}

// score returns the current count for the node or 0 if the time window has
// expired.
func (s *badNodeStats) score(t time.Time) int {
expiry := s.ts.Add(s.window)
if t.After(expiry) {
s.count = 0
}
return s.count
}

// incrementScore adds 1 to the score and updates the timestamp to move the
// expiration date forward.
func (s *badNodeStats) incrementScore() int {
s.count += 1
s.ts = time.Now()
return s.count
}
99 changes: 99 additions & 0 deletions nomad/plan_apply_node_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package nomad

import (
"fmt"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestBadNodeTracker(t *testing.T) {
ci.Parallel(t)

cacheSize := 3
tracker, err := NewBadNodeTracker(
hclog.NewNullLogger(), cacheSize, time.Second, 10)
require.NoError(t, err)

for i := 0; i < 10; i++ {
tracker.Add(fmt.Sprintf("node-%d", i+1))
}

require.Equal(t, cacheSize, tracker.cache.Len())

// Only track the most recent values.
expected := []string{"node-8", "node-9", "node-10"}
require.ElementsMatch(t, expected, tracker.cache.Keys())
}

func TestBadNodeTracker_IsBad(t *testing.T) {
ci.Parallel(t)

window := time.Duration(testutil.TestMultiplier()) * time.Second
tracker, err := NewBadNodeTracker(hclog.NewNullLogger(), 3, window, 4)
require.NoError(t, err)

// Populate cache.
tracker.Add("node-1")

tracker.Add("node-2")
tracker.Add("node-2")

tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")
tracker.Add("node-3")

testCases := []struct {
name string
nodeID string
bad bool
}{
{
name: "node-1 is not bad",
nodeID: "node-1",
bad: false,
},
{
name: "node-3 is bad",
nodeID: "node-3",
bad: true,
},
{
name: "node not tracked is not bad",
nodeID: "node-1000",
bad: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := tracker.IsBad(tc.nodeID)
require.Equal(t, tc.bad, got)
})
}

t.Run("cache expires", func(t *testing.T) {
time.Sleep(window)
require.False(t, tracker.IsBad("node-1"))
require.False(t, tracker.IsBad("node-2"))
require.False(t, tracker.IsBad("node-3"))
})

t.Run("IsBad updates cache", func(t *testing.T) {
// Don't access node-3 so it should be evicted when a new value is
// added and the tracker size overflows.
tracker.IsBad("node-1")
tracker.IsBad("node-2")
tracker.Add("node-4")

expected := []string{"node-1", "node-2", "node-4"}
require.ElementsMatch(t, expected, tracker.cache.Keys())
})
}
36 changes: 34 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ const (
)

const (
// NodeEligibilityEventPlanRejectThreshold is the message used when the node
// is set to ineligible due to multiple plan failures.
// This is a preventive measure to signal scheduler workers to not consider
// the node for future placements.
// Plan rejections for a node are expected due to the optimistic and
// concurrent nature of the scheduling process, but repeated failures for
// the same node may indicate an underlying issue not detected by Nomad.
// The plan applier keeps track of plan rejection history and will mark
// nodes as ineligible if they cross a given threshold.
NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections"

// NodeRegisterEventRegistered is the message used when the node becomes
// registered.
NodeRegisterEventRegistered = "Node registered"
Expand Down Expand Up @@ -359,6 +370,22 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()

// Mark nodes as ineligible.
now := time.Now().Unix()
for _, nodeID := range results.IneligibleNodes {
s.logger.Warn("marking node as ineligible due to multiple plan rejections", "node_id", nodeID)

nodeEvent := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemScheduler).
SetMessage(NodeEligibilityEventPlanRejectThreshold)

err := s.updateNodeEligibilityImpl(index, nodeID,
structs.NodeSchedulingIneligible, now, nodeEvent, txn)
if err != nil {
return err
}
}

// Upsert the newly created or updated deployment
if results.Deployment != nil {
if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil {
Expand Down Expand Up @@ -1136,10 +1163,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,

// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {

txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil {
return err
}
return txn.Commit()
}

func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error {
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
Expand Down Expand Up @@ -1176,7 +1208,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
return nil
}

// UpsertNodeEvents adds the node events to the nodes, rotating events as
Expand Down
Loading

0 comments on commit cf278ec

Please sign in to comment.