diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 2bf75f9716f0..1e2b5dd358be 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -143,7 +143,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat vclient := vaultclient.NewMockVaultClient() cclient := consul.NewMockAgent() - serviceClient := consul.NewServiceClient(cclient, true, logger) + serviceClient := consul.NewServiceClient(cclient, logger) go serviceClient.Run() tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient) if !restarts { @@ -1854,7 +1854,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { // backed by a mock consul whose checks are always unhealthy. consulAgent := consul.NewMockAgent() consulAgent.SetStatus("critical") - consulClient := consul.NewServiceClient(consulAgent, true, ctx.tr.logger) + consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger) go consulClient.Run() defer consulClient.Shutdown() diff --git a/command/agent/agent.go b/command/agent/agent.go index eff225a3b116..590a5164a116 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -15,7 +15,6 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" - version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/client" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/command/agent/consul" @@ -57,10 +56,6 @@ type Agent struct { // consulCatalog is the subset of Consul's Catalog API Nomad uses. consulCatalog consul.CatalogAPI - // consulSupportsTLSSkipVerify flags whether or not Nomad can register - // checks with TLSSkipVerify - consulSupportsTLSSkipVerify bool - client *client.Client server *nomad.Server @@ -592,10 +587,6 @@ func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck { // No HTTPS, return a plain http check return &check } - if !a.consulSupportsTLSSkipVerify { - a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because it requires Consul>=0.7.2") - return nil - } if a.config.TLSConfig.VerifyHTTPSClient { a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because verify_https_client enabled") return nil @@ -837,55 +828,14 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error { } // Determine version for TLSSkipVerify - if self, err := client.Agent().Self(); err == nil { - a.consulSupportsTLSSkipVerify = consulSupportsTLSSkipVerify(self) - } // Create Consul Catalog client for service discovery. a.consulCatalog = client.Catalog() // Create Consul Service client for service advertisement and checks. - a.consulService = consul.NewServiceClient(client.Agent(), a.consulSupportsTLSSkipVerify, a.logger) + a.consulService = consul.NewServiceClient(client.Agent(), a.logger) // Run the Consul service client's sync'ing main loop go a.consulService.Run() return nil } - -var consulTLSSkipVerifyMinVersion = version.Must(version.NewVersion("0.7.2")) - -// consulSupportsTLSSkipVerify returns true if Consul supports TLSSkipVerify. -func consulSupportsTLSSkipVerify(self map[string]map[string]interface{}) bool { - member, ok := self["Member"] - if !ok { - return false - } - tagsI, ok := member["Tags"] - if !ok { - return false - } - tags, ok := tagsI.(map[string]interface{}) - if !ok { - return false - } - buildI, ok := tags["build"] - if !ok { - return false - } - build, ok := buildI.(string) - if !ok { - return false - } - parts := strings.SplitN(build, ":", 2) - if len(parts) != 2 { - return false - } - v, err := version.NewVersion(parts[0]) - if err != nil { - return false - } - if v.LessThan(consulTLSSkipVerifyMinVersion) { - return false - } - return true -} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 0cf41604cff3..a88dd2b61871 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1,7 +1,6 @@ package agent import ( - "encoding/json" "io/ioutil" "log" "os" @@ -379,9 +378,8 @@ func TestAgent_HTTPCheck(t *testing.T) { } }) - t.Run("HTTPS + consulSupportsTLSSkipVerify", func(t *testing.T) { + t.Run("HTTPS", func(t *testing.T) { a := agent() - a.consulSupportsTLSSkipVerify = true a.config.TLSConfig.EnableHTTP = true check := a.agentHTTPCheck(false) @@ -396,19 +394,8 @@ func TestAgent_HTTPCheck(t *testing.T) { } }) - t.Run("HTTPS w/o TLSSkipVerify", func(t *testing.T) { - a := agent() - a.consulSupportsTLSSkipVerify = false - a.config.TLSConfig.EnableHTTP = true - - if check := a.agentHTTPCheck(false); check != nil { - t.Fatalf("expected nil check not: %#v", check) - } - }) - t.Run("HTTPS + VerifyHTTPSClient", func(t *testing.T) { a := agent() - a.consulSupportsTLSSkipVerify = true a.config.TLSConfig.EnableHTTP = true a.config.TLSConfig.VerifyHTTPSClient = true @@ -418,111 +405,6 @@ func TestAgent_HTTPCheck(t *testing.T) { }) } -func TestAgent_ConsulSupportsTLSSkipVerify(t *testing.T) { - t.Parallel() - assertSupport := func(expected bool, blob string) { - self := map[string]map[string]interface{}{} - if err := json.Unmarshal([]byte("{"+blob+"}"), &self); err != nil { - t.Fatalf("invalid json: %v", err) - } - actual := consulSupportsTLSSkipVerify(self) - if actual != expected { - t.Errorf("expected %t but got %t for:\n%s\n", expected, actual, blob) - } - } - - // 0.6.4 - assertSupport(false, `"Member": { - "Addr": "127.0.0.1", - "DelegateCur": 4, - "DelegateMax": 4, - "DelegateMin": 2, - "Name": "rusty", - "Port": 8301, - "ProtocolCur": 2, - "ProtocolMax": 3, - "ProtocolMin": 1, - "Status": 1, - "Tags": { - "build": "0.6.4:26a0ef8c", - "dc": "dc1", - "port": "8300", - "role": "consul", - "vsn": "2", - "vsn_max": "3", - "vsn_min": "1" - }}`) - - // 0.7.0 - assertSupport(false, `"Member": { - "Addr": "127.0.0.1", - "DelegateCur": 4, - "DelegateMax": 4, - "DelegateMin": 2, - "Name": "rusty", - "Port": 8301, - "ProtocolCur": 2, - "ProtocolMax": 4, - "ProtocolMin": 1, - "Status": 1, - "Tags": { - "build": "0.7.0:'a189091", - "dc": "dc1", - "port": "8300", - "role": "consul", - "vsn": "2", - "vsn_max": "3", - "vsn_min": "2" - }}`) - - // 0.7.2 - assertSupport(true, `"Member": { - "Addr": "127.0.0.1", - "DelegateCur": 4, - "DelegateMax": 4, - "DelegateMin": 2, - "Name": "rusty", - "Port": 8301, - "ProtocolCur": 2, - "ProtocolMax": 5, - "ProtocolMin": 1, - "Status": 1, - "Tags": { - "build": "0.7.2:'a9afa0c", - "dc": "dc1", - "port": "8300", - "role": "consul", - "vsn": "2", - "vsn_max": "3", - "vsn_min": "2" - }}`) - - // 0.8.1 - assertSupport(true, `"Member": { - "Addr": "127.0.0.1", - "DelegateCur": 4, - "DelegateMax": 5, - "DelegateMin": 2, - "Name": "rusty", - "Port": 8301, - "ProtocolCur": 2, - "ProtocolMax": 5, - "ProtocolMin": 1, - "Status": 1, - "Tags": { - "build": "0.8.1:'e9ca44d", - "dc": "dc1", - "id": "3ddc1b59-460e-a100-1d5c-ce3972122664", - "port": "8300", - "raft_vsn": "2", - "role": "consul", - "vsn": "2", - "vsn_max": "3", - "vsn_min": "2", - "wan_join_port": "8302" - }}`) -} - // TestAgent_HTTPCheckPath asserts clients and servers use different endpoints // for healthchecks. func TestAgent_HTTPCheckPath(t *testing.T) { diff --git a/command/agent/consul/catalog_testing.go b/command/agent/consul/catalog_testing.go index e0a31f770496..4630560d44ff 100644 --- a/command/agent/consul/catalog_testing.go +++ b/command/agent/consul/catalog_testing.go @@ -61,6 +61,27 @@ func (c *MockAgent) SetStatus(s string) string { return old } +func (c *MockAgent) Self() (map[string]map[string]interface{}, error) { + s := map[string]map[string]interface{}{ + "Member": map[string]interface{}{ + "Addr": "127.0.0.1", + "DelegateCur": 4, + "DelegateMax": 5, + "DelegateMin": 2, + "Name": "rusty", + "Port": 8301, + "ProtocolCur": 2, + "ProtocolMax": 5, + "ProtocolMin": 1, + "Status": 1, + "Tags": map[string]interface{}{ + "build": "0.8.1:'e9ca44d", + }, + }, + } + return s, nil +} + func (c *MockAgent) Services() (map[string]*api.AgentService, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index eb49bcedeb6f..13f44c5a5eda 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -70,6 +70,7 @@ type AgentAPI interface { Checks() (map[string]*api.AgentCheck, error) CheckRegister(check *api.AgentCheckRegistration) error CheckDeregister(checkID string) error + Self() (map[string]map[string]interface{}, error) ServiceRegister(service *api.AgentServiceRegistration) error ServiceDeregister(serviceID string) error UpdateTTL(id, output, status string) error @@ -190,9 +191,6 @@ type ServiceClient struct { retryInterval time.Duration maxRetryInterval time.Duration - // skipVerifySupport is true if the local Consul agent supports TLSSkipVerify - skipVerifySupport bool - // exitCh is closed when the main Run loop exits exitCh chan struct{} @@ -231,10 +229,9 @@ type ServiceClient struct { // NewServiceClient creates a new Consul ServiceClient from an existing Consul API // Client and logger. -func NewServiceClient(consulClient AgentAPI, skipVerifySupport bool, logger *log.Logger) *ServiceClient { +func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient { return &ServiceClient{ client: consulClient, - skipVerifySupport: skipVerifySupport, logger: logger, retryInterval: defaultRetryInterval, maxRetryInterval: defaultMaxRetryInterval, @@ -273,19 +270,48 @@ func (c *ServiceClient) hasSeen() bool { func (c *ServiceClient) Run() { defer close(c.exitCh) - // start checkWatcher - ctx, cancelWatcher := context.WithCancel(context.Background()) - defer cancelWatcher() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // init will be closed when Consul has been contacted + init := make(chan struct{}) + go checkConsulTLSSkipVerify(ctx, c.logger, c.client, init) + + // Process operations while waiting for initial contact with Consul but + // do not sync until contact has been made. + hasOps := false +INIT: + for { + select { + case <-init: + c.markSeen() + break INIT + case <-c.shutdownCh: + return + case ops := <-c.opCh: + hasOps = true + c.merge(ops) + } + } + c.logger.Printf("[TRACE] consul.sync: able to contact Consul") + + // Block until contact with Consul has been established + // Start checkWatcher go c.checkWatcher.Run(ctx) retryTimer := time.NewTimer(0) - <-retryTimer.C // disabled by default + if !hasOps { + // No pending operations so don't immediately sync + <-retryTimer.C + } + failures := 0 for { select { case <-retryTimer.C: case <-c.shutdownCh: - cancelWatcher() + // Cancel check watcher but sync one last time + cancel() case ops := <-c.opCh: c.merge(ops) } @@ -475,9 +501,6 @@ func (c *ServiceClient) sync() error { } } - // A Consul operation has succeeded, mark Consul as having been seen - c.markSeen() - c.logger.Printf("[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks", sreg, creg, sdereg, cdereg) return nil @@ -625,11 +648,6 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se checkIDs := make([]string, 0, numChecks) for _, check := range service.Checks { - if check.TLSSkipVerify && !c.skipVerifySupport { - c.logger.Printf("[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2.", - check.Name, task.Name, allocID) - continue - } checkID := makeCheckID(serviceID, check) checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index ee0fff74819d..77d052cffc89 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -146,7 +146,7 @@ func TestConsul_Integration(t *testing.T) { consulClient, err := consulapi.NewClient(consulConfig) assert.Nil(err) - serviceClient := consul.NewServiceClient(consulClient.Agent(), true, logger) + serviceClient := consul.NewServiceClient(consulClient.Agent(), logger) defer serviceClient.Shutdown() // just-in-case cleanup consulRan := make(chan struct{}) go func() { diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index b189649b7360..91ec3a2b3d05 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -15,8 +15,10 @@ import ( "github.com/hashicorp/consul/api" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -113,7 +115,7 @@ func (t *testFakeCtx) syncOnce() error { func setupFake() *testFakeCtx { fc := NewMockAgent() return &testFakeCtx{ - ServiceClient: NewServiceClient(fc, true, testLogger()), + ServiceClient: NewServiceClient(fc, testLogger()), FakeConsul: fc, Task: testTask(), Restarter: &restartRecorder{}, @@ -775,6 +777,7 @@ func TestConsul_RegServices(t *testing.T) { // TestConsul_ShutdownOK tests the ok path for the shutdown logic in // ServiceClient. func TestConsul_ShutdownOK(t *testing.T) { + require := require.New(t) ctx := setupFake() // Add a script check to make sure its TTL gets updated @@ -808,19 +811,24 @@ func TestConsul_ShutdownOK(t *testing.T) { t.Fatalf("unexpected error registering agent: %v", err) } + testutil.WaitForResult(func() (bool, error) { + return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + // Shutdown should block until scripts finish if err := ctx.ServiceClient.Shutdown(); err != nil { t.Errorf("unexpected error shutting down client: %v", err) } - // UpdateTTL should have been called once for the script check + // UpdateTTL should have been called once for the script check and once + // for shutdown if n := len(ctx.FakeConsul.checkTTLs); n != 1 { t.Fatalf("expected 1 checkTTL entry but found: %d", n) } for _, v := range ctx.FakeConsul.checkTTLs { - if v != 1 { - t.Fatalf("expected script check to be updated once but found %d", v) - } + require.Equalf(2, v, "expected 2 updates but foud %d", v) } for _, v := range ctx.FakeConsul.checks { if v.Status != "passing" { @@ -970,51 +978,6 @@ func TestConsul_ShutdownBlocked(t *testing.T) { } } -// TestConsul_NoTLSSkipVerifySupport asserts that checks with -// TLSSkipVerify=true are skipped when Consul doesn't support TLSSkipVerify. -func TestConsul_NoTLSSkipVerifySupport(t *testing.T) { - ctx := setupFake() - ctx.ServiceClient = NewServiceClient(ctx.FakeConsul, false, testLogger()) - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - // This check sets TLSSkipVerify so it should get dropped - { - Name: "tls-check-skip", - Type: "http", - Protocol: "https", - Path: "/", - TLSSkipVerify: true, - }, - // This check doesn't set TLSSkipVerify so it should work fine - { - Name: "tls-check-noskip", - Type: "http", - Protocol: "https", - Path: "/", - TLSSkipVerify: false, - }, - } - - if err := ctx.ServiceClient.RegisterTask("allocid", ctx.Task, ctx.Restarter, nil, nil); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if len(ctx.FakeConsul.checks) != 1 { - t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks)) - } - for _, v := range ctx.FakeConsul.checks { - if expected := "tls-check-noskip"; v.Name != expected { - t.Errorf("only expected %q but found: %q", expected, v.Name) - } - if v.TLSSkipVerify { - t.Errorf("TLSSkipVerify=true when TLSSkipVerify not supported!") - } - } -} - // TestConsul_RemoveScript assert removing a script check removes all objects // related to that check. func TestConsul_CancelScript(t *testing.T) { diff --git a/command/agent/consul/version_checker.go b/command/agent/consul/version_checker.go new file mode 100644 index 000000000000..864adfb88445 --- /dev/null +++ b/command/agent/consul/version_checker.go @@ -0,0 +1,86 @@ +package consul + +import ( + "context" + "log" + "strings" + "time" + + version "github.com/hashicorp/go-version" +) + +// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on +// checks and is intended to be run in a goroutine. +func checkConsulTLSSkipVerify(ctx context.Context, logger *log.Logger, client AgentAPI, done chan struct{}) { + const ( + baseline = time.Second + limit = 20 * time.Second + ) + + defer close(done) + + i := uint64(0) + for { + self, err := client.Self() + if err == nil { + if supportsTLSSkipVerify(self) { + logger.Printf("[TRACE] consul.sync: supports TLSSkipVerify") + } else { + logger.Printf("[WARN] consul.sync: Consul does NOT support TLSSkipVerify; please upgrade to Consul %s or newer", + consulTLSSkipVerifyMinVersion) + } + return + } + + backoff := (1 << (2 * uint64(i))) * baseline + if backoff > limit { + backoff = limit + } else { + i++ + } + + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(backoff)): + } + } +} + +var consulTLSSkipVerifyMinVersion = version.Must(version.NewVersion("0.7.2")) + +// supportsTLSSkipVerify returns true if Consul supports TLSSkipVerify. +func supportsTLSSkipVerify(self map[string]map[string]interface{}) bool { + member, ok := self["Member"] + if !ok { + return false + } + tagsI, ok := member["Tags"] + if !ok { + return false + } + tags, ok := tagsI.(map[string]interface{}) + if !ok { + return false + } + buildI, ok := tags["build"] + if !ok { + return false + } + build, ok := buildI.(string) + if !ok { + return false + } + parts := strings.SplitN(build, ":", 2) + if len(parts) != 2 { + return false + } + v, err := version.NewVersion(parts[0]) + if err != nil { + return false + } + if v.LessThan(consulTLSSkipVerifyMinVersion) { + return false + } + return true +} diff --git a/command/agent/consul/version_checker_test.go b/command/agent/consul/version_checker_test.go new file mode 100644 index 000000000000..351c89702d6e --- /dev/null +++ b/command/agent/consul/version_checker_test.go @@ -0,0 +1,111 @@ +package consul + +import ( + "encoding/json" + "testing" +) + +func TestConsulSupportsTLSSkipVerify(t *testing.T) { + t.Parallel() + assertSupport := func(expected bool, blob string) { + self := map[string]map[string]interface{}{} + if err := json.Unmarshal([]byte("{"+blob+"}"), &self); err != nil { + t.Fatalf("invalid json: %v", err) + } + actual := supportsTLSSkipVerify(self) + if actual != expected { + t.Errorf("expected %t but got %t for:\n%s\n", expected, actual, blob) + } + } + + // 0.6.4 + assertSupport(false, `"Member": { + "Addr": "127.0.0.1", + "DelegateCur": 4, + "DelegateMax": 4, + "DelegateMin": 2, + "Name": "rusty", + "Port": 8301, + "ProtocolCur": 2, + "ProtocolMax": 3, + "ProtocolMin": 1, + "Status": 1, + "Tags": { + "build": "0.6.4:26a0ef8c", + "dc": "dc1", + "port": "8300", + "role": "consul", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "1" + }}`) + + // 0.7.0 + assertSupport(false, `"Member": { + "Addr": "127.0.0.1", + "DelegateCur": 4, + "DelegateMax": 4, + "DelegateMin": 2, + "Name": "rusty", + "Port": 8301, + "ProtocolCur": 2, + "ProtocolMax": 4, + "ProtocolMin": 1, + "Status": 1, + "Tags": { + "build": "0.7.0:'a189091", + "dc": "dc1", + "port": "8300", + "role": "consul", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "2" + }}`) + + // 0.7.2 + assertSupport(true, `"Member": { + "Addr": "127.0.0.1", + "DelegateCur": 4, + "DelegateMax": 4, + "DelegateMin": 2, + "Name": "rusty", + "Port": 8301, + "ProtocolCur": 2, + "ProtocolMax": 5, + "ProtocolMin": 1, + "Status": 1, + "Tags": { + "build": "0.7.2:'a9afa0c", + "dc": "dc1", + "port": "8300", + "role": "consul", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "2" + }}`) + + // 0.8.1 + assertSupport(true, `"Member": { + "Addr": "127.0.0.1", + "DelegateCur": 4, + "DelegateMax": 5, + "DelegateMin": 2, + "Name": "rusty", + "Port": 8301, + "ProtocolCur": 2, + "ProtocolMax": 5, + "ProtocolMin": 1, + "Status": 1, + "Tags": { + "build": "0.8.1:'e9ca44d", + "dc": "dc1", + "id": "3ddc1b59-460e-a100-1d5c-ce3972122664", + "port": "8300", + "raft_vsn": "2", + "role": "consul", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "2", + "wan_join_port": "8302" + }}`) +}