Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generic conn pool: directly use thread local cluster #14423

Merged
merged 4 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Envoy {
namespace Upstream {
class ClusterManager;
class LoadBalancerContext;
class ThreadLocalCluster;
} // namespace Upstream

namespace Router {
Expand Down Expand Up @@ -1306,7 +1307,7 @@ class GenericConnPoolFactory : public Envoy::Config::TypedFactory {
* @return may be null
*/
virtual GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const RouteEntry& route_entry,
absl::optional<Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const PURE;
Expand Down
6 changes: 3 additions & 3 deletions include/envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Envoy {

namespace Upstream {
class LoadBalancerContext;
class ThreadLocalCluster;
} // namespace Upstream

namespace TcpProxy {
Expand Down Expand Up @@ -116,15 +117,14 @@ class GenericConnPoolFactory : public Envoy::Config::TypedFactory {
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

/*
* @param cluster_name the name of the cluster to use
* @param cm the cluster manager to get the connection pool from
* @param thread_local_cluster the thread local cluster to use for conn pool creation.
* @param config the tunneling config, if doing connect tunneling.
* @param context the load balancing context for this connection.
* @param upstream_callbacks the callbacks to provide to the connection if successfully created.
* @return may be null if there is no cluster with the given name.
*/
virtual GenericConnPoolPtr
createGenericConnPool(const std::string& cluster_name, Upstream::ClusterManager& cm,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const PURE;
Expand Down
16 changes: 12 additions & 4 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
*callbacks_->streamInfo().filterState());
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool();
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);

if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
Expand Down Expand Up @@ -595,7 +595,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
return Http::FilterHeadersStatus::StopIteration;
}

std::unique_ptr<GenericConnPool> Filter::createConnPool() {
std::unique_ptr<GenericConnPool>
Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
GenericConnPoolFactory* factory = nullptr;
if (cluster_->upstreamConfig().has_value()) {
factory = &Envoy::Config::Utility::getAndCheckFactory<GenericConnPoolFactory>(
Expand All @@ -607,7 +608,7 @@ std::unique_ptr<GenericConnPool> Filter::createConnPool() {
const bool should_tcp_proxy =
route_entry_->connectConfig().has_value() &&
downstream_headers_->getMethodValue() == Http::Headers::get().MethodValues.Connect;
return factory->createGenericConnPool(config_.cm_, should_tcp_proxy, *route_entry_,
return factory->createGenericConnPool(thread_local_cluster, should_tcp_proxy, *route_entry_,
callbacks_->streamInfo().protocol(), this);
}

Expand Down Expand Up @@ -1533,7 +1534,14 @@ void Filter::doRetry() {
ASSERT(pending_retries_ > 0);
pending_retries_--;

std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool();
// Clusters can technically get removed by CDS during a retry. Make sure it still exists.
const auto cluster = config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
std::unique_ptr<GenericConnPool> generic_conn_pool;
if (cluster != nullptr) {
cluster_ = cluster->info();
generic_conn_pool = createConnPool(*cluster);
}

if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
cleanup();
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
Event::Dispatcher& dispatcher, TimeSource& time_source,
Upstream::ResourcePriority priority) PURE;

std::unique_ptr<GenericConnPool> createConnPool();
std::unique_ptr<GenericConnPool>
createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster);
UpstreamRequestPtr createUpstreamRequest();

void maybeDoShadowing();
Expand Down
8 changes: 4 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
downstreamConnection()->streamInfo().filterState());
}

if (!maybeTunnel(*thread_local_cluster, cluster_name)) {
if (!maybeTunnel(*thread_local_cluster)) {
// Either cluster is unknown or there are no healthy hosts. tcpConnPool() increments
// cluster->stats().upstream_cx_none_healthy in the latter case.
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
Expand All @@ -443,7 +443,7 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
return Network::FilterStatus::StopIteration;
}

bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::string& cluster_name) {
bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
GenericConnPoolFactory* factory = nullptr;
if (cluster.info()->upstreamConfig().has_value()) {
factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
Expand All @@ -456,8 +456,8 @@ bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::strin
return false;
}

generic_conn_pool_ = factory->createGenericConnPool(
cluster_name, cluster_manager_, config_->tunnelingConfig(), this, *upstream_callbacks_);
generic_conn_pool_ = factory->createGenericConnPool(cluster, config_->tunnelingConfig(), this,
*upstream_callbacks_);
if (generic_conn_pool_) {
connecting_ = true;
connect_attempts_++;
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class Filter : public Network::ReadFilter,

void initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats);
Network::FilterStatus initializeUpstreamConnection();
bool maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::string& cluster_name);
bool maybeTunnel(Upstream::ThreadLocalCluster& cluster);
void onConnectTimeout();
void onDownstreamEvent(Network::ConnectionEvent event);
void onUpstreamData(Buffer::Instance& data, bool end_stream);
Expand Down
23 changes: 5 additions & 18 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,11 @@ void HttpUpstream::doneWriting() {
}
}

TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: upstream_callbacks_(upstream_callbacks) {
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, context);
}
conn_pool_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
}

TcpConnPool::~TcpConnPool() {
Expand Down Expand Up @@ -185,20 +179,13 @@ void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data
latched_data->connection().streamInfo().downstreamSslConnection());
}

HttpConnPool::HttpConnPool(const std::string& cluster_name,
Upstream::ClusterManager& cluster_manager,
HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, const TunnelingConfig& config,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecClient::Type type)
: hostname_(config.hostname()), type_(type), upstream_callbacks_(upstream_callbacks) {
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->httpConnPool(Upstream::ResourcePriority::Default,
absl::nullopt, context);
}
conn_pool_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, absl::nullopt,
context);
}

