Skip to content

Commit

Permalink
fix memory leaking in dfp (#31433)
Browse files Browse the repository at this point in the history
Commit Message:
fix #30999

when host is resolved to IP A, A will be added into mutable_cross_priority_host_map_
when host is resolved to IP B, the address saved in logical_host_ will be updated to IP B.
when host is TTL expired, only IP B will be removed in mutable_cross_priority_host_map_, IP A is leaking in mutable_cross_priority_host_map_.

Signed-off-by: doujiang24 <doujiang24@gmail.com>
  • Loading branch information
doujiang24 authored Jan 18, 2024
1 parent 5160151 commit f20b6ca
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 39 deletions.
6 changes: 6 additions & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,8 @@ HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
if (mutable_cross_priority_host_map_ != nullptr) {
const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
ASSERT(mutable_cross_priority_host_map_ == nullptr);
ENVOY_LOG(debug, "cross_priority host map, moving mutable to const, len: {}",
const_cross_priority_host_map_->size());
}
return const_cross_priority_host_map_;
}
Expand All @@ -839,14 +841,18 @@ void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_add
if (mutable_cross_priority_host_map_ == nullptr) {
// Copy old read only host map to mutable host map.
mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
ENVOY_LOG(debug, "cross_priority host map, copying from const, len: {}",
const_cross_priority_host_map_->size());
}

for (const auto& host : hosts_removed) {
mutable_cross_priority_host_map_->erase(addressToString(host->address()));
ENVOY_LOG(debug, "cross_priority host map, removing: {}", addressToString(host->address()));
}

for (const auto& host : hosts_added) {
mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
ENVOY_LOG(debug, "cross_priority host map, adding: {}", addressToString(host->address()));
}
}

Expand Down
56 changes: 20 additions & 36 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,10 @@ Cluster::~Cluster() {

void Cluster::startPreInit() {
// If we are attaching to a pre-populated cache we need to initialize our hosts.
std::unique_ptr<Upstream::HostVector> hosts_added;
dns_cache_->iterateHostMap(
[&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
addOrUpdateHost(host, info, hosts_added);
addOrUpdateHost(host, info);
});
if (hosts_added) {
updatePriorityState(*hosts_added, {});
}
onPreInitComplete();
}

Expand Down Expand Up @@ -237,9 +233,11 @@ bool Cluster::ClusterInfo::checkIdle() {

void Cluster::addOrUpdateHost(
absl::string_view host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
std::unique_ptr<Upstream::HostVector>& hosts_added) {
Upstream::LogicalHostSharedPtr emplaced_host;
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
Upstream::HostVector hosts_added, hosts_removed;
Upstream::LogicalHostSharedPtr new_host = std::make_shared<Upstream::LogicalHost>(
info(), std::string{host}, host_info->address(), host_info->addressList(),
dummy_locality_lb_endpoint_, dummy_lb_endpoint_, nullptr, time_source_);
{
absl::WriterMutexLock lock{&host_map_lock_};

Expand All @@ -251,7 +249,6 @@ void Cluster::addOrUpdateHost(
// future.
const auto host_map_it = host_map_.find(host);
if (host_map_it != host_map_.end()) {
// If we only have an address change, we can do that swap inline without any other updates.
// The appropriate R/W locking is in place to allow this. The details of this locking are:
// - Hosts are not thread local, they are global.
// - We take a read lock when reading the address and a write lock when changing it.
Expand All @@ -267,44 +264,31 @@ void Cluster::addOrUpdateHost(
// semantics, meaning the cache would expose multiple addresses and the
// cluster would create multiple logical hosts based on those addresses.
// We will leave this is a follow up depending on need.
ASSERT(host_info == host_map_it->second.shared_host_info_);
ASSERT(host_map_it->second.shared_host_info_->address() !=
host_map_it->second.logical_host_->address());
ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
host_map_it->second.logical_host_->setNewAddresses(
host_info->address(), host_info->addressList(), dummy_lb_endpoint_);
return;
}

ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
// remove the old host
hosts_removed.emplace_back(host_map_it->second.logical_host_);
ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
host_map_.erase(host_map_it);
host_map_.try_emplace(host, host_info, new_host);

emplaced_host = host_map_
.try_emplace(host, host_info,
std::make_shared<Upstream::LogicalHost>(
info(), std::string{host}, host_info->address(),
host_info->addressList(), dummy_locality_lb_endpoint_,
dummy_lb_endpoint_, nullptr, time_source_))
.first->second.logical_host_;
} else {
ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
host_map_.try_emplace(host, host_info, new_host);
}
hosts_added.emplace_back(new_host);
}

ASSERT(emplaced_host);
if (hosts_added == nullptr) {
hosts_added = std::make_unique<Upstream::HostVector>();
}
hosts_added->emplace_back(emplaced_host);
ASSERT(hosts_added.size() > 0);
updatePriorityState(hosts_added, hosts_removed);
}

void Cluster::onDnsHostAddOrUpdate(
const std::string& host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
ENVOY_LOG(debug, "Adding host info for {}", host);

std::unique_ptr<Upstream::HostVector> hosts_added;
addOrUpdateHost(host, host_info, hosts_added);
if (hosts_added != nullptr) {
ASSERT(!hosts_added->empty());
updatePriorityState(*hosts_added, {});
}
ENVOY_LOG(debug, "Adding/Updating host info for {}", host);
addOrUpdateHost(host, host_info);
}

void Cluster::updatePriorityState(const Upstream::HostVector& hosts_added,
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,

void
addOrUpdateHost(absl::string_view host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
std::unique_ptr<Upstream::HostVector>& hosts_added)
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info)
ABSL_LOCKS_EXCLUDED(host_map_lock_);

void updatePriorityState(const Upstream::HostVector& hosts_added,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ClusterTest : public testing::Test,
}
}));
if (!existing_hosts.empty()) {
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(existing_hosts.size()), SizeIs(0)));
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(1), SizeIs(0))).Times(existing_hosts.size());
}
cluster_->initialize([] {});
}
Expand Down Expand Up @@ -257,6 +257,7 @@ TEST_F(ClusterTest, BasicFlow) {

// After changing the address, LB will immediately resolve the new address with a refresh.
updateTestHostAddress("host1:0", "2.3.4.5");
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(1), SizeIs(1)));
update_callbacks_->onDnsHostAddOrUpdate("host1:0", host_map_["host1:0"]);
EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size());
EXPECT_EQ("2.3.4.5:0",
Expand Down

0 comments on commit f20b6ca

Please sign in to comment.