From c6b0e0a3da2007f3ef7b3af3e225f802c2f0e133 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 19 Mar 2024 11:00:22 +0100 Subject: [PATCH 01/17] Refs #20628: Add TCP DS blackbox test Signed-off-by: cferreiragonz --- .../common/DDSBlackboxTestsDiscovery.cpp | 140 +++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index 4bc653320c3..9c02d71359a 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -89,7 +89,7 @@ TEST(DDSDiscovery, IgnoreParticipantFlags) * 2. Then, connect the client to the other server and check discovery again. * 3. Finally connect the two servers by adding one of them to the others list */ -TEST(DDSDiscovery, AddDiscoveryServerToList) +TEST(DDSDiscovery, AddDiscoveryServerToListUDP) { using namespace eprosima; using namespace eprosima::fastdds::dds; @@ -178,7 +178,7 @@ TEST(DDSDiscovery, AddDiscoveryServerToList) // Update client's servers list ASSERT_TRUE(client.update_wire_protocol(client_qos)); - /* Check that the servers only know about the client, and that the client known about both servers */ + /* Check that the servers only know about the client and that the client knows about both servers */ server_1.wait_discovery(std::chrono::seconds::zero(), 1, true); client.wait_discovery(std::chrono::seconds::zero(), 2, true); server_2.wait_discovery(std::chrono::seconds::zero(), 1, true); @@ -193,6 +193,142 @@ TEST(DDSDiscovery, AddDiscoveryServerToList) server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); } +/** + * This test checks that adding servers to the Discovery Server list results in discovering those participants. + * It does so by: + * 1. Creating two servers and two clients that are only connected to the first server. Discovery is checked + * at this state. + * 2. Then, connect client_1 to the second server and check discovery again. + * 3. Finally connect the two servers by adding one of them to the others list and check disvoery again. + */ +TEST(DDSDiscovery, AddDiscoveryServerToListTCP) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // TCP default DS port + std::string W_UNICAST_PORT_RANDOM_NUMBER_STR = "42100"; + + /* Create first server */ + PubSubParticipant server_1(0u, 0u, 0u, 0u); + // Set participant as server + WireProtocolConfigQos server_1_qos; + server_1_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER; + // Generate random GUID prefix + srand(static_cast(time(nullptr))); + GuidPrefix_t server_1_prefix; + for (auto i = 0; i < 12; i++) + { + server_1_prefix.value[i] = eprosima::fastrtps::rtps::octet(rand() % 254); + } + server_1_qos.prefix = server_1_prefix; + // Generate server's listening locator + Locator_t locator_server_1; + IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + uint32_t server_1_port = stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR); + IPLocator::setPhysicalPort(locator_server_1, server_1_port); + locator_server_1.kind = LOCATOR_KIND_TCPv4; + // Leave logical port as 0 to use TCP DS default logical port + server_1_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + // Add TCP transport + auto descriptor_1 = std::make_shared(); + descriptor_1->add_listener_port(server_1_port); + // Init server + ASSERT_TRUE(server_1.wire_protocol(server_1_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_1) + .init_participant()); + + /* Create second server */ + PubSubParticipant server_2(0u, 0u, 0u, 0u); + // Set participant as server + WireProtocolConfigQos server_2_qos; + server_2_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER; + // Generate random GUID prefix + GuidPrefix_t server_2_prefix = server_1_prefix; + server_2_prefix.value[11]++; + server_2_qos.prefix = server_2_prefix; + // Generate server's listening locator + Locator_t locator_server_2; + IPLocator::setIPv4(locator_server_2, 127, 0, 0, 1); + uint32_t server_2_port = stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR) + 1; + IPLocator::setPhysicalPort(locator_server_2, server_2_port); + locator_server_2.kind = LOCATOR_KIND_TCPv4; + // Leave logical port as 0 to use TCP DS default logical port + server_2_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Add TCP transport + auto descriptor_2 = std::make_shared(); + descriptor_2->add_listener_port(server_2_port); + + // Init server + ASSERT_TRUE(server_2.wire_protocol(server_2_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_2) + .init_participant()); + + + /* Create a client that connects to the first server from the beginning with higher listening_port*/ + PubSubParticipant client_1(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos_1; + client_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::CLIENT; + // Connect to first server + RemoteServerAttributes server_1_att; + server_1_att.guidPrefix = server_1_prefix; + server_1_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_1)); + client_qos_1.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); + auto descriptor_3 = std::make_shared(); + descriptor_3->add_listener_port(server_1_port + 10); + // Init client + ASSERT_TRUE(client_1.wire_protocol(client_qos_1) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_3) + .init_participant()); + + /* Create a client that connects to the first server from the beginning with lower listening_port*/ + PubSubParticipant client_2(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos_2; + client_qos_2.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::CLIENT; + // Connect to first server + client_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); + auto descriptor_4 = std::make_shared(); + descriptor_4->add_listener_port(server_1_port - 10); + // Init client + ASSERT_TRUE(client_2.wire_protocol(client_qos_2) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_4) + .init_participant()); + + server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and client2 + client_1.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 0, true); // Knows no one + + /* Add server_2 to client */ + RemoteServerAttributes server_2_att; + server_2_att.guidPrefix = server_2_prefix; + server_2_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_2)); + client_qos_1.builtin.discovery_config.m_DiscoveryServers.push_back(server_2_att); + // Update client_1's servers list + ASSERT_TRUE(client_1.update_wire_protocol(client_qos_1)); + + server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and client2 + client_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server2 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows client1 + + /* Add server_2 to server_1 */ + server_1_qos.builtin.discovery_config.m_DiscoveryServers.push_back(server_2_att); + ASSERT_TRUE(server_1.update_wire_protocol(server_1_qos)); + + server_1.wait_discovery(std::chrono::seconds::zero(), 3, true); // Knows client1, client2 and server2 + client_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server2 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and server1 +} + /** * This test checks the addition of network interfaces at run-time. * From ebf3e14823cca93a2b2d893757671c511a07858e Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Fri, 15 Mar 2024 12:39:53 +0100 Subject: [PATCH 02/17] Refs #20628: Configuration to use logical port 0 as default in DS Signed-off-by: cferreiragonz --- .../rtps/participant/RTPSParticipantImpl.cpp | 91 +++++++++++++++++-- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 562bb292eb2..b8ca925e8b6 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -298,18 +298,62 @@ RTPSParticipantImpl::RTPSParticipantImpl( switch (m_att.builtin.discovery_config.discoveryProtocol) { case DiscoveryProtocol::BACKUP: - case DiscoveryProtocol::CLIENT: case DiscoveryProtocol::SERVER: + // Verify if listening ports are provided + for (auto& transportDescriptor : m_att.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + if (pT->listening_ports.empty()) + { + EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, + "Participant " << m_att.getName() << " with GUID " << m_guid << + " tries to create a TCP server for discovery server without providing a proper listening port."); + break; + } + if (!m_att.builtin.metatrafficUnicastLocatorList.empty()) + { + std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(), + m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + // TCP DS default logical port is the same as the physical one + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); + } + } + } + break; + case DiscoveryProtocol::CLIENT: case DiscoveryProtocol::SUPER_CLIENT: // Verify if listening ports are provided for (auto& transportDescriptor : m_att.userTransports) { TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); - if (pT && pT->listening_ports.empty()) + if (pT) { - EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, - "Participant " << m_att.getName() << " with GUID " << m_guid << - " tries to use discovery server over TCP without providing a proper listening port."); + if (pT->listening_ports.empty()) + { + EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, + "Participant " << m_att.getName() << " with GUID " << m_guid << + " tries to create a TCP client for discovery server without providing a proper listening port." << + " No TCP participants will be able to connect to this participant, but it will be able make connections."); + } + for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers) + { + // TCP DS default logical port is the same as the physical one + std::for_each(it.metatrafficUnicastLocatorList.begin(), + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); + } } } default: @@ -1458,8 +1502,33 @@ void RTPSParticipantImpl::update_attributes( auto pdp = mp_builtinProtocols->mp_PDP; bool update_pdp = false; + // Check if discovery servers need to be updated + eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers = patt.builtin.discovery_config.m_DiscoveryServers; + if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers) + { + for (auto& transportDescriptor : m_att.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers) + { + // TCP DS default logical port is the same as the physical one + std::for_each(it.metatrafficUnicastLocatorList.begin(), + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); + } + } + } + } + // Check if there are changes - if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers + if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers || patt.userData != m_att.userData || local_interfaces_changed) { @@ -1496,7 +1565,7 @@ void RTPSParticipantImpl::update_attributes( for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers) { bool contained = false; - for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) + for (auto incoming_server : converted_discovery_servers) { if (existing_server.guidPrefix == incoming_server.guidPrefix) { @@ -1569,8 +1638,8 @@ void RTPSParticipantImpl::update_attributes( m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) { - // Add incoming servers iff we don't know about them already or the listening locator has been modified - for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) + // Add incoming servers if we don't know about them already or the listening locator has been modified + for (auto incoming_server : converted_discovery_servers) { eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it; for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin(); @@ -2593,6 +2662,10 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta( case LOCATOR_KIND_TCPv4: set_wan_address(ret); IPLocator::setPhysicalPort(ret, Tcp4ListeningPort()); + if (IPLocator::getLogicalPort(ret) == 0) + { + IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret)); + } break; case LOCATOR_KIND_TCPv6: IPLocator::setPhysicalPort(ret, Tcp6ListeningPort()); From 071a3646b2c3d8377b88f92189c5ff7b9927652b Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 09:55:17 +0100 Subject: [PATCH 03/17] Refs #20628: Check interface change before creating new send resources Signed-off-by: cferreiragonz --- src/cpp/rtps/participant/RTPSParticipantImpl.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index b8ca925e8b6..4e41a22d3e2 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1624,9 +1624,12 @@ void RTPSParticipantImpl::update_attributes( local_participant_proxy_data->default_locators.add_unicast_locator(locator); } - createSenderResources(m_att.builtin.metatrafficMulticastLocatorList); - createSenderResources(m_att.builtin.metatrafficUnicastLocatorList); - createSenderResources(m_att.defaultUnicastLocatorList); + if (local_interfaces_changed) + { + createSenderResources(m_att.builtin.metatrafficMulticastLocatorList); + createSenderResources(m_att.builtin.metatrafficUnicastLocatorList); + createSenderResources(m_att.defaultUnicastLocatorList); + } if (!modified_locators.empty()) { createSenderResources(modified_locators); From 4ab09aabe6f1c8c64d39acfbc699292ba99052f5 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:05:27 +0100 Subject: [PATCH 04/17] Refs #20628: Use new OpenOutputChannels in TCP Signed-off-by: cferreiragonz --- .../rtps/transport/TCPTransportInterface.cpp | 137 ++++++++++++++++-- .../rtps/transport/TCPTransportInterface.h | 22 +++ 2 files changed, 149 insertions(+), 10 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 8a8a2bbb069..144459609f4 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -769,10 +769,7 @@ bool TCPTransportInterface::OpenOutputChannel( // At this point, if there is no SenderResource to reuse, this is the first call to OpenOutputChannel for this locator. // Need to check if a channel already exists for this locator. - EPROSIMA_LOG_INFO(RTCP, "Called to OpenOutputChannel (physical: " << IPLocator::getPhysicalPort( - locator) << "; logical: " - << IPLocator::getLogicalPort( - locator) << ") @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(RTCP, "Called to OpenOutputChannel @ " << IPLocator::to_string(locator)); auto channel_resource = channel_resources_.find(physical_locator); @@ -835,9 +832,7 @@ bool TCPTransportInterface::OpenOutputChannel( if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface) { // Client side (either Server-Client or LARGE_DATA) - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: " - << IPLocator::getPhysicalPort(locator) << "; logical: " - << IPLocator::getLogicalPort(locator) << ") @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] @ " << IPLocator::to_string(locator)); // Create a TCP_CONNECT_TYPE channel std::shared_ptr channel( @@ -860,9 +855,7 @@ bool TCPTransportInterface::OpenOutputChannel( { // Server side LARGE_DATA // Act as server and wait to the other endpoint to connect. Add locator to sender_resource_list - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [WAIT_CONNECTION] (physical: " - << IPLocator::getPhysicalPort(locator) << "; logical: " - << IPLocator::getLogicalPort(locator) << ") @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [WAIT_CONNECTION] @ " << IPLocator::to_string(locator)); std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); channel_pending_logical_ports_[physical_locator].insert(logical_port); } @@ -875,6 +868,130 @@ bool TCPTransportInterface::OpenOutputChannel( return true; } +bool TCPTransportInterface::OpenOutputChannels( + SendResourceList& send_resource_list, + const LocatorSelectorEntry& locator_selector_entry) +{ + bool success = false; + if (locator_selector_entry.is_initial_peer_or_ds) + { + for (size_t i = 0; i < locator_selector_entry.state.multicast.size(); ++i) + { + // TODO Carlos: is multicast needed when is initial_peer_or_ds? Or is always zero? + size_t index = locator_selector_entry.state.multicast[i]; + success |= CreateInitialConnect(send_resource_list, locator_selector_entry.multicast[index]); + } + for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) + { + size_t index = locator_selector_entry.state.unicast[i]; + success |= CreateInitialConnect(send_resource_list, locator_selector_entry.unicast[index]); + } + } + else + { + for (size_t i = 0; i < locator_selector_entry.state.multicast.size(); ++i) + { + size_t index = locator_selector_entry.state.multicast[i]; + success |= OpenOutputChannel(send_resource_list, locator_selector_entry.multicast[index]); + } + for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) + { + size_t index = locator_selector_entry.state.unicast[i]; + success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]); + } + } + return success; +} + +bool TCPTransportInterface::CreateInitialConnect( + SendResourceList& send_resource_list, + const Locator& locator) +{ + if (!IsLocatorSupported(locator)) + { + return false; + } + + uint16_t logical_port = IPLocator::getLogicalPort(locator); + if (0 == logical_port) + { + return false; + } + + Locator physical_locator = IPLocator::toPhysicalLocator(locator); + + std::lock_guard socketsLock(sockets_map_mutex_); + + // TODO Carlos: verify if it is needed to check the SenderResource + + // We try to find a SenderResource that has this locator. + // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport + // already reuses a SenderResource. + for (auto& sender_resource : send_resource_list) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get()); + + if (tcp_sender_resource && (physical_locator == tcp_sender_resource->locator() || + (IPLocator::hasWan(locator) && + IPLocator::WanToLanLocator(physical_locator) == + tcp_sender_resource->locator()))) + { + // Add logical port to channel if it's not there yet + auto channel_resource = channel_resources_.find(physical_locator); + + // Maybe as WAN? + if (channel_resource == channel_resources_.end() && IPLocator::hasWan(locator)) + { + Locator wan_locator = IPLocator::WanToLanLocator(locator); + channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator)); + } + + if (channel_resource != channel_resources_.end()) + { + channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + else + { + std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); + channel_pending_logical_ports_[physical_locator].insert(logical_port); + } + + statistics_info_.add_entry(locator); + return true; + } + } + + // At this point, if there is no SenderResource to reuse, this is the first try to open a channel for this locator. + // There is no need to check if a channel already exists for this locator because this method is called only when + // a new connection is required. + + EPROSIMA_LOG_INFO(RTCP, "Called to CreateInitialConnect @ " << IPLocator::to_string(locator)); + + // Create a TCP_CONNECT_TYPE channel + std::shared_ptr channel( +#if TLS_FOUND + (configuration()->apply_security) ? + static_cast( + new TCPChannelResourceSecure(this, io_service_, ssl_context_, + physical_locator, configuration()->maxMessageSize)) : +#endif // if TLS_FOUND + static_cast( + new TCPChannelResourceBasic(this, io_service_, physical_locator, + configuration()->maxMessageSize)) + ); + + EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] @ " << IPLocator::to_string(locator)); + + channel_resources_[physical_locator] = channel; + channel->connect(channel_resources_[physical_locator]); + channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + statistics_info_.add_entry(locator); + send_resource_list.emplace_back( + static_cast(new TCPSenderResource(*this, physical_locator))); + + return true; +} + bool TCPTransportInterface::OpenInputChannel( const Locator& locator, TransportReceiverInterface* receiver, diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index f1d2c0d5388..a6857d7e846 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -318,6 +318,28 @@ class TCPTransportInterface : public TransportInterface SendResourceList& send_resource_list, const Locator&) override; + /** + * Must open the channel that maps to/from the given locator selector entry. This method must allocate, + * reserve and mark any resources that are needed for said channel. + * + * @param sender_resource_list Participant's send resource list. + * @param locator_selector_entry Locator selector entry with the remote entity locators. + * + * @return true if the channel was correctly opened or if finding an already opened one. + */ + bool OpenOutputChannels( + SendResourceList& sender_resource_list, + const fastrtps::rtps::LocatorSelectorEntry& locator_selector_entry) override; + + /** + * Acts like OpenOutputChannel but ensures that a new CONNECT channel is created for the given locator + * if no other channel is already opened for it. + * It is used with the initial peers and locators belonging to DS servers. + */ + bool CreateInitialConnect( + SendResourceList& send_resource_list, + const Locator&); + /** * Converts a given remote locator (that is, a locator referring to a remote * destination) to the main local locator whose channel can write to that From da790ced207ceedab8550dbd1413877c9ad4b779 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:06:17 +0100 Subject: [PATCH 05/17] Refs #20628: Update LocatorSelectorEntry Signed-off-by: cferreiragonz --- .../rtps/common/LocatorSelectorEntry.hpp | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp index 0196adf84e4..a8f19013372 100644 --- a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp +++ b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp @@ -77,6 +77,7 @@ struct LocatorSelectorEntry , state(max_unicast_locators, max_multicast_locators) , enabled(false) , transport_should_process(false) + , is_initial_peer_or_ds(false) { } @@ -100,6 +101,24 @@ struct LocatorSelectorEntry state.multicast.clear(); } + void fill_unicast(const LocatorList_t& locators) + { + for (const Locator_t& locator : locators) + { + state.unicast.push_back(unicast.size()); + unicast.push_back(locator); + } + } + + void fill_multicast(const LocatorList_t& locators) + { + for (const Locator_t& locator : locators) + { + state.multicast.push_back(multicast.size()); + multicast.push_back(locator); + } + } + //! GUID of the remote entity. GUID_t remote_guid; //! List of unicast locators to send data to the remote entity. @@ -112,6 +131,8 @@ struct LocatorSelectorEntry bool enabled; //! A temporary value for each transport to help optimizing some use cases. bool transport_should_process; + //! True if the locator is an initial peer or DS connection. False otherwise. + bool is_initial_peer_or_ds; }; } /* namespace rtps */ From bda08a66c163ae07d6700365c7d91a24f1541126 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:06:53 +0100 Subject: [PATCH 06/17] Refs #20628: Use locator_selector_entry to call OpenOutputChannels Signed-off-by: cferreiragonz --- src/cpp/rtps/network/NetworkFactory.cpp | 14 ++++++++++++++ src/cpp/rtps/network/NetworkFactory.h | 9 +++++++++ src/cpp/rtps/participant/RTPSParticipantImpl.cpp | 8 ++++++++ src/cpp/rtps/participant/RTPSParticipantImpl.h | 3 +++ 4 files changed, 34 insertions(+) diff --git a/src/cpp/rtps/network/NetworkFactory.cpp b/src/cpp/rtps/network/NetworkFactory.cpp index 9d195e6249d..5e3f1cf168e 100644 --- a/src/cpp/rtps/network/NetworkFactory.cpp +++ b/src/cpp/rtps/network/NetworkFactory.cpp @@ -82,6 +82,20 @@ bool NetworkFactory::build_send_resources( return returned_value; } +bool NetworkFactory::build_send_resources( + SendResourceList& sender_resource_list, + const LocatorSelectorEntry& locator_selector_entry) +{ + bool returned_value = false; + + for (auto& transport : mRegisteredTransports) + { + returned_value |= transport->OpenOutputChannels(sender_resource_list, locator_selector_entry); + } + + return returned_value; +} + bool NetworkFactory::BuildReceiverResources( Locator_t& local, std::vector>& returned_resources_list, diff --git a/src/cpp/rtps/network/NetworkFactory.h b/src/cpp/rtps/network/NetworkFactory.h index 9c394cd60ce..20654879388 100644 --- a/src/cpp/rtps/network/NetworkFactory.h +++ b/src/cpp/rtps/network/NetworkFactory.h @@ -84,6 +84,15 @@ class NetworkFactory fastdds::rtps::SendResourceList&, const Locator_t& locator); + /** + * Walk over the list of transports, opening every possible channel that can send through + * the locators contained in @param locator_selector_entry and returning a vector of Sender Resources associated with it. + * @param locator_selector_entry LocatorSelectorEntry containing metadata and the locators through which to send. + */ + bool build_send_resources( + fastdds::rtps::SendResourceList&, + const LocatorSelectorEntry& locator_selector_entry); + /** * Walk over the list of transports, opening every possible channel that we can listen to * from the given locator, and returns a vector of Receiver Resources for this goal. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 4e41a22d3e2..e9bc641f2a1 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -1999,6 +1999,14 @@ void RTPSParticipantImpl::createSenderResources( m_network_Factory.build_send_resources(send_resource_list_, locator); } +void RTPSParticipantImpl::createSenderResources( + const LocatorSelectorEntry& locator_selector_entry) +{ + std::lock_guard lock(m_send_resources_mutex_); + + m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry); +} + bool RTPSParticipantImpl::deleteUserEndpoint( const GUID_t& endpoint) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index b091f6cbf77..24adef87e52 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -1040,6 +1040,9 @@ class RTPSParticipantImpl void createSenderResources( const Locator_t& locator); + void createSenderResources( + const LocatorSelectorEntry& locator_selector); + bool networkFactoryHasRegisteredTransports() const; /** From 12d0fa2722a12f2785ce3232cb07b8aa6592678a Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:07:37 +0100 Subject: [PATCH 07/17] Refs #20628: Refactor PDPClient to handle initial TCP connections Signed-off-by: cferreiragonz --- .../discovery/participant/PDPClient.cpp | 51 ++++++++++++++++++- .../builtin/discovery/participant/PDPClient.h | 5 ++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index d83bb486aab..0cea0f42cc2 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -443,10 +444,23 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + bool set_logicals = handle_logical_ports_required(); + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); + if(set_logicals) + { + LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, pattr.allocation.locators.max_multicast_locators); + entry.is_initial_peer_or_ds = true; + entry.fill_multicast(it.metatrafficMulticastLocatorList); + entry.fill_unicast(it.metatrafficUnicastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); + } + else + { + mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); + } #if HAVE_SECURITY if (!mp_RTPSParticipant->is_secure()) @@ -841,8 +855,24 @@ void PDPClient::update_remote_servers_list() { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + bool set_logicals = handle_logical_ports_required(); + const RemoteLocatorsAllocationAttributes& rlaa = mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || + !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) + { + if(set_logicals) + { + LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); + entry.is_initial_peer_or_ds = true; + entry.fill_multicast(it.metatrafficMulticastLocatorList); + entry.fill_unicast(it.metatrafficUnicastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); + } + } + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); @@ -1415,6 +1445,23 @@ bool PDPClient::remove_remote_participant( return false; } +bool PDPClient::handle_logical_ports_required() +{ + const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); + bool set_logicals = false; + for (auto& transportDescriptor : pattr.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + set_logicals = true; + break; + } + } + + return set_logicals; +} + } /* namespace rtps */ } /* namespace fastdds */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h index 4a5d5d7aee4..3acbe362f78 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h @@ -242,6 +242,11 @@ class PDPClient : public PDP void perform_builtin_endpoints_matching( const ParticipantProxyData& pdata); + /** + * Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport). + */ + bool handle_logical_ports_required(); + /** * TimedEvent for server synchronization: * first stage: periodically resend the local RTPSParticipant information until From 1de73609534df603068cfdd98277ef8f5fc19254 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:07:44 +0100 Subject: [PATCH 08/17] Refs #20628: Refactor PDPServer to handle initial TCP connections Signed-off-by: cferreiragonz --- .../discovery/participant/PDPServer.cpp | 54 ++++++++++++++++++- .../discovery/participant/PDPServer.hpp | 5 ++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index aa23b230f99..9325a08a932 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -515,10 +516,23 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + bool set_logicals = handle_logical_ports_required(); + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); + if(set_logicals) + { + LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, pattr.allocation.locators.max_multicast_locators); + entry.is_initial_peer_or_ds = true; + entry.fill_multicast(it.metatrafficMulticastLocatorList); + entry.fill_unicast(it.metatrafficUnicastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); + } + else + { + mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); + } if (!secure) { @@ -1186,8 +1200,24 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + bool set_logicals = handle_logical_ports_required(); + const RemoteLocatorsAllocationAttributes& rlaa = mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || + !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) + { + if(set_logicals) + { + LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); + entry.is_initial_peer_or_ds = true; + entry.fill_multicast(it.metatrafficMulticastLocatorList); + entry.fill_unicast(it.metatrafficUnicastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); + } + } + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); @@ -1203,6 +1233,9 @@ void PDPServer::update_remote_servers_list() { discovery_db_.add_server(server.guidPrefix); } + + // Need to reactivate the server thread to send the DATA(p) to the new servers + awake_server_thread(); } bool PDPServer::process_writers_acknowledgements() @@ -2054,6 +2087,23 @@ void PDPServer::release_change_from_writer( endpoints->writer.writer_->release_change(change); } +bool PDPServer::handle_logical_ports_required() +{ + const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); + bool set_logicals = false; + for (auto& transportDescriptor : pattr.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + set_logicals = true; + break; + } + } + + return set_logicals; +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp index c5a30905e6d..09b20dff4cb 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp @@ -365,6 +365,11 @@ class PDPServer : public fastrtps::rtps::PDP void match_reliable_pdp_endpoints( const fastrtps::rtps::ParticipantProxyData& pdata); + /** + * Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport). + */ + bool handle_logical_ports_required(); + //! Server thread eprosima::fastrtps::rtps::ResourceEvent resource_event_thread_; From d11cbaf0624f5c627fcc10b62e26463ca5766ef1 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 20 Mar 2024 10:13:46 +0100 Subject: [PATCH 09/17] Refs #20628: Uncrustify Signed-off-by: cferreiragonz --- .../rtps/common/LocatorSelectorEntry.hpp | 6 ++- .../discovery/participant/PDPClient.cpp | 10 ++-- .../discovery/participant/PDPServer.cpp | 10 ++-- .../rtps/participant/RTPSParticipantImpl.cpp | 47 ++++++++++--------- .../rtps/participant/RTPSParticipantImpl.h | 2 +- .../rtps/transport/TCPTransportInterface.cpp | 7 +-- .../rtps/transport/TCPTransportInterface.h | 2 +- .../common/DDSBlackboxTestsDiscovery.cpp | 24 +++++----- 8 files changed, 58 insertions(+), 50 deletions(-) diff --git a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp index a8f19013372..141e5ca1804 100644 --- a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp +++ b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp @@ -101,7 +101,8 @@ struct LocatorSelectorEntry state.multicast.clear(); } - void fill_unicast(const LocatorList_t& locators) + void fill_unicast( + const LocatorList_t& locators) { for (const Locator_t& locator : locators) { @@ -110,7 +111,8 @@ struct LocatorSelectorEntry } } - void fill_multicast(const LocatorList_t& locators) + void fill_multicast( + const LocatorList_t& locators) { for (const Locator_t& locator : locators) { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 0cea0f42cc2..4029e27810b 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -448,9 +448,10 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if(set_logicals) + if (set_logicals) { - LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, pattr.allocation.locators.max_multicast_locators); + LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, + pattr.allocation.locators.max_multicast_locators); entry.is_initial_peer_or_ds = true; entry.fill_multicast(it.metatrafficMulticastLocatorList); entry.fill_unicast(it.metatrafficUnicastLocatorList); @@ -856,14 +857,15 @@ void PDPClient::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); bool set_logicals = handle_logical_ports_required(); - const RemoteLocatorsAllocationAttributes& rlaa = mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; + const RemoteLocatorsAllocationAttributes& rlaa = + mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { - if(set_logicals) + if (set_logicals) { LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); entry.is_initial_peer_or_ds = true; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 9325a08a932..cd482888815 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -520,9 +520,10 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if(set_logicals) + if (set_logicals) { - LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, pattr.allocation.locators.max_multicast_locators); + LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, + pattr.allocation.locators.max_multicast_locators); entry.is_initial_peer_or_ds = true; entry.fill_multicast(it.metatrafficMulticastLocatorList); entry.fill_unicast(it.metatrafficUnicastLocatorList); @@ -1201,14 +1202,15 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); bool set_logicals = handle_logical_ports_required(); - const RemoteLocatorsAllocationAttributes& rlaa = mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; + const RemoteLocatorsAllocationAttributes& rlaa = + mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { - if(set_logicals) + if (set_logicals) { LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); entry.is_initial_peer_or_ds = true; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index e9bc641f2a1..fecb495c0b5 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -315,14 +315,14 @@ RTPSParticipantImpl::RTPSParticipantImpl( if (!m_att.builtin.metatrafficUnicastLocatorList.empty()) { std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(), - m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) - { - // TCP DS default logical port is the same as the physical one - if (IPLocator::getLogicalPort(locator) == 0) - { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); - } - }); + m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + // TCP DS default logical port is the same as the physical one + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); } } } @@ -346,13 +346,13 @@ RTPSParticipantImpl::RTPSParticipantImpl( { // TCP DS default logical port is the same as the physical one std::for_each(it.metatrafficUnicastLocatorList.begin(), - it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) - { - if (IPLocator::getLogicalPort(locator) == 0) - { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); - } - }); + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); } } } @@ -1503,7 +1503,8 @@ void RTPSParticipantImpl::update_attributes( bool update_pdp = false; // Check if discovery servers need to be updated - eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers = patt.builtin.discovery_config.m_DiscoveryServers; + eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers = + patt.builtin.discovery_config.m_DiscoveryServers; if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers) { for (auto& transportDescriptor : m_att.userTransports) @@ -1515,13 +1516,13 @@ void RTPSParticipantImpl::update_attributes( { // TCP DS default logical port is the same as the physical one std::for_each(it.metatrafficUnicastLocatorList.begin(), - it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) - { - if (IPLocator::getLogicalPort(locator) == 0) - { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); - } - }); + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + }); } } } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 24adef87e52..12ea1cb9cb3 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -1041,7 +1041,7 @@ class RTPSParticipantImpl const Locator_t& locator); void createSenderResources( - const LocatorSelectorEntry& locator_selector); + const LocatorSelectorEntry& locator_selector); bool networkFactoryHasRegisteredTransports() const; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 144459609f4..f97f860ef0f 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -855,7 +855,8 @@ bool TCPTransportInterface::OpenOutputChannel( { // Server side LARGE_DATA // Act as server and wait to the other endpoint to connect. Add locator to sender_resource_list - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [WAIT_CONNECTION] @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(OpenOutputChannel, + "OpenOutputChannel: [WAIT_CONNECTION] @ " << IPLocator::to_string(locator)); std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); channel_pending_logical_ports_[physical_locator].insert(logical_port); } @@ -904,8 +905,8 @@ bool TCPTransportInterface::OpenOutputChannels( } bool TCPTransportInterface::CreateInitialConnect( - SendResourceList& send_resource_list, - const Locator& locator) + SendResourceList& send_resource_list, + const Locator& locator) { if (!IsLocatorSupported(locator)) { diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index a6857d7e846..0ae629b23ea 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -327,7 +327,7 @@ class TCPTransportInterface : public TransportInterface * * @return true if the channel was correctly opened or if finding an already opened one. */ - bool OpenOutputChannels( + bool OpenOutputChannels( SendResourceList& sender_resource_list, const fastrtps::rtps::LocatorSelectorEntry& locator_selector_entry) override; diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index 9c02d71359a..db2f60f8a72 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -236,9 +236,9 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) descriptor_1->add_listener_port(server_1_port); // Init server ASSERT_TRUE(server_1.wire_protocol(server_1_qos) - .disable_builtin_transport() - .add_user_transport_to_pparams(descriptor_1) - .init_participant()); + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_1) + .init_participant()); /* Create second server */ PubSubParticipant server_2(0u, 0u, 0u, 0u); @@ -263,9 +263,9 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) // Init server ASSERT_TRUE(server_2.wire_protocol(server_2_qos) - .disable_builtin_transport() - .add_user_transport_to_pparams(descriptor_2) - .init_participant()); + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_2) + .init_participant()); /* Create a client that connects to the first server from the beginning with higher listening_port*/ @@ -282,9 +282,9 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) descriptor_3->add_listener_port(server_1_port + 10); // Init client ASSERT_TRUE(client_1.wire_protocol(client_qos_1) - .disable_builtin_transport() - .add_user_transport_to_pparams(descriptor_3) - .init_participant()); + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_3) + .init_participant()); /* Create a client that connects to the first server from the beginning with lower listening_port*/ PubSubParticipant client_2(0u, 0u, 0u, 0u); @@ -297,9 +297,9 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) descriptor_4->add_listener_port(server_1_port - 10); // Init client ASSERT_TRUE(client_2.wire_protocol(client_qos_2) - .disable_builtin_transport() - .add_user_transport_to_pparams(descriptor_4) - .init_participant()); + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_4) + .init_participant()); server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and client2 client_1.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 From ce27eb905846acf7fffc42a2aa5293865e5effc8 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 26 Mar 2024 15:13:14 +0100 Subject: [PATCH 10/17] Refs #20628: Minor fix Signed-off-by: cferreiragonz --- src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp | 4 ++-- src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 4029e27810b..2210b9392b3 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -857,8 +857,6 @@ void PDPClient::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); bool set_logicals = handle_logical_ports_required(); - const RemoteLocatorsAllocationAttributes& rlaa = - mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -867,6 +865,8 @@ void PDPClient::update_remote_servers_list() { if (set_logicals) { + const RemoteLocatorsAllocationAttributes& rlaa = + mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); entry.is_initial_peer_or_ds = true; entry.fill_multicast(it.metatrafficMulticastLocatorList); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index cd482888815..a92e3765ba6 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1202,8 +1202,6 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); bool set_logicals = handle_logical_ports_required(); - const RemoteLocatorsAllocationAttributes& rlaa = - mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -1212,6 +1210,8 @@ void PDPServer::update_remote_servers_list() { if (set_logicals) { + const RemoteLocatorsAllocationAttributes& rlaa = + mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); entry.is_initial_peer_or_ds = true; entry.fill_multicast(it.metatrafficMulticastLocatorList); From 1b4008fbd60aaac0fea2bff40e00616275a6bad8 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 27 Mar 2024 11:53:49 +0100 Subject: [PATCH 11/17] Refs #20628: Fix windows Signed-off-by: cferreiragonz --- test/blackbox/common/DDSBlackboxTestsDiscovery.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index db2f60f8a72..3064d25d9e6 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -226,7 +226,7 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) // Generate server's listening locator Locator_t locator_server_1; IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); - uint32_t server_1_port = stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR); + uint16_t server_1_port = static_cast(stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR)); IPLocator::setPhysicalPort(locator_server_1, server_1_port); locator_server_1.kind = LOCATOR_KIND_TCPv4; // Leave logical port as 0 to use TCP DS default logical port @@ -252,7 +252,7 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) // Generate server's listening locator Locator_t locator_server_2; IPLocator::setIPv4(locator_server_2, 127, 0, 0, 1); - uint32_t server_2_port = stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR) + 1; + uint16_t server_2_port = server_1_port + 1; IPLocator::setPhysicalPort(locator_server_2, server_2_port); locator_server_2.kind = LOCATOR_KIND_TCPv4; // Leave logical port as 0 to use TCP DS default logical port @@ -279,7 +279,8 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) server_1_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_1)); client_qos_1.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); auto descriptor_3 = std::make_shared(); - descriptor_3->add_listener_port(server_1_port + 10); + uint16_t client_1_port = server_1_port + 10; + descriptor_3->add_listener_port(client_1_port); // Init client ASSERT_TRUE(client_1.wire_protocol(client_qos_1) .disable_builtin_transport() @@ -294,7 +295,8 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP) // Connect to first server client_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); auto descriptor_4 = std::make_shared(); - descriptor_4->add_listener_port(server_1_port - 10); + uint16_t client_2_port = server_1_port - 10; + descriptor_4->add_listener_port(client_2_port); // Init client ASSERT_TRUE(client_2.wire_protocol(client_qos_2) .disable_builtin_transport() From d72fa6be73745e565763ee19efb86e3f54d97f09 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 3 Apr 2024 11:43:02 +0200 Subject: [PATCH 12/17] Refs #20628: Method in RTPSPartImpl Signed-off-by: cferreiragonz --- .../discovery/participant/PDPClient.cpp | 21 ++----------------- .../builtin/discovery/participant/PDPClient.h | 5 ----- .../discovery/participant/PDPServer.cpp | 21 ++----------------- .../discovery/participant/PDPServer.hpp | 5 ----- .../rtps/participant/RTPSParticipantImpl.cpp | 17 +++++++++++++++ .../rtps/participant/RTPSParticipantImpl.h | 3 +++ 6 files changed, 24 insertions(+), 48 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 2210b9392b3..358ff801c7e 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -444,7 +444,7 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = handle_logical_ports_required(); + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -856,7 +856,7 @@ void PDPClient::update_remote_servers_list() { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = handle_logical_ports_required(); + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -1447,23 +1447,6 @@ bool PDPClient::remove_remote_participant( return false; } -bool PDPClient::handle_logical_ports_required() -{ - const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); - bool set_logicals = false; - for (auto& transportDescriptor : pattr.userTransports) - { - TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); - if (pT) - { - set_logicals = true; - break; - } - } - - return set_logicals; -} - } /* namespace rtps */ } /* namespace fastdds */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h index 3acbe362f78..4a5d5d7aee4 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h @@ -242,11 +242,6 @@ class PDPClient : public PDP void perform_builtin_endpoints_matching( const ParticipantProxyData& pdata); - /** - * Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport). - */ - bool handle_logical_ports_required(); - /** * TimedEvent for server synchronization: * first stage: periodically resend the local RTPSParticipant information until diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index a92e3765ba6..cf91246eb0e 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -516,7 +516,7 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = handle_logical_ports_required(); + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -1201,7 +1201,7 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = handle_logical_ports_required(); + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { @@ -2089,23 +2089,6 @@ void PDPServer::release_change_from_writer( endpoints->writer.writer_->release_change(change); } -bool PDPServer::handle_logical_ports_required() -{ - const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); - bool set_logicals = false; - for (auto& transportDescriptor : pattr.userTransports) - { - TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); - if (pT) - { - set_logicals = true; - break; - } - } - - return set_logicals; -} - } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp index 09b20dff4cb..c5a30905e6d 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp @@ -365,11 +365,6 @@ class PDPServer : public fastrtps::rtps::PDP void match_reliable_pdp_endpoints( const fastrtps::rtps::ParticipantProxyData& pdata); - /** - * Check if the user transports of the RTPSParticipant requires logical ports (only TCP transport). - */ - bool handle_logical_ports_required(); - //! Server thread eprosima::fastrtps::rtps::ResourceEvent resource_event_thread_; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index fecb495c0b5..0f9fdf707cb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2472,6 +2472,23 @@ fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manage return mp_builtinProtocols->tlm_; } +bool RTPSParticipantImpl::has_tcp_transports() +{ + const RTPSParticipantAttributes& pattr = getRTPSParticipantAttributes(); + bool has_tcp_transports = false; + for (auto& transportDescriptor : pattr.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + has_tcp_transports = true; + break; + } + } + + return has_tcp_transports; +} + IPersistenceService* RTPSParticipantImpl::get_persistence_service( const EndpointAttributes& param) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 12ea1cb9cb3..ba32eba6bcc 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -461,6 +461,9 @@ class RTPSParticipantImpl return has_shm_transport_; } + //! Check if the participant has at least one TCP transport + bool has_tcp_transports(); + uint32_t get_min_network_send_buffer_size() { return m_network_Factory.get_min_send_buffer_size(); From bbc82b5325521ea2e160af5962dd28692031d7ee Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 3 Apr 2024 12:04:10 +0200 Subject: [PATCH 13/17] Refs #20628: Check loc.kind for default logical port Signed-off-by: cferreiragonz --- .../rtps/participant/RTPSParticipantImpl.cpp | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 0f9fdf707cb..25093d3c937 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -318,9 +318,12 @@ RTPSParticipantImpl::RTPSParticipantImpl( m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) { // TCP DS default logical port is the same as the physical one - if (IPLocator::getLogicalPort(locator) == 0) + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } } }); } @@ -344,13 +347,16 @@ RTPSParticipantImpl::RTPSParticipantImpl( } for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers) { - // TCP DS default logical port is the same as the physical one std::for_each(it.metatrafficUnicastLocatorList.begin(), it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) { - if (IPLocator::getLogicalPort(locator) == 0) + // TCP DS default logical port is the same as the physical one + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } } }); } @@ -1514,13 +1520,16 @@ void RTPSParticipantImpl::update_attributes( { for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers) { - // TCP DS default logical port is the same as the physical one std::for_each(it.metatrafficUnicastLocatorList.begin(), it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) { - if (IPLocator::getLogicalPort(locator) == 0) + // TCP DS default logical port is the same as the physical one + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) { - IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } } }); } @@ -2698,6 +2707,10 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta( break; case LOCATOR_KIND_TCPv6: IPLocator::setPhysicalPort(ret, Tcp6ListeningPort()); + if (IPLocator::getLogicalPort(ret) == 0) + { + IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret)); + } break; } return ret; From 446ef025a1308471564b15772fab1e08c21bbde4 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Thu, 4 Apr 2024 09:20:37 +0200 Subject: [PATCH 14/17] Refs #20628: Apply suggestions Signed-off-by: cferreiragonz --- .../rtps/common/LocatorSelectorEntry.hpp | 31 +++++++++---------- .../discovery/participant/PDPClient.cpp | 27 +++------------- .../discovery/participant/PDPServer.cpp | 27 +++------------- src/cpp/rtps/network/NetworkFactory.h | 1 + .../rtps/participant/RTPSParticipantImpl.h | 6 ++++ .../rtps/transport/TCPTransportInterface.cpp | 16 ++-------- 6 files changed, 34 insertions(+), 74 deletions(-) diff --git a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp index 141e5ca1804..75c59df0458 100644 --- a/include/fastdds/rtps/common/LocatorSelectorEntry.hpp +++ b/include/fastdds/rtps/common/LocatorSelectorEntry.hpp @@ -77,7 +77,6 @@ struct LocatorSelectorEntry , state(max_unicast_locators, max_multicast_locators) , enabled(false) , transport_should_process(false) - , is_initial_peer_or_ds(false) { } @@ -101,24 +100,26 @@ struct LocatorSelectorEntry state.multicast.clear(); } - void fill_unicast( - const LocatorList_t& locators) + static LocatorSelectorEntry create_fully_selected_entry( + const LocatorList_t& unicast_locators, + const LocatorList_t& multicast_locators) { - for (const Locator_t& locator : locators) + // Create an entry with space for all locators + LocatorSelectorEntry entry(unicast_locators.size(), multicast_locators.size()); + // Add and select unicast locators + for (const Locator_t& locator : unicast_locators) { - state.unicast.push_back(unicast.size()); - unicast.push_back(locator); + entry.state.unicast.push_back(entry.unicast.size()); + entry.unicast.push_back(locator); } - } - - void fill_multicast( - const LocatorList_t& locators) - { - for (const Locator_t& locator : locators) + // Add and select multicast locators + for (const Locator_t& locator : multicast_locators) { - state.multicast.push_back(multicast.size()); - multicast.push_back(locator); + entry.state.multicast.push_back(entry.multicast.size()); + entry.multicast.push_back(locator); } + // Return created entry + return entry; } //! GUID of the remote entity. @@ -133,8 +134,6 @@ struct LocatorSelectorEntry bool enabled; //! A temporary value for each transport to help optimizing some use cases. bool transport_should_process; - //! True if the locator is an initial peer or DS connection. False otherwise. - bool is_initial_peer_or_ds; }; } /* namespace rtps */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 358ff801c7e..0a5dccc9b83 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -444,24 +444,11 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); - for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if (set_logicals) - { - LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, - pattr.allocation.locators.max_multicast_locators); - entry.is_initial_peer_or_ds = true; - entry.fill_multicast(it.metatrafficMulticastLocatorList); - entry.fill_unicast(it.metatrafficUnicastLocatorList); - mp_RTPSParticipant->createSenderResources(entry); - } - else - { - mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); - } + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); #if HAVE_SECURITY if (!mp_RTPSParticipant->is_secure()) @@ -865,12 +852,8 @@ void PDPClient::update_remote_servers_list() { if (set_logicals) { - const RemoteLocatorsAllocationAttributes& rlaa = - mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; - LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); - entry.is_initial_peer_or_ds = true; - entry.fill_multicast(it.metatrafficMulticastLocatorList); - entry.fill_unicast(it.metatrafficUnicastLocatorList); + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); mp_RTPSParticipant->createSenderResources(entry); } } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index cf91246eb0e..49828a401a4 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -516,24 +516,11 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); - for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { - if (set_logicals) - { - LocatorSelectorEntry entry(pattr.allocation.locators.max_unicast_locators, - pattr.allocation.locators.max_multicast_locators); - entry.is_initial_peer_or_ds = true; - entry.fill_multicast(it.metatrafficMulticastLocatorList); - entry.fill_unicast(it.metatrafficUnicastLocatorList); - mp_RTPSParticipant->createSenderResources(entry); - } - else - { - mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList); - } + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); if (!secure) { @@ -1210,12 +1197,8 @@ void PDPServer::update_remote_servers_list() { if (set_logicals) { - const RemoteLocatorsAllocationAttributes& rlaa = - mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators; - LocatorSelectorEntry entry(rlaa.max_unicast_locators, rlaa.max_multicast_locators); - entry.is_initial_peer_or_ds = true; - entry.fill_multicast(it.metatrafficMulticastLocatorList); - entry.fill_unicast(it.metatrafficUnicastLocatorList); + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); mp_RTPSParticipant->createSenderResources(entry); } } diff --git a/src/cpp/rtps/network/NetworkFactory.h b/src/cpp/rtps/network/NetworkFactory.h index 20654879388..23fc1902773 100644 --- a/src/cpp/rtps/network/NetworkFactory.h +++ b/src/cpp/rtps/network/NetworkFactory.h @@ -88,6 +88,7 @@ class NetworkFactory * Walk over the list of transports, opening every possible channel that can send through * the locators contained in @param locator_selector_entry and returning a vector of Sender Resources associated with it. * @param locator_selector_entry LocatorSelectorEntry containing metadata and the locators through which to send. + * @return true if at least one send resource was created, false otherwise. */ bool build_send_resources( fastdds::rtps::SendResourceList&, diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index ba32eba6bcc..644667d908e 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -1043,6 +1043,12 @@ class RTPSParticipantImpl void createSenderResources( const Locator_t& locator); + /** + * Creates sender resources for the given locator selector entry by calling the NetworkFactory's + * build_send_resources method. + * + * @param locator_selector The locator selector entry for which sender resources need to be created. + */ void createSenderResources( const LocatorSelectorEntry& locator_selector); diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index f97f860ef0f..48a6cfc75e8 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -874,14 +874,9 @@ bool TCPTransportInterface::OpenOutputChannels( const LocatorSelectorEntry& locator_selector_entry) { bool success = false; - if (locator_selector_entry.is_initial_peer_or_ds) + if (locator_selector_entry.remote_guid == fastrtps::rtps::c_Guid_Unknown) { - for (size_t i = 0; i < locator_selector_entry.state.multicast.size(); ++i) - { - // TODO Carlos: is multicast needed when is initial_peer_or_ds? Or is always zero? - size_t index = locator_selector_entry.state.multicast[i]; - success |= CreateInitialConnect(send_resource_list, locator_selector_entry.multicast[index]); - } + // Only unicast is used in TCP for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) { size_t index = locator_selector_entry.state.unicast[i]; @@ -890,11 +885,6 @@ bool TCPTransportInterface::OpenOutputChannels( } else { - for (size_t i = 0; i < locator_selector_entry.state.multicast.size(); ++i) - { - size_t index = locator_selector_entry.state.multicast[i]; - success |= OpenOutputChannel(send_resource_list, locator_selector_entry.multicast[index]); - } for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) { size_t index = locator_selector_entry.state.unicast[i]; @@ -923,8 +913,6 @@ bool TCPTransportInterface::CreateInitialConnect( std::lock_guard socketsLock(sockets_map_mutex_); - // TODO Carlos: verify if it is needed to check the SenderResource - // We try to find a SenderResource that has this locator. // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport // already reuses a SenderResource. From 63173e6dbe5f655448b5c9bfef62f133b6d34224 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Thu, 4 Apr 2024 11:36:49 +0200 Subject: [PATCH 15/17] Refs #20628: Create Resource in update for UDP too Signed-off-by: cferreiragonz --- .../builtin/discovery/participant/PDPClient.cpp | 11 +++-------- .../builtin/discovery/participant/PDPServer.cpp | 11 +++-------- .../rtps/participant/RTPSParticipantImpl.cpp | 17 ----------------- src/cpp/rtps/participant/RTPSParticipantImpl.h | 3 --- 4 files changed, 6 insertions(+), 36 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 0a5dccc9b83..02539de580c 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -843,19 +843,14 @@ void PDPClient::update_remote_servers_list() { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); - for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { - if (set_logicals) - { - auto entry = LocatorSelectorEntry::create_fully_selected_entry( - it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(entry); - } + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); } if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 49828a401a4..7364dca31c2 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1188,19 +1188,14 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); - for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) { - if (set_logicals) - { - auto entry = LocatorSelectorEntry::create_fully_selected_entry( - it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); - mp_RTPSParticipant->createSenderResources(entry); - } + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList); + mp_RTPSParticipant->createSenderResources(entry); } if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 25093d3c937..2d924492b33 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2481,23 +2481,6 @@ fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manage return mp_builtinProtocols->tlm_; } -bool RTPSParticipantImpl::has_tcp_transports() -{ - const RTPSParticipantAttributes& pattr = getRTPSParticipantAttributes(); - bool has_tcp_transports = false; - for (auto& transportDescriptor : pattr.userTransports) - { - TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); - if (pT) - { - has_tcp_transports = true; - break; - } - } - - return has_tcp_transports; -} - IPersistenceService* RTPSParticipantImpl::get_persistence_service( const EndpointAttributes& param) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 644667d908e..af226589e45 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -461,9 +461,6 @@ class RTPSParticipantImpl return has_shm_transport_; } - //! Check if the participant has at least one TCP transport - bool has_tcp_transports(); - uint32_t get_min_network_send_buffer_size() { return m_network_Factory.get_min_send_buffer_size(); From a163328a91451adf2ffe365006bb674954d126cf Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Thu, 4 Apr 2024 16:34:11 +0200 Subject: [PATCH 16/17] Refs #20628: Remove unnecessary headers Signed-off-by: cferreiragonz --- src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp | 1 - src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp | 1 - 2 files changed, 2 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 02539de580c..49faa1d11fa 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -37,7 +37,6 @@ #include #include #include -#include #include #include #include diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 7364dca31c2..0c77f1e6015 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -34,7 +34,6 @@ #include #include #include -#include #include #include From 318b8ad4eee4f638742ed058e8bf75334a818b6a Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 9 Apr 2024 09:47:02 +0200 Subject: [PATCH 17/17] Refs #20628: Clarify Log_info Signed-off-by: cferreiragonz --- src/cpp/rtps/transport/TCPTransportInterface.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 48a6cfc75e8..bf947089ed0 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -832,7 +832,7 @@ bool TCPTransportInterface::OpenOutputChannel( if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface) { // Client side (either Server-Client or LARGE_DATA) - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(RTCP, "OpenOutputChannel: [CONNECT] @ " << IPLocator::to_string(locator)); // Create a TCP_CONNECT_TYPE channel std::shared_ptr channel( @@ -855,7 +855,7 @@ bool TCPTransportInterface::OpenOutputChannel( { // Server side LARGE_DATA // Act as server and wait to the other endpoint to connect. Add locator to sender_resource_list - EPROSIMA_LOG_INFO(OpenOutputChannel, + EPROSIMA_LOG_INFO(RTCP, "OpenOutputChannel: [WAIT_CONNECTION] @ " << IPLocator::to_string(locator)); std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); channel_pending_logical_ports_[physical_locator].insert(logical_port); @@ -969,7 +969,7 @@ bool TCPTransportInterface::CreateInitialConnect( configuration()->maxMessageSize)) ); - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(RTCP, "CreateInitialConnect: [CONNECT] @ " << IPLocator::to_string(locator)); channel_resources_[physical_locator] = channel; channel->connect(channel_resources_[physical_locator]);