Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20628] Fix Discovery Server over TCP using LocatorSelectorEntry #4586

Merged
merged 17 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/fastdds/rtps/common/LocatorSelectorEntry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct LocatorSelectorEntry
, state(max_unicast_locators, max_multicast_locators)
, enabled(false)
, transport_should_process(false)
, is_initial_peer_or_ds(false)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
}

Expand All @@ -100,6 +101,26 @@ struct LocatorSelectorEntry
state.multicast.clear();
}

void fill_unicast(
const LocatorList_t& locators)
{
for (const Locator_t& locator : locators)
{
state.unicast.push_back(unicast.size());
unicast.push_back(locator);
}
}

void fill_multicast(
const LocatorList_t& locators)
{
for (const Locator_t& locator : locators)
{
state.multicast.push_back(multicast.size());
multicast.push_back(locator);
}
}

MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
//! GUID of the remote entity.
GUID_t remote_guid;
//! List of unicast locators to send data to the remote entity.
Expand All @@ -112,6 +133,8 @@ struct LocatorSelectorEntry
bool enabled;
//! A temporary value for each transport to help optimizing some use cases.
bool transport_should_process;
//! True if the locator is an initial peer or DS connection. False otherwise.
bool is_initial_peer_or_ds;
};

} /* namespace rtps */
Expand Down
53 changes: 51 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/writer/ReaderProxy.h>
#include <fastdds/rtps/writer/StatefulWriter.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/utils/shared_mutex.hpp>
#include <rtps/builtin/discovery/endpoint/EDPClient.h>
Expand Down Expand Up @@ -443,10 +444,24 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

bool set_logicals = handle_logical_ports_required();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
if (set_logicals)
{
LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators,
pattr.allocation.locators.max_multicast_locators);
entry.is_initial_peer_or_ds = true;
entry.fill_multicast(it.metatrafficMulticastLocatorList);
entry.fill_unicast(it.metatrafficUnicastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}
else
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
}
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

#if HAVE_SECURITY
if (!mp_RTPSParticipant->is_secure())
Expand Down Expand Up @@ -841,8 +856,25 @@ void PDPClient::update_remote_servers_list()
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

bool set_logicals = handle_logical_ports_required();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
const RemoteLocatorsAllocationAttributes& rlaa =
mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators;
LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators);
entry.is_initial_peer_or_ds = true;
entry.fill_multicast(it.metatrafficMulticastLocatorList);
entry.fill_unicast(it.metatrafficUnicastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand Down Expand Up @@ -1415,6 +1447,23 @@ bool PDPClient::remove_remote_participant(
return false;
}

bool PDPClient::handle_logical_ports_required()
{
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes();
bool set_logicals = false;
for (auto& transportDescriptor : pattr.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
set_logicals = true;
break;
}
}

return set_logicals;
}

} /* namespace rtps */
} /* namespace fastdds */
} /* namespace eprosima */
5 changes: 5 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ class PDPClient : public PDP
void perform_builtin_endpoints_matching(
const ParticipantProxyData& pdata);

/**
* Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport).
*/
bool handle_logical_ports_required();

/**
* TimedEvent for server synchronization:
* first stage: periodically resend the local RTPSParticipant information until
Expand Down
56 changes: 54 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <fastdds/rtps/history/WriterHistory.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/history/History.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

#include <fastrtps/utils/TimeConversion.h>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
Expand Down Expand Up @@ -515,10 +516,24 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

bool set_logicals = handle_logical_ports_required();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
if (set_logicals)
{
LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators,
pattr.allocation.locators.max_multicast_locators);
entry.is_initial_peer_or_ds = true;
entry.fill_multicast(it.metatrafficMulticastLocatorList);
entry.fill_unicast(it.metatrafficUnicastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}
else
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
}

if (!secure)
{
Expand Down Expand Up @@ -1186,8 +1201,25 @@ void PDPServer::update_remote_servers_list()

eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

bool set_logicals = handle_logical_ports_required();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
const RemoteLocatorsAllocationAttributes& rlaa =
mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators;
LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators);
entry.is_initial_peer_or_ds = true;
entry.fill_multicast(it.metatrafficMulticastLocatorList);
entry.fill_unicast(it.metatrafficUnicastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand All @@ -1203,6 +1235,9 @@ void PDPServer::update_remote_servers_list()
{
discovery_db_.add_server(server.guidPrefix);
}

// Need to reactivate the server thread to send the DATA(p) to the new servers
awake_server_thread();
}

bool PDPServer::process_writers_acknowledgements()
Expand Down Expand Up @@ -2054,6 +2089,23 @@ void PDPServer::release_change_from_writer(
endpoints->writer.writer_->release_change(change);
}

bool PDPServer::handle_logical_ports_required()
{
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes();
bool set_logicals = false;
for (auto& transportDescriptor : pattr.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
set_logicals = true;
break;
}
}

return set_logicals;
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
5 changes: 5 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ class PDPServer : public fastrtps::rtps::PDP
void match_reliable_pdp_endpoints(
const fastrtps::rtps::ParticipantProxyData& pdata);

/**
* Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport).
*/
bool handle_logical_ports_required();

//! Server thread
eprosima::fastrtps::rtps::ResourceEvent resource_event_thread_;

Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ bool NetworkFactory::build_send_resources(
return returned_value;
}

bool NetworkFactory::build_send_resources(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool returned_value = false;

for (auto& transport : mRegisteredTransports)
{
returned_value |= transport->OpenOutputChannels(sender_resource_list, locator_selector_entry);
}

return returned_value;
}

bool NetworkFactory::BuildReceiverResources(
Locator_t& local,
std::vector<std::shared_ptr<ReceiverResource>>& returned_resources_list,
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ class NetworkFactory
fastdds::rtps::SendResourceList&,
const Locator_t& locator);

/**
* Walk over the list of transports, opening every possible channel that can send through
* the locators contained in @param locator_selector_entry and returning a vector of Sender Resources associated with it.
* @param locator_selector_entry LocatorSelectorEntry containing metadata and the locators through which to send.
*/
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
bool build_send_resources(
fastdds::rtps::SendResourceList&,
const LocatorSelectorEntry& locator_selector_entry);

/**
* Walk over the list of transports, opening every possible channel that we can listen to
* from the given locator, and returns a vector of Receiver Resources for this goal.
Expand Down
Loading