diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8c..454b3cea2b9e 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -25,20 +25,38 @@ type planner struct { // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue + + // badNodeTracker keeps a score for nodes that have plan rejections. + // Plan rejections are somewhat expected given Nomad's optimistic + // scheduling, but repeated rejections for the same node may indicate an + // undetected issue, so we need to track rejection history. + badNodeTracker *BadNodeTracker } // newPlanner returns a new planner to be used for managing allocation plans. func newPlanner(s *Server) (*planner, error) { + log := s.logger.Named("planner") + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { return nil, err } + // Create the bad node tracker. + size := 50 + badNodeTracker, err := NewBadNodeTracker(log, size, + s.config.NodePlanRejectionWindow, + s.config.NodePlanRejectionThreshold) + if err != nil { + return nil, err + } + return &planner{ - Server: s, - log: s.logger.Named("planner"), - planQueue: planQueue, + Server: s, + log: log, + planQueue: planQueue, + badNodeTracker: badNodeTracker, }, nil } @@ -209,9 +227,19 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, + IneligibleNodes: make([]string, 0), EvalID: plan.EvalID, } + // Check the bad node history to decide if the rejected nodes should be + // marked as ineligible. + for _, nodeID := range result.RejectedNodes { + p.badNodeTracker.Add(nodeID) + if p.badNodeTracker.IsBad(nodeID) { + req.IneligibleNodes = append(req.IneligibleNodes, nodeID) + } + } + preemptedJobIDs := make(map[structs.NamespacedID]struct{}) now := time.Now().UTC().UnixNano() @@ -466,6 +494,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan // errors since we are processing in parallel. var mErr multierror.Error partialCommit := false + rejectedNodes := make(map[string]struct{}, 0) // handleResult is used to process the result of evaluateNodePlan handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) { @@ -489,8 +518,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan "node_id", nodeID, "reason", reason, "eval_id", plan.EvalID, "namespace", plan.Job.Namespace) } - // Set that this is a partial commit + // Set that this is a partial commit and store the node that was + // rejected so the plan applier can detect repeated plan rejections + // for the same node. partialCommit = true + rejectedNodes[nodeID] = struct{}{} // If we require all-at-once scheduling, there is no point // to continue the evaluation, as we've already failed. @@ -595,6 +627,10 @@ OUTER: // placed but wasn't actually placed correctDeploymentCanaries(result) } + + for n := range rejectedNodes { + result.RejectedNodes = append(result.RejectedNodes, n) + } return result, mErr.ErrorOrNil() } diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go new file mode 100644 index 000000000000..aab1b9333b97 --- /dev/null +++ b/nomad/plan_apply_node_tracker.go @@ -0,0 +1,103 @@ +package nomad + +import ( + "fmt" + "time" + + "github.com/hashicorp/go-hclog" + lru "github.com/hashicorp/golang-lru" +) + +// BadNodeTracker keeps a record of nodes marked as bad by the plan applier. +// +// It takes a time window and a threshold score value. Node scores are +// incremented every time they are added to the tracker. If the score is above +// the threshold within the time window the node is reported as bad. +// +// The tracker uses a fixed size ARC cache that evicts old entries based on +// access frequency and recency. +type BadNodeTracker struct { + logger hclog.Logger + cache *lru.ARCCache + window time.Duration + threshold int +} + +// NewBadNodeTracker returns a new BadNodeTracker. +func NewBadNodeTracker(logger hclog.Logger, size int, window time.Duration, threshold int) (*BadNodeTracker, error) { + cache, err := lru.NewARC(size) + if err != nil { + return nil, fmt.Errorf("failed to create new bad node tracker: %v", err) + } + + return &BadNodeTracker{ + logger: logger.Named("bad_node_tracker"). + With("threshold", threshold). + With("window", window), + cache: cache, + window: window, + threshold: threshold, + }, nil +} + +// IsBad returns true if the node has a score above the threshold within the +// time window. +func (t *BadNodeTracker) IsBad(nodeID string) bool { + value, ok := t.cache.Get(nodeID) + if !ok { + return false + } + + stats := value.(*badNodeStats) + score := stats.score(time.Now()) + + t.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score) + return score > t.threshold +} + +// Add adds a node to be tracked. If the node is already being tracked its +// score is incremented. +func (t *BadNodeTracker) Add(nodeID string) { + value, ok := t.cache.Get(nodeID) + if !ok { + value = newBadNodeStats(t.window) + t.cache.Add(nodeID, value) + } + + stats := value.(*badNodeStats) + score := stats.incrementScore() + t.logger.Debug("marking node as bad", "node_id", nodeID, "score", score) +} + +// badNodeStats represents a node being tracked by BadNodeTracker. +type badNodeStats struct { + count int + ts time.Time + window time.Duration +} + +func newBadNodeStats(window time.Duration) *badNodeStats { + return &badNodeStats{ + count: 0, + ts: time.Now(), + window: window, + } +} + +// score returns the current count for the node or 0 if the time window has +// expired. +func (s *badNodeStats) score(t time.Time) int { + expiry := s.ts.Add(s.window) + if t.After(expiry) { + s.count = 0 + } + return s.count +} + +// incrementScore adds 1 to the score and updates the timestamp to move the +// expiration date forward. +func (s *badNodeStats) incrementScore() int { + s.count += 1 + s.ts = time.Now() + return s.count +} diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go new file mode 100644 index 000000000000..74bd094427eb --- /dev/null +++ b/nomad/plan_apply_node_tracker_test.go @@ -0,0 +1,99 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestBadNodeTracker(t *testing.T) { + ci.Parallel(t) + + cacheSize := 3 + tracker, err := NewBadNodeTracker( + hclog.NewNullLogger(), cacheSize, time.Second, 10) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + tracker.Add(fmt.Sprintf("node-%d", i+1)) + } + + require.Equal(t, cacheSize, tracker.cache.Len()) + + // Only track the most recent values. + expected := []string{"node-8", "node-9", "node-10"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) +} + +func TestBadNodeTracker_IsBad(t *testing.T) { + ci.Parallel(t) + + window := time.Duration(testutil.TestMultiplier()) * time.Second + tracker, err := NewBadNodeTracker(hclog.NewNullLogger(), 3, window, 4) + require.NoError(t, err) + + // Populate cache. + tracker.Add("node-1") + + tracker.Add("node-2") + tracker.Add("node-2") + + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + + testCases := []struct { + name string + nodeID string + bad bool + }{ + { + name: "node-1 is not bad", + nodeID: "node-1", + bad: false, + }, + { + name: "node-3 is bad", + nodeID: "node-3", + bad: true, + }, + { + name: "node not tracked is not bad", + nodeID: "node-1000", + bad: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tracker.IsBad(tc.nodeID) + require.Equal(t, tc.bad, got) + }) + } + + t.Run("cache expires", func(t *testing.T) { + time.Sleep(window) + require.False(t, tracker.IsBad("node-1")) + require.False(t, tracker.IsBad("node-2")) + require.False(t, tracker.IsBad("node-3")) + }) + + t.Run("IsBad updates cache", func(t *testing.T) { + // Don't access node-3 so it should be evicted when a new value is + // added and the tracker size overflows. + tracker.IsBad("node-1") + tracker.IsBad("node-2") + tracker.Add("node-4") + + expected := []string{"node-1", "node-2", "node-4"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) + }) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda054..6c02733fb602 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -34,6 +34,17 @@ const ( ) const ( + // NodeEligibilityEventPlanRejectThreshold is the message used when the node + // is set to ineligible due to multiple plan failures. + // This is a preventive measure to signal scheduler workers to not consider + // the node for future placements. + // Plan rejections for a node are expected due to the optimistic and + // concurrent nature of the scheduling process, but repeated failures for + // the same node may indicate an underlying issue not detected by Nomad. + // The plan applier keeps track of plan rejection history and will mark + // nodes as ineligible if they cross a given threshold. + NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections" + // NodeRegisterEventRegistered is the message used when the node becomes // registered. NodeRegisterEventRegistered = "Node registered" @@ -359,6 +370,22 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Mark nodes as ineligible. + now := time.Now().Unix() + for _, nodeID := range results.IneligibleNodes { + s.logger.Warn("marking node as ineligible due to multiple plan rejections", "node_id", nodeID) + + nodeEvent := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemScheduler). + SetMessage(NodeEligibilityEventPlanRejectThreshold) + + err := s.updateNodeEligibilityImpl(index, nodeID, + structs.NodeSchedulingIneligible, now, nodeEvent, txn) + if err != nil { + return err + } + } + // Upsert the newly created or updated deployment if results.Deployment != nil { if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil { @@ -1136,10 +1163,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil { + return err + } + return txn.Commit() +} +func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1176,7 +1208,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui return fmt.Errorf("index update failed: %v", err) } - return txn.Commit() + return nil } // UpsertNodeEvents adds the node events to the nodes, rotating events as diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b376ebfaa144..4d9c3dc6cfb4 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -913,6 +913,11 @@ type ApplyPlanResultsRequest struct { // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan PreemptionEvals []*Evaluation + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string } // AllocUpdateRequest is used to submit changes to allocations, either @@ -1632,6 +1637,7 @@ const ( NodeEventSubsystemDriver = "Driver" NodeEventSubsystemHeartbeat = "Heartbeat" NodeEventSubsystemCluster = "Cluster" + NodeEventSubsystemScheduler = "Scheduler" NodeEventSubsystemStorage = "Storage" ) @@ -11399,12 +11405,18 @@ type PlanResult struct { // AllocIndex is the Raft index in which the evictions and // allocations took place. This is used for the write index. AllocIndex uint64 + + // RejectedNodes are nodes the plan applier has rejected placements for and + // should therefore be considered for ineligibility to avoid retrying them + // repeatedly. + RejectedNodes []string } // IsNoOp checks if this plan result would do nothing func (p *PlanResult) IsNoOp() bool { - return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && - len(p.DeploymentUpdates) == 0 && p.Deployment == nil + return len(p.RejectedNodes) == 0 && len(p.NodeUpdate) == 0 && + len(p.NodeAllocation) == 0 && len(p.DeploymentUpdates) == 0 && + p.Deployment == nil } // FullCommit is used to check if all the allocations in a plan