From 5e4ed107ace4bd587843a933ecfc6b9e64f38642 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 2 Dec 2022 12:38:11 +0100 Subject: [PATCH] Fix communication with asymmetric ignoreParticipantFlags (#3128) * Fix communication with asymmetric ignoreParticipantFlags (#3105) * Refs #16253. Regression test. Signed-off-by: Miguel Company * Refs #16253. Ignoring participants from other processes when they have no locators. Signed-off-by: Miguel Company * Refs #16253. Avoid announcing locators for intraprocess-only participants. Signed-off-by: Miguel Company * Refs #16253. Set TTL to 0 for intraprocess-only participants. Signed-off-by: Miguel Company * Refs #16253. Avoid SHM transport for intraprocess-only participants. Signed-off-by: Miguel Company Signed-off-by: Miguel Company (cherry picked from commit 865702b4496884e866426c38d80da35fba9b991d) # Conflicts: # src/cpp/rtps/builtin/discovery/participant/PDP.cpp * Fix conflicts Signed-off-by: Miguel Company * Added ignore_participant_flags() to Blackbox_FastRTPS PubSubReader. (#3114) Signed-off-by: Miguel Company Signed-off-by: Miguel Company Signed-off-by: Miguel Company Co-authored-by: Miguel Company (cherry picked from commit 6de984284025573ffa8155931178c7f8af8709a5) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../builtin/discovery/participant/PDP.cpp | 38 +++++++---- .../discovery/participant/PDPSimple.cpp | 21 ++++-- .../rtps/participant/RTPSParticipantImpl.cpp | 26 ++++--- .../api/fastrtps_deprecated/PubSubReader.hpp | 7 ++ .../common/BlackboxTestsDiscovery.cpp | 68 +++++++++++++++++++ 5 files changed, 131 insertions(+), 29 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 8f99ef83169..6bc4b117dc2 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -239,7 +239,10 @@ ParticipantProxyData* PDP::add_participant_proxy_data( void PDP::initializeParticipantProxyData( ParticipantProxyData* participant_data) { - participant_data->m_leaseDuration = mp_RTPSParticipant->getAttributes().builtin.discovery_config.leaseDuration; + RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes(); + bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only(); + + participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration; //set_VendorId_eProsima(participant_data->m_VendorId); participant_data->m_VendorId = c_VendorId_eProsima; @@ -278,13 +281,16 @@ void PDP::initializeParticipantProxyData( participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints(); #endif // if HAVE_SECURITY - for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultUnicastLocatorList) - { - participant_data->default_locators.add_unicast_locator(loc); - } - for (const Locator_t& loc : mp_RTPSParticipant->getAttributes().defaultMulticastLocatorList) + if (announce_locators) { - participant_data->default_locators.add_multicast_locator(loc); + for (const Locator_t& loc : attributes.defaultUnicastLocatorList) + { + participant_data->default_locators.add_unicast_locator(loc); + } + for (const Locator_t& loc : attributes.defaultMulticastLocatorList) + { + participant_data->default_locators.add_multicast_locator(loc); + } } participant_data->m_expectsInlineQos = false; participant_data->m_guid = mp_RTPSParticipant->getGuid(); @@ -313,21 +319,27 @@ void PDP::initializeParticipantProxyData( } participant_data->metatraffic_locators.unicast.clear(); - for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) + if (announce_locators) { - participant_data->metatraffic_locators.add_unicast_locator(loc); + for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) + { + participant_data->metatraffic_locators.add_unicast_locator(loc); + } } participant_data->metatraffic_locators.multicast.clear(); - if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) + if (announce_locators) { - for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) + if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) { - participant_data->metatraffic_locators.add_multicast_locator(loc); + for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) + { + participant_data->metatraffic_locators.add_multicast_locator(loc); + } } } - participant_data->m_participantName = std::string(mp_RTPSParticipant->getAttributes().getName()); + participant_data->m_participantName = std::string(attributes.getName()); participant_data->m_userData = mp_RTPSParticipant->getAttributes().userData; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index ab74b5e9285..6bad54ba93b 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -161,13 +161,22 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData( // decide if we dismiss the participant using the ParticipantFilteringFlags const ParticipantFilteringFlags_t& flags = m_discovery.discovery_config.ignoreParticipantFlags; + const GUID_t& remote = participant_data.m_guid; + const GUID_t& local = getLocalParticipantProxyData()->m_guid; + bool is_same_host = local.is_on_same_host_as(remote); + bool is_same_process = local.is_on_same_process_as(remote); + + // Discard participants on different process when they don't have metatraffic locators + if (participant_data.metatraffic_locators.multicast.empty() && + participant_data.metatraffic_locators.unicast.empty() && + !is_same_process) + { + return nullptr; + } if (flags != ParticipantFilteringFlags_t::NO_FILTER) { - const GUID_t& remote = participant_data.m_guid; - const GUID_t& local = getLocalParticipantProxyData()->m_guid; - - if (!local.is_on_same_host_as(remote)) + if (!is_same_host) { if (flags & ParticipantFilteringFlags::FILTER_DIFFERENT_HOST) { @@ -184,9 +193,7 @@ ParticipantProxyData* PDPSimple::createParticipantProxyData( return nullptr; } - bool is_same = local.is_on_same_process_as(remote); - - if ((filter_same && is_same) || (filter_different && !is_same)) + if ((filter_same && is_same_process) || (filter_different && !is_same_process)) { return nullptr; } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index e6671509396..19abd813cdd 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -152,18 +152,26 @@ RTPSParticipantImpl::RTPSParticipantImpl( UDPv4TransportDescriptor descriptor; descriptor.sendBufferSize = m_att.sendSocketBufferSize; descriptor.receiveBufferSize = m_att.listenSocketBufferSize; + if (is_intraprocess_only()) + { + // Avoid multicast leaving the host for intraprocess-only participants + descriptor.TTL = 0; + } m_network_Factory.RegisterTransport(&descriptor, &m_att.properties); #ifdef SHM_TRANSPORT_BUILTIN - SharedMemTransportDescriptor shm_transport; - // We assume (Linux) UDP doubles the user socket buffer size in kernel, so - // the equivalent segment size in SHM would be socket buffer size x 2 - auto segment_size_udp_equivalent = - std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; - shm_transport.segment_size(segment_size_udp_equivalent); - // Use same default max_message_size on both UDP and SHM - shm_transport.max_message_size(descriptor.max_message_size()); - has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); + if (!is_intraprocess_only()) + { + SharedMemTransportDescriptor shm_transport; + // We assume (Linux) UDP doubles the user socket buffer size in kernel, so + // the equivalent segment size in SHM would be socket buffer size x 2 + auto segment_size_udp_equivalent = + std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; + shm_transport.segment_size(segment_size_udp_equivalent); + // Use same default max_message_size on both UDP and SHM + shm_transport.max_message_size(descriptor.max_message_size()); + has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); + } #endif // ifdef SHM_TRANSPORT_BUILTIN } diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 55920e6dd13..f528caa4bbe 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -946,6 +946,13 @@ class PubSubReader return *this; } + PubSubReader& ignore_participant_flags( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t flags) + { + participant_attr_.rtps.builtin.discovery_config.ignoreParticipantFlags = flags; + return *this; + } + PubSubReader& socket_buffer_size( uint32_t sockerBufferSize) { diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 9f707fca4be..4c204d67b7e 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -1282,6 +1282,74 @@ TEST_P(Discovery, EndpointCreationMultithreaded) endpoint_thr.join(); } +// Regression test for redmine issue 16253 +TEST_P(Discovery, AsymmeticIgnoreParticipantFlags) +{ + if (INTRAPROCESS != GetParam()) + { + GTEST_SKIP() << "Only makes sense on INTRAPROCESS"; + return; + } + + // This participant is created with flags to ignore participants which are not on the same process. + // When the announcements of this participant arrive to p2, a single DATA(p) should be sent back. + // No other traffic is expected, since it will take place through intraprocess. + PubSubReader p1(TEST_TOPIC_NAME); + p1.ignore_participant_flags(static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS)); + p1.init(); + EXPECT_TRUE(p1.isInitialized()); + + // This participant is created with the test transport to check that nothing unexpected is sent to the + // multicast metatraffic locators. + // Setting localhost in the interface whitelist ensures that the traffic will not leave the host, and also + // that multicast datagrams are sent only once. + // A very long period for the participant announcement is set, along with 0 initial announcements, so we can + // have a exact expectation on the number of datagrams sent to multicast. + PubSubWriter p2(TEST_TOPIC_NAME); + + // This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set + // its value when the first multicast datagram is sent. + std::atomic multicast_port{ 0 }; + // Only two multicast datagrams are allowed: the initial DATA(p) and the DATA(p) sent in response of the discovery + // of p1. + constexpr uint32_t allowed_messages_on_port = 2; + + auto test_transport = std::make_shared(); + + std::atomic messages_on_port{ 0 }; + test_transport->interfaceWhiteList.push_back("127.0.0.1"); + test_transport->locator_filter_ = [&multicast_port, &messages_on_port]( + const eprosima::fastdds::rtps::Locator& destination) + { + if (IPLocator::isMulticast(destination)) + { + uint32_t port = 0; + multicast_port.compare_exchange_strong(port, destination.port); + if (destination.port == multicast_port) + { + ++messages_on_port; + } + } + return false; + }; + + p2.disable_builtin_transport(). + add_user_transport_to_pparams(test_transport). + lease_duration({ 60 * 60, 0 }, { 50 * 60, 0 }). + initial_announcements(0, {}); + p2.init(); + EXPECT_TRUE(p2.isInitialized()); + + // Wait for participants and endpoints to discover each other + p1.wait_discovery(); + p2.wait_discovery(); + + // Check expectation on the number of multicast datagrams sent by p2 + EXPECT_EQ(messages_on_port, allowed_messages_on_port); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else