diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 36fba93eb07..30b998398ef 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -430,6 +431,15 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + // TCP Clients need to handle logical ports + if (mp_RTPSParticipant->has_tcp_transports()) + { + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) + { + mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList); + } + } + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); @@ -828,8 +838,20 @@ void PDPClient::update_remote_servers_list() { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + // TCP Clients need to handle logical ports + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || + !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) + { + if (set_logicals) + { + mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList); + } + } + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index f4998afc6f8..81778431ba8 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -501,6 +502,15 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( { eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + // TCP Clients need to handle logical ports + if (mp_RTPSParticipant->has_tcp_transports()) + { + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) + { + mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList); + } + } + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList); @@ -1170,8 +1180,20 @@ void PDPServer::update_remote_servers_list() eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + // TCP Clients need to handle logical ports + bool set_logicals = mp_RTPSParticipant->has_tcp_transports(); + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) { + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) || + !endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader())) + { + if (set_logicals) + { + mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList); + } + } + if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter())) { match_pdp_writer_nts_(it); @@ -1187,6 +1209,9 @@ void PDPServer::update_remote_servers_list() { discovery_db_.add_server(server.guidPrefix); } + + // Need to reactivate the server thread to send the DATA(p) to the new servers + awake_server_thread(); } bool PDPServer::process_writers_acknowledgements() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 0fcd6692531..e8377321ea1 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -200,18 +200,68 @@ RTPSParticipantImpl::RTPSParticipantImpl( switch (m_att.builtin.discovery_config.discoveryProtocol) { case DiscoveryProtocol::BACKUP: - case DiscoveryProtocol::CLIENT: case DiscoveryProtocol::SERVER: + // Verify if listening ports are provided + for (auto& transportDescriptor : m_att.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + if (pT->listening_ports.empty()) + { + logError(RTPS_PARTICIPANT, + "Participant " << m_att.getName() << " with GUID " << m_guid << + " tries to create a TCP server for discovery server without providing a proper listening port."); + break; + } + if (!m_att.builtin.metatrafficUnicastLocatorList.empty()) + { + std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(), + m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + // TCP DS default logical port is the same as the physical one + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + } + }); + } + } + } + break; + case DiscoveryProtocol::CLIENT: case DiscoveryProtocol::SUPER_CLIENT: // Verify if listening ports are provided for (auto& transportDescriptor : m_att.userTransports) { TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); - if (pT && pT->listening_ports.empty()) + if (pT) { - logInfo(RTPS_PARTICIPANT, - "Participant " << m_att.getName() << " with GUID " << m_guid << - " tries to use discovery server over TCP without providing a proper listening port."); + if (pT->listening_ports.empty()) + { + logInfo(RTPS_PARTICIPANT, + "Participant " << m_att.getName() << " with GUID " << m_guid << + " tries to create a TCP client for discovery server without providing a proper listening port." << + " No TCP participants will be able to connect to this participant, but it will be able make connections."); + } + for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers) + { + std::for_each(it.metatrafficUnicastLocatorList.begin(), + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + // TCP DS default logical port is the same as the physical one + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + } + }); + } } } default: @@ -1268,8 +1318,37 @@ void RTPSParticipantImpl::update_attributes( auto pdp = mp_builtinProtocols->mp_PDP; + // Check if discovery servers need to be updated + eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers = + patt.builtin.discovery_config.m_DiscoveryServers; + if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers) + { + for (auto& transportDescriptor : m_att.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers) + { + std::for_each(it.metatrafficUnicastLocatorList.begin(), + it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) + { + // TCP DS default logical port is the same as the physical one + if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6) + { + if (IPLocator::getLogicalPort(locator) == 0) + { + IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator)); + } + } + }); + } + } + } + } + // Check if there are changes - if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers + if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers || patt.userData != m_att.userData || update_pdp) { @@ -1291,7 +1370,7 @@ void RTPSParticipantImpl::update_attributes( for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers) { bool contained = false; - for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) + for (auto incoming_server : converted_discovery_servers) { if (existing_server.guidPrefix == incoming_server.guidPrefix) { @@ -1364,8 +1443,8 @@ void RTPSParticipantImpl::update_attributes( m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) { - // Add incoming servers iff we don't know about them already or the listening locator has been modified - for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) + // Add incoming servers if we don't know about them already or the listening locator has been modified + for (auto incoming_server : converted_discovery_servers) { eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it; for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin(); @@ -2164,6 +2243,38 @@ fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manage return mp_builtinProtocols->tlm_; } +bool RTPSParticipantImpl::has_tcp_transports() +{ + const RTPSParticipantAttributes& pattr = getRTPSParticipantAttributes(); + bool has_tcp_transports = false; + for (auto& transportDescriptor : pattr.userTransports) + { + TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); + if (pT) + { + has_tcp_transports = true; + break; + } + } + + return has_tcp_transports; +} + +void RTPSParticipantImpl::create_tcp_connections( + const LocatorList_t& locators) +{ + for (const Locator_t& loc : locators) + { + if (loc.kind == LOCATOR_KIND_TCPv4 || loc.kind == LOCATOR_KIND_TCPv6) + { + // Set logical port to 0 and call createSenderResources to allow opening a TCP CONNECT channel in the transport + Locator_t loc_with_logical_zero = loc; + IPLocator::setLogicalPort(loc_with_logical_zero, 0); + createSenderResources(loc_with_logical_zero); + } + } +} + IPersistenceService* RTPSParticipantImpl::get_persistence_service( const EndpointAttributes& param) { @@ -2365,9 +2476,17 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta( case LOCATOR_KIND_TCPv4: set_wan_address(ret); IPLocator::setPhysicalPort(ret, Tcp4ListeningPort()); + if (IPLocator::getLogicalPort(ret) == 0) + { + IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret)); + } break; case LOCATOR_KIND_TCPv6: IPLocator::setPhysicalPort(ret, Tcp6ListeningPort()); + if (IPLocator::getLogicalPort(ret) == 0) + { + IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret)); + } break; } return ret; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index f4555154315..3ce0300a8eb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -440,6 +440,19 @@ class RTPSParticipantImpl return has_shm_transport_; } + //! Check if the participant has at least one TCP transport + bool has_tcp_transports(); + + /** + * This method creates the needed sender resources for a locator list, but forces + * each logical port to be zero. It is used to enforce the proper creation of a + * CONNECT channel in TCP scenarios. + * + * @param locators List of unicast locators. + */ + void create_tcp_connections( + const LocatorList_t& locators); + uint32_t get_min_network_send_buffer_size() { return m_network_Factory.get_min_send_buffer_size(); diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 1ce807907a5..74ca287eee1 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -626,10 +626,14 @@ bool TCPTransportInterface::OpenOutputChannel( return false; } + bool always_connect = false; uint16_t logical_port = IPLocator::getLogicalPort(locator); if (0 == logical_port) { - return false; + // During builtin endpoints setup, a logical port equal to 0 indicates that the locator belongs + // to discovery server remote server. A connect channel is always needed. + // Should only be called once to avoid adding a logical port equal to 0. + always_connect = true; } Locator physical_locator = IPLocator::toPhysicalLocator(locator); @@ -648,6 +652,7 @@ bool TCPTransportInterface::OpenOutputChannel( IPLocator::WanToLanLocator(physical_locator) == tcp_sender_resource->locator()))) { + // If missing, logical port will be added in first send() // Add logical port to channel if it's not there yet auto channel_resource = channel_resources_.find(physical_locator); @@ -658,14 +663,17 @@ bool TCPTransportInterface::OpenOutputChannel( channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator)); } - if (channel_resource != channel_resources_.end()) - { - channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get()); - } - else + if (logical_port != 0) { - std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); - channel_pending_logical_ports_[physical_locator].insert(logical_port); + if (channel_resource != channel_resources_.end()) + { + channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + else + { + std::lock_guard channelPendingLock(channel_pending_logical_ports_mutex_); + channel_pending_logical_ports_[physical_locator].insert(logical_port); + } } statistics_info_.add_entry(locator); @@ -738,7 +746,9 @@ bool TCPTransportInterface::OpenOutputChannel( // If the remote physical port is higher than our listening port, a new CONNECT channel needs to be created and connected // and the locator added to the send_resource_list. // If the remote physical port is lower than our listening port, only the locator needs to be added to the send_resource_list. - if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface) + // If the ports are equal, the CONNECT channel is created if the local interface is lower. + // If this locator belong to a DS server, a CONNECT channel is always needed. + if (always_connect || IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface) { // Client side (either Server-Client or LARGE_DATA) logInfo(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: " @@ -760,7 +770,11 @@ bool TCPTransportInterface::OpenOutputChannel( channel_resources_[physical_locator] = channel; channel->connect(channel_resources_[physical_locator]); - channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + // Add logical port only if it's not 0 + if (!always_connect) + { + channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + } } else { diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index 507d05aeef4..dc6b84bd94d 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -89,7 +89,7 @@ TEST(DDSDiscovery, IgnoreParticipantFlags) * 2. Then, connect the client to the other server and check discovery again. * 3. Finally connect the two servers by adding one of them to the others list */ -TEST(DDSDiscovery, AddDiscoveryServerToList) +TEST(DDSDiscovery, AddDiscoveryServerToListUDP) { using namespace eprosima; using namespace eprosima::fastdds::dds; @@ -171,7 +171,7 @@ TEST(DDSDiscovery, AddDiscoveryServerToList) // Update client's servers list ASSERT_TRUE(client.update_wire_protocol(client_qos)); - /* Check that the servers only know about the client, and that the client known about both servers */ + /* Check that the servers only know about the client and that the client knows about both servers */ server_1.wait_discovery(std::chrono::seconds::zero(), 1, true); client.wait_discovery(std::chrono::seconds::zero(), 2, true); server_2.wait_discovery(std::chrono::seconds::zero(), 1, true); @@ -186,6 +186,144 @@ TEST(DDSDiscovery, AddDiscoveryServerToList) server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); } +/** + * This test checks that adding servers to the Discovery Server list results in discovering those participants. + * It does so by: + * 1. Creating two servers and two clients that are only connected to the first server. Discovery is checked + * at this state. + * 2. Then, connect client_1 to the second server and check discovery again. + * 3. Finally connect the two servers by adding one of them to the others list and check disvoery again. + */ +TEST(DDSDiscovery, AddDiscoveryServerToListTCP) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // TCP default DS port + std::string W_UNICAST_PORT_RANDOM_NUMBER_STR = "42100"; + + /* Create first server */ + PubSubParticipant server_1(0u, 0u, 0u, 0u); + // Set participant as server + WireProtocolConfigQos server_1_qos; + server_1_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER; + // Generate random GUID prefix + srand(static_cast(time(nullptr))); + GuidPrefix_t server_1_prefix; + for (auto i = 0; i < 12; i++) + { + server_1_prefix.value[i] = eprosima::fastrtps::rtps::octet(rand() % 254); + } + server_1_qos.prefix = server_1_prefix; + // Generate server's listening locator + Locator_t locator_server_1; + IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + uint16_t server_1_port = static_cast(stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR)); + IPLocator::setPhysicalPort(locator_server_1, server_1_port); + locator_server_1.kind = LOCATOR_KIND_TCPv4; + // Leave logical port as 0 to use TCP DS default logical port + server_1_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + // Add TCP transport + auto descriptor_1 = std::make_shared(); + descriptor_1->add_listener_port(server_1_port); + // Init server + ASSERT_TRUE(server_1.wire_protocol(server_1_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_1) + .init_participant()); + + /* Create second server */ + PubSubParticipant server_2(0u, 0u, 0u, 0u); + // Set participant as server + WireProtocolConfigQos server_2_qos; + server_2_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER; + // Generate random GUID prefix + GuidPrefix_t server_2_prefix = server_1_prefix; + server_2_prefix.value[11]++; + server_2_qos.prefix = server_2_prefix; + // Generate server's listening locator + Locator_t locator_server_2; + IPLocator::setIPv4(locator_server_2, 127, 0, 0, 1); + uint16_t server_2_port = server_1_port + 1; + IPLocator::setPhysicalPort(locator_server_2, server_2_port); + locator_server_2.kind = LOCATOR_KIND_TCPv4; + // Leave logical port as 0 to use TCP DS default logical port + server_2_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Add TCP transport + auto descriptor_2 = std::make_shared(); + descriptor_2->add_listener_port(server_2_port); + + // Init server + ASSERT_TRUE(server_2.wire_protocol(server_2_qos) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_2) + .init_participant()); + + + /* Create a client that connects to the first server from the beginning with higher listening_port*/ + PubSubParticipant client_1(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos_1; + client_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::CLIENT; + // Connect to first server + RemoteServerAttributes server_1_att; + server_1_att.guidPrefix = server_1_prefix; + server_1_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_1)); + client_qos_1.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); + auto descriptor_3 = std::make_shared(); + uint16_t client_1_port = server_1_port + 10; + descriptor_3->add_listener_port(client_1_port); + // Init client + ASSERT_TRUE(client_1.wire_protocol(client_qos_1) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_3) + .init_participant()); + + /* Create a client that connects to the first server from the beginning with lower listening_port*/ + PubSubParticipant client_2(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos_2; + client_qos_2.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::CLIENT; + // Connect to first server + client_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att); + auto descriptor_4 = std::make_shared(); + uint16_t client_2_port = server_1_port - 10; + descriptor_4->add_listener_port(client_2_port); + // Init client + ASSERT_TRUE(client_2.wire_protocol(client_qos_2) + .disable_builtin_transport() + .add_user_transport_to_pparams(descriptor_4) + .init_participant()); + + server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and client2 + client_1.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 0, true); // Knows no one + + /* Add server_2 to client */ + RemoteServerAttributes server_2_att; + server_2_att.guidPrefix = server_2_prefix; + server_2_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_2)); + client_qos_1.builtin.discovery_config.m_DiscoveryServers.push_back(server_2_att); + // Update client_1's servers list + ASSERT_TRUE(client_1.update_wire_protocol(client_qos_1)); + + server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and client2 + client_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server2 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows client1 + + /* Add server_2 to server_1 */ + server_1_qos.builtin.discovery_config.m_DiscoveryServers.push_back(server_2_att); + ASSERT_TRUE(server_1.update_wire_protocol(server_1_qos)); + + server_1.wait_discovery(std::chrono::seconds::zero(), 3, true); // Knows client1, client2 and server2 + client_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server2 + client_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1 + server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and server1 +} + /** * This test checks the addition of network interfaces at run-time. *