diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 2fd9e1950d34..32ea14faa401 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/watch" + "github.com/hashicorp/nomad/scheduler" ) const ( @@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, return fmt.Errorf("dequeue requires at least one scheduler type") } + // Check that there isn't a scheduler version mismatch + if args.SchedulerVersion != scheduler.SchedulerVersion { + return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d", + args.SchedulerVersion, scheduler.SchedulerVersion) + } + // Ensure there is a default timeout if args.Timeout <= 0 { args.Timeout = DefaultDequeueTimeout diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 6524180b5f13..cf5473ca7a66 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -2,12 +2,14 @@ package nomad import ( "reflect" + "strings" "testing" "time" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" ) @@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { // Dequeue the eval get := &structs.EvalDequeueRequest{ - Schedulers: defaultSched, - WriteRequest: structs.WriteRequest{Region: "global"}, + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.EvalDequeueResponse if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { @@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { } } +func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: 0, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp) + if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") { + t.Fatalf("err: %v", err) + } +} + func TestEvalEndpoint_Ack(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c7ea2e9ff90c..9e1aa354eaee 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -305,8 +305,9 @@ type EvalAckRequest struct { // EvalDequeueRequest is used when we want to dequeue an evaluation type EvalDequeueRequest struct { - Schedulers []string - Timeout time.Duration + Schedulers []string + Timeout time.Duration + SchedulerVersion uint16 WriteRequest } diff --git a/nomad/worker.go b/nomad/worker.go index e64a2b0366ee..f4ef1fb5cdba 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -27,6 +27,10 @@ const ( // the slower backoff backoffLimitSlow = 10 * time.Second + // backoffSchedulerVersionMismatch is the backoff between retries when the + // scheduler version mismatches that of the leader. + backoffSchedulerVersionMismatch = 30 * time.Second + // dequeueTimeout is used to timeout an evaluation dequeue so that // we can check if there is a shutdown event dequeueTimeout = 500 * time.Millisecond @@ -134,8 +138,9 @@ func (w *Worker) run() { func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) { // Setup the request req := structs.EvalDequeueRequest{ - Schedulers: w.srv.config.EnabledSchedulers, - Timeout: timeout, + Schedulers: w.srv.config.EnabledSchedulers, + Timeout: timeout, + SchedulerVersion: scheduler.SchedulerVersion, WriteRequest: structs.WriteRequest{ Region: w.srv.config.Region, }, @@ -154,7 +159,16 @@ REQ: if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() { w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err) } - if w.backoffErr(backoffBaselineSlow, backoffLimitSlow) { + + // Adjust the backoff based on the error. If it is a scheduler version + // mismatch we increase the baseline. + base, limit := backoffBaselineFast, backoffLimitSlow + if strings.Contains(err.Error(), "calling scheduler version") { + base = backoffSchedulerVersionMismatch + limit = backoffSchedulerVersionMismatch + } + + if w.backoffErr(base, limit) { return nil, "", true } goto REQ diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d96c6aed7250..6652e281c6c9 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,6 +8,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // SchedulerVersion is the version of the scheduler. Changes to the + // scheduler that are incompatible with prior schedulers will increment this + // version. It is used to disallow dequeueing when the versions do not match + // across the leader and the dequeueing scheduler. + SchedulerVersion uint16 = 1 +) + // BuiltinSchedulers contains the built in registered schedulers // which are available var BuiltinSchedulers = map[string]Factory{