Skip to content

Commit

Permalink
Merge pull request #2555 from hashicorp/f-nack-delay
Browse files Browse the repository at this point in the history
Back-pressure on Nacks and ensure scheduling progress on failures
  • Loading branch information
dadgar committed Apr 14, 2017
2 parents 75098df + d3807db commit 477c97e
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 101 deletions.
82 changes: 55 additions & 27 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,30 @@ type Config struct {
// complete eventually fails out of the system.
EvalDeliveryLimit int

// EvalNackInitialReenqueueDelay is the delay applied before reenqueuing a
// Nacked evaluation for the first time. This value should be small as the
// initial Nack can be due to a down machine and the eval should be retried
// quickly for liveliness.
EvalNackInitialReenqueueDelay time.Duration

// EvalNackSubsequentReenqueueDelay is the delay applied before reenqueuing
// an evaluation that has been Nacked more than once. This delay is
// compounding after the first Nack. This value should be significantly
// longer than the initial delay as the purpose it severs is to apply
// back-pressure as evaluatiions are being Nacked either due to scheduler
// failures or because they are hitting their Nack timeout, both of which
// are signs of high server resource usage.
EvalNackSubsequentReenqueueDelay time.Duration

// EvalFailedFollowupBaselineDelay is the minimum time waited before
// retrying a failed evaluation.
EvalFailedFollowupBaselineDelay time.Duration

// EvalFailedFollowupDelayRange defines the range of additional time from
// the baseline in which to wait before retrying a failed evaluation. The
// additional delay is selected from this range randomly.
EvalFailedFollowupDelayRange time.Duration

// MinHeartbeatTTL is the minimum time between heartbeats.
// This is used as a floor to prevent excessive updates.
MinHeartbeatTTL time.Duration
Expand Down Expand Up @@ -214,33 +238,37 @@ func DefaultConfig() *Config {
}

c := &Config{
Region: DefaultRegion,
Datacenter: DefaultDC,
NodeName: hostname,
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
RPCAddr: DefaultRPCAddr,
SerfConfig: serf.DefaultConfig(),
NumSchedulers: 1,
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
ConsulConfig: config.DefaultConsulConfig(),
VaultConfig: config.DefaultVaultConfig(),
RPCHoldTimeout: 5 * time.Second,
TLSConfig: &config.TLSConfig{},
Region: DefaultRegion,
Datacenter: DefaultDC,
NodeName: hostname,
ProtocolVersion: ProtocolVersionMax,
RaftConfig: raft.DefaultConfig(),
RaftTimeout: 10 * time.Second,
LogOutput: os.Stderr,
RPCAddr: DefaultRPCAddr,
SerfConfig: serf.DefaultConfig(),
NumSchedulers: 1,
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
NodeGCThreshold: 24 * time.Hour,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
EvalNackInitialReenqueueDelay: 1 * time.Second,
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
EvalFailedFollowupDelayRange: 5 * time.Minute,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,
FailoverHeartbeatTTL: 300 * time.Second,
ConsulConfig: config.DefaultConsulConfig(),
VaultConfig: config.DefaultVaultConfig(),
RPCHoldTimeout: 5 * time.Second,
TLSConfig: &config.TLSConfig{},
}

// Enable all known schedulers by default
Expand Down
85 changes: 64 additions & 21 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ type EvalBroker struct {
// timeWait has evaluations that are waiting for time to elapse
timeWait map[string]*time.Timer

// initialNackDelay is the delay applied before reenqueuing a
// Nacked evaluation for the first time.
initialNackDelay time.Duration

// subsequentNackDelay is the delay applied before reenqueuing
// an evaluation that has been Nacked more than once. This delay is
// compounding after the first Nack.
subsequentNackDelay time.Duration

l sync.RWMutex
}

Expand All @@ -94,24 +103,29 @@ type PendingEvaluations []*structs.Evaluation
// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
// which prevents a failing eval from being endlessly delivered.
func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error) {
// which prevents a failing eval from being endlessly delivered. The
// initialNackDelay is the delay before making a Nacked evalution available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
return b, nil
Expand Down Expand Up @@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {

// Check if we need to enforce a wait
if eval.Wait > 0 {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
b.processWaitingEnqueue(eval)
return
}

b.enqueueLocked(eval, eval.Type)
}

// processWaitingEnqueue waits the given duration on the evaluation before
// enqueueing.
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
}

// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
Expand Down Expand Up @@ -547,14 +567,37 @@ func (b *EvalBroker) Nack(evalID, token string) error {

// Check if we've hit the delivery limit, and re-enqueue
// in the failedQueue
if b.evals[evalID] >= b.deliveryLimit {
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
b.enqueueLocked(unack.Eval, failedQueue)
} else {
b.enqueueLocked(unack.Eval, unack.Eval.Type)
e := unack.Eval
e.Wait = b.nackReenqueueDelay(e, dequeues)

// See if there should be a delay before re-enqueuing
if e.Wait > 0 {
b.processWaitingEnqueue(e)
} else {
b.enqueueLocked(e, e.Type)
}
}

return nil
}

// nackReenqueueDelay is used to determine the delay that should be applied on
// the evaluation given the number of previous attempts
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
switch {
case prevDequeues <= 0:
return 0
case prevDequeues == 1:
return b.initialNackDelay
default:
// For each subsequent nack compound a delay
return time.Duration(prevDequeues-1) * b.subsequentNackDelay
}
}

// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
Expand Down
Loading

0 comments on commit 477c97e

Please sign in to comment.