Skip to content

Commit

Permalink
Replace Consul SkipVerifyTLS handling
Browse files Browse the repository at this point in the history
Instead of checking Consul's version on startup to see if it supports
SkipVerifyTLS, assume that it does and only log in the job service
handler if we discover Consul does not support SkipVerifyTLS.

The old code would break SkipVerifyTLS support if Nomad started before
Consul (such as on system boot) as SkipVerifyTLS would default to false
if Consul wasn't running. Since SkipVerifyTLS has been supported since
Consul 0.7.2, it's safe to relax our handling.
  • Loading branch information
schmichael committed Mar 15, 2018
1 parent 4252ffe commit 14a4d79
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 241 deletions.
4 changes: 2 additions & 2 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat

vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, true, logger)
serviceClient := consul.NewServiceClient(cclient, logger)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
Expand Down Expand Up @@ -1854,7 +1854,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, true, ctx.tr.logger)
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger)
go consulClient.Run()
defer consulClient.Shutdown()

Expand Down
52 changes: 1 addition & 51 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/command/agent/consul"
Expand Down Expand Up @@ -57,10 +56,6 @@ type Agent struct {
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI

// consulSupportsTLSSkipVerify flags whether or not Nomad can register
// checks with TLSSkipVerify
consulSupportsTLSSkipVerify bool

client *client.Client

server *nomad.Server
Expand Down Expand Up @@ -592,10 +587,6 @@ func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck {
// No HTTPS, return a plain http check
return &check
}
if !a.consulSupportsTLSSkipVerify {
a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because it requires Consul>=0.7.2")
return nil
}
if a.config.TLSConfig.VerifyHTTPSClient {
a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because verify_https_client enabled")
return nil
Expand Down Expand Up @@ -837,55 +828,14 @@ func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
}

// Determine version for TLSSkipVerify
if self, err := client.Agent().Self(); err == nil {
a.consulSupportsTLSSkipVerify = consulSupportsTLSSkipVerify(self)
}

// Create Consul Catalog client for service discovery.
a.consulCatalog = client.Catalog()

// Create Consul Service client for service advertisement and checks.
a.consulService = consul.NewServiceClient(client.Agent(), a.consulSupportsTLSSkipVerify, a.logger)
a.consulService = consul.NewServiceClient(client.Agent(), a.logger)

// Run the Consul service client's sync'ing main loop
go a.consulService.Run()
return nil
}

var consulTLSSkipVerifyMinVersion = version.Must(version.NewVersion("0.7.2"))

// consulSupportsTLSSkipVerify returns true if Consul supports TLSSkipVerify.
func consulSupportsTLSSkipVerify(self map[string]map[string]interface{}) bool {
member, ok := self["Member"]
if !ok {
return false
}
tagsI, ok := member["Tags"]
if !ok {
return false
}
tags, ok := tagsI.(map[string]interface{})
if !ok {
return false
}
buildI, ok := tags["build"]
if !ok {
return false
}
build, ok := buildI.(string)
if !ok {
return false
}
parts := strings.SplitN(build, ":", 2)
if len(parts) != 2 {
return false
}
v, err := version.NewVersion(parts[0])
if err != nil {
return false
}
if v.LessThan(consulTLSSkipVerifyMinVersion) {
return false
}
return true
}
120 changes: 1 addition & 119 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package agent

import (
"encoding/json"
"io/ioutil"
"log"
"os"
Expand Down Expand Up @@ -379,9 +378,8 @@ func TestAgent_HTTPCheck(t *testing.T) {
}
})

