Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20120] TCPSendResources cleanup (backport #4300) #4514

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include <fastrtps/types/TypeObjectFactory.h>
#include <fastrtps/types/DynamicPubSubType.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/utils/IPLocator.h>
#include "fastrtps/utils/shared_mutex.hpp"
Expand Down Expand Up @@ -1239,6 +1241,21 @@ bool PDP::remove_remote_participant(

this->mp_mutex->lock();

// Delete from sender resource list (TCP only)
LocatorList_t remote_participant_locators;
for (auto& remote_participant_default_locator : pdata->default_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_default_locator);
}
for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast)
{
remote_participant_locators.push_back(remote_participant_metatraffic_locator);
}
if (!remote_participant_locators.empty())
{
mp_RTPSParticipant->update_removed_participant(remote_participant_locators);
}

// Return reader proxy objects to pool
for (auto pit : *pdata->m_readers)
{
Expand Down Expand Up @@ -1266,6 +1283,7 @@ bool PDP::remove_remote_participant(
participant_proxies_pool_.push_back(pdata);

this->mp_mutex->unlock();

return true;
}

Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
#include <utility>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.h>
#include <fastdds/rtps/transport/TransportDescriptorInterface.h>
#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/IPLocator.h>

#include <rtps/transport/UDPv4Transport.h>
#include <rtps/transport/TCPTransportInterface.h>

using namespace std;
using namespace eprosima::fastdds::rtps;
Expand Down Expand Up @@ -471,6 +472,24 @@ void NetworkFactory::update_network_interfaces()
}
}

void NetworkFactory::remove_participant_associated_send_resources(
SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const
{
for (auto& transport : mRegisteredTransports)
{
TCPTransportInterface* tcp_transport = dynamic_cast<TCPTransportInterface*>(transport.get());
if (tcp_transport)
{
tcp_transport->CloseOutputChannel(
send_resource_list,
remote_participant_locators,
participant_initial_peers);
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
13 changes: 13 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>

#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/MessageReceiver.h>
#include <fastdds/rtps/transport/SenderResource.h>
Expand Down Expand Up @@ -246,6 +247,18 @@ class NetworkFactory
*/
void update_network_interfaces();

/**
* Remove the given participants from the send resource list
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators List of locators associated to the remote participant.
* @param participant_initial_peers List of locators of the initial peers of the local participant.
*/
void remove_participant_associated_send_resources(
fastdds::rtps::SendResourceList& send_resource_list,
const LocatorList_t& remote_participant_locators,
const LocatorList_t& participant_initial_peers) const;

private:

std::vector<std::unique_ptr<fastdds::rtps::TransportInterface>> mRegisteredTransports;
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include <fastdds/rtps/writer/StatelessPersistentWriter.h>
#include <fastdds/rtps/writer/StatefulPersistentWriter.h>

#include <fastdds/rtps/common/LocatorList.hpp>

#include <fastrtps/utils/IPFinder.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>
Expand Down Expand Up @@ -2982,6 +2984,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints(
return should_match_local_endpoints;
}

void RTPSParticipantImpl::update_removed_participant(
const LocatorList_t& remote_participant_locators)
{
if (!remote_participant_locators.empty())
{
std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_);
m_network_Factory.remove_participant_associated_send_resources(
send_resource_list_,
remote_participant_locators,
m_att.builtin.initialPeersList);
}
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/messages/MessageReceiver.h>
Expand Down Expand Up @@ -1266,6 +1267,14 @@ class RTPSParticipantImpl
return match_local_endpoints_;
}

/**
* Method called on participant removal with the set of locators associated to the participant.
*
* @param remote_participant_locators Set of locators associated to the participant removed.
*/
void update_removed_participant(
const LocatorList_t& remote_participant_locators);

};
} // namespace rtps
} /* namespace rtps */
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(locator_);
transport.SenderResourceHasBeenClosed(locator_);
};

send_lambda_ = [this, &transport](
Expand Down Expand Up @@ -68,7 +68,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}

