Skip to content

Commit

Permalink
Merge pull request OpenDDS#4731 from iguessthislldo/igtd/rtps-udp-mem…
Browse files Browse the repository at this point in the history
…ory-leaks

Fix Locator Memory Leak in RTPS/UDP Transport
  • Loading branch information
jrw972 authored Jul 25, 2024
2 parents c253132 + 0bc9a99 commit 3ac828b
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 133 deletions.
18 changes: 16 additions & 2 deletions dds/DCPS/RTPS/DiscoveredEntities.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ struct DiscoveredParticipant {
, participant_tokens_sent_(false)
#endif
{
const DCPS::GUID_t guid = DCPS::make_part_guid(p.participantProxy.guidPrefix);
assign(location_data_.guid, guid);
assign(location_data_.guid, make_part_guid());
location_data_.location = 0;
location_data_.change_mask = 0;
location_data_.local_timestamp.sec = 0;
Expand Down Expand Up @@ -207,6 +206,21 @@ struct DiscoveredParticipant {
return pdata_.dataKind == Security::DPDK_ENHANCED || pdata_.dataKind == Security::DPDK_SECURE;
}
#endif

const DCPS::GuidPrefix_t& prefix() const
{
return pdata_.participantProxy.guidPrefix;
}

DCPS::GUID_t make_guid(const DCPS::EntityId_t& entity) const
{
return DCPS::make_id(prefix(), entity);
}

DCPS::GUID_t make_part_guid() const
{
return DCPS::make_part_guid(prefix());
}
};

struct DiscoveredSubscription : DCPS::PoolAllocationBase {
Expand Down
134 changes: 65 additions & 69 deletions dds/DCPS/RTPS/Sedp.cpp

Large diffs are not rendered by default.

43 changes: 26 additions & 17 deletions dds/DCPS/transport/framework/DataLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,8 @@ DataLink::peer_ids(const GUID_t& local_id) const
/// with a simultaneous call (in another thread) to one of this
/// DataLink's make_reservation() methods.
void
DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,
DataLinkSetMap& released_locals)
DataLink::release_reservations(const GUID_t& remote_id, const GUID_t& local_id,
DataLinkSetMap* released_locals)
{
DBG_ENTRY_LVL("DataLink", "release_reservations", 6);

Expand Down Expand Up @@ -541,23 +541,32 @@ DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,

if (this->stopped_) return;

ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
if (rls->size() == 1) {
assoc_by_remote_.erase(remote_id);
release_remote_required = true;
} else {
rls->remove(local_id);
AssocByRemote::iterator remote_it = assoc_by_remote_.find(remote_id);
if (remote_it != assoc_by_remote_.end()) {
ReceiveListenerSet_rch& rls = remote_it->second;
if (rls->size() == 1) {
assoc_by_remote_.erase(remote_id);
release_remote_required = true;
} else {
rls->remove(local_id);
}
}
RepoIdSet& ris = assoc_by_local_[local_id].associated_;
if (ris.size() == 1) {
DataLinkSet_rch& links = released_locals[local_id];
if (links.is_nil()) {
links = make_rch<DataLinkSet>();

AssocByLocal::iterator local_it = assoc_by_local_.find(local_id);
if (local_it != assoc_by_local_.end()) {
RepoIdSet& ris = local_it->second.associated_;
if (ris.size() == 1) {
if (released_locals) {
DataLinkSet_rch& links = (*released_locals)[local_id];
if (links.is_nil()) {
links = make_rch<DataLinkSet>();
}
links->insert_link(rchandle_from(this));
}
assoc_by_local_.erase(local_id);
} else {
ris.erase(remote_id);
}
links->insert_link(rchandle_from(this));
assoc_by_local_.erase(local_id);
} else {
ris.erase(remote_id);
}

if (assoc_by_local_.empty()) {
Expand Down
6 changes: 3 additions & 3 deletions dds/DCPS/transport/framework/DataLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class OpenDDS_Dcps_Export DataLink
/// make_reservation() methods. All we know is that the supplied
/// GUID_t is considered to be a remote id. It could be a
/// remote subscriber or a remote publisher.
void release_reservations(GUID_t remote_id,
GUID_t local_id,
DataLinkSetMap& released_locals);
void release_reservations(const GUID_t& remote_id,
const GUID_t& local_id,
DataLinkSetMap* released_locals = 0);

void schedule_delayed_release();

Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/transport/framework/TransportClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ TransportClient::disassociate(const GUID_t& peerId)
}

OPENDDS_ASSERT(guid_ != GUID_UNKNOWN);
link->release_reservations(peerId, guid_, released);
link->release_reservations(peerId, guid_, &released);

if (!released.empty()) {

Expand Down
84 changes: 49 additions & 35 deletions dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

namespace {

bool compare_and_update_counts(CORBA::Long incoming, CORBA::Long& existing) {
bool compare_and_update_counts(CORBA::Long incoming, CORBA::Long& existing)
{
static const CORBA::Long ONE_QUARTER_MAX_POSITIVE = 0x20000000;
static const CORBA::Long THREE_QUARTER_MAX_POSITIVE = 0x60000000;
if (incoming <= existing &&
Expand Down Expand Up @@ -485,24 +486,41 @@ RtpsUdpDataLink::update_locators(const GUID_t& remote_id,
bool requires_inline_qos,
bool add_ref)
{
if (unicast_addresses.empty() && multicast_addresses.empty()) {
if (DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::update_locators: no addresses for %C\n"), LogGuid(remote_id).c_str()));
}
const bool log_warn = log_level >= LogLevel::Warning;
if (log_warn && unicast_addresses.empty() && multicast_addresses.empty()) {
ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RtpsUdpDataLink::update_locators: "
"no addresses for %C\n", LogGuid(remote_id).c_str()));
}

remove_locator_and_bundling_cache(remote_id);

ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfo& info = locators_[remote_id];
const bool log_unicast_change = DCPS_debug_level > 3 && info.unicast_addrs_ != unicast_addresses;
const bool log_multicast_change = DCPS_debug_level > 3 && info.multicast_addrs_ != multicast_addresses;
info.unicast_addrs_.swap(unicast_addresses);
info.multicast_addrs_.swap(multicast_addresses);
info.requires_inline_qos_ = requires_inline_qos;
RemoteInfo* info = 0;
if (add_ref) {
++info.ref_count_;
info = &locators_[remote_id];
} else {
RemoteInfoMap::iterator it = locators_.find(remote_id);
locators_.find(remote_id);
if (it == locators_.end()) {
if (log_warn) {
g.release();
ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: RtpsUdpDataLink::update_locators: "
"no existing locators to update for %C\n", LogGuid(remote_id).c_str()));
}
return;
}
info = &it->second;
}

const bool log_change = DCPS_debug_level >= 4;
const bool log_unicast_change = log_change && info->unicast_addrs_ != unicast_addresses;
const bool log_multicast_change = log_change && info->multicast_addrs_ != multicast_addresses;
info->unicast_addrs_.swap(unicast_addresses);
info->multicast_addrs_.swap(multicast_addresses);
info->requires_inline_qos_ = requires_inline_qos;
if (add_ref) {
++info->ref_count_;
}

g.release();
Expand Down Expand Up @@ -654,29 +672,6 @@ RtpsUdpDataLink::associated(const GUID_t& local_id, const GUID_t& remote_id,
return associated;
}

void
RtpsUdpDataLink::disassociated(const GUID_t& local_id,
const GUID_t& remote_id)
{
release_reservations_i(remote_id, local_id);
remove_locator_and_bundling_cache(remote_id);
sq_.ignore_remote(remote_id);

ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfoMap::iterator pos = locators_.find(remote_id);
if (pos != locators_.end()) {
OPENDDS_ASSERT(pos->second.ref_count_ > 0);

--pos->second.ref_count_;
if (pos->second.ref_count_ == 0) {
locators_.erase(pos);
}
} else if (Transport_debug_level > 3) {
ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n", LogGuid(local_id).c_str()));
}
}

void
RtpsUdpDataLink::register_for_reader(const GUID_t& writerid,
const GUID_t& readerid,
Expand Down Expand Up @@ -957,6 +952,25 @@ RtpsUdpDataLink::release_reservations_i(const GUID_t& remote_id,

remove_locator_and_bundling_cache(remote_id);

{
ACE_GUARD(ACE_Thread_Mutex, g, locators_lock_);

RemoteInfoMap::iterator pos = locators_.find(remote_id);
if (pos != locators_.end()) {
OPENDDS_ASSERT(pos->second.ref_count_ > 0);
if (--pos->second.ref_count_ == 0) {
locators_.erase(pos);
}
} else if (Transport_debug_level >= 4) {
g.release();
ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::release_reservations_i: "
"%C doesn't not have any locators with remote %C\n",
String(conv).c_str(), LogGuid(remote_id).c_str()));
}
}

sq_.ignore_remote(remote_id);

for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
(*drop_it)->data_dropped(true);
}
Expand Down
4 changes: 1 addition & 3 deletions dds/DCPS/transport/rtps_udp/RtpsUdpDataLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink
const NetworkAddress& last_addr_hint,
bool requires_inline_qos);

void disassociated(const GUID_t& local, const GUID_t& remote);

void register_for_reader(const GUID_t& writerid,
const GUID_t& readerid,
const NetworkAddressSet& addresses,
Expand Down Expand Up @@ -290,7 +288,7 @@ class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink
bool requires_inline_qos_;
NetworkAddress last_recv_addr_;
MonotonicTimePoint last_recv_time_;
size_t ref_count_;
DDS::UInt32 ref_count_;
bool insert_recv_addr(NetworkAddressSet& aset) const;
};

Expand Down
2 changes: 1 addition & 1 deletion dds/DCPS/transport/rtps_udp/RtpsUdpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ RtpsUdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& clien
if (link_) {
TransportClient_rch c = client.lock();
if (c) {
link_->disassociated(c->get_guid(), remote_id);
link_->release_reservations(c->get_guid(), remote_id);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions docs/news.d/rtps-udp-memory-leaks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.. news-prs: 4731
.. news-start-section: Fixes
- Fixed memory leak of remote locators in the RTPS/UDP transport.
.. news-end-section
3 changes: 1 addition & 2 deletions tools/rtpsrelay/GuidAddrSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ GuidAddrSet::record_activity(const AddrPort& remote_address,
auto result = remote_map_.insert(std::make_pair(remote, src_guid));
if (result.second) {
relay_stats_reporter_.remote_map_size(static_cast<uint32_t>(remote_map_.size()), now);
}
if (!result.second && result.first->second != src_guid) {
} else if (result.first->second != src_guid) {
if (config_.log_activity()) {
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) INFO: GuidAddrSet::record_activity change detected %C -> %C\n"),
guid_to_string(result.first->second).c_str(),
Expand Down

0 comments on commit 3ac828b

Please sign in to comment.