From f755add9443e76d6e70f8242ef8be7356e1ccc44 Mon Sep 17 00:00:00 2001 From: Aaron Batilo Date: Wed, 7 Sep 2022 06:35:56 -0600 Subject: [PATCH] Make gRPC ServerParameters configurable (#499) --- config/config.go | 10 +++++++ config/config_test.go | 46 ++++++++++++++++++++++++++++++ config/file_config.go | 52 ++++++++++++++++++++++++++++++++++ config/mock.go | 65 +++++++++++++++++++++++++++++++++++++++++++ config_complete.toml | 49 ++++++++++++++++++++++++++++++++ route/route.go | 8 ++++-- 6 files changed, 227 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 0c3c1457a6..4c35458424 100644 --- a/config/config.go +++ b/config/config.go @@ -147,4 +147,14 @@ type Config interface { // GetQueryAuthToken returns the token that must be used to access the /query endpoints GetQueryAuthToken() string + + GetGRPCMaxConnectionIdle() time.Duration + + GetGRPCMaxConnectionAge() time.Duration + + GetGRPCMaxConnectionAgeGrace() time.Duration + + GetGRPCTime() time.Duration + + GetGRPCTimeout() time.Duration } diff --git a/config/config_test.go b/config/config_test.go index 7db30fcd7e..f2cc9b99cc 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -637,3 +637,49 @@ func TestQueryAuthToken(t *testing.T) { assert.Equal(t, "MySeekretToken", c.GetQueryAuthToken()) } + +func TestGRPCServerParameters(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + configFile, err := ioutil.TempFile(tmpDir, "*.toml") + assert.NoError(t, err) + + _, err = configFile.Write([]byte(` + [GRPCServerParameters] + MaxConnectionIdle = "1m" + MaxConnectionAge = "2m" + MaxConnectionAgeGrace = "3m" + Time = "4m" + Timeout = "5m" + + [InMemCollector] + CacheCapacity=1000 + + [HoneycombMetrics] + MetricsHoneycombAPI="http://honeycomb.io" + MetricsAPIKey="1234" + MetricsDataset="testDatasetName" + MetricsReportingInterval=3 + + [HoneycombLogger] + LoggerHoneycombAPI="http://honeycomb.io" + LoggerAPIKey="1234" + LoggerDataset="loggerDataset" + `)) + assert.NoError(t, err) + configFile.Close() + + rulesFile, err := ioutil.TempFile(tmpDir, "*.toml") + assert.NoError(t, err) + + c, err := NewConfig(configFile.Name(), rulesFile.Name(), func(err error) {}) + assert.NoError(t, err) + + assert.Equal(t, 1*time.Minute, c.GetGRPCMaxConnectionIdle()) + assert.Equal(t, 2*time.Minute, c.GetGRPCMaxConnectionAge()) + assert.Equal(t, 3*time.Minute, c.GetGRPCMaxConnectionAgeGrace()) + assert.Equal(t, 4*time.Minute, c.GetGRPCTime()) + assert.Equal(t, 5*time.Minute, c.GetGRPCTimeout()) +} diff --git a/config/file_config.go b/config/file_config.go index 8488c8afcb..dc59da8c9c 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -51,6 +51,7 @@ type configContents struct { EnvironmentCacheTTL time.Duration DatasetPrefix string QueryAuthToken string + GRPCServerParameters GRPCServerParameters } type InMemoryCollectorCacheCapacity struct { @@ -94,6 +95,17 @@ type PeerManagementConfig struct { RedisIdentifier string } +// GRPCServerParameters allow you to configure the GRPC ServerParameters used +// by refinery's own GRPC server: +// https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters +type GRPCServerParameters struct { + MaxConnectionIdle time.Duration + MaxConnectionAge time.Duration + MaxConnectionAgeGrace time.Duration + Time time.Duration + Timeout time.Duration +} + // NewConfig creates a new config struct func NewConfig(config, rules string, errorCallback func(error)) (Config, error) { c := viper.New() @@ -130,6 +142,11 @@ func NewConfig(config, rules string, errorCallback func(error)) (Config, error) c.SetDefault("HoneycombLogger.LoggerSamplerThroughput", 5) c.SetDefault("AddHostMetadataToTrace", false) c.SetDefault("EnvironmentCacheTTL", time.Hour) + c.SetDefault("GRPCServerParameters.MaxConnectionIdle", 1*time.Minute) + c.SetDefault("GRPCServerParameters.MaxConnectionAge", time.Duration(0)) + c.SetDefault("GRPCServerParameters.MaxConnectionAgeGrace", time.Duration(0)) + c.SetDefault("GRPCServerParameters.Time", 10*time.Second) + c.SetDefault("GRPCServerParameters.Timeout", 2*time.Second) c.SetConfigFile(config) err := c.ReadInConfig() @@ -791,3 +808,38 @@ func (f *fileConfig) GetQueryAuthToken() string { return f.conf.QueryAuthToken } + +func (f *fileConfig) GetGRPCMaxConnectionIdle() time.Duration { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.conf.GRPCServerParameters.MaxConnectionIdle +} + +func (f *fileConfig) GetGRPCMaxConnectionAge() time.Duration { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.conf.GRPCServerParameters.MaxConnectionAge +} + +func (f *fileConfig) GetGRPCMaxConnectionAgeGrace() time.Duration { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.conf.GRPCServerParameters.MaxConnectionAgeGrace +} + +func (f *fileConfig) GetGRPCTime() time.Duration { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.conf.GRPCServerParameters.Time +} + +func (f *fileConfig) GetGRPCTimeout() time.Duration { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.conf.GRPCServerParameters.Timeout +} diff --git a/config/mock.go b/config/mock.go index 1b53b38552..f6a621f1fb 100644 --- a/config/mock.go +++ b/config/mock.go @@ -74,6 +74,11 @@ type MockConfig struct { EnvironmentCacheTTL time.Duration DatasetPrefix string QueryAuthToken string + GRPCMaxConnectionIdle time.Duration + GRPCMaxConnectionAge time.Duration + GRPCMaxConnectionAgeGrace time.Duration + GRPCTime time.Duration + GRPCTimeout time.Duration Mux sync.RWMutex } @@ -86,77 +91,90 @@ func (m *MockConfig) ReloadConfig() { callback() } } + func (m *MockConfig) RegisterReloadCallback(callback func()) { m.Mux.Lock() m.Callbacks = append(m.Callbacks, callback) m.Mux.Unlock() } + func (m *MockConfig) GetAPIKeys() ([]string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetAPIKeysVal, m.GetAPIKeysErr } + func (m *MockConfig) GetCollectorType() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetCollectorTypeVal, m.GetCollectorTypeErr } + func (m *MockConfig) GetInMemCollectorCacheCapacity() (InMemoryCollectorCacheCapacity, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetInMemoryCollectorCacheCapacityVal, m.GetInMemoryCollectorCacheCapacityErr } + func (m *MockConfig) GetHoneycombAPI() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetHoneycombAPIVal, m.GetHoneycombAPIErr } + func (m *MockConfig) GetListenAddr() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetListenAddrVal, m.GetListenAddrErr } + func (m *MockConfig) GetPeerListenAddr() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetPeerListenAddrVal, m.GetPeerListenAddrErr } + func (m *MockConfig) GetCompressPeerCommunication() bool { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetCompressPeerCommunicationsVal } + func (m *MockConfig) GetGRPCListenAddr() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetGRPCListenAddrVal, m.GetGRPCListenAddrErr } + func (m *MockConfig) GetLoggerType() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetLoggerTypeVal, m.GetLoggerTypeErr } + func (m *MockConfig) GetHoneycombLoggerConfig() (HoneycombLoggerConfig, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetHoneycombLoggerConfigVal, m.GetHoneycombLoggerConfigErr } + func (m *MockConfig) GetLoggingLevel() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetLoggingLevelVal, m.GetLoggingLevelErr } + func (m *MockConfig) GetOtherConfig(name string, iface interface{}) error { m.Mux.RLock() defer m.Mux.RUnlock() @@ -167,66 +185,77 @@ func (m *MockConfig) GetOtherConfig(name string, iface interface{}) error { } return m.GetOtherConfigErr } + func (m *MockConfig) GetPeers() ([]string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetPeersVal, m.GetPeersErr } + func (m *MockConfig) GetRedisHost() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetRedisHostVal, m.GetRedisHostErr } + func (m *MockConfig) GetRedisUsername() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetRedisUsernameVal, m.GetRedisUsernameErr } + func (m *MockConfig) GetRedisPassword() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetRedisPasswordVal, m.GetRedisPasswordErr } + func (m *MockConfig) GetUseTLS() (bool, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetUseTLSVal, m.GetUseTLSErr } + func (m *MockConfig) GetUseTLSInsecure() (bool, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetUseTLSInsecureVal, m.GetUseTLSInsecureErr } + func (m *MockConfig) GetMetricsType() (string, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetMetricsTypeVal, m.GetMetricsTypeErr } + func (m *MockConfig) GetHoneycombMetricsConfig() (HoneycombMetricsConfig, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetHoneycombMetricsConfigVal, m.GetHoneycombMetricsConfigErr } + func (m *MockConfig) GetPrometheusMetricsConfig() (PrometheusMetricsConfig, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetPrometheusMetricsConfigVal, m.GetPrometheusMetricsConfigErr } + func (m *MockConfig) GetSendDelay() (time.Duration, error) { m.Mux.RLock() defer m.Mux.RUnlock() return m.GetSendDelayVal, m.GetSendDelayErr } + func (m *MockConfig) GetTraceTimeout() (time.Duration, error) { m.Mux.RLock() defer m.Mux.RUnlock() @@ -264,6 +293,7 @@ func (m *MockConfig) GetUpstreamBufferSize() int { return m.GetUpstreamBufferSizeVal } + func (m *MockConfig) GetPeerBufferSize() int { m.Mux.RLock() defer m.Mux.RUnlock() @@ -354,3 +384,38 @@ func (f *MockConfig) GetQueryAuthToken() string { return f.QueryAuthToken } + +func (f *MockConfig) GetGRPCMaxConnectionIdle() time.Duration { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.GRPCMaxConnectionIdle +} + +func (f *MockConfig) GetGRPCMaxConnectionAge() time.Duration { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.GRPCMaxConnectionAge +} + +func (f *MockConfig) GetGRPCMaxConnectionAgeGrace() time.Duration { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.GRPCMaxConnectionAgeGrace +} + +func (f *MockConfig) GetGRPCTime() time.Duration { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.GRPCTime +} + +func (f *MockConfig) GetGRPCTimeout() time.Duration { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.GRPCTimeout +} diff --git a/config_complete.toml b/config_complete.toml index d62d8dd95a..a4f08fa870 100644 --- a/config_complete.toml +++ b/config_complete.toml @@ -323,3 +323,52 @@ MetricsReportingInterval = 3 # listener. # Not eligible for live reload. # MetricsListenAddr = "localhost:2112" + +########################### +## gRPC ServerParameters ## +########################### + +# Reflects: https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters + +[GRPCServerParameters] + +# MaxConnectionIdle is a duration for the amount of time after which an +# idle connection would be closed by sending a GoAway. Idleness duration is +# defined since the most recent time the number of outstanding RPCs became +# zero or the connection establishment. +# 0s sets duration to infinity which is the default: +# https://github.com/grpc/grpc-go/blob/60a3a7e969c401ca16dbcd0108ad544fb35aa61c/internal/transport/http2_server.go#L217-L219 +# Not eligible for live reload. +# MaxConnectionIdle = "1m" + +# MaxConnectionAge is a duration for the maximum amount of time a +# connection may exist before it will be closed by sending a GoAway. A +# random jitter of +/-10% will be added to MaxConnectionAge to spread out +# connection storms. +# 0s sets duration to infinity which is the default: +# https://github.com/grpc/grpc-go/blob/60a3a7e969c401ca16dbcd0108ad544fb35aa61c/internal/transport/http2_server.go#L220-L222 +# Not eligible for live reload. +# MaxConnectionAge = "0s" + +# MaxConnectionAgeGrace is an additive period after MaxConnectionAge after +# which the connection will be forcibly closed. +# 0s sets duration to infinity which is the default: +# https://github.com/grpc/grpc-go/blob/60a3a7e969c401ca16dbcd0108ad544fb35aa61c/internal/transport/http2_server.go#L225-L227 +# Not eligible for live reload. +# MaxConnectionAgeGrace = "0s" + +# After a duration of this time if the server doesn't see any activity it +# pings the client to see if the transport is still alive. +# If set below 1s, a minimum value of 1s will be used instead. +# 0s sets duration to 2 hours which is the default: +# https://github.com/grpc/grpc-go/blob/60a3a7e969c401ca16dbcd0108ad544fb35aa61c/internal/transport/http2_server.go#L228-L230 +# Not eligible for live reload. +# Time = "10s" + +# After having pinged for keepalive check, the server waits for a duration +# of Timeout and if no activity is seen even after that the connection is +# closed. +# 0s sets duration to 20 seconds which is the default: +# https://github.com/grpc/grpc-go/blob/60a3a7e969c401ca16dbcd0108ad544fb35aa61c/internal/transport/http2_server.go#L231-L233 +# Not eligible for live reload. +# Timeout = "2s" diff --git a/route/route.go b/route/route.go index 5cfa633bf1..18af0b03a8 100644 --- a/route/route.go +++ b/route/route.go @@ -221,9 +221,11 @@ func (r *Router) LnS(incomingOrPeer string) { grpc.MaxSendMsgSize(GRPCMessageSizeMax), // default is math.MaxInt32 grpc.MaxRecvMsgSize(GRPCMessageSizeMax), // default is 4MB grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, - Timeout: 2 * time.Second, - MaxConnectionIdle: time.Minute, + MaxConnectionIdle: r.Config.GetGRPCMaxConnectionIdle(), + MaxConnectionAge: r.Config.GetGRPCMaxConnectionAge(), + MaxConnectionAgeGrace: r.Config.GetGRPCMaxConnectionAgeGrace(), + Time: r.Config.GetGRPCTime(), + Timeout: r.Config.GetGRPCTimeout(), }), } r.grpcServer = grpc.NewServer(serverOpts...)