diff --git a/docs/configuration/cluster_manager/cluster.rst b/docs/configuration/cluster_manager/cluster.rst index e0337439b825..cebb48b09aa3 100644 --- a/docs/configuration/cluster_manager/cluster.rst +++ b/docs/configuration/cluster_manager/cluster.rst @@ -9,6 +9,7 @@ Cluster "name": "...", "type": "...", "connect_timeout_ms": "...", + "per_connection_buffer_limit_bytes": "...", "lb_type": "...", "hosts": [], "service_name": "...", @@ -37,6 +38,10 @@ connect_timeout_ms *(required, integer)* The timeout for new network connections to hosts in the cluster specified in milliseconds. +per_connection_buffer_limit_bytes + *(optional, integer)* Soft limit on size of the cluster's connections read and write buffers. + If unspecified, an implementation defined default is applied (1MiB). + lb_type *(required, string)* The :ref:`load balancer type ` to use when picking a host in the cluster. Possible options are *round_robin*, *least_request*, @@ -172,7 +177,6 @@ outlier_detection Each of the above configuration values can be overridden via :ref:`runtime values `. - .. toctree:: :hidden: diff --git a/docs/configuration/listeners/listeners.rst b/docs/configuration/listeners/listeners.rst index 58681ff7ec9b..1f7cd991ba37 100644 --- a/docs/configuration/listeners/listeners.rst +++ b/docs/configuration/listeners/listeners.rst @@ -14,7 +14,8 @@ Each individual listener configuration has the following format: "ssl_context": "{...}", "bind_to_port": "...", "use_proxy_proto": "...", - "use_original_dst": "..." + "use_original_dst": "...", + "per_connection_buffer_limit_bytes": "..." } port @@ -54,7 +55,7 @@ use_original_dst per_connection_buffer_limit_bytes *(optional, integer)* Soft limit on size of the listener's new connection read and write buffers. - If unspecified, an implementation defined default is applied (1MB). + If unspecified, an implementation defined default is applied (1MiB). .. toctree:: :hidden: diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index d0d8a2e5e0f2..f209822605c6 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -152,6 +152,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager { * processing pipeline. */ virtual void setReadBufferLimit(uint32_t limit) PURE; + + /** + * Get the value set with setReadBufferLimit. + */ + virtual uint32_t readBufferLimit() const PURE; }; typedef std::unique_ptr ConnectionPtr; diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 411be2aff55a..b4dbcd0a97e9 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -221,6 +221,11 @@ class ClusterInfo { */ virtual std::chrono::milliseconds connectTimeout() const PURE; + /** + * @return soft limit on size of the cluster's connections read and write buffers. + */ + virtual uint32_t perConnectionBufferLimitBytes() const PURE; + /** * @return uint64_t features supported by the cluster. @see Features. */ diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index 8546b7a7a449..321c36baf344 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -1037,6 +1037,11 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF( "minimum" : 0, "exclusiveMinimum" : true }, + "per_connection_buffer_limit_bytes" : { + "type" : "integer", + "minimum" : 0, + "exclusiveMinimum" : true + }, "lb_type" : { "type" : "string", "enum" : ["round_robin", "least_request", "random", "ring_hash"] diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 51c6438da1cf..a8be3339562a 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -64,6 +64,7 @@ class ConnectionImpl : public virtual Connection, State state() override; void write(Buffer::Instance& data) override; void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; } + uint32_t readBufferLimit() const override { return read_buffer_limit_; } // Network::BufferSource Buffer::Instance& getReadBuffer() override { return read_buffer_; } diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 59b8540259c3..88c1ef5b8915 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -30,12 +30,11 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster, Network::Address::InstancePtr address) { - if (cluster.sslContext()) { - return Network::ClientConnectionPtr{ - dispatcher.createSslClientConnection(*cluster.sslContext(), address)}; - } else { - return Network::ClientConnectionPtr{dispatcher.createClientConnection(address)}; - } + Network::ClientConnectionPtr connection = + cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address) + : dispatcher.createClientConnection(address); + connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes()); + return connection; } void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(100U, new_weight)); } @@ -60,6 +59,8 @@ ClusterInfoImpl::ClusterInfoImpl(const Json::Object& config, Runtime::Loader& ru : runtime_(runtime), name_(config.getString("name")), max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)), connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))), + per_connection_buffer_limit_bytes_( + config.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)), stats_scope_(stats.createScope(fmt::format("cluster.{}.", name_))), stats_(generateStats(*stats_scope_)), features_(parseFeatures(config)), http_codec_options_(Http::Utility::parseCodecOptions(config)), diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 873c03e05192..057a086e1bb4 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -160,6 +160,9 @@ class ClusterInfoImpl : public ClusterInfo { // Upstream::ClusterInfo std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; } + uint32_t perConnectionBufferLimitBytes() const override { + return per_connection_buffer_limit_bytes_; + } uint64_t features() const override { return features_; } uint64_t httpCodecOptions() const override { return http_codec_options_; } LoadBalancerType lbType() const override { return lb_type_; } @@ -189,6 +192,7 @@ class ClusterInfoImpl : public ClusterInfo { const std::string name_; const uint64_t max_requests_per_connection_; const std::chrono::milliseconds connect_timeout_; + const uint32_t per_connection_buffer_limit_bytes_; Stats::ScopePtr stats_scope_; mutable ClusterStats stats_; Ssl::ClientContextPtr ssl_ctx_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 6a20816f891b..91fd10773b4d 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -188,6 +188,23 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that buffer limits are set. + */ +TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) { + NiceMock outer_decoder; + ConnPoolCallbacks callbacks; + conn_pool_.expectClientCreate(); + EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); + EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192)); + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); + EXPECT_NE(nullptr, handle); + + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); +} + /** * Tests a request that generates a new connection, completes, and then a second request that uses * the same connection. diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index e7406dd5ca6a..9f7d5e674146 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -126,6 +126,26 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that buffer limits are set. + */ +TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) { + expectClientCreate(); + EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); + EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192)); + + ActiveTestRequest r1(*this, 0); + EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true)); + r1.callbacks_.outer_encoder_->encodeHeaders(HeaderMapImpl{}, true); + expectClientConnect(0); + EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true)); + r1.inner_decoder_->decodeHeaders(HeaderMapPtr{new HeaderMapImpl{}}, true); + + test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose); + EXPECT_CALL(*this, onClientDestroy()); + dispatcher_.clearDeferredDeleteList(); +} + TEST_F(Http2ConnPoolImplTest, RequestAndResponse) { InSequence s; diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index f843ee0efa7f..5b0b684ec56e 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -157,6 +157,7 @@ class ReadBufferLimitTest : public testing::Test { server_connection = std::move(conn); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); + EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit()); })); uint32_t filter_seen = 0; diff --git a/test/common/ssl/connection_impl_test.cc b/test/common/ssl/connection_impl_test.cc index 33035fb228db..e8ff44fe4a5f 100644 --- a/test/common/ssl/connection_impl_test.cc +++ b/test/common/ssl/connection_impl_test.cc @@ -298,6 +298,7 @@ class SslReadBufferLimitTest : public testing::Test { server_connection = std::move(conn); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); + EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit()); })); uint32_t filter_seen = 0; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 123f05421712..73d989601bbc 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -319,6 +319,34 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) { factory_.tls_.shutdownThread(); } +/** + * Test that buffer limits are set on new TCP connections. + */ +TEST_F(ClusterManagerImplTest, VerifyBufferLimits) { + std::string json = R"EOF( + { + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 250, + "per_connection_buffer_limit_bytes": 8192, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11001"}] + }] + } + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + create(*loader); + Network::MockClientConnection* connection = new NiceMock(); + EXPECT_CALL(*connection, setReadBufferLimit(8192)); + EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection)); + auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1"); + EXPECT_EQ(connection, conn_data.connection_.get()); + factory_.tls_.shutdownThread(); +} + TEST_F(ClusterManagerImplTest, ShutdownOrder) { std::string json = R"EOF( { diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 5a0197b78059..1d6405f1eadf 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -12,6 +12,7 @@ #include "test/test_common/utility.h" using testing::_; +using testing::Invoke; using testing::NiceMock; using testing::Return; using testing::ReturnRef; @@ -23,12 +24,12 @@ class TestHttpHealthCheckerImpl : public HttpHealthCheckerImpl { public: using HttpHealthCheckerImpl::HttpHealthCheckerImpl; - Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData&) override { - return createCodecClient_(); + Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& conn_data) override { + return createCodecClient_(conn_data); }; // HttpHealthCheckerImpl - MOCK_METHOD0(createCodecClient_, Http::CodecClient*()); + MOCK_METHOD1(createCodecClient_, Http::CodecClient*(Upstream::Host::CreateConnectionData&)); }; class HttpHealthCheckerImplTest : public testing::Test { @@ -41,7 +42,6 @@ class HttpHealthCheckerImplTest : public testing::Test { Http::MockClientConnection* codec_{}; Stats::IsolatedStoreImpl stats_store_; Network::MockClientConnection* client_connection_{}; - Http::CodecClient* codec_client_{}; NiceMock request_encoder_; Http::StreamDecoder* stream_response_callbacks_{}; }; @@ -104,14 +104,15 @@ class HttpHealthCheckerImplTest : public testing::Test { void expectClientCreate(size_t index) { TestSession& test_session = *test_sessions_[index]; - test_session.codec_ = new NiceMock(); + auto* codec = test_session.codec_ = new NiceMock(); test_session.client_connection_ = new NiceMock(); + auto create_codec_client = [codec](Upstream::Host::CreateConnectionData& conn_data) { + return new CodecClientForTest(std::move(conn_data.connection_), codec, nullptr, nullptr); + }; - Network::ClientConnectionPtr connection{test_session.client_connection_}; - test_session.codec_client_ = - new CodecClientForTest(std::move(connection), test_session.codec_, nullptr, nullptr); - EXPECT_CALL(*health_checker_, createCodecClient_()) - .WillOnce(Return(test_session.codec_client_)); + EXPECT_CALL(dispatcher_, createClientConnection_(_)) + .WillOnce(Return(test_session.client_connection_)); + EXPECT_CALL(*health_checker_, createCodecClient_(_)).WillOnce(Invoke(create_codec_client)); } void expectStreamCreate(size_t index) { diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index 0b9e3284b849..649efa8d3289 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -120,7 +120,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { HostPtr logical_host = cluster_->hosts()[0]; EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); logical_host->outlierDetector().putHttpResponseCode(200); @@ -133,7 +134,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))) + .WillOnce(Return(new NiceMock())); Host::CreateConnectionData data = logical_host->createConnection(dispatcher_); EXPECT_FALSE(data.host_description_->canary()); EXPECT_EQ(&cluster_->hosts()[0]->cluster(), &data.host_description_->cluster()); @@ -152,7 +154,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); expectResolve(); @@ -164,7 +167,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); // Make sure we cancel. diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index eebe43a8ddb6..64eb95df44cf 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -58,6 +58,7 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); + MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); }; /** @@ -89,6 +90,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); + MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); // Network::ClientConnection MOCK_METHOD0(connect, void()); diff --git a/test/mocks/upstream/cluster_info.h b/test/mocks/upstream/cluster_info.h index e5f96840c863..eb0545fbee7c 100644 --- a/test/mocks/upstream/cluster_info.h +++ b/test/mocks/upstream/cluster_info.h @@ -19,6 +19,7 @@ class MockClusterInfo : public ClusterInfo { // Upstream::ClusterInfo MOCK_CONST_METHOD0(connectTimeout, std::chrono::milliseconds()); + MOCK_CONST_METHOD0(perConnectionBufferLimitBytes, uint32_t()); MOCK_CONST_METHOD0(features, uint64_t()); MOCK_CONST_METHOD0(httpCodecOptions, uint64_t()); MOCK_CONST_METHOD0(lbType, LoadBalancerType()); diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc index 7b8acabe5c4f..5190d593a25e 100644 --- a/test/server/configuration_impl_test.cc +++ b/test/server/configuration_impl_test.cc @@ -178,6 +178,41 @@ TEST(ConfigurationImplTest, VerifySubjectAltNameConfig) { EXPECT_TRUE(config.listeners().back()->sslContext() != nullptr); } +TEST(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) { + std::string json = R"EOF( + { + "listeners" : [], + "cluster_manager": { + "clusters": [ + { + "name": "test_cluster", + "type": "static", + "connect_timeout_ms": 1, + "per_connection_buffer_limit_bytes": 8192, + "lb_type": "round_robin", + "hosts": [] + } + ] + } + } + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + + NiceMock server; + MainImpl config(server); + config.initialize(*loader); + + ASSERT_EQ(1U, config.clusterManager().clusters().count("test_cluster")); + EXPECT_EQ(8192U, config.clusterManager() + .clusters() + .find("test_cluster") + ->second.get() + .info() + ->perConnectionBufferLimitBytes()); + server.thread_local_.shutdownThread(); +} + TEST(ConfigurationImplTest, BadListenerConfig) { std::string json = R"EOF( {