From 17b24c7ad1fe4bf2e804f63b6d104a5cbece3ddb Mon Sep 17 00:00:00 2001 From: Jose Ulises Nino Rivera Date: Wed, 1 Sep 2021 16:04:58 -0700 Subject: [PATCH] cluster manager: change underlying call chain for drainConnections (#17950) Commit Message: cluster manager - change underlying call chain for drainConnections Additional Description: previously drainConnections used ConnPool::startDrain which does not guarantee that non-idle connections will not be used for new streams. This change leverages the logic in onHostHealthFailure to close idle connections and mark non-idle connections as draining so that new streams are forced onto new connections. Risk Level: low - API usage is opt in Testing: existing tests. Signed-off-by: Jose Nino --- .../common/upstream/cluster_manager_impl.cc | 100 ++++++++++-------- source/common/upstream/cluster_manager_impl.h | 10 +- 2 files changed, 67 insertions(+), 43 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 8017ae014025..f2c9ff1271b9 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -955,7 +955,7 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster) { tls_.runOnAllThreads([cluster](OptRef cluster_manager) { auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster); if (cluster_entry != cluster_manager->thread_local_clusters_.end()) { - cluster_entry->second->drainConnPools(); + cluster_entry->second->drainAllConnPools(); } }); } @@ -966,7 +966,7 @@ void ClusterManagerImpl::drainConnections() { tls_.runOnAllThreads([](OptRef cluster_manager) { for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) { - cluster_entry.second->drainConnPools(); + cluster_entry.second->drainAllConnPools(); } }); } @@ -1262,49 +1262,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( const HostSharedPtr& host) { - // Drain all HTTP connection pool connections in the case of a host health failure. If outlier/ - // health is due to `ECMP` flow hashing issues for example, a new set of connections might do - // better. + // Drain all HTTP and TCP connection pool connections in the case of a host health failure. If + // outlier/ health is due to `ECMP` flow hashing issues for example, a new set of connections + // might do better. // TODO(mattklein123): This function is currently very specific, but in the future when we do // more granular host set changes, we should be able to capture single host changes and make them // more targeted. - { - const auto container = getHttpConnPoolsContainer(host); - if (container != nullptr) { - container->do_not_delete_ = true; - container->pools_->drainConnections(); - container->do_not_delete_ = false; - - if (container->pools_->size() == 0) { - host_http_conn_pool_map_.erase(host); - } - } - } - { - // Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to - // connections being closed, it only prevents new connections through the pool. The - // CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any - // active connections. - const auto& container = host_tcp_conn_pool_map_.find(host); - if (container != host_tcp_conn_pool_map_.end()) { - // Draining pools or closing connections can cause pool deletion if it becomes - // idle. Copy `pools_` so that we aren't iterating through a container that - // gets mutated by callbacks deleting from it. - std::vector pools; - for (const auto& pair : container->second.pools_) { - pools.push_back(pair.second.get()); - } - - for (auto* pool : pools) { - if (host->cluster().features() & - ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { - pool->closeConnections(); - } else { - pool->drainConnections(); - } - } - } - } + drainAllConnPoolsWorker(host); if (host->cluster().features() & ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { @@ -1408,6 +1372,58 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP } } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools() { + for (auto& host_set : priority_set_.hostSetsPerPriority()) { + for (const HostSharedPtr& host : host_set->hosts()) { + parent_.drainAllConnPoolsWorker(host); + } + } +} + +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker( + const HostSharedPtr& host) { + // Drain or close any HTTP connection pool for the host. Draining an HTTP pool only leads to + // idle connections being closed. Non-idle connections are marked as draining and prevents new + // streams to go through them, causing new connections to be opened. + { + const auto container = getHttpConnPoolsContainer(host); + if (container != nullptr) { + container->do_not_delete_ = true; + container->pools_->drainConnections(); + container->do_not_delete_ = false; + + if (container->pools_->size() == 0) { + host_http_conn_pool_map_.erase(host); + } + } + } + { + // Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to + // connections being closed, it only prevents new connections through the pool. The + // CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any + // active connections. + const auto& container = host_tcp_conn_pool_map_.find(host); + if (container != host_tcp_conn_pool_map_.end()) { + // Draining pools or closing connections can cause pool deletion if it becomes + // idle. Copy `pools_` so that we aren't iterating through a container that + // gets mutated by callbacks deleting from it. + std::vector pools; + for (const auto& pair : container->second.pools_) { + pools.push_back(pair.second.get()); + } + + for (auto* pool : pools) { + if (host->cluster().features() & + ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { + pool->closeConnections(); + } else { + pool->drainConnections(); + } + } + } + } +} + ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() { // We need to drain all connection pools for the cluster being removed. Then we can remove the // cluster. diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 3b3cc3e225f3..1540defd9771 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -431,8 +431,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& local_cluster_params); ~ThreadLocalClusterManagerImpl() override; + // TODO(junr03): clean up drainConnPools vs drainAllConnPools once ConnPoolImplBase::startDrain + // and + // ConnPoolImplBase::drainConnections() get cleaned up. The code in onHostHealthFailure and the + // code in ThreadLocalClusterManagerImpl::drainConnPools(const HostVector& hosts) is very + // similar and can be merged in a similar fashion to the ConnPoolImplBase case. void drainConnPools(const HostVector& hosts); void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container); void drainTcpConnPools(TcpConnPoolsContainer& container); + void drainAllConnPoolsWorker(const HostSharedPtr& host); void httpConnPoolIsIdle(HostConstSharedPtr host, ResourcePriority priority, const std::vector& hash_key); void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector& hash_key);