Skip to content

Commit

Permalink
cluster manager: change underlying call chain for drainConnections (#…
Browse files Browse the repository at this point in the history
…17950)

Commit Message: cluster manager - change underlying call chain for drainConnections
Additional Description: previously drainConnections used ConnPool::startDrain which does not guarantee that non-idle connections will not be used for new streams. This change leverages the logic in onHostHealthFailure to close idle connections and mark non-idle connections as draining so that new streams are forced onto new connections.
Risk Level: low - API usage is opt in
Testing: existing tests.

Signed-off-by: Jose Nino <jnino@lyft.com>
  • Loading branch information
junr03 authored Sep 1, 2021
1 parent 702cadc commit 17b24c7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 43 deletions.
100 changes: 58 additions & 42 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster) {
tls_.runOnAllThreads([cluster](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
cluster_entry->second->drainConnPools();
cluster_entry->second->drainAllConnPools();
}
});
}
Expand All @@ -966,7 +966,7 @@ void ClusterManagerImpl::drainConnections() {

tls_.runOnAllThreads([](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
cluster_entry.second->drainConnPools();
cluster_entry.second->drainAllConnPools();
}
});
}
Expand Down Expand Up @@ -1262,49 +1262,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
const HostSharedPtr& host) {

// Drain all HTTP connection pool connections in the case of a host health failure. If outlier/
// health is due to `ECMP` flow hashing issues for example, a new set of connections might do
// better.
// Drain all HTTP and TCP connection pool connections in the case of a host health failure. If
// outlier/ health is due to `ECMP` flow hashing issues for example, a new set of connections
// might do better.
// TODO(mattklein123): This function is currently very specific, but in the future when we do
// more granular host set changes, we should be able to capture single host changes and make them
// more targeted.
{
const auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->do_not_delete_ = true;
container->pools_->drainConnections();
container->do_not_delete_ = false;

if (container->pools_->size() == 0) {
host_http_conn_pool_map_.erase(host);
}
}
}
{
// Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to
// connections being closed, it only prevents new connections through the pool. The
// CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any
// active connections.
const auto& container = host_tcp_conn_pool_map_.find(host);
if (container != host_tcp_conn_pool_map_.end()) {
// Draining pools or closing connections can cause pool deletion if it becomes
// idle. Copy `pools_` so that we aren't iterating through a container that
// gets mutated by callbacks deleting from it.
std::vector<Tcp::ConnectionPool::Instance*> pools;
for (const auto& pair : container->second.pools_) {
pools.push_back(pair.second.get());
}

for (auto* pool : pools) {
if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
pool->closeConnections();
} else {
pool->drainConnections();
}
}
}
}
drainAllConnPoolsWorker(host);

if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
Expand Down Expand Up @@ -1408,6 +1372,58 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools() {
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
for (const HostSharedPtr& host : host_set->hosts()) {
parent_.drainAllConnPoolsWorker(host);
}
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker(
const HostSharedPtr& host) {
// Drain or close any HTTP connection pool for the host. Draining an HTTP pool only leads to
// idle connections being closed. Non-idle connections are marked as draining and prevents new
// streams to go through them, causing new connections to be opened.
{
const auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->do_not_delete_ = true;
container->pools_->drainConnections();
container->do_not_delete_ = false;

if (container->pools_->size() == 0) {
host_http_conn_pool_map_.erase(host);
}
}
}
{
// Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to
// connections being closed, it only prevents new connections through the pool. The
// CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any
// active connections.
const auto& container = host_tcp_conn_pool_map_.find(host);
if (container != host_tcp_conn_pool_map_.end()) {
// Draining pools or closing connections can cause pool deletion if it becomes
// idle. Copy `pools_` so that we aren't iterating through a container that
// gets mutated by callbacks deleting from it.
std::vector<Tcp::ConnectionPool::Instance*> pools;
for (const auto& pair : container->second.pools_) {
pools.push_back(pair.second.get());
}

for (auto* pool : pools) {
if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
pool->closeConnections();
} else {
pool->drainConnections();
}
}
}
}
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
// We need to drain all connection pools for the cluster being removed. Then we can remove the
// cluster.
Expand Down
10 changes: 9 additions & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

// Drains any connection pools associated with the removed hosts.
void drainConnPools(const HostVector& hosts_removed);
// Drains connection pools for all hosts.
// Drains idle clients in connection pools for all hosts.
void drainConnPools();
// Drain all clients in connection pools for all hosts.
void drainAllConnPools();

private:
Http::ConnectionPool::Instance*
Expand Down Expand Up @@ -465,9 +467,15 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const absl::optional<LocalClusterParams>& local_cluster_params);
~ThreadLocalClusterManagerImpl() override;
// TODO(junr03): clean up drainConnPools vs drainAllConnPools once ConnPoolImplBase::startDrain
// and
// ConnPoolImplBase::drainConnections() get cleaned up. The code in onHostHealthFailure and the
// code in ThreadLocalClusterManagerImpl::drainConnPools(const HostVector& hosts) is very
// similar and can be merged in a similar fashion to the ConnPoolImplBase case.
void drainConnPools(const HostVector& hosts);
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
void drainTcpConnPools(TcpConnPoolsContainer& container);
void drainAllConnPoolsWorker(const HostSharedPtr& host);
void httpConnPoolIsIdle(HostConstSharedPtr host, ResourcePriority priority,
const std::vector<uint8_t>& hash_key);
void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector<uint8_t>& hash_key);
Expand Down

0 comments on commit 17b24c7

Please sign in to comment.