From d6d41c9b235d476fa47d80d926c0b810f64f3871 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 14 Mar 2017 12:28:54 -0400 Subject: [PATCH 1/4] Upstream TCP connection buffer and read buffer limits (#150). As with fd58242, but on the upstream cluster side. --- .../configuration/cluster_manager/cluster.rst | 6 +++- docs/configuration/listeners/listeners.rst | 3 +- include/envoy/network/connection.h | 5 +++ include/envoy/upstream/upstream.h | 5 +++ source/common/http/http1/conn_pool.cc | 1 + source/common/http/http2/conn_pool.cc | 1 + source/common/json/config_schemas.cc | 5 +++ source/common/network/connection_impl.h | 1 + .../common/upstream/cluster_manager_impl.cc | 8 ++++- source/common/upstream/upstream_impl.cc | 2 ++ source/common/upstream/upstream_impl.h | 4 +++ test/common/http/http1/conn_pool_test.cc | 17 +++++++++ test/common/http/http2/conn_pool_test.cc | 20 +++++++++++ test/common/network/connection_impl_test.cc | 1 + test/common/ssl/connection_impl_test.cc | 1 + .../upstream/cluster_manager_impl_test.cc | 28 +++++++++++++++ test/mocks/network/mocks.h | 2 ++ test/mocks/upstream/cluster_info.h | 1 + test/server/configuration_impl_test.cc | 35 +++++++++++++++++++ 19 files changed, 143 insertions(+), 3 deletions(-) diff --git a/docs/configuration/cluster_manager/cluster.rst b/docs/configuration/cluster_manager/cluster.rst index e0337439b825..b1c9595523ed 100644 --- a/docs/configuration/cluster_manager/cluster.rst +++ b/docs/configuration/cluster_manager/cluster.rst @@ -9,6 +9,7 @@ Cluster "name": "...", "type": "...", "connect_timeout_ms": "...", + "per_connection_buffer_limit_bytes": "...", "lb_type": "...", "hosts": [], "service_name": "...", @@ -37,6 +38,10 @@ connect_timeout_ms *(required, integer)* The timeout for new network connections to hosts in the cluster specified in milliseconds. +per_connection_buffer_limit_bytes + *(optional, integer)* Soft limit on size of the cluster's connections read and write buffers. + If unspecified, an implementation defined default is applied (1MB). + lb_type *(required, string)* The :ref:`load balancer type ` to use when picking a host in the cluster. Possible options are *round_robin*, *least_request*, @@ -172,7 +177,6 @@ outlier_detection Each of the above configuration values can be overridden via :ref:`runtime values `. - .. toctree:: :hidden: diff --git a/docs/configuration/listeners/listeners.rst b/docs/configuration/listeners/listeners.rst index 58681ff7ec9b..dd63ace4c960 100644 --- a/docs/configuration/listeners/listeners.rst +++ b/docs/configuration/listeners/listeners.rst @@ -14,7 +14,8 @@ Each individual listener configuration has the following format: "ssl_context": "{...}", "bind_to_port": "...", "use_proxy_proto": "...", - "use_original_dst": "..." + "use_original_dst": "...", + "per_connection_buffer_limit_bytes": "..." } port diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index d0d8a2e5e0f2..f209822605c6 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -152,6 +152,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager { * processing pipeline. */ virtual void setReadBufferLimit(uint32_t limit) PURE; + + /** + * Get the value set with setReadBufferLimit. + */ + virtual uint32_t readBufferLimit() const PURE; }; typedef std::unique_ptr ConnectionPtr; diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 411be2aff55a..b4dbcd0a97e9 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -221,6 +221,11 @@ class ClusterInfo { */ virtual std::chrono::milliseconds connectTimeout() const PURE; + /** + * @return soft limit on size of the cluster's connections read and write buffers. + */ + virtual uint32_t perConnectionBufferLimitBytes() const PURE; + /** * @return uint64_t features supported by the cluster. @see Features. */ diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 78760b7d7bce..2207324caedc 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -264,6 +264,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.conn_connect_ms_ = parent_.host_->cluster().stats().upstream_cx_connect_ms_.allocateSpan(); Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(parent_.dispatcher_); + data.connection_->setReadBufferLimit(parent_.host_->cluster().perConnectionBufferLimitBytes()); real_host_description_ = data.host_description_; codec_client_ = parent_.createCodecClient(data); codec_client_->addConnectionCallbacks(*this); diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 8aee8646425e..33d6907bcfe2 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -212,6 +212,7 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.conn_connect_ms_ = parent_.host_->cluster().stats().upstream_cx_connect_ms_.allocateSpan(); Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(parent_.dispatcher_); + data.connection_->setReadBufferLimit(parent_.host_->cluster().perConnectionBufferLimitBytes()); real_host_description_ = data.host_description_; client_ = parent_.createCodecClient(data); client_->addConnectionCallbacks(*this); diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index e53a093f13b3..5c5fea6e7f0f 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -1032,6 +1032,11 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF( "minimum" : 0, "exclusiveMinimum" : true }, + "per_connection_buffer_limit_bytes" : { + "type" : "integer", + "minimum" : 0, + "exclusiveMinimum" : true + }, "lb_type" : { "type" : "string", "enum" : ["round_robin", "least_request", "random", "ring_hash"] diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 51c6438da1cf..a8be3339562a 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -64,6 +64,7 @@ class ConnectionImpl : public virtual Connection, State state() override; void write(Buffer::Instance& data) override; void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; } + uint32_t readBufferLimit() const override { return read_buffer_limit_; } // Network::BufferSource Buffer::Instance& getReadBuffer() override { return read_buffer_; } diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index fc9d61d8767e..66e36f85aec8 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -358,7 +358,13 @@ Host::CreateConnectionData ClusterManagerImpl::tcpConnForCluster(const std::stri ConstHostPtr logical_host = entry->second->lb_->chooseHost(nullptr); if (logical_host) { - return logical_host->createConnection(cluster_manager.thread_local_dispatcher_); + Host::CreateConnectionData conn_data = + logical_host->createConnection(cluster_manager.thread_local_dispatcher_); + if (conn_data.connection_ != nullptr) { + conn_data.connection_->setReadBufferLimit( + entry->second->cluster_info_->perConnectionBufferLimitBytes()); + } + return conn_data; } else { entry->second->cluster_info_->stats().upstream_cx_none_healthy_.inc(); return {nullptr, nullptr}; diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 59b8540259c3..7f4e8d69e145 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -60,6 +60,8 @@ ClusterInfoImpl::ClusterInfoImpl(const Json::Object& config, Runtime::Loader& ru : runtime_(runtime), name_(config.getString("name")), max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)), connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))), + per_connection_buffer_limit_bytes_( + config.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)), stats_scope_(stats.createScope(fmt::format("cluster.{}.", name_))), stats_(generateStats(*stats_scope_)), features_(parseFeatures(config)), http_codec_options_(Http::Utility::parseCodecOptions(config)), diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 873c03e05192..057a086e1bb4 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -160,6 +160,9 @@ class ClusterInfoImpl : public ClusterInfo { // Upstream::ClusterInfo std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; } + uint32_t perConnectionBufferLimitBytes() const override { + return per_connection_buffer_limit_bytes_; + } uint64_t features() const override { return features_; } uint64_t httpCodecOptions() const override { return http_codec_options_; } LoadBalancerType lbType() const override { return lb_type_; } @@ -189,6 +192,7 @@ class ClusterInfoImpl : public ClusterInfo { const std::string name_; const uint64_t max_requests_per_connection_; const std::chrono::milliseconds connect_timeout_; + const uint32_t per_connection_buffer_limit_bytes_; Stats::ScopePtr stats_scope_; mutable ClusterStats stats_; Ssl::ClientContextPtr ssl_ctx_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 6a20816f891b..91fd10773b4d 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -188,6 +188,23 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that buffer limits are set. + */ +TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) { + NiceMock outer_decoder; + ConnPoolCallbacks callbacks; + conn_pool_.expectClientCreate(); + EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); + EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192)); + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); + EXPECT_NE(nullptr, handle); + + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); +} + /** * Tests a request that generates a new connection, completes, and then a second request that uses * the same connection. diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index e7406dd5ca6a..9f7d5e674146 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -126,6 +126,26 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that buffer limits are set. + */ +TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) { + expectClientCreate(); + EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192)); + EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192)); + + ActiveTestRequest r1(*this, 0); + EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true)); + r1.callbacks_.outer_encoder_->encodeHeaders(HeaderMapImpl{}, true); + expectClientConnect(0); + EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true)); + r1.inner_decoder_->decodeHeaders(HeaderMapPtr{new HeaderMapImpl{}}, true); + + test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose); + EXPECT_CALL(*this, onClientDestroy()); + dispatcher_.clearDeferredDeleteList(); +} + TEST_F(Http2ConnPoolImplTest, RequestAndResponse) { InSequence s; diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index f843ee0efa7f..5b0b684ec56e 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -157,6 +157,7 @@ class ReadBufferLimitTest : public testing::Test { server_connection = std::move(conn); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); + EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit()); })); uint32_t filter_seen = 0; diff --git a/test/common/ssl/connection_impl_test.cc b/test/common/ssl/connection_impl_test.cc index 33035fb228db..e8ff44fe4a5f 100644 --- a/test/common/ssl/connection_impl_test.cc +++ b/test/common/ssl/connection_impl_test.cc @@ -298,6 +298,7 @@ class SslReadBufferLimitTest : public testing::Test { server_connection = std::move(conn); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); + EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit()); })); uint32_t filter_seen = 0; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 123f05421712..73d989601bbc 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -319,6 +319,34 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) { factory_.tls_.shutdownThread(); } +/** + * Test that buffer limits are set on new TCP connections. + */ +TEST_F(ClusterManagerImplTest, VerifyBufferLimits) { + std::string json = R"EOF( + { + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 250, + "per_connection_buffer_limit_bytes": 8192, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11001"}] + }] + } + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + create(*loader); + Network::MockClientConnection* connection = new NiceMock(); + EXPECT_CALL(*connection, setReadBufferLimit(8192)); + EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection)); + auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1"); + EXPECT_EQ(connection, conn_data.connection_.get()); + factory_.tls_.shutdownThread(); +} + TEST_F(ClusterManagerImplTest, ShutdownOrder) { std::string json = R"EOF( { diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index eebe43a8ddb6..64eb95df44cf 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -58,6 +58,7 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); + MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); }; /** @@ -89,6 +90,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit)); + MOCK_CONST_METHOD0(readBufferLimit, uint32_t()); // Network::ClientConnection MOCK_METHOD0(connect, void()); diff --git a/test/mocks/upstream/cluster_info.h b/test/mocks/upstream/cluster_info.h index e5f96840c863..eb0545fbee7c 100644 --- a/test/mocks/upstream/cluster_info.h +++ b/test/mocks/upstream/cluster_info.h @@ -19,6 +19,7 @@ class MockClusterInfo : public ClusterInfo { // Upstream::ClusterInfo MOCK_CONST_METHOD0(connectTimeout, std::chrono::milliseconds()); + MOCK_CONST_METHOD0(perConnectionBufferLimitBytes, uint32_t()); MOCK_CONST_METHOD0(features, uint64_t()); MOCK_CONST_METHOD0(httpCodecOptions, uint64_t()); MOCK_CONST_METHOD0(lbType, LoadBalancerType()); diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc index da67de3ba1d1..404f7546de39 100644 --- a/test/server/configuration_impl_test.cc +++ b/test/server/configuration_impl_test.cc @@ -146,6 +146,41 @@ TEST(ConfigurationImplTest, SetListenerPerConnectionBufferLimit) { EXPECT_EQ(8192U, config.listeners().back()->perConnectionBufferLimitBytes()); } +TEST(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) { + std::string json = R"EOF( + { + "listeners" : [], + "cluster_manager": { + "clusters": [ + { + "name": "test_cluster", + "type": "static", + "connect_timeout_ms": 1, + "per_connection_buffer_limit_bytes": 8192, + "lb_type": "round_robin", + "hosts": [] + } + ] + } + } + )EOF"; + + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); + + NiceMock server; + MainImpl config(server); + config.initialize(*loader); + + ASSERT_EQ(1U, config.clusterManager().clusters().count("test_cluster")); + EXPECT_EQ(8192U, config.clusterManager() + .clusters() + .find("test_cluster") + ->second.get() + .info() + ->perConnectionBufferLimitBytes()); + server.thread_local_.shutdownThread(); +} + TEST(ConfigurationImplTest, BadListenerConfig) { std::string json = R"EOF( { From e287c2c8e6add8dcf6542e5fe01ce6250f82fcfa Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Wed, 15 Mar 2017 13:21:16 -0400 Subject: [PATCH 2/4] @mattklein123 review feedback. --- docs/configuration/cluster_manager/cluster.rst | 2 +- docs/configuration/listeners/listeners.rst | 2 +- source/common/http/http1/conn_pool.cc | 1 - source/common/http/http2/conn_pool.cc | 1 - source/common/upstream/cluster_manager_impl.cc | 8 +------- source/common/upstream/upstream_impl.cc | 17 +++++++++-------- source/common/upstream/upstream_impl.h | 6 +++--- 7 files changed, 15 insertions(+), 22 deletions(-) diff --git a/docs/configuration/cluster_manager/cluster.rst b/docs/configuration/cluster_manager/cluster.rst index b1c9595523ed..cebb48b09aa3 100644 --- a/docs/configuration/cluster_manager/cluster.rst +++ b/docs/configuration/cluster_manager/cluster.rst @@ -40,7 +40,7 @@ connect_timeout_ms per_connection_buffer_limit_bytes *(optional, integer)* Soft limit on size of the cluster's connections read and write buffers. - If unspecified, an implementation defined default is applied (1MB). + If unspecified, an implementation defined default is applied (1MiB). lb_type *(required, string)* The :ref:`load balancer type ` to use diff --git a/docs/configuration/listeners/listeners.rst b/docs/configuration/listeners/listeners.rst index dd63ace4c960..1f7cd991ba37 100644 --- a/docs/configuration/listeners/listeners.rst +++ b/docs/configuration/listeners/listeners.rst @@ -55,7 +55,7 @@ use_original_dst per_connection_buffer_limit_bytes *(optional, integer)* Soft limit on size of the listener's new connection read and write buffers. - If unspecified, an implementation defined default is applied (1MB). + If unspecified, an implementation defined default is applied (1MiB). .. toctree:: :hidden: diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 2207324caedc..78760b7d7bce 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -264,7 +264,6 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.conn_connect_ms_ = parent_.host_->cluster().stats().upstream_cx_connect_ms_.allocateSpan(); Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(parent_.dispatcher_); - data.connection_->setReadBufferLimit(parent_.host_->cluster().perConnectionBufferLimitBytes()); real_host_description_ = data.host_description_; codec_client_ = parent_.createCodecClient(data); codec_client_->addConnectionCallbacks(*this); diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 33d6907bcfe2..8aee8646425e 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -212,7 +212,6 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.conn_connect_ms_ = parent_.host_->cluster().stats().upstream_cx_connect_ms_.allocateSpan(); Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(parent_.dispatcher_); - data.connection_->setReadBufferLimit(parent_.host_->cluster().perConnectionBufferLimitBytes()); real_host_description_ = data.host_description_; client_ = parent_.createCodecClient(data); client_->addConnectionCallbacks(*this); diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 66e36f85aec8..fc9d61d8767e 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -358,13 +358,7 @@ Host::CreateConnectionData ClusterManagerImpl::tcpConnForCluster(const std::stri ConstHostPtr logical_host = entry->second->lb_->chooseHost(nullptr); if (logical_host) { - Host::CreateConnectionData conn_data = - logical_host->createConnection(cluster_manager.thread_local_dispatcher_); - if (conn_data.connection_ != nullptr) { - conn_data.connection_->setReadBufferLimit( - entry->second->cluster_info_->perConnectionBufferLimitBytes()); - } - return conn_data; + return logical_host->createConnection(cluster_manager.thread_local_dispatcher_); } else { entry->second->cluster_info_->stats().upstream_cx_none_healthy_.inc(); return {nullptr, nullptr}; diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 7f4e8d69e145..ef1a0a9e9287 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -27,15 +27,16 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc return {createConnection(dispatcher, *cluster_, address_), shared_from_this()}; } -Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher, - const ClusterInfo& cluster, - Network::Address::InstancePtr address) { - if (cluster.sslContext()) { - return Network::ClientConnectionPtr{ - dispatcher.createSslClientConnection(*cluster.sslContext(), address)}; - } else { - return Network::ClientConnectionPtr{dispatcher.createClientConnection(address)}; +Network::ClientConnectionPtr +HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster, + Network::Address::InstancePtr address) const { + Network::ClientConnectionPtr connection = + cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address) + : dispatcher.createClientConnection(address); + if (cluster_) { + connection->setReadBufferLimit(cluster_->perConnectionBufferLimitBytes()); } + return connection; } void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(100U, new_weight)); } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 057a086e1bb4..19e6d6cf3be1 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -88,9 +88,9 @@ class HostImpl : public HostDescriptionImpl, void weight(uint32_t new_weight) override; protected: - static Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher, - const ClusterInfo& cluster, - Network::Address::InstancePtr address); + Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher, + const ClusterInfo& cluster, + Network::Address::InstancePtr address) const; private: std::atomic health_flags_{}; From 46ada8fa37d221845115a600336e2973d2a6ffe5 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Wed, 15 Mar 2017 14:33:54 -0400 Subject: [PATCH 3/4] Fix cluster_ reference and connection nullptr checks. --- source/common/upstream/upstream_impl.cc | 10 +++++----- source/common/upstream/upstream_impl.h | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index ef1a0a9e9287..e5dac9594633 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -27,14 +27,14 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc return {createConnection(dispatcher, *cluster_, address_), shared_from_this()}; } -Network::ClientConnectionPtr -HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster, - Network::Address::InstancePtr address) const { +Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher, + const ClusterInfo& cluster, + Network::Address::InstancePtr address) { Network::ClientConnectionPtr connection = cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address) : dispatcher.createClientConnection(address); - if (cluster_) { - connection->setReadBufferLimit(cluster_->perConnectionBufferLimitBytes()); + if (connection) { + connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes()); } return connection; } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 19e6d6cf3be1..057a086e1bb4 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -88,9 +88,9 @@ class HostImpl : public HostDescriptionImpl, void weight(uint32_t new_weight) override; protected: - Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher, - const ClusterInfo& cluster, - Network::Address::InstancePtr address) const; + static Network::ClientConnectionPtr createConnection(Event::Dispatcher& dispatcher, + const ClusterInfo& cluster, + Network::Address::InstancePtr address); private: std::atomic health_flags_{}; From 359a1efa2fb534958df0aa4d6c3896bee25c9d3d Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Wed, 15 Mar 2017 16:39:11 -0400 Subject: [PATCH 4/4] Fix tests to ensure we never get a null client connection. --- source/common/upstream/upstream_impl.cc | 4 +--- .../upstream/health_checker_impl_test.cc | 21 ++++++++++--------- .../upstream/logical_dns_cluster_test.cc | 12 +++++++---- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index e5dac9594633..88c1ef5b8915 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -33,9 +33,7 @@ Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispa Network::ClientConnectionPtr connection = cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address) : dispatcher.createClientConnection(address); - if (connection) { - connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes()); - } + connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes()); return connection; } diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 5a0197b78059..1d6405f1eadf 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -12,6 +12,7 @@ #include "test/test_common/utility.h" using testing::_; +using testing::Invoke; using testing::NiceMock; using testing::Return; using testing::ReturnRef; @@ -23,12 +24,12 @@ class TestHttpHealthCheckerImpl : public HttpHealthCheckerImpl { public: using HttpHealthCheckerImpl::HttpHealthCheckerImpl; - Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData&) override { - return createCodecClient_(); + Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& conn_data) override { + return createCodecClient_(conn_data); }; // HttpHealthCheckerImpl - MOCK_METHOD0(createCodecClient_, Http::CodecClient*()); + MOCK_METHOD1(createCodecClient_, Http::CodecClient*(Upstream::Host::CreateConnectionData&)); }; class HttpHealthCheckerImplTest : public testing::Test { @@ -41,7 +42,6 @@ class HttpHealthCheckerImplTest : public testing::Test { Http::MockClientConnection* codec_{}; Stats::IsolatedStoreImpl stats_store_; Network::MockClientConnection* client_connection_{}; - Http::CodecClient* codec_client_{}; NiceMock request_encoder_; Http::StreamDecoder* stream_response_callbacks_{}; }; @@ -104,14 +104,15 @@ class HttpHealthCheckerImplTest : public testing::Test { void expectClientCreate(size_t index) { TestSession& test_session = *test_sessions_[index]; - test_session.codec_ = new NiceMock(); + auto* codec = test_session.codec_ = new NiceMock(); test_session.client_connection_ = new NiceMock(); + auto create_codec_client = [codec](Upstream::Host::CreateConnectionData& conn_data) { + return new CodecClientForTest(std::move(conn_data.connection_), codec, nullptr, nullptr); + }; - Network::ClientConnectionPtr connection{test_session.client_connection_}; - test_session.codec_client_ = - new CodecClientForTest(std::move(connection), test_session.codec_, nullptr, nullptr); - EXPECT_CALL(*health_checker_, createCodecClient_()) - .WillOnce(Return(test_session.codec_client_)); + EXPECT_CALL(dispatcher_, createClientConnection_(_)) + .WillOnce(Return(test_session.client_connection_)); + EXPECT_CALL(*health_checker_, createCodecClient_(_)).WillOnce(Invoke(create_codec_client)); } void expectStreamCreate(size_t index) { diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index 0b9e3284b849..649efa8d3289 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -120,7 +120,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { HostPtr logical_host = cluster_->hosts()[0]; EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); logical_host->outlierDetector().putHttpResponseCode(200); @@ -133,7 +134,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")))) + .WillOnce(Return(new NiceMock())); Host::CreateConnectionData data = logical_host->createConnection(dispatcher_); EXPECT_FALSE(data.host_description_->canary()); EXPECT_EQ(&cluster_->hosts()[0]->cluster(), &data.host_description_->cluster()); @@ -152,7 +154,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); expectResolve(); @@ -164,7 +167,8 @@ TEST_F(LogicalDnsClusterTest, Basic) { EXPECT_EQ(logical_host, cluster_->hosts()[0]); EXPECT_CALL(dispatcher_, createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))); + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")))) + .WillOnce(Return(new NiceMock())); logical_host->createConnection(dispatcher_); // Make sure we cancel.