From 802fb18979ab9d120adb93b6c862b8e8b455cd89 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Mon, 13 Jul 2020 16:53:10 -0500 Subject: [PATCH] consul/connect: add support for bridge networks with connect native tasks Before, Connect Native Tasks needed one of these to work: - To be run in host networking mode - To have the Consul agent configured to listen to a unix socket - To have the Consul agent configured to listen to a public interface None of these are a great experience, though running in host networking is still the best solution for non-Linux hosts. This PR establishes a connection proxy between the Consul HTTP listener and a unix socket inside the alloc fs, bypassing the network namespace for any Connect Native task. Similar to and re-uses a bunch of code from the gRPC listener version for envoy sidecar proxies. Proxy is established only if the alloc is configured for bridge networking and there is at least one Connect Native task in the Task Group. Fixes #8290 --- client/allocdir/alloc_dir.go | 4 + client/allocrunner/alloc_runner_hooks.go | 3 +- ...lsock_hook.go => consul_grpc_sock_hook.go} | 132 ++++++------ ..._test.go => consul_grpc_sock_hook_test.go} | 24 +-- client/allocrunner/consul_http_sock_hook.go | 202 ++++++++++++++++++ .../allocrunner/consul_http_sock_hook_test.go | 122 +++++++++++ .../taskrunner/connect_native_hook.go | 41 +++- .../taskrunner/connect_native_hook_test.go | 40 ++++ nomad/mock/mock.go | 48 +++-- 9 files changed, 526 insertions(+), 90 deletions(-) rename client/allocrunner/{consulsock_hook.go => consul_grpc_sock_hook.go} (67%) rename client/allocrunner/{consulsock_hook_test.go => consul_grpc_sock_hook_test.go} (86%) create mode 100644 client/allocrunner/consul_http_sock_hook.go create mode 100644 client/allocrunner/consul_http_sock_hook_test.go diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index f8ef62efb0ff..2ac23e958224 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -64,6 +64,10 @@ var ( // AllocGRPCSocket is the path relative to the task dir root for the // unix socket connected to Consul's gRPC endpoint. AllocGRPCSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_grpc.sock") + + // AllocHTTPSocket is the path relative to the task dir root for the unix + // socket connected to Consul's HTTP endpoint. + AllocHTTPSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_http.sock") ) // AllocDir allows creating, destroying, and accessing an allocation's diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index d31570d22ef6..e209ff622d6c 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -167,7 +167,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir), logger: hookLogger, }), - newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), + newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), + newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs), } diff --git a/client/allocrunner/consulsock_hook.go b/client/allocrunner/consul_grpc_sock_hook.go similarity index 67% rename from client/allocrunner/consulsock_hook.go rename to client/allocrunner/consul_grpc_sock_hook.go index 827e0b0f0d83..13b4a286383f 100644 --- a/client/allocrunner/consulsock_hook.go +++ b/client/allocrunner/consul_grpc_sock_hook.go @@ -16,43 +16,49 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/pkg/errors" ) -// consulSockHook creates Unix sockets to allow communication from inside a -// netns to Consul. -// -// Noop for allocations without a group Connect stanza. -type consulSockHook struct { - alloc *structs.Allocation +const ( + consulGRPCSockHookName = "consul_grpc_socket" +) + +var ( + errSocketProxyTimeout = errors.New("timed out waiting for socket proxy to exit") +) - proxy *sockProxy +// consulGRPCSocketHook creates Unix sockets to allow communication from inside a +// netns to Consul gRPC endpoint. +// +// Noop for allocations without a group Connect stanza using bridge networking. +type consulGRPCSocketHook struct { + logger hclog.Logger + alloc *structs.Allocation + proxy *grpcSocketProxy - // mu synchronizes group & cancel as they may be mutated and accessed + // mu synchronizes proxy as they may be mutated and accessed // concurrently via Prerun, Update, Postrun. mu sync.Mutex - - logger hclog.Logger } -func newConsulSockHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulSockHook { - h := &consulSockHook{ - alloc: alloc, - proxy: newSockProxy(logger, allocDir, config), +func newConsulGRPCSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulGRPCSocketHook { + return &consulGRPCSocketHook{ + alloc: alloc, + proxy: newGRPCSocketProxy(logger, allocDir, config), + logger: logger.Named(consulGRPCSockHookName), } - h.logger = logger.Named(h.Name()) - return h } -func (*consulSockHook) Name() string { - return "consul_socket" +func (*consulGRPCSocketHook) Name() string { + return consulGRPCSockHookName } // shouldRun returns true if the Unix socket should be created and proxied. // Requires the mutex to be held. -func (h *consulSockHook) shouldRun() bool { +func (h *consulGRPCSocketHook) shouldRun() bool { tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) for _, s := range tg.Services { - if s.Connect.HasSidecar() { + if s.Connect.HasSidecar() { // todo(shoenig) check we are in bridge mode return true } } @@ -60,7 +66,7 @@ func (h *consulSockHook) shouldRun() bool { return false } -func (h *consulSockHook) Prerun() error { +func (h *consulGRPCSocketHook) Prerun() error { h.mu.Lock() defer h.mu.Unlock() @@ -73,7 +79,7 @@ func (h *consulSockHook) Prerun() error { // Update creates a gRPC socket file and proxy if there are any Connect // services. -func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error { +func (h *consulGRPCSocketHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() @@ -86,7 +92,7 @@ func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error { return h.proxy.run(h.alloc) } -func (h *consulSockHook) Postrun() error { +func (h *consulGRPCSocketHook) Postrun() error { h.mu.Lock() defer h.mu.Unlock() @@ -98,7 +104,8 @@ func (h *consulSockHook) Postrun() error { return nil } -type sockProxy struct { +type grpcSocketProxy struct { + logger hclog.Logger allocDir *allocdir.AllocDir config *config.ConsulConfig @@ -106,13 +113,11 @@ type sockProxy struct { cancel func() doneCh chan struct{} runOnce bool - - logger hclog.Logger } -func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *sockProxy { +func newGRPCSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *grpcSocketProxy { ctx, cancel := context.WithCancel(context.Background()) - return &sockProxy{ + return &grpcSocketProxy{ allocDir: allocDir, config: config, ctx: ctx, @@ -126,50 +131,55 @@ func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *conf // hasn't been told to stop. // // NOT safe for concurrent use. -func (s *sockProxy) run(alloc *structs.Allocation) error { +func (p *grpcSocketProxy) run(alloc *structs.Allocation) error { // Only run once. - if s.runOnce { + if p.runOnce { return nil } // Only run once. Never restart. select { - case <-s.doneCh: - s.logger.Trace("socket proxy already shutdown; exiting") + case <-p.doneCh: + p.logger.Trace("socket proxy already shutdown; exiting") return nil - case <-s.ctx.Done(): - s.logger.Trace("socket proxy already done; exiting") + case <-p.ctx.Done(): + p.logger.Trace("socket proxy already done; exiting") return nil default: } - destAddr := s.config.GRPCAddr + // make sure either grpc or http consul address has been configured + if p.config.GRPCAddr == "" && p.config.Addr == "" { + return errors.New("consul address must be set on nomad client") + } + + destAddr := p.config.GRPCAddr if destAddr == "" { // No GRPCAddr defined. Use Addr but replace port with the gRPC // default of 8502. - host, _, err := net.SplitHostPort(s.config.Addr) + host, _, err := net.SplitHostPort(p.config.Addr) if err != nil { return fmt.Errorf("error parsing Consul address %q: %v", - s.config.Addr, err) + p.config.Addr, err) } destAddr = net.JoinHostPort(host, "8502") } - hostGRPCSockPath := filepath.Join(s.allocDir.AllocDir, allocdir.AllocGRPCSocket) + hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocGRPCSocket) // if the socket already exists we'll try to remove it, but if not then any // other errors will bubble up to the caller here or when we try to listen - _, err := os.Stat(hostGRPCSockPath) + _, err := os.Stat(hostGRPCSocketPath) if err == nil { - err := os.Remove(hostGRPCSockPath) + err := os.Remove(hostGRPCSocketPath) if err != nil { return fmt.Errorf( "unable to remove existing unix socket for Consul gRPC endpoint: %v", err) } } - listener, err := net.Listen("unix", hostGRPCSockPath) + listener, err := net.Listen("unix", hostGRPCSocketPath) if err != nil { return fmt.Errorf("unable to create unix socket for Consul gRPC endpoint: %v", err) } @@ -179,40 +189,40 @@ func (s *sockProxy) run(alloc *structs.Allocation) error { // socket permissions when creating the file, so we must manually call // chmod afterwards. // https://github.com/golang/go/issues/11822 - if err := os.Chmod(hostGRPCSockPath, os.ModePerm); err != nil { + if err := os.Chmod(hostGRPCSocketPath, os.ModePerm); err != nil { return fmt.Errorf("unable to set permissions on unix socket for Consul gRPC endpoint: %v", err) } go func() { - proxy(s.ctx, s.logger, destAddr, listener) - s.cancel() - close(s.doneCh) + proxy(p.ctx, p.logger, destAddr, listener) + p.cancel() + close(p.doneCh) }() - s.runOnce = true + p.runOnce = true return nil } // stop the proxy and blocks until the proxy has stopped. Returns an error if // the proxy does not exit in a timely fashion. -func (s *sockProxy) stop() error { - s.cancel() +func (p *grpcSocketProxy) stop() error { + p.cancel() // If proxy was never run, don't wait for anything to shutdown. - if !s.runOnce { + if !p.runOnce { return nil } select { - case <-s.doneCh: + case <-p.doneCh: return nil case <-time.After(3 * time.Second): - return fmt.Errorf("timed out waiting for proxy to exit") + return errSocketProxyTimeout } } -// Proxy between a listener and dest -func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener) { +// Proxy between a listener and destination. +func proxy(ctx context.Context, logger hclog.Logger, destAddr string, l net.Listener) { // Wait for all connections to be done before exiting to prevent // goroutine leaks. wg := sync.WaitGroup{} @@ -220,14 +230,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener defer func() { // Must cancel context and close listener before waiting cancel() - l.Close() + _ = l.Close() wg.Wait() }() // Close Accept() when context is cancelled go func() { <-ctx.Done() - l.Close() + _ = l.Close() }() for ctx.Err() == nil { @@ -237,14 +247,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener // Accept errors during shutdown are to be expected return } - logger.Error("error in grpc proxy; shutting down proxy", "error", err, "dest", dest) + logger.Error("error in socket proxy; shutting down proxy", "error", err, "dest", destAddr) return } wg.Add(1) go func() { defer wg.Done() - proxyConn(ctx, logger, dest, conn) + proxyConn(ctx, logger, destAddr, conn) }() } } @@ -286,7 +296,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n wg := sync.WaitGroup{} defer wg.Wait() - // socket -> gRPC + // socket -> consul wg.Add(1) go func() { defer wg.Done() @@ -305,7 +315,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n ) }() - // gRPC -> socket + // consul -> socket wg.Add(1) go func() { defer wg.Done() @@ -326,6 +336,6 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n // When cancelled close connections to break out of copies goroutines. <-ctx.Done() - conn.Close() - dest.Close() + _ = conn.Close() + _ = dest.Close() } diff --git a/client/allocrunner/consulsock_hook_test.go b/client/allocrunner/consul_grpc_sock_hook_test.go similarity index 86% rename from client/allocrunner/consulsock_hook_test.go rename to client/allocrunner/consul_grpc_sock_hook_test.go index 4651d3fef875..f700c8ce4ec1 100644 --- a/client/allocrunner/consulsock_hook_test.go +++ b/client/allocrunner/consul_grpc_sock_hook_test.go @@ -20,10 +20,10 @@ import ( "github.com/stretchr/testify/require" ) -// TestConsulSockHook_PrerunPostrun_Ok asserts that a proxy is started when the +// TestConsulGRPCSocketHook_PrerunPostrun_Ok asserts that a proxy is started when the // Consul unix socket hook's Prerun method is called and stopped with the // Postrun method is called. -func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { +func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { t.Parallel() // As of Consul 1.6.0 the test server does not support the gRPC @@ -43,7 +43,7 @@ func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { defer cleanup() // Start the unix socket proxy - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket) @@ -98,9 +98,9 @@ func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { require.Zero(t, n) } -// TestConsulSockHook_Prerun_Error asserts that invalid Consul addresses cause +// TestConsulGRPCSocketHook_Prerun_Error asserts that invalid Consul addresses cause // Prerun to return an error if the alloc requires a grpc proxy. -func TestConsulSockHook_Prerun_Error(t *testing.T) { +func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { t.Parallel() logger := testlog.HCLogger(t) @@ -117,7 +117,7 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // An alloc without a Connect proxy sidecar should not return // an error. - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) // Postrun should be a noop @@ -127,8 +127,8 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // An alloc *with* a Connect proxy sidecar *should* return an error // when Consul is not configured. - h := newConsulSockHook(logger, connectAlloc, allocDir, consulConfig) - require.Error(t, h.Prerun()) + h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfig) + require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") // Postrun should be a noop require.NoError(t, h.Postrun()) @@ -137,22 +137,22 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // Updating an alloc without a sidecar to have a sidecar should // error when the sidecar is added. - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{ Alloc: connectAlloc, } - require.Error(t, h.Update(req)) + require.EqualError(t, h.Update(req), "consul address must be set on nomad client") // Postrun should be a noop require.NoError(t, h.Postrun()) } } -// TestConsulSockHook_proxy_Unix asserts that the destination can be a unix +// TestConsulGRPCSocketHook_proxy_Unix asserts that the destination can be a unix // socket path. -func TestConsulSockHook_proxy_Unix(t *testing.T) { +func TestConsulGRPCSocketHook_proxy_Unix(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "nomadtest_proxy_Unix") diff --git a/client/allocrunner/consul_http_sock_hook.go b/client/allocrunner/consul_http_sock_hook.go new file mode 100644 index 000000000000..6f33736fccdd --- /dev/null +++ b/client/allocrunner/consul_http_sock_hook.go @@ -0,0 +1,202 @@ +package allocrunner + +import ( + "context" + "net" + "os" + "path/filepath" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/pkg/errors" +) + +const ( + consulHTTPSocketHookName = "consul_http_socket" +) + +type consulHTTPSockHook struct { + logger hclog.Logger + alloc *structs.Allocation + proxy *httpSocketProxy + + // lock synchronizes proxy which may be mutated and read concurrently via + // Prerun, Update, and Postrun. + lock sync.Mutex +} + +func newConsulHTTPSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulHTTPSockHook { + return &consulHTTPSockHook{ + alloc: alloc, + proxy: newHTTPSocketProxy(logger, allocDir, config), + logger: logger.Named(consulHTTPSocketHookName), + } +} + +func (*consulHTTPSockHook) Name() string { + return consulHTTPSocketHookName +} + +// shouldRun returns true if the alloc contains at least one connect native +// task and has a network configured in bridge mode +// +// todo(shoenig): what about CNI networks? +func (h *consulHTTPSockHook) shouldRun() bool { + // we must be in bridge networking and at least one connect native task + tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) + if len(tg.Networks) < 1 || tg.Networks[0].Mode != "bridge" { + return false + } + + for _, service := range tg.Services { + if service.Connect.IsNative() { + return true + } + } + return false +} + +func (h *consulHTTPSockHook) Prerun() error { + h.lock.Lock() + defer h.lock.Unlock() + + if !h.shouldRun() { + return nil + } + + return h.proxy.run(h.alloc) +} + +func (h *consulHTTPSockHook) Update(req *interfaces.RunnerUpdateRequest) error { + h.lock.Lock() + defer h.lock.Unlock() + + h.alloc = req.Alloc + + if !h.shouldRun() { + return nil + } + + return h.proxy.run(h.alloc) +} + +func (h *consulHTTPSockHook) Postrun() error { + h.lock.Lock() + defer h.lock.Unlock() + + if err := h.proxy.stop(); err != nil { + // Only log a failure to stop, worst case is the proxy leaks a goroutine. + h.logger.Debug("error stopping Consul HTTP proxy", "error", err) + } + + return nil +} + +type httpSocketProxy struct { + logger hclog.Logger + allocDir *allocdir.AllocDir + config *config.ConsulConfig + + ctx context.Context + cancel func() + doneCh chan struct{} + runOnce bool +} + +func newHTTPSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *httpSocketProxy { + ctx, cancel := context.WithCancel(context.Background()) + return &httpSocketProxy{ + logger: logger, + allocDir: allocDir, + config: config, + ctx: ctx, + cancel: cancel, + doneCh: make(chan struct{}), + } +} + +// run the httpSocketProxy for the given allocation. +// +// Assumes locking done by the calling alloc runner. +func (p *httpSocketProxy) run(alloc *structs.Allocation) error { + // Only run once. + if p.runOnce { + return nil + } + + // Never restart. + select { + case <-p.doneCh: + p.logger.Trace("consul http socket proxy already shutdown; exiting") + return nil + case <-p.ctx.Done(): + p.logger.Trace("consul http socket proxy already done; exiting") + return nil + default: + } + + // consul http dest addr + destAddr := p.config.Addr + if destAddr == "" { + return errors.New("consul address must be set on nomad client") + } + + hostHTTPSockPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocHTTPSocket) + if err := maybeRemoveOldSocket(hostHTTPSockPath); err != nil { + return err + } + + listener, err := net.Listen("unix", hostHTTPSockPath) + if err != nil { + return errors.Wrap(err, "unable to create unix socket for Consul HTTP endpoint") + } + + // The Consul HTTP socket should be usable by all users in case a task is + // running as a non-privileged user. Unix does not allow setting domain + // socket permissions when creating the file, so we must manually call + // chmod afterwords. + if err := os.Chmod(hostHTTPSockPath, os.ModePerm); err != nil { + return errors.Wrap(err, "unable to set permissions on unix socket") + } + + go func() { + proxy(p.ctx, p.logger, destAddr, listener) + p.cancel() + close(p.doneCh) + }() + + p.runOnce = true + return nil +} + +func (p *httpSocketProxy) stop() error { + p.cancel() + + // if proxy was never run, no need to wait before shutdown + if !p.runOnce { + return nil + } + + select { + case <-p.doneCh: + case <-time.After(3 * time.Second): + return errSocketProxyTimeout + } + + return nil +} + +func maybeRemoveOldSocket(socketPath string) error { + _, err := os.Stat(socketPath) + if err == nil { + if err = os.Remove(socketPath); err != nil { + return errors.Wrap(err, "unable to remove existing unix socket") + } + } + return nil +} diff --git a/client/allocrunner/consul_http_sock_hook_test.go b/client/allocrunner/consul_http_sock_hook_test.go new file mode 100644 index 000000000000..d3fd2603d99c --- /dev/null +++ b/client/allocrunner/consul_http_sock_hook_test.go @@ -0,0 +1,122 @@ +package allocrunner + +import ( + "bytes" + "net" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/stretchr/testify/require" +) + +func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { + t.Parallel() + + fakeConsul, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer fakeConsul.Close() + + consulConfig := &config.ConsulConfig{ + Addr: fakeConsul.Addr().String(), + } + + alloc := mock.ConnectNativeAlloc() + + logger := testlog.HCLogger(t) + + allocDir, cleanupDir := allocdir.TestAllocDir(t, logger, "ConnectNativeTask") + defer cleanupDir() + + // start unix socket proxy + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) + require.NoError(t, h.Prerun()) + + httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket) + taskCon, err := net.Dial("unix", httpSocket) + require.NoError(t, err) + + // write to consul from task to ensure data is proxied out of the netns + input := bytes.Repeat([]byte{'X'}, 5*1024) + errCh := make(chan error, 1) + go func() { + _, err := taskCon.Write(input) + errCh <- err + }() + + // accept the connection from inside the netns + consulConn, err := fakeConsul.Accept() + require.NoError(t, err) + defer consulConn.Close() + + output := make([]byte, len(input)) + _, err = consulConn.Read(output) + require.NoError(t, err) + require.NoError(t, <-errCh) + require.Equal(t, input, output) + + // read from consul to ensure http response bodies can come back + input = bytes.Repeat([]byte{'Y'}, 5*1024) + go func() { + _, err := consulConn.Write(input) + errCh <- err + }() + + output = make([]byte, len(input)) + _, err = taskCon.Read(output) + require.NoError(t, err) + require.NoError(t, <-errCh) + require.Equal(t, input, output) + + // stop the unix socket proxy + require.NoError(t, h.Postrun()) + + // consul reads should now error + n, err := consulConn.Read(output) + require.Error(t, err) + require.Zero(t, n) + + // task reads and writes should error + n, err = taskCon.Write(input) + require.Error(t, err) + require.Zero(t, n) + n, err = taskCon.Read(output) + require.Error(t, err) + require.Zero(t, n) +} + +func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { + t.Parallel() + + logger := testlog.HCLogger(t) + + allocDir, cleanupDir := allocdir.TestAllocDir(t, logger, "ConnectNativeTask") + defer cleanupDir() + + consulConfig := new(config.ConsulConfig) + + alloc := mock.Alloc() + connectNativeAlloc := mock.ConnectNativeAlloc() + + { + // an alloc without a connect native task should not return an error + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) + require.NoError(t, h.Prerun()) + + // postrun should be a noop + require.NoError(t, h.Postrun()) + } + + { + // an alloc with a native task should return an error when consul is not + // configured + h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfig) + require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") + + // Postrun should be a noop + require.NoError(t, h.Postrun()) + } +} diff --git a/client/allocrunner/taskrunner/connect_native_hook.go b/client/allocrunner/taskrunner/connect_native_hook.go index 8adc61137560..4ebd2c504305 100644 --- a/client/allocrunner/taskrunner/connect_native_hook.go +++ b/client/allocrunner/taskrunner/connect_native_hook.go @@ -8,6 +8,7 @@ import ( "path/filepath" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" ifs "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -43,6 +44,10 @@ func newConnectNativeHookConfig(alloc *structs.Allocation, consul *config.Consul // // If consul is configured with ACLs enabled, a Service Identity token will be // generated on behalf of the native service and supplied to the task. +// +// If the alloc is configured with bridge networking enabled, the standard +// CONSUL_HTTP_ADDR environment variable is defaulted to the unix socket created +// for the alloc by the consul_grpc_sock_hook alloc runner hook. type connectNativeHook struct { // alloc is the allocation with the connect native task being run alloc *structs.Allocation @@ -73,6 +78,13 @@ func (connectNativeHook) Name() string { return connectNativeHookName } +// merge b into a, overwriting on conflicts +func merge(a, b map[string]string) { + for k, v := range b { + a[k] = v + } +} + func (h *connectNativeHook) Prestart( ctx context.Context, request *ifs.TaskPrestartRequest, @@ -83,6 +95,8 @@ func (h *connectNativeHook) Prestart( return nil } + environment := make(map[string]string) + if h.consulShareTLS { // copy TLS certificates if err := h.copyCertificates(h.consulConfig, request.TaskDir.SecretsDir); err != nil { @@ -92,17 +106,19 @@ func (h *connectNativeHook) Prestart( // set environment variables for communicating with Consul agent, but // only if those environment variables are not already set - response.Env = h.tlsEnv(request.TaskEnv.EnvMap) - + merge(environment, h.tlsEnv(request.TaskEnv.EnvMap)) } - if err := h.maybeSetSITokenEnv(request.TaskDir.SecretsDir, request.Task.Name, response.Env); err != nil { + if err := h.maybeSetSITokenEnv(request.TaskDir.SecretsDir, request.Task.Name, environment); err != nil { h.logger.Error("failed to load Consul Service Identity Token", "error", err, "task", request.Task.Name) return err } + merge(environment, h.bridgeEnv(request.TaskEnv.EnvMap)) + // tls/acl setup for native task done response.Done = true + response.Env = environment return nil } @@ -190,6 +206,25 @@ func (h *connectNativeHook) tlsEnv(env map[string]string) map[string]string { return m } +// bridgeEnv creates a set of additional environment variables to be used when launching +// the connect native task. This will enable the task to communicate with Consul +// if the task is running inside an alloc's network namespace (i.e. bridge mode). +// +// Sets CONSUL_HTTP_ADDR if not already set. +func (h *connectNativeHook) bridgeEnv(env map[string]string) map[string]string { + if h.alloc.AllocatedResources.Shared.Networks[0].Mode != "bridge" { + return nil + } + + if _, exists := env["CONSUL_HTTP_ADDR"]; !exists { + return map[string]string{ + "CONSUL_HTTP_ADDR": "unix:///" + allocdir.AllocHTTPSocket, + } + } + + return nil +} + // maybeSetSITokenEnv will set the CONSUL_HTTP_TOKEN environment variable in // the given env map, if the token is found to exist in the task's secrets // directory AND the CONSUL_HTTP_TOKEN environment variable is not already set. diff --git a/client/allocrunner/taskrunner/connect_native_hook_test.go b/client/allocrunner/taskrunner/connect_native_hook_test.go index 4ebbea058a19..9b12c89fa610 100644 --- a/client/allocrunner/taskrunner/connect_native_hook_test.go +++ b/client/allocrunner/taskrunner/connect_native_hook_test.go @@ -162,6 +162,46 @@ func TestConnectNativeHook_tlsEnv(t *testing.T) { }) } +func TestConnectNativeHook_bridgeEnv_bridge(t *testing.T) { + t.Parallel() + + hook := new(connectNativeHook) + hook.alloc = mock.ConnectNativeAlloc("bridge") + + t.Run("consul address env not preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(nil) + require.Equal(t, map[string]string{ + "CONSUL_HTTP_ADDR": "unix:///alloc/tmp/consul_http.sock", + }, result) + }) + + t.Run("consul address env is preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(map[string]string{ + "CONSUL_HTTP_ADDR": "10.1.1.1", + }) + require.Empty(t, result) + }) +} + +func TestConnectNativeHook_bridgeEnv_host(t *testing.T) { + t.Parallel() + + hook := new(connectNativeHook) + hook.alloc = mock.ConnectNativeAlloc("host") + + t.Run("consul address env not preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(nil) + require.Empty(t, result) + }) + + t.Run("consul address env is preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(map[string]string{ + "CONSUL_HTTP_ADDR": "10.1.1.1", + }) + require.Empty(t, result) + }) +} + func TestTaskRunner_ConnectNativeHook_Noop(t *testing.T) { t.Parallel() logger := testlog.HCLogger(t) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 30f4b0dfb45f..b4f29dc2f7af 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -636,26 +636,38 @@ func MaxParallelJob() *structs.Job { func ConnectJob() *structs.Job { job := Job() tg := job.TaskGroups[0] - tg.Networks = []*structs.NetworkResource{ - { - Mode: "bridge", - }, - } - tg.Services = []*structs.Service{ - { - Name: "testconnect", - PortLabel: "9999", - Connect: &structs.ConsulConnect{ - SidecarService: &structs.ConsulSidecarService{}, - }, + tg.Services = []*structs.Service{{ + Name: "testconnect", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + SidecarService: new(structs.ConsulSidecarService), }, - } + }} tg.Networks = structs.Networks{{ Mode: "bridge", // always bridge ... for now? }} return job } +func ConnectNativeJob(mode string) *structs.Job { + job := Job() + tg := job.TaskGroups[0] + tg.Networks = []*structs.NetworkResource{{ + Mode: mode, + }} + tg.Services = []*structs.Service{{ + Name: "test_connect_native", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + Native: true, + }, + }} + tg.Tasks = []*structs.Task{{ + Name: "native_task", + }} + return job +} + func BatchJob() *structs.Job { job := &structs.Job{ Region: "global", @@ -920,6 +932,16 @@ func ConnectAlloc() *structs.Allocation { return alloc } +func ConnectNativeAlloc(mode string) *structs.Allocation { + alloc := Alloc() + alloc.Job = ConnectNativeJob(mode) + alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{{ + Mode: mode, + IP: "10.0.0.1", + }} + return alloc +} + func BatchConnectJob() *structs.Job { job := &structs.Job{ Region: "global",