HttpConnPool::~HttpConnPool() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace TcpProxy {

class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks {
public:
TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks);
~TcpConnPool() override;
Expand Down Expand Up @@ -44,7 +44,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
using TunnelingConfig =
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, const TunnelingConfig& config,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecClient::Type type);
Expand Down
11 changes: 6 additions & 5 deletions source/extensions/upstreams/http/generic/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ namespace Http {
namespace Generic {

Router::GenericConnPoolPtr GenericGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
if (is_connect) {
auto ret = std::make_unique<Upstreams::Http::Tcp::TcpConnPool>(cm, is_connect, route_entry,
downstream_protocol, ctx);
auto ret = std::make_unique<Upstreams::Http::Tcp::TcpConnPool>(
thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}
auto ret = std::make_unique<Upstreams::Http::Http::HttpConnPool>(cm, is_connect, route_entry,
downstream_protocol, ctx);
auto ret = std::make_unique<Upstreams::Http::Http::HttpConnPool>(
thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/generic/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GenericGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.generic"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/upstreams/http/http/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ namespace Http {
namespace Http {

Router::GenericConnPoolPtr HttpGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
auto ret = std::make_unique<HttpConnPool>(cm, is_connect, route_entry, downstream_protocol, ctx);
auto ret = std::make_unique<HttpConnPool>(thread_local_cluster, is_connect, route_entry,
downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/http/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HttpGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.http"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
13 changes: 4 additions & 9 deletions source/extensions/upstreams/http/http/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@ namespace Http {
class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::ConnectionPool::Callbacks {
public:
// GenericConnPool
HttpConnPool(Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) {
ASSERT(!is_connect);
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cm.getThreadLocalCluster(route_entry.clusterName());
if (thread_local_cluster != nullptr) {
conn_pool_ =
thread_local_cluster->httpConnPool(route_entry.priority(), downstream_protocol, ctx);
}
conn_pool_ =
thread_local_cluster.httpConnPool(route_entry.priority(), downstream_protocol, ctx);
}
~HttpConnPool() override {
ASSERT(conn_pool_stream_handle_ == nullptr, "conn_pool_stream_handle not null");
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/upstreams/http/tcp/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ namespace Http {
namespace Tcp {

Router::GenericConnPoolPtr TcpGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
auto ret = std::make_unique<TcpConnPool>(cm, is_connect, route_entry, downstream_protocol, ctx);
auto ret = std::make_unique<TcpConnPool>(thread_local_cluster, is_connect, route_entry,
downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/tcp/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TcpGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.tcp"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
11 changes: 3 additions & 8 deletions source/extensions/upstreams/http/tcp/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@ namespace Tcp {

class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::ConnectionPool::Callbacks {
public:
TcpConnPool(Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol>, Upstream::LoadBalancerContext* ctx) {
ASSERT(is_connect);
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cm.getThreadLocalCluster(route_entry.clusterName());
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, ctx);
}
conn_pool_ = thread_local_cluster.tcpConnPool(route_entry.priority(), ctx);
}
void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override {
callbacks_ = callbacks;
Expand Down
19 changes: 8 additions & 11 deletions source/extensions/upstreams/tcp/generic/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,20 @@ namespace Tcp {
namespace Generic {

TcpProxy::GenericConnPoolPtr GenericConnPoolFactory::createGenericConnPool(
const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config, Upstream::LoadBalancerContext* context,
Envoy::Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const {
if (config.has_value()) {
auto* cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (!cluster) {
return nullptr;
}
auto pool_type = ((cluster->info()->features() & Upstream::ClusterInfo::Features::HTTP2) != 0)
? Http::CodecClient::Type::HTTP2
: Http::CodecClient::Type::HTTP1;
auto pool_type =
((thread_local_cluster.info()->features() & Upstream::ClusterInfo::Features::HTTP2) != 0)
? Http::CodecClient::Type::HTTP2
: Http::CodecClient::Type::HTTP1;
auto ret = std::make_unique<TcpProxy::HttpConnPool>(
cluster_name, cluster_manager, context, config.value(), upstream_callbacks, pool_type);
thread_local_cluster, context, config.value(), upstream_callbacks, pool_type);
return (ret->valid() ? std::move(ret) : nullptr);
}
auto ret = std::make_unique<TcpProxy::TcpConnPool>(cluster_name, cluster_manager, context,
upstream_callbacks);
auto ret =
std::make_unique<TcpProxy::TcpConnPool>(thread_local_cluster, context, upstream_callbacks);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/tcp/generic/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GenericConnPoolFactory : public TcpProxy::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.tcp.generic"; }
std::string category() const override { return "envoy.upstreams"; }
TcpProxy::GenericConnPoolPtr createGenericConnPool(
const std::string& cluster_name, Upstream::ClusterManager& cm,
Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config, Upstream::LoadBalancerContext* context,
Envoy::Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const override;

Expand Down
4 changes: 2 additions & 2 deletions test/extensions/upstreams/http/tcp/upstream_request_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class TcpConnPoolTest : public ::testing::Test {
NiceMock<Upstream::MockClusterManager> cm;
cm.initializeThreadLocalClusters({"fake_cluster"});
EXPECT_CALL(cm.thread_local_cluster_, tcpConnPool(_, _)).WillOnce(Return(&mock_pool_));
conn_pool_ = std::make_unique<TcpConnPool>(cm, true, route_entry, Envoy::Http::Protocol::Http11,
nullptr);
conn_pool_ = std::make_unique<TcpConnPool>(cm.thread_local_cluster_, true, route_entry,
Envoy::Http::Protocol::Http11, nullptr);
}

std::unique_ptr<TcpConnPool> conn_pool_;
Expand Down
Loading