diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index f8ef62efb0ff..2ac23e958224 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -64,6 +64,10 @@ var ( // AllocGRPCSocket is the path relative to the task dir root for the // unix socket connected to Consul's gRPC endpoint. AllocGRPCSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_grpc.sock") + + // AllocHTTPSocket is the path relative to the task dir root for the unix + // socket connected to Consul's HTTP endpoint. + AllocHTTPSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_http.sock") ) // AllocDir allows creating, destroying, and accessing an allocation's diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index d31570d22ef6..e209ff622d6c 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -167,7 +167,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir), logger: hookLogger, }), - newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), + newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), + newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs), } diff --git a/client/allocrunner/consulsock_hook.go b/client/allocrunner/consul_grpc_sock_hook.go similarity index 67% rename from client/allocrunner/consulsock_hook.go rename to client/allocrunner/consul_grpc_sock_hook.go index 827e0b0f0d83..13b4a286383f 100644 --- a/client/allocrunner/consulsock_hook.go +++ b/client/allocrunner/consul_grpc_sock_hook.go @@ -16,43 +16,49 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/pkg/errors" ) -// consulSockHook creates Unix sockets to allow communication from inside a -// netns to Consul. -// -// Noop for allocations without a group Connect stanza. -type consulSockHook struct { - alloc *structs.Allocation +const ( + consulGRPCSockHookName = "consul_grpc_socket" +) + +var ( + errSocketProxyTimeout = errors.New("timed out waiting for socket proxy to exit") +) - proxy *sockProxy +// consulGRPCSocketHook creates Unix sockets to allow communication from inside a +// netns to Consul gRPC endpoint. +// +// Noop for allocations without a group Connect stanza using bridge networking. +type consulGRPCSocketHook struct { + logger hclog.Logger + alloc *structs.Allocation + proxy *grpcSocketProxy - // mu synchronizes group & cancel as they may be mutated and accessed + // mu synchronizes proxy as they may be mutated and accessed // concurrently via Prerun, Update, Postrun. mu sync.Mutex - - logger hclog.Logger } -func newConsulSockHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulSockHook { - h := &consulSockHook{ - alloc: alloc, - proxy: newSockProxy(logger, allocDir, config), +func newConsulGRPCSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulGRPCSocketHook { + return &consulGRPCSocketHook{ + alloc: alloc, + proxy: newGRPCSocketProxy(logger, allocDir, config), + logger: logger.Named(consulGRPCSockHookName), } - h.logger = logger.Named(h.Name()) - return h } -func (*consulSockHook) Name() string { - return "consul_socket" +func (*consulGRPCSocketHook) Name() string { + return consulGRPCSockHookName } // shouldRun returns true if the Unix socket should be created and proxied. // Requires the mutex to be held. -func (h *consulSockHook) shouldRun() bool { +func (h *consulGRPCSocketHook) shouldRun() bool { tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) for _, s := range tg.Services { - if s.Connect.HasSidecar() { + if s.Connect.HasSidecar() { // todo(shoenig) check we are in bridge mode return true } } @@ -60,7 +66,7 @@ func (h *consulSockHook) shouldRun() bool { return false } -func (h *consulSockHook) Prerun() error { +func (h *consulGRPCSocketHook) Prerun() error { h.mu.Lock() defer h.mu.Unlock() @@ -73,7 +79,7 @@ func (h *consulSockHook) Prerun() error { // Update creates a gRPC socket file and proxy if there are any Connect // services. -func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error { +func (h *consulGRPCSocketHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() @@ -86,7 +92,7 @@ func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error { return h.proxy.run(h.alloc) } -func (h *consulSockHook) Postrun() error { +func (h *consulGRPCSocketHook) Postrun() error { h.mu.Lock() defer h.mu.Unlock() @@ -98,7 +104,8 @@ func (h *consulSockHook) Postrun() error { return nil } -type sockProxy struct { +type grpcSocketProxy struct { + logger hclog.Logger allocDir *allocdir.AllocDir config *config.ConsulConfig @@ -106,13 +113,11 @@ type sockProxy struct { cancel func() doneCh chan struct{} runOnce bool - - logger hclog.Logger } -func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *sockProxy { +func newGRPCSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *grpcSocketProxy { ctx, cancel := context.WithCancel(context.Background()) - return &sockProxy{ + return &grpcSocketProxy{ allocDir: allocDir, config: config, ctx: ctx, @@ -126,50 +131,55 @@ func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *conf // hasn't been told to stop. // // NOT safe for concurrent use. -func (s *sockProxy) run(alloc *structs.Allocation) error { +func (p *grpcSocketProxy) run(alloc *structs.Allocation) error { // Only run once. - if s.runOnce { + if p.runOnce { return nil } // Only run once. Never restart. select { - case <-s.doneCh: - s.logger.Trace("socket proxy already shutdown; exiting") + case <-p.doneCh: + p.logger.Trace("socket proxy already shutdown; exiting") return nil - case <-s.ctx.Done(): - s.logger.Trace("socket proxy already done; exiting") + case <-p.ctx.Done(): + p.logger.Trace("socket proxy already done; exiting") return nil default: } - destAddr := s.config.GRPCAddr + // make sure either grpc or http consul address has been configured + if p.config.GRPCAddr == "" && p.config.Addr == "" { + return errors.New("consul address must be set on nomad client") + } + + destAddr := p.config.GRPCAddr if destAddr == "" { // No GRPCAddr defined. Use Addr but replace port with the gRPC // default of 8502. - host, _, err := net.SplitHostPort(s.config.Addr) + host, _, err := net.SplitHostPort(p.config.Addr) if err != nil { return fmt.Errorf("error parsing Consul address %q: %v", - s.config.Addr, err) + p.config.Addr, err) } destAddr = net.JoinHostPort(host, "8502") } - hostGRPCSockPath := filepath.Join(s.allocDir.AllocDir, allocdir.AllocGRPCSocket) + hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocGRPCSocket) // if the socket already exists we'll try to remove it, but if not then any // other errors will bubble up to the caller here or when we try to listen - _, err := os.Stat(hostGRPCSockPath) + _, err := os.Stat(hostGRPCSocketPath) if err == nil { - err := os.Remove(hostGRPCSockPath) + err := os.Remove(hostGRPCSocketPath) if err != nil { return fmt.Errorf( "unable to remove existing unix socket for Consul gRPC endpoint: %v", err) } } - listener, err := net.Listen("unix", hostGRPCSockPath) + listener, err := net.Listen("unix", hostGRPCSocketPath) if err != nil { return fmt.Errorf("unable to create unix socket for Consul gRPC endpoint: %v", err) } @@ -179,40 +189,40 @@ func (s *sockProxy) run(alloc *structs.Allocation) error { // socket permissions when creating the file, so we must manually call // chmod afterwards. // https://github.com/golang/go/issues/11822 - if err := os.Chmod(hostGRPCSockPath, os.ModePerm); err != nil { + if err := os.Chmod(hostGRPCSocketPath, os.ModePerm); err != nil { return fmt.Errorf("unable to set permissions on unix socket for Consul gRPC endpoint: %v", err) } go func() { - proxy(s.ctx, s.logger, destAddr, listener) - s.cancel() - close(s.doneCh) + proxy(p.ctx, p.logger, destAddr, listener) + p.cancel() + close(p.doneCh) }() - s.runOnce = true + p.runOnce = true return nil } // stop the proxy and blocks until the proxy has stopped. Returns an error if // the proxy does not exit in a timely fashion. -func (s *sockProxy) stop() error { - s.cancel() +func (p *grpcSocketProxy) stop() error { + p.cancel() // If proxy was never run, don't wait for anything to shutdown. - if !s.runOnce { + if !p.runOnce { return nil } select { - case <-s.doneCh: + case <-p.doneCh: return nil case <-time.After(3 * time.Second): - return fmt.Errorf("timed out waiting for proxy to exit") + return errSocketProxyTimeout } } -// Proxy between a listener and dest -func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener) { +// Proxy between a listener and destination. +func proxy(ctx context.Context, logger hclog.Logger, destAddr string, l net.Listener) { // Wait for all connections to be done before exiting to prevent // goroutine leaks. wg := sync.WaitGroup{} @@ -220,14 +230,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener defer func() { // Must cancel context and close listener before waiting cancel() - l.Close() + _ = l.Close() wg.Wait() }() // Close Accept() when context is cancelled go func() { <-ctx.Done() - l.Close() + _ = l.Close() }() for ctx.Err() == nil { @@ -237,14 +247,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener // Accept errors during shutdown are to be expected return } - logger.Error("error in grpc proxy; shutting down proxy", "error", err, "dest", dest) + logger.Error("error in socket proxy; shutting down proxy", "error", err, "dest", destAddr) return } wg.Add(1) go func() { defer wg.Done() - proxyConn(ctx, logger, dest, conn) + proxyConn(ctx, logger, destAddr, conn) }() } } @@ -286,7 +296,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n wg := sync.WaitGroup{} defer wg.Wait() - // socket -> gRPC + // socket -> consul wg.Add(1) go func() { defer wg.Done() @@ -305,7 +315,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n ) }() - // gRPC -> socket + // consul -> socket wg.Add(1) go func() { defer wg.Done() @@ -326,6 +336,6 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n // When cancelled close connections to break out of copies goroutines. <-ctx.Done() - conn.Close() - dest.Close() + _ = conn.Close() + _ = dest.Close() } diff --git a/client/allocrunner/consulsock_hook_test.go b/client/allocrunner/consul_grpc_sock_hook_test.go similarity index 86% rename from client/allocrunner/consulsock_hook_test.go rename to client/allocrunner/consul_grpc_sock_hook_test.go index 4651d3fef875..f700c8ce4ec1 100644 --- a/client/allocrunner/consulsock_hook_test.go +++ b/client/allocrunner/consul_grpc_sock_hook_test.go @@ -20,10 +20,10 @@ import ( "github.com/stretchr/testify/require" ) -// TestConsulSockHook_PrerunPostrun_Ok asserts that a proxy is started when the +// TestConsulGRPCSocketHook_PrerunPostrun_Ok asserts that a proxy is started when the // Consul unix socket hook's Prerun method is called and stopped with the // Postrun method is called. -func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { +func TestConsulGRPCSocketHook_PrerunPostrun_Ok(t *testing.T) { t.Parallel() // As of Consul 1.6.0 the test server does not support the gRPC @@ -43,7 +43,7 @@ func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { defer cleanup() // Start the unix socket proxy - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) gRPCSock := filepath.Join(allocDir.AllocDir, allocdir.AllocGRPCSocket) @@ -98,9 +98,9 @@ func TestConsulSockHook_PrerunPostrun_Ok(t *testing.T) { require.Zero(t, n) } -// TestConsulSockHook_Prerun_Error asserts that invalid Consul addresses cause +// TestConsulGRPCSocketHook_Prerun_Error asserts that invalid Consul addresses cause // Prerun to return an error if the alloc requires a grpc proxy. -func TestConsulSockHook_Prerun_Error(t *testing.T) { +func TestConsulGRPCSocketHook_Prerun_Error(t *testing.T) { t.Parallel() logger := testlog.HCLogger(t) @@ -117,7 +117,7 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // An alloc without a Connect proxy sidecar should not return // an error. - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) // Postrun should be a noop @@ -127,8 +127,8 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // An alloc *with* a Connect proxy sidecar *should* return an error // when Consul is not configured. - h := newConsulSockHook(logger, connectAlloc, allocDir, consulConfig) - require.Error(t, h.Prerun()) + h := newConsulGRPCSocketHook(logger, connectAlloc, allocDir, consulConfig) + require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") // Postrun should be a noop require.NoError(t, h.Postrun()) @@ -137,22 +137,22 @@ func TestConsulSockHook_Prerun_Error(t *testing.T) { { // Updating an alloc without a sidecar to have a sidecar should // error when the sidecar is added. - h := newConsulSockHook(logger, alloc, allocDir, consulConfig) + h := newConsulGRPCSocketHook(logger, alloc, allocDir, consulConfig) require.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{ Alloc: connectAlloc, } - require.Error(t, h.Update(req)) + require.EqualError(t, h.Update(req), "consul address must be set on nomad client") // Postrun should be a noop require.NoError(t, h.Postrun()) } } -// TestConsulSockHook_proxy_Unix asserts that the destination can be a unix +// TestConsulGRPCSocketHook_proxy_Unix asserts that the destination can be a unix // socket path. -func TestConsulSockHook_proxy_Unix(t *testing.T) { +func TestConsulGRPCSocketHook_proxy_Unix(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "nomadtest_proxy_Unix") diff --git a/client/allocrunner/consul_http_sock_hook.go b/client/allocrunner/consul_http_sock_hook.go new file mode 100644 index 000000000000..6f33736fccdd --- /dev/null +++ b/client/allocrunner/consul_http_sock_hook.go @@ -0,0 +1,202 @@ +package allocrunner + +import ( + "context" + "net" + "os" + "path/filepath" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/pkg/errors" +) + +const ( + consulHTTPSocketHookName = "consul_http_socket" +) + +type consulHTTPSockHook struct { + logger hclog.Logger + alloc *structs.Allocation + proxy *httpSocketProxy + + // lock synchronizes proxy which may be mutated and read concurrently via + // Prerun, Update, and Postrun. + lock sync.Mutex +} + +func newConsulHTTPSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulHTTPSockHook { + return &consulHTTPSockHook{ + alloc: alloc, + proxy: newHTTPSocketProxy(logger, allocDir, config), + logger: logger.Named(consulHTTPSocketHookName), + } +} + +func (*consulHTTPSockHook) Name() string { + return consulHTTPSocketHookName +} + +// shouldRun returns true if the alloc contains at least one connect native +// task and has a network configured in bridge mode +// +// todo(shoenig): what about CNI networks? +func (h *consulHTTPSockHook) shouldRun() bool { + // we must be in bridge networking and at least one connect native task + tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) + if len(tg.Networks) < 1 || tg.Networks[0].Mode != "bridge" { + return false + } + + for _, service := range tg.Services { + if service.Connect.IsNative() { + return true + } + } + return false +} + +func (h *consulHTTPSockHook) Prerun() error { + h.lock.Lock() + defer h.lock.Unlock() + + if !h.shouldRun() { + return nil + } + + return h.proxy.run(h.alloc) +} + +func (h *consulHTTPSockHook) Update(req *interfaces.RunnerUpdateRequest) error { + h.lock.Lock() + defer h.lock.Unlock() + + h.alloc = req.Alloc + + if !h.shouldRun() { + return nil + } + + return h.proxy.run(h.alloc) +} + +func (h *consulHTTPSockHook) Postrun() error { + h.lock.Lock() + defer h.lock.Unlock() + + if err := h.proxy.stop(); err != nil { + // Only log a failure to stop, worst case is the proxy leaks a goroutine. + h.logger.Debug("error stopping Consul HTTP proxy", "error", err) + } + + return nil +} + +type httpSocketProxy struct { + logger hclog.Logger + allocDir *allocdir.AllocDir + config *config.ConsulConfig + + ctx context.Context + cancel func() + doneCh chan struct{} + runOnce bool +} + +func newHTTPSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *httpSocketProxy { + ctx, cancel := context.WithCancel(context.Background()) + return &httpSocketProxy{ + logger: logger, + allocDir: allocDir, + config: config, + ctx: ctx, + cancel: cancel, + doneCh: make(chan struct{}), + } +} + +// run the httpSocketProxy for the given allocation. +// +// Assumes locking done by the calling alloc runner. +func (p *httpSocketProxy) run(alloc *structs.Allocation) error { + // Only run once. + if p.runOnce { + return nil + } + + // Never restart. + select { + case <-p.doneCh: + p.logger.Trace("consul http socket proxy already shutdown; exiting") + return nil + case <-p.ctx.Done(): + p.logger.Trace("consul http socket proxy already done; exiting") + return nil + default: + } + + // consul http dest addr + destAddr := p.config.Addr + if destAddr == "" { + return errors.New("consul address must be set on nomad client") + } + + hostHTTPSockPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocHTTPSocket) + if err := maybeRemoveOldSocket(hostHTTPSockPath); err != nil { + return err + } + + listener, err := net.Listen("unix", hostHTTPSockPath) + if err != nil { + return errors.Wrap(err, "unable to create unix socket for Consul HTTP endpoint") + } + + // The Consul HTTP socket should be usable by all users in case a task is + // running as a non-privileged user. Unix does not allow setting domain + // socket permissions when creating the file, so we must manually call + // chmod afterwords. + if err := os.Chmod(hostHTTPSockPath, os.ModePerm); err != nil { + return errors.Wrap(err, "unable to set permissions on unix socket") + } + + go func() { + proxy(p.ctx, p.logger, destAddr, listener) + p.cancel() + close(p.doneCh) + }() + + p.runOnce = true + return nil +} + +func (p *httpSocketProxy) stop() error { + p.cancel() + + // if proxy was never run, no need to wait before shutdown + if !p.runOnce { + return nil + } + + select { + case <-p.doneCh: + case <-time.After(3 * time.Second): + return errSocketProxyTimeout + } + + return nil +} + +func maybeRemoveOldSocket(socketPath string) error { + _, err := os.Stat(socketPath) + if err == nil { + if err = os.Remove(socketPath); err != nil { + return errors.Wrap(err, "unable to remove existing unix socket") + } + } + return nil +} diff --git a/client/allocrunner/consul_http_sock_hook_test.go b/client/allocrunner/consul_http_sock_hook_test.go new file mode 100644 index 000000000000..d3fd2603d99c --- /dev/null +++ b/client/allocrunner/consul_http_sock_hook_test.go @@ -0,0 +1,122 @@ +package allocrunner + +import ( + "bytes" + "net" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/stretchr/testify/require" +) + +func TestConsulSocketHook_PrerunPostrun_Ok(t *testing.T) { + t.Parallel() + + fakeConsul, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer fakeConsul.Close() + + consulConfig := &config.ConsulConfig{ + Addr: fakeConsul.Addr().String(), + } + + alloc := mock.ConnectNativeAlloc() + + logger := testlog.HCLogger(t) + + allocDir, cleanupDir := allocdir.TestAllocDir(t, logger, "ConnectNativeTask") + defer cleanupDir() + + // start unix socket proxy + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) + require.NoError(t, h.Prerun()) + + httpSocket := filepath.Join(allocDir.AllocDir, allocdir.AllocHTTPSocket) + taskCon, err := net.Dial("unix", httpSocket) + require.NoError(t, err) + + // write to consul from task to ensure data is proxied out of the netns + input := bytes.Repeat([]byte{'X'}, 5*1024) + errCh := make(chan error, 1) + go func() { + _, err := taskCon.Write(input) + errCh <- err + }() + + // accept the connection from inside the netns + consulConn, err := fakeConsul.Accept() + require.NoError(t, err) + defer consulConn.Close() + + output := make([]byte, len(input)) + _, err = consulConn.Read(output) + require.NoError(t, err) + require.NoError(t, <-errCh) + require.Equal(t, input, output) + + // read from consul to ensure http response bodies can come back + input = bytes.Repeat([]byte{'Y'}, 5*1024) + go func() { + _, err := consulConn.Write(input) + errCh <- err + }() + + output = make([]byte, len(input)) + _, err = taskCon.Read(output) + require.NoError(t, err) + require.NoError(t, <-errCh) + require.Equal(t, input, output) + + // stop the unix socket proxy + require.NoError(t, h.Postrun()) + + // consul reads should now error + n, err := consulConn.Read(output) + require.Error(t, err) + require.Zero(t, n) + + // task reads and writes should error + n, err = taskCon.Write(input) + require.Error(t, err) + require.Zero(t, n) + n, err = taskCon.Read(output) + require.Error(t, err) + require.Zero(t, n) +} + +func TestConsulHTTPSocketHook_Prerun_Error(t *testing.T) { + t.Parallel() + + logger := testlog.HCLogger(t) + + allocDir, cleanupDir := allocdir.TestAllocDir(t, logger, "ConnectNativeTask") + defer cleanupDir() + + consulConfig := new(config.ConsulConfig) + + alloc := mock.Alloc() + connectNativeAlloc := mock.ConnectNativeAlloc() + + { + // an alloc without a connect native task should not return an error + h := newConsulHTTPSocketHook(logger, alloc, allocDir, consulConfig) + require.NoError(t, h.Prerun()) + + // postrun should be a noop + require.NoError(t, h.Postrun()) + } + + { + // an alloc with a native task should return an error when consul is not + // configured + h := newConsulHTTPSocketHook(logger, connectNativeAlloc, allocDir, consulConfig) + require.EqualError(t, h.Prerun(), "consul address must be set on nomad client") + + // Postrun should be a noop + require.NoError(t, h.Postrun()) + } +} diff --git a/client/allocrunner/taskrunner/connect_native_hook.go b/client/allocrunner/taskrunner/connect_native_hook.go index 8adc61137560..4ebd2c504305 100644 --- a/client/allocrunner/taskrunner/connect_native_hook.go +++ b/client/allocrunner/taskrunner/connect_native_hook.go @@ -8,6 +8,7 @@ import ( "path/filepath" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocdir" ifs "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -43,6 +44,10 @@ func newConnectNativeHookConfig(alloc *structs.Allocation, consul *config.Consul // // If consul is configured with ACLs enabled, a Service Identity token will be // generated on behalf of the native service and supplied to the task. +// +// If the alloc is configured with bridge networking enabled, the standard +// CONSUL_HTTP_ADDR environment variable is defaulted to the unix socket created +// for the alloc by the consul_grpc_sock_hook alloc runner hook. type connectNativeHook struct { // alloc is the allocation with the connect native task being run alloc *structs.Allocation @@ -73,6 +78,13 @@ func (connectNativeHook) Name() string { return connectNativeHookName } +// merge b into a, overwriting on conflicts +func merge(a, b map[string]string) { + for k, v := range b { + a[k] = v + } +} + func (h *connectNativeHook) Prestart( ctx context.Context, request *ifs.TaskPrestartRequest, @@ -83,6 +95,8 @@ func (h *connectNativeHook) Prestart( return nil } + environment := make(map[string]string) + if h.consulShareTLS { // copy TLS certificates if err := h.copyCertificates(h.consulConfig, request.TaskDir.SecretsDir); err != nil { @@ -92,17 +106,19 @@ func (h *connectNativeHook) Prestart( // set environment variables for communicating with Consul agent, but // only if those environment variables are not already set - response.Env = h.tlsEnv(request.TaskEnv.EnvMap) - + merge(environment, h.tlsEnv(request.TaskEnv.EnvMap)) } - if err := h.maybeSetSITokenEnv(request.TaskDir.SecretsDir, request.Task.Name, response.Env); err != nil { + if err := h.maybeSetSITokenEnv(request.TaskDir.SecretsDir, request.Task.Name, environment); err != nil { h.logger.Error("failed to load Consul Service Identity Token", "error", err, "task", request.Task.Name) return err } + merge(environment, h.bridgeEnv(request.TaskEnv.EnvMap)) + // tls/acl setup for native task done response.Done = true + response.Env = environment return nil } @@ -190,6 +206,25 @@ func (h *connectNativeHook) tlsEnv(env map[string]string) map[string]string { return m } +// bridgeEnv creates a set of additional environment variables to be used when launching +// the connect native task. This will enable the task to communicate with Consul +// if the task is running inside an alloc's network namespace (i.e. bridge mode). +// +// Sets CONSUL_HTTP_ADDR if not already set. +func (h *connectNativeHook) bridgeEnv(env map[string]string) map[string]string { + if h.alloc.AllocatedResources.Shared.Networks[0].Mode != "bridge" { + return nil + } + + if _, exists := env["CONSUL_HTTP_ADDR"]; !exists { + return map[string]string{ + "CONSUL_HTTP_ADDR": "unix:///" + allocdir.AllocHTTPSocket, + } + } + + return nil +} + // maybeSetSITokenEnv will set the CONSUL_HTTP_TOKEN environment variable in // the given env map, if the token is found to exist in the task's secrets // directory AND the CONSUL_HTTP_TOKEN environment variable is not already set. diff --git a/client/allocrunner/taskrunner/connect_native_hook_test.go b/client/allocrunner/taskrunner/connect_native_hook_test.go index 4ebbea058a19..9b12c89fa610 100644 --- a/client/allocrunner/taskrunner/connect_native_hook_test.go +++ b/client/allocrunner/taskrunner/connect_native_hook_test.go @@ -162,6 +162,46 @@ func TestConnectNativeHook_tlsEnv(t *testing.T) { }) } +func TestConnectNativeHook_bridgeEnv_bridge(t *testing.T) { + t.Parallel() + + hook := new(connectNativeHook) + hook.alloc = mock.ConnectNativeAlloc("bridge") + + t.Run("consul address env not preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(nil) + require.Equal(t, map[string]string{ + "CONSUL_HTTP_ADDR": "unix:///alloc/tmp/consul_http.sock", + }, result) + }) + + t.Run("consul address env is preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(map[string]string{ + "CONSUL_HTTP_ADDR": "10.1.1.1", + }) + require.Empty(t, result) + }) +} + +func TestConnectNativeHook_bridgeEnv_host(t *testing.T) { + t.Parallel() + + hook := new(connectNativeHook) + hook.alloc = mock.ConnectNativeAlloc("host") + + t.Run("consul address env not preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(nil) + require.Empty(t, result) + }) + + t.Run("consul address env is preconfigured", func(t *testing.T) { + result := hook.bridgeEnv(map[string]string{ + "CONSUL_HTTP_ADDR": "10.1.1.1", + }) + require.Empty(t, result) + }) +} + func TestTaskRunner_ConnectNativeHook_Noop(t *testing.T) { t.Parallel() logger := testlog.HCLogger(t) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 30f4b0dfb45f..b4f29dc2f7af 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -636,26 +636,38 @@ func MaxParallelJob() *structs.Job { func ConnectJob() *structs.Job { job := Job() tg := job.TaskGroups[0] - tg.Networks = []*structs.NetworkResource{ - { - Mode: "bridge", - }, - } - tg.Services = []*structs.Service{ - { - Name: "testconnect", - PortLabel: "9999", - Connect: &structs.ConsulConnect{ - SidecarService: &structs.ConsulSidecarService{}, - }, + tg.Services = []*structs.Service{{ + Name: "testconnect", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + SidecarService: new(structs.ConsulSidecarService), }, - } + }} tg.Networks = structs.Networks{{ Mode: "bridge", // always bridge ... for now? }} return job } +func ConnectNativeJob(mode string) *structs.Job { + job := Job() + tg := job.TaskGroups[0] + tg.Networks = []*structs.NetworkResource{{ + Mode: mode, + }} + tg.Services = []*structs.Service{{ + Name: "test_connect_native", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + Native: true, + }, + }} + tg.Tasks = []*structs.Task{{ + Name: "native_task", + }} + return job +} + func BatchJob() *structs.Job { job := &structs.Job{ Region: "global", @@ -920,6 +932,16 @@ func ConnectAlloc() *structs.Allocation { return alloc } +func ConnectNativeAlloc(mode string) *structs.Allocation { + alloc := Alloc() + alloc.Job = ConnectNativeJob(mode) + alloc.AllocatedResources.Shared.Networks = []*structs.NetworkResource{{ + Mode: mode, + IP: "10.0.0.1", + }} + return alloc +} + func BatchConnectJob() *structs.Job { job := &structs.Job{ Region: "global",