Skip to content

Commit

Permalink
allow enabling the node rejection tracker and limit the rate nodes ar…
Browse files Browse the repository at this point in the history
…e marked ineligible
  • Loading branch information
lgfa29 committed Jul 7, 2022
1 parent ff15de9 commit 67f42b8
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 45 deletions.
1 change: 1 addition & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {

// Set plan rejection tracker configuration.
if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil {
conf.NodePlanRejectionEnabled = planRejectConf.Enabled
conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold

if planRejectConf.NodeWindow == 0 {
Expand Down
6 changes: 6 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,12 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) {
{
name: "valid config",
trackerConfig: &PlanRejectionTracker{
Enabled: true,
NodeThreshold: 123,
NodeWindow: 17 * time.Minute,
},
expectedConfig: &PlanRejectionTracker{
Enabled: true,
NodeThreshold: 123,
NodeWindow: 17 * time.Minute,
},
Expand Down Expand Up @@ -420,6 +422,10 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) {
require.Contains(t, err.Error(), tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t,
tc.expectedConfig.Enabled,
serverConfig.NodePlanRejectionEnabled,
)
require.Equal(t,
tc.expectedConfig.NodeThreshold,
serverConfig.NodePlanRejectionThreshold,
Expand Down
11 changes: 9 additions & 2 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ type RaftBoltConfig struct {
// PlanRejectionTracker is used in servers to configure the plan rejection
// tracker.
type PlanRejectionTracker struct {
// Enabled controls if the plan rejection tracker is active or not.
Enabled bool `hcl:"enabled"`

// NodeThreshold is the number of times a node can have plan rejections
// before it is marked as ineligible.
NodeThreshold int `hcl:"node_threshold"`
Expand All @@ -579,6 +582,10 @@ func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTrac
return &result
}

if b.Enabled {
result.Enabled = true
}

if b.NodeThreshold != 0 {
result.NodeThreshold = b.NodeThreshold
}
Expand Down Expand Up @@ -1030,8 +1037,8 @@ func DefaultConfig() *Config {
RaftProtocol: 3,
StartJoin: []string{},
PlanRejectionTracker: &PlanRejectionTracker{
NodeThreshold: 15,
NodeWindow: 10 * time.Minute,
NodeThreshold: 100,
NodeWindow: 5 * time.Minute,
},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
Expand Down
1 change: 1 addition & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var basicConfig = &Config{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: true,
NodeThreshold: 100,
NodeWindow: 41 * time.Minute,
NodeWindowHCL: "41m",
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestConfig_Merge(t *testing.T) {
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: true,
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
Expand Down Expand Up @@ -348,6 +349,7 @@ func TestConfig_Merge(t *testing.T) {
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(100),
PlanRejectionTracker: &PlanRejectionTracker{
Enabled: true,
NodeThreshold: 100,
NodeWindow: 11 * time.Minute,
},
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ server {
event_buffer_size = 200

plan_rejection_tracker {
enabled = true
node_threshold = 100
node_window = "41m"
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@
"non_voting_server": true,
"num_schedulers": 2,
"plan_rejection_tracker": {
"enabled": true,
"node_threshold": 100,
"node_window": "41m"
},
Expand Down
4 changes: 4 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ type Config struct {
// additional delay is selected from this range randomly.
EvalFailedFollowupDelayRange time.Duration

// NodePlanRejectionEnabled controls if node rejection tracker is enabled.
NodePlanRejectionEnabled bool

// NodePlanRejectionThreshold is the number of times a node must have a
// plan rejection before it is set as ineligible.
NodePlanRejectionThreshold int
Expand Down Expand Up @@ -403,6 +406,7 @@ func DefaultConfig() *Config {
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
NodePlanRejectionEnabled: false,
NodePlanRejectionThreshold: 15,
NodePlanRejectionWindow: 10 * time.Minute,
ConsulConfig: config.DefaultConsulConfig(),
Expand Down
21 changes: 14 additions & 7 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type planner struct {
// Plan rejections are somewhat expected given Nomad's optimistic
// scheduling, but repeated rejections for the same node may indicate an
// undetected issue, so we need to track rejection history.
badNodeTracker *BadNodeTracker
badNodeTracker BadNodeTracker
}

// newPlanner returns a new planner to be used for managing allocation plans.
Expand All @@ -44,12 +44,19 @@ func newPlanner(s *Server) (*planner, error) {
}

// Create the bad node tracker.
size := 50
badNodeTracker, err := NewBadNodeTracker(log, size,
s.config.NodePlanRejectionWindow,
s.config.NodePlanRejectionThreshold)
if err != nil {
return nil, err
var badNodeTracker BadNodeTracker
if s.config.NodePlanRejectionEnabled {
config := DefaultCachedBadNodeTrackerConfig()

config.Window = s.config.NodePlanRejectionWindow
config.Threshold = s.config.NodePlanRejectionThreshold

badNodeTracker, err = NewCachedBadNodeTracker(log, config)
if err != nil {
return nil, err
}
} else {
badNodeTracker = &NoopBadNodeTracker{}
}

return &planner{
Expand Down
102 changes: 75 additions & 27 deletions nomad/plan_apply_node_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,121 @@ import (
"github.com/hashicorp/go-hclog"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/helper"
"golang.org/x/time/rate"
)

// BadNodeTracker keeps a record of nodes marked as bad by the plan applier.
type BadNodeTracker interface {
IsBad(string) bool
Add(string)
EmitStats(time.Duration, <-chan struct{})
}

// NoopBadNodeTracker is a no-op implementation of bad node tracker that is
// used when tracking is disabled.
type NoopBadNodeTracker struct{}

func (n *NoopBadNodeTracker) Add(string) {}
func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {}
func (n *NoopBadNodeTracker) IsBad(string) bool {
return false
}

// CachedBadNodeTracker keeps a record of nodes marked as bad by the plan
// applier in a LRU cache.
//
// It takes a time window and a threshold value. Plan rejections for a node
// will be registered with its timestamp. If the number of rejections within
// the time window is greater than the threshold the node is reported as bad.
//
// The tracker uses a fixed size cache that evicts old entries based on access
// frequency and recency.
type BadNodeTracker struct {
type CachedBadNodeTracker struct {
logger hclog.Logger
cache *lru.TwoQueueCache
limiter *rate.Limiter
window time.Duration
threshold int
}

// NewBadNodeTracker returns a new BadNodeTracker.
func NewBadNodeTracker(logger hclog.Logger, size int, window time.Duration, threshold int) (*BadNodeTracker, error) {
cache, err := lru.New2Q(size)
type CachedBadNodeTrackerConfig struct {
CacheSize int
RateLimit float64
BurstSize int
Window time.Duration
Threshold int
}

func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig {
return CachedBadNodeTrackerConfig{
CacheSize: 50,

// Limit marking 5 nodes per 30min as ineligible with an initial
// burst of 10 nodes.
RateLimit: 5 / (30 * 60),
BurstSize: 10,

// Consider a node as bad if it is added more than 100 times in a 5min
// window period.
Window: 5 * time.Minute,
Threshold: 100,
}
}

// NewCachedBadNodeTracker returns a new CachedBadNodeTracker.
func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) {
log := logger.Named("bad_node_tracker").
With("threshold", config.Threshold).
With("window", config.Window)

cache, err := lru.New2Q(config.CacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create new bad node tracker: %v", err)
}

return &BadNodeTracker{
logger: logger.Named("bad_node_tracker").
With("threshold", threshold).
With("window", window),
return &CachedBadNodeTracker{
logger: log,
cache: cache,
window: window,
threshold: threshold,
limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstSize),
window: config.Window,
threshold: config.Threshold,
}, nil
}

// IsBad returns true if the node has more rejections than the threshold within
// the time window.
func (t *BadNodeTracker) IsBad(nodeID string) bool {
value, ok := t.cache.Get(nodeID)
if !ok {
func (c *CachedBadNodeTracker) IsBad(nodeID string) bool {
// Limit the number of nodes we report as bad to avoid mass assigning nodes
// as ineligible, but still call Get to keep the cache entry fresh.
value, ok := c.cache.Get(nodeID)
if !ok || !c.limiter.Allow() {
return false
}

stats := value.(*badNodeStats)
score := stats.score()

t.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score)
return score > t.threshold
c.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score)
return score > c.threshold
}

// Add records a new rejection for node. If it's the first time a node is added
// it will be included in the internal cache. If the cache is full the least
// recently updated or accessed node is evicted.
func (t *BadNodeTracker) Add(nodeID string) {
value, ok := t.cache.Get(nodeID)
func (c *CachedBadNodeTracker) Add(nodeID string) {
value, ok := c.cache.Get(nodeID)
if !ok {
value = newBadNodeStats(t.window)
t.cache.Add(nodeID, value)
value = newBadNodeStats(c.window)
c.cache.Add(nodeID, value)
}

stats := value.(*badNodeStats)
score := stats.record()
t.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score)
c.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score)
}

// EmitStats generates metrics for the bad nodes being currently tracked. Must
// be called in a goroutine.
func (t *BadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

Expand All @@ -83,16 +131,16 @@ func (t *BadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{})

select {
case <-timer.C:
t.emitStats()
c.emitStats()
case <-stopCh:
return
}
}
}

func (t *BadNodeTracker) emitStats() {
for _, k := range t.cache.Keys() {
value, _ := t.cache.Get(k)
func (c *CachedBadNodeTracker) emitStats() {
for _, k := range c.cache.Keys() {
value, _ := c.cache.Get(k)
stats := value.(*badNodeStats)
score := stats.score()

Expand All @@ -103,7 +151,7 @@ func (t *BadNodeTracker) emitStats() {
}
}

// badNodeStats represents a node being tracked by BadNodeTracker.
// badNodeStats represents a node being tracked by a BadNodeTracker.
type badNodeStats struct {
history []time.Time
window time.Duration
Expand Down
Loading

0 comments on commit 67f42b8

Please sign in to comment.