diff --git a/nomad/config.go b/nomad/config.go index e843ba89503b..92e16b1e71f8 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -158,6 +158,30 @@ type Config struct { // complete eventually fails out of the system. EvalDeliveryLimit int + // EvalNackInitialReenqueueDelay is the delay applied before reenqueuing a + // Nacked evaluation for the first time. This value should be small as the + // initial Nack can be due to a down machine and the eval should be retried + // quickly for liveliness. + EvalNackInitialReenqueueDelay time.Duration + + // EvalNackSubsequentReenqueueDelay is the delay applied before reenqueuing + // an evaluation that has been Nacked more than once. This delay is + // compounding after the first Nack. This value should be significantly + // longer than the initial delay as the purpose it severs is to apply + // back-pressure as evaluatiions are being Nacked either due to scheduler + // failures or because they are hitting their Nack timeout, both of which + // are signs of high server resource usage. + EvalNackSubsequentReenqueueDelay time.Duration + + // EvalFailedFollowupBaselineDelay is the minimum time waited before + // retrying a failed evaluation. + EvalFailedFollowupBaselineDelay time.Duration + + // EvalFailedFollowupDelayRange defines the range of additional time from + // the baseline in which to wait before retrying a failed evaluation. The + // additional delay is selected from this range randomly. + EvalFailedFollowupDelayRange time.Duration + // MinHeartbeatTTL is the minimum time between heartbeats. // This is used as a floor to prevent excessive updates. MinHeartbeatTTL time.Duration @@ -214,33 +238,37 @@ func DefaultConfig() *Config { } c := &Config{ - Region: DefaultRegion, - Datacenter: DefaultDC, - NodeName: hostname, - ProtocolVersion: ProtocolVersionMax, - RaftConfig: raft.DefaultConfig(), - RaftTimeout: 10 * time.Second, - LogOutput: os.Stderr, - RPCAddr: DefaultRPCAddr, - SerfConfig: serf.DefaultConfig(), - NumSchedulers: 1, - ReconcileInterval: 60 * time.Second, - EvalGCInterval: 5 * time.Minute, - EvalGCThreshold: 1 * time.Hour, - JobGCInterval: 5 * time.Minute, - JobGCThreshold: 4 * time.Hour, - NodeGCInterval: 5 * time.Minute, - NodeGCThreshold: 24 * time.Hour, - EvalNackTimeout: 60 * time.Second, - EvalDeliveryLimit: 3, - MinHeartbeatTTL: 10 * time.Second, - MaxHeartbeatsPerSecond: 50.0, - HeartbeatGrace: 10 * time.Second, - FailoverHeartbeatTTL: 300 * time.Second, - ConsulConfig: config.DefaultConsulConfig(), - VaultConfig: config.DefaultVaultConfig(), - RPCHoldTimeout: 5 * time.Second, - TLSConfig: &config.TLSConfig{}, + Region: DefaultRegion, + Datacenter: DefaultDC, + NodeName: hostname, + ProtocolVersion: ProtocolVersionMax, + RaftConfig: raft.DefaultConfig(), + RaftTimeout: 10 * time.Second, + LogOutput: os.Stderr, + RPCAddr: DefaultRPCAddr, + SerfConfig: serf.DefaultConfig(), + NumSchedulers: 1, + ReconcileInterval: 60 * time.Second, + EvalGCInterval: 5 * time.Minute, + EvalGCThreshold: 1 * time.Hour, + JobGCInterval: 5 * time.Minute, + JobGCThreshold: 4 * time.Hour, + NodeGCInterval: 5 * time.Minute, + NodeGCThreshold: 24 * time.Hour, + EvalNackTimeout: 60 * time.Second, + EvalDeliveryLimit: 3, + EvalNackInitialReenqueueDelay: 1 * time.Second, + EvalNackSubsequentReenqueueDelay: 20 * time.Second, + EvalFailedFollowupBaselineDelay: 1 * time.Minute, + EvalFailedFollowupDelayRange: 5 * time.Minute, + MinHeartbeatTTL: 10 * time.Second, + MaxHeartbeatsPerSecond: 50.0, + HeartbeatGrace: 10 * time.Second, + FailoverHeartbeatTTL: 300 * time.Second, + ConsulConfig: config.DefaultConsulConfig(), + VaultConfig: config.DefaultVaultConfig(), + RPCHoldTimeout: 5 * time.Second, + TLSConfig: &config.TLSConfig{}, } // Enable all known schedulers by default diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 783ad84f0ac9..4835ee07dc44 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -76,6 +76,15 @@ type EvalBroker struct { // timeWait has evaluations that are waiting for time to elapse timeWait map[string]*time.Timer + // initialNackDelay is the delay applied before reenqueuing a + // Nacked evaluation for the first time. + initialNackDelay time.Duration + + // subsequentNackDelay is the delay applied before reenqueuing + // an evaluation that has been Nacked more than once. This delay is + // compounding after the first Nack. + subsequentNackDelay time.Duration + l sync.RWMutex } @@ -94,24 +103,29 @@ type PendingEvaluations []*structs.Evaluation // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit -// which prevents a failing eval from being endlessly delivered. -func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error) { +// which prevents a failing eval from being endlessly delivered. The +// initialNackDelay is the delay before making a Nacked evalution available +// again for the first Nack and subsequentNackDelay is the compounding delay +// after the first Nack. +func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) { if timeout < 0 { return nil, fmt.Errorf("timeout cannot be negative") } b := &EvalBroker{ - nackTimeout: timeout, - deliveryLimit: deliveryLimit, - enabled: false, - stats: new(BrokerStats), - evals: make(map[string]int), - jobEvals: make(map[string]string), - blocked: make(map[string]PendingEvaluations), - ready: make(map[string]PendingEvaluations), - unack: make(map[string]*unackEval), - waiting: make(map[string]chan struct{}), - requeue: make(map[string]*structs.Evaluation), - timeWait: make(map[string]*time.Timer), + nackTimeout: timeout, + deliveryLimit: deliveryLimit, + enabled: false, + stats: new(BrokerStats), + evals: make(map[string]int), + jobEvals: make(map[string]string), + blocked: make(map[string]PendingEvaluations), + ready: make(map[string]PendingEvaluations), + unack: make(map[string]*unackEval), + waiting: make(map[string]chan struct{}), + requeue: make(map[string]*structs.Evaluation), + timeWait: make(map[string]*time.Timer), + initialNackDelay: initialNackDelay, + subsequentNackDelay: subsequentNackDelay, } b.stats.ByScheduler = make(map[string]*SchedulerStats) return b, nil @@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { // Check if we need to enforce a wait if eval.Wait > 0 { - timer := time.AfterFunc(eval.Wait, func() { - b.enqueueWaiting(eval) - }) - b.timeWait[eval.ID] = timer - b.stats.TotalWaiting += 1 + b.processWaitingEnqueue(eval) return } b.enqueueLocked(eval, eval.Type) } +// processWaitingEnqueue waits the given duration on the evaluation before +// enqueueing. +func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) { + timer := time.AfterFunc(eval.Wait, func() { + b.enqueueWaiting(eval) + }) + b.timeWait[eval.ID] = timer + b.stats.TotalWaiting += 1 +} + // enqueueWaiting is used to enqueue a waiting evaluation func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() @@ -547,14 +567,37 @@ func (b *EvalBroker) Nack(evalID, token string) error { // Check if we've hit the delivery limit, and re-enqueue // in the failedQueue - if b.evals[evalID] >= b.deliveryLimit { + if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit { b.enqueueLocked(unack.Eval, failedQueue) } else { - b.enqueueLocked(unack.Eval, unack.Eval.Type) + e := unack.Eval + e.Wait = b.nackReenqueueDelay(e, dequeues) + + // See if there should be a delay before re-enqueuing + if e.Wait > 0 { + b.processWaitingEnqueue(e) + } else { + b.enqueueLocked(e, e.Type) + } } + return nil } +// nackReenqueueDelay is used to determine the delay that should be applied on +// the evaluation given the number of previous attempts +func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration { + switch { + case prevDequeues <= 0: + return 0 + case prevDequeues == 1: + return b.initialNackDelay + default: + // For each subsequent nack compound a delay + return time.Duration(prevDequeues-1) * b.subsequentNackDelay + } +} + // PauseNackTimeout is used to pause the Nack timeout for an eval that is making // progress but is in a potentially unbounded operation such as the plan queue. func (b *EvalBroker) PauseNackTimeout(evalID, token string) error { diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index ab6ecedf0379..2c1036ef7acd 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,11 +1,13 @@ package nomad import ( + "fmt" "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) var ( @@ -15,14 +17,34 @@ var ( } ) +func testBrokerConfig() *Config { + config := DefaultConfig() + + // Tune the Nack timeout + config.EvalNackTimeout = 5 * time.Second + + // Tune the Nack delay + config.EvalNackInitialReenqueueDelay = 5 * time.Millisecond + config.EvalNackSubsequentReenqueueDelay = 50 * time.Millisecond + return config +} + func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { - if timeout == 0 { - timeout = 5 * time.Second + config := testBrokerConfig() + + if timeout != 0 { + config.EvalNackTimeout = timeout } - b, err := NewEvalBroker(timeout, 3) + + return testBrokerFromConfig(t, config) +} + +func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker { + b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3) if err != nil { t.Fatalf("err: %v", err) } + return b } @@ -125,21 +147,151 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("should not be outstanding") } + // Check the stats + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + // Dequeue should work again + out2, token2, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out2 != eval { + t.Fatalf("bad : %#v", out2) + } + if token2 == token { + t.Fatalf("should get a new token") + } + + tokenOut2, ok := b.Outstanding(out.ID) + if !ok { + t.Fatalf("should be outstanding") + } + if tokenOut2 != token2 { + t.Fatalf("Bad: %#v %#v", token2, tokenOut2) + } + + // Ack with wrong token + err = b.Ack(eval.ID, "zip") + if err == nil { + t.Fatalf("should fail to ack") + } + + // Ack finally + err = b.Ack(eval.ID, token2) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + // Check the stats stats = b.Stats() - if stats.TotalReady != 1 { + if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } - if stats.ByScheduler[eval.Type].Ready != 1 { + if stats.ByScheduler[eval.Type].Ready != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + t.Fatalf("bad: %#v", stats) + } +} + +func TestEvalBroker_Nack_Delay(t *testing.T) { + b := testBroker(t, 0) + + // Enqueue, but broker is disabled! + b.SetEnabled(true) + eval := mock.Eval() + b.Enqueue(eval) + + // Dequeue should work + out, token, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad : %#v", out) + } + + // Nack back into the queue + err = b.Nack(eval.ID, token) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + + // Check the stats to ensure that it is waiting + stats := b.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalWaiting != 1 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Unacked != 0 { t.Fatalf("bad: %#v", stats) } + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + // Dequeue should work again out2, token2, err := b.Dequeue(defaultSched, time.Second) if err != nil { @@ -152,22 +304,58 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("should get a new token") } - tokenOut2, ok := b.Outstanding(out.ID) - if !ok { - t.Fatalf("should be outstanding") + // Capture the time + start := time.Now() + + // Nack back into the queue + err = b.Nack(eval.ID, token2) + if err != nil { + t.Fatalf("err: %v", err) } - if tokenOut2 != token2 { - t.Fatalf("Bad: %#v %#v", token2, tokenOut2) + + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + delay := time.Now().Sub(start) + if delay < b.subsequentNackDelay { + t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay) } - // Ack with wrong token - err = b.Ack(eval.ID, "zip") - if err == nil { - t.Fatalf("should fail to ack") + // Dequeue should work again + out3, token3, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out3 != eval { + t.Fatalf("bad : %#v", out3) + } + if token3 == token || token3 == token2 { + t.Fatalf("should get a new token") } // Ack finally - err = b.Ack(eval.ID, token2) + err = b.Ack(eval.ID, token3) if err != nil { t.Fatalf("err: %v", err) } @@ -472,7 +660,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { func TestEvalBroker_Dequeue_Fairness(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) - NUM := 100 + NUM := 1000 for i := 0; i < NUM; i++ { eval1 := mock.Eval() @@ -503,7 +691,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) { // This will fail randomly at times. It is very hard to // test deterministically that its acting randomly. - if counter >= 25 || counter <= -25 { + if counter >= 250 || counter <= -250 { t.Fatalf("unlikely sequence: %d", counter) } } @@ -584,7 +772,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { // Ensure we nack in a timely manner func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -601,8 +789,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { t.Fatalf("bad: %v", out) } - // Reset in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Reset in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.OutstandingReset(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -618,13 +806,13 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 7*time.Millisecond { + if diff := end.Sub(start); diff < 75*time.Millisecond { t.Fatalf("bad: %#v", diff) } } func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -641,14 +829,14 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { t.Fatalf("bad: %v", out) } - // Pause in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Pause in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.PauseNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } go func() { - time.Sleep(2 * time.Millisecond) + time.Sleep(20 * time.Millisecond) if err := b.ResumeNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -665,7 +853,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 9*time.Millisecond { + if diff := end.Sub(start); diff < 95*time.Millisecond { t.Fatalf("bad: %#v", diff) } } @@ -820,7 +1008,7 @@ func TestEvalBroker_Wait(t *testing.T) { } // Let the wait elapse - time.Sleep(15 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Verify ready stats = b.Stats() @@ -976,14 +1164,20 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) { } // Check stats again as this should cause the re-enqueued one to be dropped - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if len(b.requeue) != 0 { - t.Fatalf("bad: %#v", b.requeue) - } + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if len(b.requeue) != 0 { + return false, fmt.Errorf("bad: %#v", b.requeue) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) } diff --git a/nomad/leader.go b/nomad/leader.go index e938bbee5864..275f84de9ac3 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "time" @@ -387,17 +388,24 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { } // Update the status to failed - newEval := eval.Copy() - newEval.Status = structs.EvalStatusFailed - newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) - s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval) + updateEval := eval.Copy() + updateEval.Status = structs.EvalStatusFailed + updateEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) + s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", updateEval) + + // Create a follow-up evaluation that will be used to retry the + // scheduling for the job after the cluster is hopefully more stable + // due to the fairly large backoff. + followupEvalWait := s.config.EvalFailedFollowupBaselineDelay + + time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange))) + followupEval := eval.CreateFailedFollowUpEval(followupEvalWait) // Update via Raft req := structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{newEval}, + Evals: []*structs.Evaluation{updateEval, followupEval}, } if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { - s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err) + s.logger.Printf("[ERR] nomad: failed to update failed eval %#v and create a follow-up: %v", updateEval, err) continue } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 70822eab1444..eda24c2b6319 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -508,7 +508,7 @@ func TestLeader_ReapFailedEval(t *testing.T) { } s1.evalBroker.Nack(out.ID, token) - // Wait updated evaluation + // Wait for an updated and followup evaluation state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() @@ -516,7 +516,45 @@ func TestLeader_ReapFailedEval(t *testing.T) { if err != nil { return false, err } - return out != nil && out.Status == structs.EvalStatusFailed, nil + if out == nil { + return false, fmt.Errorf("expect original evaluation to exist") + } + if out.Status != structs.EvalStatusFailed { + return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed) + } + + // See if there is a followup + evals, err := state.EvalsByJob(ws, eval.JobID) + if err != nil { + return false, err + } + + if l := len(evals); l != 2 { + return false, fmt.Errorf("got %d evals, want 2", l) + } + + for _, e := range evals { + if e.ID == eval.ID { + continue + } + + if e.Status != structs.EvalStatusPending { + return false, fmt.Errorf("follow up eval has status %v; want %v", + e.Status, structs.EvalStatusPending) + } + + if e.Wait < s1.config.EvalFailedFollowupBaselineDelay || + e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange { + return false, fmt.Errorf("bad wait: %v", e.Wait) + } + + if e.TriggeredBy != structs.EvalTriggerFailedFollowUp { + return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v", + e.TriggeredBy, structs.EvalTriggerFailedFollowUp) + } + } + + return true, nil }, func(err error) { t.Fatalf("err: %v", err) }) diff --git a/nomad/server.go b/nomad/server.go index 8089f151c01a..efed68cebdf2 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -174,7 +174,11 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger) } // Create an eval broker - evalBroker, err := NewEvalBroker(config.EvalNackTimeout, config.EvalDeliveryLimit) + evalBroker, err := NewEvalBroker( + config.EvalNackTimeout, + config.EvalNackInitialReenqueueDelay, + config.EvalNackSubsequentReenqueueDelay, + config.EvalDeliveryLimit) if err != nil { return nil, err } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 868767a1a1e6..57376279ca12 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3736,13 +3736,14 @@ const ( ) const ( - EvalTriggerJobRegister = "job-register" - EvalTriggerJobDeregister = "job-deregister" - EvalTriggerPeriodicJob = "periodic-job" - EvalTriggerNodeUpdate = "node-update" - EvalTriggerScheduled = "scheduled" - EvalTriggerRollingUpdate = "rolling-update" - EvalTriggerMaxPlans = "max-plan-attempts" + EvalTriggerJobRegister = "job-register" + EvalTriggerJobDeregister = "job-deregister" + EvalTriggerPeriodicJob = "periodic-job" + EvalTriggerNodeUpdate = "node-update" + EvalTriggerScheduled = "scheduled" + EvalTriggerRollingUpdate = "rolling-update" + EvalTriggerFailedFollowUp = "failed-follow-up" + EvalTriggerMaxPlans = "max-plan-attempts" ) const ( @@ -3985,6 +3986,23 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped } } +// CreateFailedFollowUpEval creates a follow up evaluation when the current one +// has been marked as failed becasue it has hit the delivery limit and will not +// be retried by the eval_broker. +func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation { + return &Evaluation{ + ID: GenerateUUID(), + Priority: e.Priority, + Type: e.Type, + TriggeredBy: EvalTriggerFailedFollowUp, + JobID: e.JobID, + JobModifyIndex: e.JobModifyIndex, + Status: EvalStatusPending, + Wait: wait, + PreviousEval: e.ID, + } +} + // Plan is used to submit a commit plan for task allocations. These // are submitted to the leader which verifies that resources have // not been overcommitted before admiting the plan.