Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consul/connect: add support for bridge networks with connect native tasks #8443

Merged
merged 4 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ var (
// AllocGRPCSocket is the path relative to the task dir root for the
// unix socket connected to Consul's gRPC endpoint.
AllocGRPCSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_grpc.sock")

// AllocHTTPSocket is the path relative to the task dir root for the unix
// socket connected to Consul's HTTP endpoint.
AllocHTTPSocket = filepath.Join(SharedAllocName, TmpDirName, "consul_http.sock")
)

// AllocDir allows creating, destroying, and accessing an allocation's
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
}),
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,57 @@ import (
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/pkg/errors"
)

// consulSockHook creates Unix sockets to allow communication from inside a
// netns to Consul.
//
// Noop for allocations without a group Connect stanza.
type consulSockHook struct {
alloc *structs.Allocation
const (
consulGRPCSockHookName = "consul_grpc_socket"

proxy *sockProxy
// socketProxyStopWaitTime is the amount of time to wait for a socket proxy
// to stop before assuming something went awry and return a timeout error.
socketProxyStopWaitTime = 3 * time.Second
)

// mu synchronizes group & cancel as they may be mutated and accessed
// concurrently via Prerun, Update, Postrun.
mu sync.Mutex
var (
errSocketProxyTimeout = errors.New("timed out waiting for socket proxy to exit")
)

// consulGRPCSocketHook creates Unix sockets to allow communication from inside a
// netns to Consul gRPC endpoint.
//
// Noop for allocations without a group Connect stanza using bridge networking.
type consulGRPCSocketHook struct {
logger hclog.Logger

// mu synchronizes proxy and alloc which may be mutated and read concurrently
// via Prerun, Update, Postrun.
mu sync.Mutex
alloc *structs.Allocation
proxy *grpcSocketProxy
}

func newConsulSockHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulSockHook {
h := &consulSockHook{
alloc: alloc,
proxy: newSockProxy(logger, allocDir, config),
func newConsulGRPCSocketHook(logger hclog.Logger, alloc *structs.Allocation, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *consulGRPCSocketHook {
return &consulGRPCSocketHook{
alloc: alloc,
proxy: newGRPCSocketProxy(logger, allocDir, config),
logger: logger.Named(consulGRPCSockHookName),
}
h.logger = logger.Named(h.Name())
return h
}

func (*consulSockHook) Name() string {
return "consul_socket"
func (*consulGRPCSocketHook) Name() string {
return consulGRPCSockHookName
}

// shouldRun returns true if the Unix socket should be created and proxied.
// Requires the mutex to be held.
func (h *consulSockHook) shouldRun() bool {
func (h *consulGRPCSocketHook) shouldRun() bool {
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)

// we must be in bridge networking and at least one connect sidecar task
if !tgFirstNetworkIsBridge(tg) {
return false
}

for _, s := range tg.Services {
if s.Connect.HasSidecar() {
return true
Expand All @@ -60,7 +76,7 @@ func (h *consulSockHook) shouldRun() bool {
return false
}

func (h *consulSockHook) Prerun() error {
func (h *consulGRPCSocketHook) Prerun() error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -73,7 +89,7 @@ func (h *consulSockHook) Prerun() error {

// Update creates a gRPC socket file and proxy if there are any Connect
// services.
func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error {
func (h *consulGRPCSocketHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -86,7 +102,7 @@ func (h *consulSockHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.proxy.run(h.alloc)
}

func (h *consulSockHook) Postrun() error {
func (h *consulGRPCSocketHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -98,21 +114,20 @@ func (h *consulSockHook) Postrun() error {
return nil
}

type sockProxy struct {
type grpcSocketProxy struct {
logger hclog.Logger
allocDir *allocdir.AllocDir
config *config.ConsulConfig

ctx context.Context
cancel func()
doneCh chan struct{}
runOnce bool

logger hclog.Logger
}

func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *sockProxy {
func newGRPCSocketProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *config.ConsulConfig) *grpcSocketProxy {
ctx, cancel := context.WithCancel(context.Background())
return &sockProxy{
return &grpcSocketProxy{
allocDir: allocDir,
config: config,
ctx: ctx,
Expand All @@ -126,50 +141,55 @@ func newSockProxy(logger hclog.Logger, allocDir *allocdir.AllocDir, config *conf
// hasn't been told to stop.
//
// NOT safe for concurrent use.
func (s *sockProxy) run(alloc *structs.Allocation) error {
func (p *grpcSocketProxy) run(alloc *structs.Allocation) error {
// Only run once.
if s.runOnce {
if p.runOnce {
return nil
}

// Only run once. Never restart.
select {
case <-s.doneCh:
s.logger.Trace("socket proxy already shutdown; exiting")
case <-p.doneCh:
p.logger.Trace("socket proxy already shutdown; exiting")
return nil
case <-s.ctx.Done():
s.logger.Trace("socket proxy already done; exiting")
case <-p.ctx.Done():
p.logger.Trace("socket proxy already done; exiting")
return nil
default:
}

destAddr := s.config.GRPCAddr
// make sure either grpc or http consul address has been configured
if p.config.GRPCAddr == "" && p.config.Addr == "" {
return errors.New("consul address must be set on nomad client")
}

destAddr := p.config.GRPCAddr
if destAddr == "" {
// No GRPCAddr defined. Use Addr but replace port with the gRPC
// default of 8502.
host, _, err := net.SplitHostPort(s.config.Addr)
host, _, err := net.SplitHostPort(p.config.Addr)
if err != nil {
return fmt.Errorf("error parsing Consul address %q: %v",
s.config.Addr, err)
p.config.Addr, err)
}

destAddr = net.JoinHostPort(host, "8502")
}

hostGRPCSockPath := filepath.Join(s.allocDir.AllocDir, allocdir.AllocGRPCSocket)
hostGRPCSocketPath := filepath.Join(p.allocDir.AllocDir, allocdir.AllocGRPCSocket)

// if the socket already exists we'll try to remove it, but if not then any
// other errors will bubble up to the caller here or when we try to listen
_, err := os.Stat(hostGRPCSockPath)
_, err := os.Stat(hostGRPCSocketPath)
if err == nil {
err := os.Remove(hostGRPCSockPath)
err := os.Remove(hostGRPCSocketPath)
if err != nil {
return fmt.Errorf(
"unable to remove existing unix socket for Consul gRPC endpoint: %v", err)
}
}

listener, err := net.Listen("unix", hostGRPCSockPath)
listener, err := net.Listen("unix", hostGRPCSocketPath)
if err != nil {
return fmt.Errorf("unable to create unix socket for Consul gRPC endpoint: %v", err)
}
Expand All @@ -179,55 +199,55 @@ func (s *sockProxy) run(alloc *structs.Allocation) error {
// socket permissions when creating the file, so we must manually call
// chmod afterwards.
// https://github.com/golang/go/issues/11822
if err := os.Chmod(hostGRPCSockPath, os.ModePerm); err != nil {
if err := os.Chmod(hostGRPCSocketPath, os.ModePerm); err != nil {
return fmt.Errorf("unable to set permissions on unix socket for Consul gRPC endpoint: %v", err)
}

go func() {
proxy(s.ctx, s.logger, destAddr, listener)
s.cancel()
close(s.doneCh)
proxy(p.ctx, p.logger, destAddr, listener)
p.cancel()
close(p.doneCh)
}()

s.runOnce = true
p.runOnce = true
return nil
}

// stop the proxy and blocks until the proxy has stopped. Returns an error if
// the proxy does not exit in a timely fashion.
func (s *sockProxy) stop() error {
s.cancel()
func (p *grpcSocketProxy) stop() error {
p.cancel()

// If proxy was never run, don't wait for anything to shutdown.
if !s.runOnce {
if !p.runOnce {
return nil
}

select {
case <-s.doneCh:
case <-p.doneCh:
return nil
case <-time.After(3 * time.Second):
return fmt.Errorf("timed out waiting for proxy to exit")
case <-time.After(socketProxyStopWaitTime):
return errSocketProxyTimeout
}
}

// Proxy between a listener and dest
func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener) {
// Proxy between a listener and destination.
func proxy(ctx context.Context, logger hclog.Logger, destAddr string, l net.Listener) {
// Wait for all connections to be done before exiting to prevent
// goroutine leaks.
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(ctx)
defer func() {
// Must cancel context and close listener before waiting
cancel()
l.Close()
_ = l.Close()
wg.Wait()
}()

// Close Accept() when context is cancelled
go func() {
<-ctx.Done()
l.Close()
_ = l.Close()
}()

for ctx.Err() == nil {
Expand All @@ -237,14 +257,14 @@ func proxy(ctx context.Context, logger hclog.Logger, dest string, l net.Listener
// Accept errors during shutdown are to be expected
return
}
logger.Error("error in grpc proxy; shutting down proxy", "error", err, "dest", dest)
logger.Error("error in socket proxy; shutting down proxy", "error", err, "dest", destAddr)
return
}

wg.Add(1)
go func() {
defer wg.Done()
proxyConn(ctx, logger, dest, conn)
proxyConn(ctx, logger, destAddr, conn)
}()
}
}
Expand Down Expand Up @@ -286,7 +306,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n
wg := sync.WaitGroup{}
defer wg.Wait()

// socket -> gRPC
// socket -> consul
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -305,7 +325,7 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n
)
}()

// gRPC -> socket
// consul -> socket
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -326,6 +346,6 @@ func proxyConn(ctx context.Context, logger hclog.Logger, destAddr string, conn n

// When cancelled close connections to break out of copies goroutines.
<-ctx.Done()
conn.Close()
dest.Close()
_ = conn.Close()
_ = dest.Close()
}
Loading