From 2179ea3b1daa4fad5dd383d051fabe4947a4e0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=C3=ADaz=20Qu=C3=ADlez?= <33602217+Tempate@users.noreply.github.com> Date: Thu, 22 Feb 2024 12:30:21 +0100 Subject: [PATCH] Fix data race on PDP (#4220) * Add regression test Signed-off-by: Juan Lopez Fernandez * Make a copy of the participant proxy data Signed-off-by: tempate * Use the copy method to ensure all the attributes are being copied correctly Signed-off-by: tempate * Lookup the participant proxy data to pass the discovery-server tests Signed-off-by: tempate * Get rid of proxy reference in RemoteServerAttributes Signed-off-by: Juan Lopez Fernandez * Fix failing test Signed-off-by: Juan Lopez Fernandez * Uncrustify Signed-off-by: Juan Lopez Fernandez --------- Signed-off-by: Juan Lopez Fernandez Signed-off-by: tempate Co-authored-by: Juan Lopez Fernandez (cherry picked from commit 305383295a9bbb9f12c5c0aa303667872d65ea33) --- .../rtps/attributes/ServerAttributes.h | 7 +- .../discovery/participant/PDPClient.cpp | 20 +-- .../discovery/participant/PDPListener.cpp | 22 ++- .../common/DDSBlackboxTestsDiscovery.cpp | 157 ++++++++++++++++-- 4 files changed, 176 insertions(+), 30 deletions(-) diff --git a/include/fastdds/rtps/attributes/ServerAttributes.h b/include/fastdds/rtps/attributes/ServerAttributes.h index c8fb9cd9ed..13f58753c4 100644 --- a/include/fastdds/rtps/attributes/ServerAttributes.h +++ b/include/fastdds/rtps/attributes/ServerAttributes.h @@ -54,7 +54,6 @@ class RemoteServerAttributes return guidPrefix == r.guidPrefix && metatrafficUnicastLocatorList == r.metatrafficUnicastLocatorList && metatrafficMulticastLocatorList == r.metatrafficMulticastLocatorList; - // && proxy == r.proxy; } RTPS_DllAPI void clear() @@ -62,7 +61,7 @@ class RemoteServerAttributes guidPrefix = fastrtps::rtps::GuidPrefix_t::unknown(); metatrafficUnicastLocatorList.clear(); metatrafficMulticastLocatorList.clear(); - proxy = nullptr; + is_connected = false; } RTPS_DllAPI fastrtps::rtps::GUID_t GetParticipant() const; @@ -100,8 +99,8 @@ class RemoteServerAttributes //!Guid prefix fastrtps::rtps::GuidPrefix_t guidPrefix; - // Live participant proxy reference - const fastrtps::rtps::ParticipantProxyData* proxy{}; + // Whether connection has been established + bool is_connected = false; // Check if there are specific transport locators associated // the template parameter is the locator kind (e.g. LOCATOR_KIND_UDPv4) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 82e758ec43..d83bb486aa 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -472,7 +472,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( return true; } -// the ParticipantProxyData* pdata must be the one kept in PDP database void PDPClient::assignRemoteEndpoints( ParticipantProxyData* pdata) { @@ -488,8 +487,7 @@ void PDPClient::assignRemoteEndpoints( { if (data_matches_with_prefix(svr.guidPrefix, *pdata)) { - std::unique_lock lock(*getMutex()); - svr.proxy = pdata; + svr.is_connected = true; } } } @@ -517,11 +515,11 @@ void PDPClient::notifyAboveRemoteEndpoints( { if (data_matches_with_prefix(svr.guidPrefix, pdata)) { - if (nullptr == svr.proxy) + if (!svr.is_connected && nullptr != get_participant_proxy_data(svr.guidPrefix)) { - //! try to retrieve the participant proxy data from an unmangled prefix in case - //! we could not fill svr.proxy in assignRemoteEndpoints() - svr.proxy = get_participant_proxy_data(svr.guidPrefix); + //! mark proxy as connected from an unmangled prefix in case + //! it could not be done in assignRemoteEndpoints() + svr.is_connected = true; } match_pdp_reader_nts_(svr, pdata.m_guid.guidPrefix); @@ -596,7 +594,7 @@ void PDPClient::removeRemoteEndpoints( if (svr.guidPrefix == pdata->m_guid.guidPrefix) { std::unique_lock lock(*getMutex()); - svr.proxy = nullptr; // reasign when we receive again server DATA(p) + svr.is_connected = false; is_server = true; mp_sync->restart_timer(); // enable announcement and sync mechanism till this server reappears } @@ -768,11 +766,11 @@ void PDPClient::announceParticipantState( for (auto& svr : mp_builtin->m_DiscoveryServers) { // if we are matched to a server report demise - if (svr.proxy != nullptr) + if (svr.is_connected) { //locators.push_back(svr.metatrafficMulticastLocatorList); locators.push_back(svr.metatrafficUnicastLocatorList); - remote_readers.emplace_back(svr.proxy->m_guid.guidPrefix, + remote_readers.emplace_back(svr.guidPrefix, endpoints->reader.reader_->getGuid().entityId); } } @@ -805,7 +803,7 @@ void PDPClient::announceParticipantState( { // non-pinging announcements like lease duration ones must be // broadcast to all servers - if (svr.proxy == nullptr || !_serverPing) + if (!svr.is_connected || !_serverPing) { locators.push_back(svr.metatrafficMulticastLocatorList); locators.push_back(svr.metatrafficUnicastLocatorList); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 31f803f88a..39a19562a0 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -181,11 +181,14 @@ void PDPListener::process_alive_data( // Create a new one when not found old_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid); - reader->getMutex().unlock(); - lock.unlock(); - if (old_data != nullptr) { + // Copy proxy to be passed forward before releasing PDP mutex + ParticipantProxyData old_data_copy(*old_data); + + reader->getMutex().unlock(); + lock.unlock(); + // Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since // StatelessWriter::matched_reader_add marks the entire history as unsent if the added reader's // durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT), @@ -196,13 +199,19 @@ void PDPListener::process_alive_data( // participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will // create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched // readers anyways. - parent_pdp_->assignRemoteEndpoints(old_data); + parent_pdp_->assignRemoteEndpoints(&old_data_copy); + } + else + { + reader->getMutex().unlock(); + lock.unlock(); } } else { old_data->updateData(new_data); old_data->isAlive = true; + reader->getMutex().unlock(); EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant " @@ -215,6 +224,9 @@ void PDPListener::process_alive_data( parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true); } + // Copy proxy to be passed forward before releasing PDP mutex + ParticipantProxyData old_data_copy(*old_data); + lock.unlock(); RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener(); @@ -224,7 +236,7 @@ void PDPListener::process_alive_data( { std::lock_guard cb_lock(parent_pdp_->callback_mtx_); - ParticipantDiscoveryInfo info(*old_data); + ParticipantDiscoveryInfo info(old_data_copy); info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT; listener->onParticipantDiscovery( diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index d19b8f2074..dbac548bfe 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -36,9 +37,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -438,13 +441,13 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) { delete remote_participant_info; } - remote_participant_info = new ParticipantDiscoveryInfo(info); + remote_participant_info = new ParticipantProxyData(info.info); found_->store(true); cv_->notify_one(); } } - ParticipantDiscoveryInfo* remote_participant_info; + ParticipantProxyData* remote_participant_info; private: @@ -496,7 +499,7 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) participant_found.store(false); // Prevent assertion on spurious discovery of a participant from elsewhere - if (part_1->guid() == listener.remote_participant_info->info.m_guid) + if (part_1->guid() == listener.remote_participant_info->m_guid) { // Check that all three properties are present in the ParticipantProxyData, and that their value // is that of the property in part_1 (the original property value) @@ -504,13 +507,13 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) { // Find property in ParticipantProxyData auto received_property = std::find_if( - listener.remote_participant_info->info.m_properties.begin(), - listener.remote_participant_info->info.m_properties.end(), + listener.remote_participant_info->m_properties.begin(), + listener.remote_participant_info->m_properties.end(), [&](const ParameterProperty_t& property) { return property.first() == physical_property_name; }); - ASSERT_NE(received_property, listener.remote_participant_info->info.m_properties.end()); + ASSERT_NE(received_property, listener.remote_participant_info->m_properties.end()); // Find property in first participant auto part_1_property = PropertyPolicyHelper::find_property( @@ -556,20 +559,20 @@ TEST(DDSDiscovery, ParticipantProxyPhysicalData) participant_found.store(false); // Prevent assertion on spurious discovery of a participant from elsewhere - if (part_1->guid() == listener.remote_participant_info->info.m_guid) + if (part_1->guid() == listener.remote_participant_info->m_guid) { // Check that none of the three properties are present in the ParticipantProxyData. for (auto physical_property_name : physical_property_names) { // Look for property in ParticipantProxyData auto received_property = std::find_if( - listener.remote_participant_info->info.m_properties.begin(), - listener.remote_participant_info->info.m_properties.end(), + listener.remote_participant_info->m_properties.begin(), + listener.remote_participant_info->m_properties.end(), [&](const ParameterProperty_t& property) { return property.first() == physical_property_name; }); - ASSERT_EQ(received_property, listener.remote_participant_info->info.m_properties.end()); + ASSERT_EQ(received_property, listener.remote_participant_info->m_properties.end()); } break; } @@ -1603,3 +1606,137 @@ TEST(DDSDiscovery, WaitSetMatchedStatus) test_DDSDiscovery_WaitSetMatchedStatus(false); test_DDSDiscovery_WaitSetMatchedStatus(true); } + +// Regression test for redmine issue 20409 +TEST(DDSDiscovery, DataracePDP) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastdds::rtps; + + class CustomDomainParticipantListener : public DomainParticipantListener + { + public: + + CustomDomainParticipantListener() + : DomainParticipantListener() + , discovery_future(discovery_promise.get_future()) + , destruction_future(destruction_promise.get_future()) + , undiscovery_future(undiscovery_promise.get_future()) + { + } + + void on_participant_discovery( + DomainParticipant* /*participant*/, + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info) override + { + if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) + { + try + { + discovery_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + destruction_future.wait(); + } + else if (info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT || + info.status == eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) + { + try + { + undiscovery_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + } + } + + std::promise discovery_promise; + std::future discovery_future; + + std::promise destruction_promise; + std::future destruction_future; + + std::promise undiscovery_promise; + std::future undiscovery_future; + }; + + // Disable intraprocess + auto settings = fastrtps::xmlparser::XMLProfileManager::library_settings(); + auto prev_intraprocess_delivery = settings.intraprocess_delivery; + settings.intraprocess_delivery = fastrtps::INTRAPROCESS_OFF; + fastrtps::xmlparser::XMLProfileManager::library_settings(settings); + + // DDS Domain Id + const unsigned int DOMAIN_ID = (uint32_t)GET_PID() % 230; + + // This is a non deterministic test, so we will run it several times to increase probability of data race detection + // if it exists. + const unsigned int N_ITER = 10; + unsigned int iter_idx = 0; + while (iter_idx < N_ITER) + { + iter_idx++; + + DomainParticipantQos qos; + qos.transport().use_builtin_transports = false; + auto udp_transport = std::make_shared(); + qos.transport().user_transports.push_back(udp_transport); + + // Create discoverer participant (the one where a data race on PDP might occur) + CustomDomainParticipantListener participant_listener; + DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID, qos, + &participant_listener); + + DomainParticipantQos aux_qos; + aux_qos.transport().use_builtin_transports = false; + auto aux_udp_transport = std::make_shared(); + aux_qos.transport().user_transports.push_back(aux_udp_transport); + + // Create auxiliary participant to be discovered + aux_qos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(1, 0); + aux_qos.wire_protocol().builtin.discovery_config.leaseDuration = Duration_t(1, 10); + DomainParticipant* aux_participant = DomainParticipantFactory::get_instance()->create_participant(DOMAIN_ID, + aux_qos); + + // Wait for discovery + participant_listener.discovery_future.wait(); + + // Shutdown auxiliary participant's network, so it will be removed after lease duration + test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true; + DomainParticipantFactory::get_instance()->delete_participant(aux_participant); + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); // Wait for longer than lease duration + + try + { + // NOTE: at this point, the discoverer participant is stuck in a UDP discovery thread (unicast or multicast). + // At the same time, the events thread is stuck at PDP::remove_remote_participant (lease duration expired + // and so the discovered participant is removed), trying to acquire the callback mutex taken by the + // discovery thread. + + // If we now signal the discovery thread to continue, a data race might occur if the received + // ParticipantProxyData, which is further being processed in the discovery thread (assignRemoteEndpoints), + // gets deleted/cleared by the events thread at the same time. + // Note that a similar situation might arise in other scenarios, such as on the concurrent reception of a + // data P and data uP each on a different thread (unicast and multicast), however these are harder to + // reproduce in a regression test. + participant_listener.destruction_promise.set_value(); + } + catch (std::future_error&) + { + // do nothing + } + + participant_listener.undiscovery_future.wait(); + DomainParticipantFactory::get_instance()->delete_participant(participant); + } + + // Reestablish previous intraprocess configuration + settings.intraprocess_delivery = prev_intraprocess_delivery; + fastrtps::xmlparser::XMLProfileManager::library_settings(settings); +}