From e51aa98672c0f3cf6cae82291fd634a47455cce0 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Wed, 22 Mar 2023 14:51:37 -0400 Subject: [PATCH] Allow overriding gRPC's connection timeout with VAULT_GRPC_MIN_CONNECT_TIMEOUT (#19676) --- changelog/19676.txt | 4 ++++ vault/cluster.go | 3 ++- vault/cluster/cluster.go | 17 +++++++++++++++-- vault/cluster/inmem_layer.go | 2 +- vault/core.go | 13 +++++++++++++ vault/request_forwarding.go | 10 ++++++++-- 6 files changed, 43 insertions(+), 6 deletions(-) create mode 100644 changelog/19676.txt diff --git a/changelog/19676.txt b/changelog/19676.txt new file mode 100644 index 000000000000..090dc801b2df --- /dev/null +++ b/changelog/19676.txt @@ -0,0 +1,4 @@ +```release-note:improvement +core: Allow overriding gRPC connect timeout via VAULT_GRPC_MIN_CONNECT_TIMEOUT. This is an env var rather than a config setting because we don't expect this to ever be needed. It's being added as a last-ditch +option in case all else fails for some replication issues we may not have fully reproduced. +``` diff --git a/vault/cluster.go b/vault/cluster.go index bc1d352e5337..191133c121e8 100644 --- a/vault/cluster.go +++ b/vault/cluster.go @@ -327,7 +327,8 @@ func (c *Core) startClusterListener(ctx context.Context) error { c.clusterListener.Store(cluster.NewListener(networkLayer, c.clusterCipherSuites, listenerLogger, - 5*c.clusterHeartbeatInterval)) + 5*c.clusterHeartbeatInterval, + c.grpcMinConnectTimeout)) c.AddLogger(listenerLogger) diff --git a/vault/cluster/cluster.go b/vault/cluster/cluster.go index fca8ca8967ea..1111bf2f47e1 100644 --- a/vault/cluster/cluster.go +++ b/vault/cluster/cluster.go @@ -72,9 +72,10 @@ type Listener struct { logger log.Logger l sync.RWMutex tlsConnectionLoggingLevel log.Level + grpcMinConnectTimeout time.Duration } -func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener { +func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout, grpcMinConnectTimeout time.Duration) *Listener { var maxStreams uint32 = math.MaxUint32 if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" { i, err := strconv.ParseUint(override, 10, 32) @@ -111,6 +112,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo cipherSuites: cipherSuites, logger: logger, tlsConnectionLoggingLevel: log.LevelFromString(os.Getenv("VAULT_CLUSTER_TLS_SESSION_LOG_LEVEL")), + grpcMinConnectTimeout: grpcMinConnectTimeout, } } @@ -461,10 +463,21 @@ func (cl *Listener) GetDialerFunc(ctx context.Context, alpn string) func(string, } tlsConfig.NextProtos = []string{alpn} - cl.logger.Debug("creating rpc dialer", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName) + args := []interface{}{ + "address", addr, + "alpn", alpn, + "host", tlsConfig.ServerName, + "timeout", fmt.Sprintf("%s", timeout), + } + if cl.grpcMinConnectTimeout != 0 { + args = append(args, "timeout_env_override", fmt.Sprintf("%s", cl.grpcMinConnectTimeout)) + } + cl.logger.Debug("creating rpc dialer", args...) + start := time.Now() conn, err := cl.networkLayer.Dial(addr, timeout, tlsConfig) if err != nil { + cl.logger.Debug("dial failure", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName, "duration", fmt.Sprintf("%s", time.Since(start)), "error", err) return nil, err } cl.logTLSSessionStart(conn.RemoteAddr().String(), conn.ConnectionState()) diff --git a/vault/cluster/inmem_layer.go b/vault/cluster/inmem_layer.go index 36053ec96bbb..66b1e412b57b 100644 --- a/vault/cluster/inmem_layer.go +++ b/vault/cluster/inmem_layer.go @@ -129,7 +129,7 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con if l.forceTimeout == addr { l.logger.Debug("forcing timeout", "addr", addr, "me", l.addr) - // gRPC sets a deadline of 20 seconds on the dail attempt, so + // gRPC sets a deadline of 20 seconds on the dial attempt, so // matching that here. time.Sleep(time.Second * 20) l.l.Unlock() diff --git a/vault/core.go b/vault/core.go index a9e9ac95f6c4..d4d40f6ba5d3 100644 --- a/vault/core.go +++ b/vault/core.go @@ -645,6 +645,9 @@ type Core struct { pendingRemovalMountsAllowed bool expirationRevokeRetryBase time.Duration + + // if populated, override the default gRPC min connect timeout (currently 20s in grpc 1.51) + grpcMinConnectTimeout time.Duration } func (c *Core) HAState() consts.HAState { @@ -1186,6 +1189,16 @@ func NewCore(conf *CoreConfig) (*Core, error) { c.versionHistory = make(map[string]VaultVersion) } + minConnectTimeoutRaw := os.Getenv("VAULT_GRPC_MIN_CONNECT_TIMEOUT") + if minConnectTimeoutRaw != "" { + dur, err := time.ParseDuration(minConnectTimeoutRaw) + if err != nil { + c.logger.Warn("VAULT_GRPC_MIN_CONNECT_TIMEOUT contains non-duration value, ignoring") + } else if dur != 0 { + c.grpcMinConnectTimeout = dur + } + } + return c, nil } diff --git a/vault/request_forwarding.go b/vault/request_forwarding.go index b68b6a0d6b01..2b436d46f2a9 100644 --- a/vault/request_forwarding.go +++ b/vault/request_forwarding.go @@ -275,7 +275,8 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd // ALPN header right. It's just "insecure" because GRPC isn't managing // the TLS state. dctx, cancelFunc := context.WithCancel(ctx) - c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, + + opts := []grpc.DialOption{ grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)), grpc.WithInsecure(), // it's not, we handle it in the dialer grpc.WithKeepaliveParams(keepalive.ClientParameters{ @@ -284,7 +285,12 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(math.MaxInt32), grpc.MaxCallSendMsgSize(math.MaxInt32), - )) + ), + } + if c.grpcMinConnectTimeout != 0 { + opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{MinConnectTimeout: c.grpcMinConnectTimeout})) + } + c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, opts...) if err != nil { cancelFunc() c.logger.Error("err setting up forwarding rpc client", "error", err)