From 962f4d4fe6cd870e6d2dbcc7cb2c21889284a362 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 26 Oct 2016 14:52:48 -0700 Subject: [PATCH] Add scheduler version enforcement --- nomad/config.go | 1 + nomad/eval_endpoint.go | 7 +++++++ nomad/eval_endpoint_test.go | 32 ++++++++++++++++++++++++++++++-- nomad/structs/structs.go | 5 +++-- nomad/worker.go | 5 +++-- scheduler/scheduler.go | 8 ++++++++ 6 files changed, 52 insertions(+), 6 deletions(-) diff --git a/nomad/config.go b/nomad/config.go index 68722f566d14..cfa64a5a5e4c 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -243,6 +243,7 @@ func DefaultConfig() *Config { ConsulConfig: config.DefaultConsulConfig(), VaultConfig: config.DefaultVaultConfig(), RPCHoldTimeout: 5 * time.Second, + TLSConfig: &config.TLSConfig{}, } // Enable all known schedulers by default diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 2fd9e1950d34..32ea14faa401 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/watch" + "github.com/hashicorp/nomad/scheduler" ) const ( @@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, return fmt.Errorf("dequeue requires at least one scheduler type") } + // Check that there isn't a scheduler version mismatch + if args.SchedulerVersion != scheduler.SchedulerVersion { + return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d", + args.SchedulerVersion, scheduler.SchedulerVersion) + } + // Ensure there is a default timeout if args.Timeout <= 0 { args.Timeout = DefaultDequeueTimeout diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 6524180b5f13..cf5473ca7a66 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -2,12 +2,14 @@ package nomad import ( "reflect" + "strings" "testing" "time" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/testutil" ) @@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { // Dequeue the eval get := &structs.EvalDequeueRequest{ - Schedulers: defaultSched, - WriteRequest: structs.WriteRequest{Region: "global"}, + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.EvalDequeueResponse if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { @@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { } } +func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: 0, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp) + if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") { + t.Fatalf("err: %v", err) + } +} + func TestEvalEndpoint_Ack(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c7ea2e9ff90c..9e1aa354eaee 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -305,8 +305,9 @@ type EvalAckRequest struct { // EvalDequeueRequest is used when we want to dequeue an evaluation type EvalDequeueRequest struct { - Schedulers []string - Timeout time.Duration + Schedulers []string + Timeout time.Duration + SchedulerVersion uint16 WriteRequest } diff --git a/nomad/worker.go b/nomad/worker.go index e64a2b0366ee..7d663479a200 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -134,8 +134,9 @@ func (w *Worker) run() { func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) { // Setup the request req := structs.EvalDequeueRequest{ - Schedulers: w.srv.config.EnabledSchedulers, - Timeout: timeout, + Schedulers: w.srv.config.EnabledSchedulers, + Timeout: timeout, + SchedulerVersion: scheduler.SchedulerVersion, WriteRequest: structs.WriteRequest{ Region: w.srv.config.Region, }, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d96c6aed7250..6652e281c6c9 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,6 +8,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // SchedulerVersion is the version of the scheduler. Changes to the + // scheduler that are incompatible with prior schedulers will increment this + // version. It is used to disallow dequeueing when the versions do not match + // across the leader and the dequeueing scheduler. + SchedulerVersion uint16 = 1 +) + // BuiltinSchedulers contains the built in registered schedulers // which are available var BuiltinSchedulers = map[string]Factory{