Skip to content

Commit

Permalink
Backport of Allow overriding gRPC's connection timeout with VAULT_GRP…
Browse files Browse the repository at this point in the history
…C_MIN_CONNECT_TIMEOUT into release/1.11.x (#19678)

* Allow overriding gRPC's connection timeout with VAULT_GRPC_MIN_CONNECT_TIMEOUT (#19676)
* Include change from #19701.
  • Loading branch information
hc-github-team-secure-vault-core authored Mar 23, 2023
1 parent 85fcea4 commit 947beba
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 6 deletions.
4 changes: 4 additions & 0 deletions changelog/19676.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:improvement
core: Allow overriding gRPC connect timeout via VAULT_GRPC_MIN_CONNECT_TIMEOUT. This is an env var rather than a config setting because we don't expect this to ever be needed. It's being added as a last-ditch
option in case all else fails for some replication issues we may not have fully reproduced.
```
3 changes: 2 additions & 1 deletion vault/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ func (c *Core) startClusterListener(ctx context.Context) error {
c.clusterListener.Store(cluster.NewListener(networkLayer,
c.clusterCipherSuites,
listenerLogger,
5*c.clusterHeartbeatInterval))
5*c.clusterHeartbeatInterval,
c.grpcMinConnectTimeout))

c.AddLogger(listenerLogger)

Expand Down
17 changes: 15 additions & 2 deletions vault/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ type Listener struct {
logger log.Logger
l sync.RWMutex
tlsConnectionLoggingLevel log.Level
grpcMinConnectTimeout time.Duration
}

func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener {
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout, grpcMinConnectTimeout time.Duration) *Listener {
var maxStreams uint32 = math.MaxUint32
if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" {
i, err := strconv.ParseUint(override, 10, 32)
Expand Down Expand Up @@ -111,6 +112,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo
cipherSuites: cipherSuites,
logger: logger,
tlsConnectionLoggingLevel: log.LevelFromString(os.Getenv("VAULT_CLUSTER_TLS_SESSION_LOG_LEVEL")),
grpcMinConnectTimeout: grpcMinConnectTimeout,
}
}

Expand Down Expand Up @@ -461,10 +463,21 @@ func (cl *Listener) GetDialerFunc(ctx context.Context, alpn string) func(string,
}

tlsConfig.NextProtos = []string{alpn}
cl.logger.Debug("creating rpc dialer", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName)
args := []interface{}{
"address", addr,
"alpn", alpn,
"host", tlsConfig.ServerName,
"timeout", fmt.Sprintf("%s", timeout),
}
if cl.grpcMinConnectTimeout != 0 {
args = append(args, "timeout_env_override", fmt.Sprintf("%s", cl.grpcMinConnectTimeout))
}
cl.logger.Debug("creating rpc dialer", args...)

start := time.Now()
conn, err := cl.networkLayer.Dial(addr, timeout, tlsConfig)
if err != nil {
cl.logger.Debug("dial failure", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName, "duration", fmt.Sprintf("%s", time.Since(start)), "error", err)
return nil, err
}
cl.logTLSSessionStart(conn.RemoteAddr().String(), conn.ConnectionState())
Expand Down
2 changes: 1 addition & 1 deletion vault/cluster/inmem_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
if l.forceTimeout == addr {
l.logger.Debug("forcing timeout", "addr", addr, "me", l.addr)

// gRPC sets a deadline of 20 seconds on the dail attempt, so
// gRPC sets a deadline of 20 seconds on the dial attempt, so
// matching that here.
time.Sleep(time.Second * 20)
l.l.Unlock()
Expand Down
13 changes: 13 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ type Core struct {
// only the active node will actually write the new version timestamp, a perf
// standby shouldn't rely on the stored version timestamps being present.
versionHistory map[string]VaultVersion

// if populated, override the default gRPC min connect timeout (currently 20s in grpc 1.51)
grpcMinConnectTimeout time.Duration
}

func (c *Core) HAState() consts.HAState {
Expand Down Expand Up @@ -1135,6 +1138,16 @@ func NewCore(conf *CoreConfig) (*Core, error) {
c.versionHistory = make(map[string]VaultVersion)
}

minConnectTimeoutRaw := os.Getenv("VAULT_GRPC_MIN_CONNECT_TIMEOUT")
if minConnectTimeoutRaw != "" {
dur, err := time.ParseDuration(minConnectTimeoutRaw)
if err != nil {
c.logger.Warn("VAULT_GRPC_MIN_CONNECT_TIMEOUT contains non-duration value, ignoring")
} else if dur != 0 {
c.grpcMinConnectTimeout = dur
}
}

return c, nil
}

Expand Down
14 changes: 12 additions & 2 deletions vault/request_forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hashicorp/vault/vault/replication"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -275,7 +276,8 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
// ALPN header right. It's just "insecure" because GRPC isn't managing
// the TLS state.
dctx, cancelFunc := context.WithCancel(ctx)
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host,

opts := []grpc.DialOption{
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
grpc.WithInsecure(), // it's not, we handle it in the dialer
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Expand All @@ -284,7 +286,15 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
))
),
}
if c.grpcMinConnectTimeout != 0 {
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: c.grpcMinConnectTimeout,
Backoff: backoff.DefaultConfig,
}))
}
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, opts...)
if err != nil {
cancelFunc()
c.logger.Error("err setting up forwarding rpc client", "error", err)
Expand Down

0 comments on commit 947beba

Please sign in to comment.