static TCPSenderResource* cast(
TransportInterface& transport,
const TransportInterface& transport,
SenderResource* sender_resource)
{
TCPSenderResource* returned_resource = nullptr;
Expand Down
100 changes: 88 additions & 12 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <cstring>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -273,6 +274,19 @@ Locator TCPTransportInterface::remote_endpoint_to_locator(
return locator;
}

Locator TCPTransportInterface::local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const
{
Locator locator;
asio::error_code ec;
endpoint_to_locator(channel->local_endpoint(ec), locator);
if (ec)
{
LOCATOR_INVALID(locator);
}
return locator;
}

void TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
Expand Down Expand Up @@ -639,11 +653,24 @@ bool TCPTransportInterface::transform_remote_locator(
return false;
}

void TCPTransportInterface::CloseOutputChannel(
void TCPTransportInterface::SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator)
{
locator.set_Invalid_Address();
locator.port = 0;
// The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction
// this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the
// socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already
// connected.
// If moving this unbind send with the respective channel disconnection to this point, the following problem arises:
// If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant
// isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has
// taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock).
// Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have
// already been destroyed and the listening thread had consequently finished).
// An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done
// via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from
// the client.
// The send resource locator is invalidated to prevent further use of associated channel.
LOCATOR_INVALID(locator);
}

bool TCPTransportInterface::CloseInputChannel(
Expand Down Expand Up @@ -1192,7 +1219,6 @@ bool TCPTransportInterface::Receive(
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
Expand Down Expand Up @@ -1441,10 +1467,8 @@ void TCPTransportInterface::SocketAccepted(
create_listening_thread(channel);

EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: "
<< channel->local_endpoint().address() << ":"
<< channel->local_endpoint().port() << "), remote: "
<< channel->remote_endpoint().address() << ":"
<< channel->remote_endpoint().port() << ")");
<< local_endpoint_to_locator(channel) << ", remote: "
<< remote_endpoint_to_locator(channel) << ")");
}
else
{
Expand Down Expand Up @@ -1486,10 +1510,8 @@ void TCPTransportInterface::SecureSocketAccepted(
create_listening_thread(secure_channel);

EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: "
<< socket->lowest_layer().local_endpoint().address() << ":"
<< socket->lowest_layer().local_endpoint().port() << "), remote: "
<< socket->lowest_layer().remote_endpoint().address() << ":"
<< socket->lowest_layer().remote_endpoint().port() << ")");
<< local_endpoint_to_locator(secure_channel) << ", remote: "
<< remote_endpoint_to_locator(secure_channel) << ")");
}
else
{
Expand Down Expand Up @@ -1842,6 +1864,60 @@ void TCPTransportInterface::fill_local_physical_port(
}
}

void TCPTransportInterface::CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const
{
// Since send resources handle physical locators, we need to convert the remote participant locators to physical
std::set<Locator> remote_participant_physical_locators;
for (const Locator& remote_participant_locator : remote_participant_locators)
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator));

// Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario,
//initial peer can also work with the WANtoLANLocator of the remote participant.
if (IPLocator::hasWan(remote_participant_locator))
{
remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator(
remote_participant_locator)));
}
}

// Exlude initial peers.
for (const auto& initial_peer : participant_initial_peers)
{
if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(),
IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end())
{
remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer));
}
}

for (const auto& remote_participant_physical_locator : remote_participant_physical_locators)
{
if (!IsLocatorSupported(remote_participant_physical_locator))
{
continue;
}
// Remove send resources for the associated remote participant locator
for (auto it = send_resource_list.begin(); it != send_resource_list.end();)
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get());

if (tcp_sender_resource)
{
if (tcp_sender_resource->locator() == remote_participant_physical_locator)
{
it = send_resource_list.erase(it);
continue;
}
}
++it;
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
20 changes: 19 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ class TCPTransportInterface : public TransportInterface
Locator remote_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Converts a local endpoint to a locator if possible. Otherwise, it sets an invalid locator.
*/
Locator local_endpoint_to_locator(
const std::shared_ptr<TCPChannelResource>& channel) const;

/**
* Shutdown method to close the connections of the transports.
*/
Expand Down Expand Up @@ -241,7 +247,7 @@ class TCPTransportInterface : public TransportInterface
const Locator&) override;

//! Resets the locator bound to the sender resource.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
fastrtps::rtps::Locator_t& locator);

//! Reports whether Locators correspond to the same port.
Expand Down Expand Up @@ -478,6 +484,18 @@ class TCPTransportInterface : public TransportInterface
return non_blocking_send_;
}

/**
* Close the output channel associated to the given remote participant but if its locators belong to the
* given list of initial peers.
*
* @param send_resource_list List of send resources associated to the local participant.
* @param remote_participant_locators Set of locators associated to the remote participant.
* @param participant_initial_peers List of locators associated to the initial peers of the local participant.
*/
void CloseOutputChannel(
SendResourceList& send_resource_list,
const LocatorList& remote_participant_locators,
const LocatorList& participant_initial_peers) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(socket_);
transport.SenderResourceHasBeenClosed(socket_);
};

send_lambda_ = [this, &transport](
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel(
return true;
}

void UDPTransportInterface::CloseOutputChannel(
void UDPTransportInterface::SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket)
{
socket.cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class UDPTransportInterface : public TransportInterface
const Locator&) override;

//! Removes all outbound sockets on the given port.
void CloseOutputChannel(
void SenderResourceHasBeenClosed(
eProsimaUDPSocket& socket);

//! Reports whether Locators correspond to the same port.
Expand Down
Loading
Loading