diff --git a/lib/auth/grpcserver.go b/lib/auth/grpcserver.go index f8ebc6f4e1c26..9ae04e00478ee 100644 --- a/lib/auth/grpcserver.go +++ b/lib/auth/grpcserver.go @@ -70,6 +70,7 @@ import ( wanlib "github.com/gravitational/teleport/lib/auth/webauthn" "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/httplib" "github.com/gravitational/teleport/lib/joinserver" @@ -5311,6 +5312,7 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) { PermitWithoutStream: true, }, ), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index ea00b8851e9db..43c1c35a88291 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -100,6 +100,10 @@ const ( // By default all users use /bin/bash DefaultShell = "/bin/bash" + // GRPCMaxConcurrentStreams is the max GRPC streams that can be active at a time. Once the limit is reached new + // RPC calls will queue until capacity is available. + GRPCMaxConcurrentStreams = 1000 + // HTTPMaxIdleConns is the max idle connections across all hosts. HTTPMaxIdleConns = 2000 diff --git a/lib/observability/tracing/collector.go b/lib/observability/tracing/collector.go index 191cc8ab2c942..aea20c78068f8 100644 --- a/lib/observability/tracing/collector.go +++ b/lib/observability/tracing/collector.go @@ -78,7 +78,7 @@ func NewCollector(cfg CollectorConfig) (*Collector, error) { c := &Collector{ grpcLn: grpcLn, httpLn: httpLn, - grpcServer: grpc.NewServer(grpc.Creds(creds)), + grpcServer: grpc.NewServer(grpc.Creds(creds), grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams)), tlsConfing: tlsConfig, exportedC: make(chan struct{}, 1), } diff --git a/lib/proxy/peer/server.go b/lib/proxy/peer/server.go index 0952e18a341c9..b71a7636a207d 100644 --- a/lib/proxy/peer/server.go +++ b/lib/proxy/peer/server.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/teleport/api/metadata" "github.com/gravitational/teleport/api/utils/grpc/interceptors" "github.com/gravitational/teleport/lib/auth" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" ) @@ -141,6 +142,7 @@ func NewServer(config ServerConfig) (*Server, error) { MinTime: peerKeepAlive, PermitWithoutStream: true, }), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) proto.RegisterProxyServiceServer(server, config.service) diff --git a/lib/service/service.go b/lib/service/service.go index 81b09ed1fe99e..3cfa0b70cfc7a 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -4267,6 +4267,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { otelgrpc.StreamServerInterceptor(), ), grpc.Creds(creds), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) connMonitor, err := srv.NewConnectionMonitor(srv.ConnectionMonitorConfig{ @@ -5929,6 +5930,7 @@ func (process *TeleportProcess) initPublicGRPCServer( // available for some time. MaxConnectionIdle: 10 * time.Second, }), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) joinServiceServer := joinserver.NewJoinServiceGRPCServer(conn.Client) proto.RegisterJoinServiceServer(server, joinServiceServer) @@ -5988,6 +5990,7 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg grpc.ChainUnaryInterceptor(authMiddleware.UnaryInterceptors()...), grpc.ChainStreamInterceptor(authMiddleware.StreamInterceptors()...), grpc.Creds(creds), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) kubeServer, err := kubegrpc.New(kubegrpc.Config{ diff --git a/lib/teleterm/apiserver/apiserver.go b/lib/teleterm/apiserver/apiserver.go index 10625b609e951..897105f2ae057 100644 --- a/lib/teleterm/apiserver/apiserver.go +++ b/lib/teleterm/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc" api "github.com/gravitational/teleport/gen/proto/go/teleport/lib/teleterm/v1" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/teleterm/apiserver/handler" "github.com/gravitational/teleport/lib/utils" ) @@ -41,7 +42,9 @@ func New(cfg Config) (*APIServer, error) { } grpcServer := grpc.NewServer(cfg.TshdServerCreds, - grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log))) + grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log)), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), + ) // Create Terminal service.