t.Run("HTTPS + consulSupportsTLSSkipVerify", func(t *testing.T) {
t.Run("HTTPS", func(t *testing.T) {
a := agent()
a.consulSupportsTLSSkipVerify = true
a.config.TLSConfig.EnableHTTP = true

check := a.agentHTTPCheck(false)
Expand All @@ -396,19 +394,8 @@ func TestAgent_HTTPCheck(t *testing.T) {
}
})

t.Run("HTTPS w/o TLSSkipVerify", func(t *testing.T) {
a := agent()
a.consulSupportsTLSSkipVerify = false
a.config.TLSConfig.EnableHTTP = true

if check := a.agentHTTPCheck(false); check != nil {
t.Fatalf("expected nil check not: %#v", check)
}
})

t.Run("HTTPS + VerifyHTTPSClient", func(t *testing.T) {
a := agent()
a.consulSupportsTLSSkipVerify = true
a.config.TLSConfig.EnableHTTP = true
a.config.TLSConfig.VerifyHTTPSClient = true

Expand All @@ -418,111 +405,6 @@ func TestAgent_HTTPCheck(t *testing.T) {
})
}

func TestAgent_ConsulSupportsTLSSkipVerify(t *testing.T) {
t.Parallel()
assertSupport := func(expected bool, blob string) {
self := map[string]map[string]interface{}{}
if err := json.Unmarshal([]byte("{"+blob+"}"), &self); err != nil {
t.Fatalf("invalid json: %v", err)
}
actual := consulSupportsTLSSkipVerify(self)
if actual != expected {
t.Errorf("expected %t but got %t for:\n%s\n", expected, actual, blob)
}
}

// 0.6.4
assertSupport(false, `"Member": {
"Addr": "127.0.0.1",
"DelegateCur": 4,
"DelegateMax": 4,
"DelegateMin": 2,
"Name": "rusty",
"Port": 8301,
"ProtocolCur": 2,
"ProtocolMax": 3,
"ProtocolMin": 1,
"Status": 1,
"Tags": {
"build": "0.6.4:26a0ef8c",
"dc": "dc1",
"port": "8300",
"role": "consul",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "1"
}}`)

// 0.7.0
assertSupport(false, `"Member": {
"Addr": "127.0.0.1",
"DelegateCur": 4,
"DelegateMax": 4,
"DelegateMin": 2,
"Name": "rusty",
"Port": 8301,
"ProtocolCur": 2,
"ProtocolMax": 4,
"ProtocolMin": 1,
"Status": 1,
"Tags": {
"build": "0.7.0:'a189091",
"dc": "dc1",
"port": "8300",
"role": "consul",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2"
}}`)

// 0.7.2
assertSupport(true, `"Member": {
"Addr": "127.0.0.1",
"DelegateCur": 4,
"DelegateMax": 4,
"DelegateMin": 2,
"Name": "rusty",
"Port": 8301,
"ProtocolCur": 2,
"ProtocolMax": 5,
"ProtocolMin": 1,
"Status": 1,
"Tags": {
"build": "0.7.2:'a9afa0c",
"dc": "dc1",
"port": "8300",
"role": "consul",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2"
}}`)

// 0.8.1
assertSupport(true, `"Member": {
"Addr": "127.0.0.1",
"DelegateCur": 4,
"DelegateMax": 5,
"DelegateMin": 2,
"Name": "rusty",
"Port": 8301,
"ProtocolCur": 2,
"ProtocolMax": 5,
"ProtocolMin": 1,
"Status": 1,
"Tags": {
"build": "0.8.1:'e9ca44d",
"dc": "dc1",
"id": "3ddc1b59-460e-a100-1d5c-ce3972122664",
"port": "8300",
"raft_vsn": "2",
"role": "consul",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2",
"wan_join_port": "8302"
}}`)
}

// TestAgent_HTTPCheckPath asserts clients and servers use different endpoints
// for healthchecks.
func TestAgent_HTTPCheckPath(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions command/agent/consul/catalog_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ func (c *MockAgent) SetStatus(s string) string {
return old
}

func (c *MockAgent) Self() (map[string]map[string]interface{}, error) {
s := map[string]map[string]interface{}{
"Member": map[string]interface{}{
"Addr": "127.0.0.1",
"DelegateCur": 4,
"DelegateMax": 5,
"DelegateMin": 2,
"Name": "rusty",
"Port": 8301,
"ProtocolCur": 2,
"ProtocolMax": 5,
"ProtocolMin": 1,
"Status": 1,
"Tags": map[string]interface{}{
"build": "0.8.1:'e9ca44d",
},
},
}
return s, nil
}

func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
54 changes: 36 additions & 18 deletions command/agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type AgentAPI interface {
Checks() (map[string]*api.AgentCheck, error)
CheckRegister(check *api.AgentCheckRegistration) error
CheckDeregister(checkID string) error
Self() (map[string]map[string]interface{}, error)
ServiceRegister(service *api.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
UpdateTTL(id, output, status string) error
Expand Down Expand Up @@ -190,9 +191,6 @@ type ServiceClient struct {
retryInterval time.Duration
maxRetryInterval time.Duration

// skipVerifySupport is true if the local Consul agent supports TLSSkipVerify
skipVerifySupport bool

// exitCh is closed when the main Run loop exits
exitCh chan struct{}

Expand Down Expand Up @@ -231,10 +229,9 @@ type ServiceClient struct {

// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
func NewServiceClient(consulClient AgentAPI, skipVerifySupport bool, logger *log.Logger) *ServiceClient {
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
return &ServiceClient{
client: consulClient,
skipVerifySupport: skipVerifySupport,
logger: logger,
retryInterval: defaultRetryInterval,
maxRetryInterval: defaultMaxRetryInterval,
Expand Down Expand Up @@ -273,19 +270,48 @@ func (c *ServiceClient) hasSeen() bool {
func (c *ServiceClient) Run() {
defer close(c.exitCh)

// start checkWatcher
ctx, cancelWatcher := context.WithCancel(context.Background())
defer cancelWatcher()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// init will be closed when Consul has been contacted
init := make(chan struct{})
go checkConsulTLSSkipVerify(ctx, c.logger, c.client, init)

// Process operations while waiting for initial contact with Consul but
// do not sync until contact has been made.
hasOps := false
INIT:
for {
select {
case <-init:
c.markSeen()
break INIT
case <-c.shutdownCh:
return
case ops := <-c.opCh:
hasOps = true
c.merge(ops)
}
}
c.logger.Printf("[TRACE] consul.sync: able to contact Consul")

// Block until contact with Consul has been established
// Start checkWatcher
go c.checkWatcher.Run(ctx)

retryTimer := time.NewTimer(0)
<-retryTimer.C // disabled by default
if !hasOps {
// No pending operations so don't immediately sync
<-retryTimer.C
}

failures := 0
for {
select {
case <-retryTimer.C:
case <-c.shutdownCh:
cancelWatcher()
// Cancel check watcher but sync one last time
cancel()
case ops := <-c.opCh:
c.merge(ops)
}
Expand Down Expand Up @@ -475,9 +501,6 @@ func (c *ServiceClient) sync() error {
}
}

// A Consul operation has succeeded, mark Consul as having been seen
c.markSeen()

c.logger.Printf("[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks",
sreg, creg, sdereg, cdereg)
return nil
Expand Down Expand Up @@ -625,11 +648,6 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se

checkIDs := make([]string, 0, numChecks)
for _, check := range service.Checks {
if check.TLSSkipVerify && !c.skipVerifySupport {
c.logger.Printf("[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2.",
check.Name, task.Name, allocID)
continue
}
checkID := makeCheckID(serviceID, check)
checkIDs = append(checkIDs, checkID)
if check.Type == structs.ServiceCheckScript {
Expand Down
2 changes: 1 addition & 1 deletion command/agent/consul/int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestConsul_Integration(t *testing.T) {
consulClient, err := consulapi.NewClient(consulConfig)
assert.Nil(err)

serviceClient := consul.NewServiceClient(consulClient.Agent(), true, logger)
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger)
defer serviceClient.Shutdown() // just-in-case cleanup
consulRan := make(chan struct{})
go func() {
Expand Down
Loading

0 comments on commit 14a4d79

Please sign in to comment.