diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index dc7675b8162..a627329fe6d 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -89,6 +89,26 @@ static bool should_be_intraprocess_only( (ParticipantFilteringFlags::FILTER_DIFFERENT_HOST | ParticipantFilteringFlags::FILTER_DIFFERENT_PROCESS); } +static bool get_unique_flows_parameters( + const RTPSParticipantAttributes& part_att, + const EndpointAttributes& att, + bool& unique_flows, + uint16_t& initial_port, + uint16_t& final_port) +{ + const std::string* value = PropertyPolicyHelper::find_property(att.properties, "fastdds.unique_network_flows"); + + unique_flows = (nullptr != value); + if (unique_flows) + { + // TODO (Miguel C): parse value to get port range + final_port = part_att.port.portBase; + initial_port = part_att.port.portBase - 400; + } + + return true; +} + Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule( Locator_t& loc) { @@ -675,13 +695,6 @@ bool RTPSParticipantImpl::create_reader( return false; } - // Check for unique_network_flows feature - if (nullptr != PropertyPolicyHelper::find_property(param.endpoint.properties, "fastdds.unique_network_flows")) - { - logError(RTPS_PARTICIPANT, "Unique network flows not supported on readers"); - return false; - } - // Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid GUID_t former_persistence_guid = param.endpoint.persistence_guid; if (param.endpoint.persistence_guid == c_Guid_Unknown) @@ -702,6 +715,15 @@ bool RTPSParticipantImpl::create_reader( return false; } + // Check for unique_network_flows feature + bool request_unique_flows = false; + uint16_t initial_port = 0; + uint16_t final_port = 0; + if (!get_unique_flows_parameters(m_att, param.endpoint, request_unique_flows, initial_port, final_port)) + { + return false; + } + normalize_endpoint_locators(param.endpoint); RTPSReader* SReader = nullptr; @@ -750,7 +772,7 @@ bool RTPSParticipantImpl::create_reader( if (enable) { - if (!createAndAssociateReceiverswithEndpoint(SReader)) + if (!createAndAssociateReceiverswithEndpoint(SReader, request_unique_flows, initial_port, final_port)) { delete(SReader); return false; @@ -1091,7 +1113,10 @@ bool RTPSParticipantImpl::assignEndpointListenResources( } bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( - Endpoint* pend) + Endpoint* pend, + bool unique_flows, + uint16_t initial_unique_port, + uint16_t final_unique_port) { /* This function... - Asks the network factory for new resources @@ -1099,18 +1124,53 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( - Associated the endpoint to the new elements in the list - Launches the listener thread */ - // 1 - Ask the network factory to generate the elements that do still not exist - std::vector newItems; //Store the newly created elements - std::vector newItemsBuffer; //Store intermediate results - //Iterate through the list of unicast and multicast locators the endpoint has... unless its empty - //In that case, just use the standard - if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty()) - { - //Default unicast + + if (unique_flows) + { + pend->getAttributes().multicastLocatorList.clear(); pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList; + + uint16_t port = initial_unique_port; + while (port < final_unique_port) + { + // Set port on unicast locators + for (Locator_t& loc : pend->getAttributes().unicastLocatorList) + { + loc.port = port; + } + + // Try creating receiver resources + if (createReceiverResources(pend->getAttributes().unicastLocatorList, false, true)) + { + break; + } + + // Try with next port + ++port; + } + + // Fail when unique ports are exhausted + if (port >= final_unique_port) + { + logError(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: " + << initial_unique_port << "-" << final_unique_port); + return false; + } + } + else + { + // 1 - Ask the network factory to generate the elements that do still not exist + //Iterate through the list of unicast and multicast locators the endpoint has... unless its empty + //In that case, just use the standard + if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty()) + { + // Take default locators from the participant. + pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList; + pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList; + } + createReceiverResources(pend->getAttributes().unicastLocatorList, false, true); + createReceiverResources(pend->getAttributes().multicastLocatorList, false, true); } - createReceiverResources(pend->getAttributes().unicastLocatorList, false, true); - createReceiverResources(pend->getAttributes().multicastLocatorList, false, true); // Associate the Endpoint with ReceiverControlBlock assignEndpointListenResources(pend); @@ -1177,12 +1237,13 @@ bool RTPSParticipantImpl::createSendResources( return true; } -void RTPSParticipantImpl::createReceiverResources( +bool RTPSParticipantImpl::createReceiverResources( LocatorList_t& Locator_list, bool ApplyMutation, bool RegisterReceiver) { std::vector> newItemsBuffer; + bool ret_val = Locator_list.empty(); #if HAVE_SECURITY // An auxilary buffer is needed in the ReceiverResource to to decrypt the message, @@ -1207,6 +1268,8 @@ void RTPSParticipantImpl::createReceiverResources( } } + ret_val |= !newItemsBuffer.empty(); + for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer) { std::lock_guard lock(m_receiverResourcelistMutex); @@ -1223,6 +1286,8 @@ void RTPSParticipantImpl::createReceiverResources( } newItemsBuffer.clear(); } + + return ret_val; } void RTPSParticipantImpl::createSenderResources( diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index a86e3efbeab..ae49229465a 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -562,10 +562,16 @@ class RTPSParticipantImpl /** Create the new ReceiverResources needed for a new Locator, contains the calls to assignEndpointListenResources and consequently assignEndpoint2LocatorList - @param pend - Pointer to the endpoint which triggered the creation of the Receivers + @param pend - Pointer to the endpoint which triggered the creation of the Receivers. + @param unique_flows - Whether unique listening ports should be created for this endpoint. + @param initial_unique_port - First unique listening port to try. + @param final_unique_port - Unique listening port that will not be tried. */ bool createAndAssociateReceiverswithEndpoint( - Endpoint* pend); + Endpoint* pend, + bool unique_flows = false, + uint16_t initial_unique_port = 0, + uint16_t final_unique_port = 0); /** Create non-existent SendResources based on the Locator list of the entity @param pend - Pointer to the endpoint whose SenderResources are to be created @@ -859,7 +865,7 @@ class RTPSParticipantImpl * @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable * @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled. */ - void createReceiverResources( + bool createReceiverResources( LocatorList_t& Locator_list, bool ApplyMutation, bool RegisterReceiver); diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index 78b80e22e0f..65d10455274 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -46,6 +46,13 @@ template class PubSubParticipant { +public: + + typedef TypeSupport type_support; + typedef typename type_support::type type; + +private: + class PubListener : public eprosima::fastdds::dds::DataWriterListener { friend class PubSubParticipant; @@ -117,6 +124,18 @@ class PubSubParticipant } + void on_data_available( + eprosima::fastdds::dds::DataReader* reader) override + { + type data; + eprosima::fastdds::dds::SampleInfo info; + + while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(&data, &info)) + { + participant_->data_received(); + } + } + private: SubListener& operator =( @@ -127,9 +146,6 @@ class PubSubParticipant public: - typedef TypeSupport type_support; - typedef typename type_support::type type; - PubSubParticipant( unsigned int num_publishers, unsigned int num_subscribers, @@ -310,6 +326,18 @@ class PubSubParticipant return false; } + eprosima::fastdds::dds::DataWriter& get_native_writer( + unsigned int index) + { + return *(std::get<2>(publishers_[index])); + } + + eprosima::fastdds::dds::DataReader& get_native_reader( + unsigned int index) + { + return *(std::get<2>(subscribers_[index])); + } + bool send_sample( type& msg, unsigned int index = 0) @@ -467,6 +495,20 @@ class PubSubParticipant return *this; } + PubSubParticipant& pub_property_policy( + const eprosima::fastrtps::rtps::PropertyPolicy property_policy) + { + datawriter_qos_.properties() = property_policy; + return *this; + } + + PubSubParticipant& sub_property_policy( + const eprosima::fastrtps::rtps::PropertyPolicy property_policy) + { + datareader_qos_.properties() = property_policy; + return *this; + } + PubSubParticipant& pub_topic_name( std::string topicName) { @@ -585,6 +627,13 @@ class PubSubParticipant sub_liveliness_cv_.notify_one(); } + void data_received() + { + std::unique_lock lock(sub_data_mutex_); + sub_times_data_received_++; + sub_data_cv_.notify_one(); + } + unsigned int pub_times_liveliness_lost() { std::unique_lock lock(pub_liveliness_mutex_); @@ -691,6 +740,13 @@ class PubSubParticipant //! A condition variable for liveliness of publisher std::condition_variable pub_liveliness_cv_; + //! A mutex protecting received data + std::mutex sub_data_mutex_; + //! A condition variable for received data + std::condition_variable sub_data_cv_; + //! Number of times a subscriber received data + size_t sub_times_data_received_ = 0; + eprosima::fastdds::dds::TypeSupport type_; }; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp index dfc40b4e778..8b9af79c544 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp @@ -238,6 +238,18 @@ class PubSubParticipant return false; } + eprosima::fastrtps::Publisher& get_native_writer( + unsigned int index) + { + return *(publishers_[index]); + } + + eprosima::fastrtps::Subscriber& get_native_reader( + unsigned int index) + { + return *(subscribers_[index]); + } + bool send_sample( type& msg, unsigned int index = 0) @@ -395,6 +407,20 @@ class PubSubParticipant return *this; } + PubSubParticipant& pub_property_policy( + const eprosima::fastrtps::rtps::PropertyPolicy property_policy) + { + publisher_attr_.properties = property_policy; + return *this; + } + + PubSubParticipant& sub_property_policy( + const eprosima::fastrtps::rtps::PropertyPolicy property_policy) + { + subscriber_attr_.properties = property_policy; + return *this; + } + PubSubParticipant& pub_topic_name( std::string topicName) { diff --git a/test/blackbox/common/BlackboxTestsNetworkConf.cpp b/test/blackbox/common/BlackboxTestsNetworkConf.cpp index 4f54b31b335..f079b23ccce 100644 --- a/test/blackbox/common/BlackboxTestsNetworkConf.cpp +++ b/test/blackbox/common/BlackboxTestsNetworkConf.cpp @@ -15,6 +15,7 @@ #include "BlackboxTests.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" +#include "PubSubParticipant.hpp" #include #include @@ -60,15 +61,46 @@ TEST(Blackbox, pub_unique_network_flows) TEST(Blackbox, sub_unique_network_flows) { - PubSubReader reader(TEST_TOPIC_NAME); + // Two readers on the same participant requesting unique flows should give different listening locators + { + PubSubParticipant participant(0, 2, 0, 0); - PropertyPolicy properties; - properties.properties().emplace_back("fastdds.unique_network_flows", ""); + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); - reader.entity_property_policy(properties).init(); + EXPECT_FALSE(locators == locators2); + } + + // Two readers on the same participant not requesting unique flows should give the same listening locators + { + PubSubParticipant participant(0, 2, 0, 0); + + participant.sub_topic_name(TEST_TOPIC_NAME); - // Creation should fail as feature is not implemented for readers - EXPECT_FALSE(reader.isInitialized()); + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_TRUE(locators == locators2); + } } //Verify that outLocatorList is used to select the desired output channel diff --git a/test/blackbox/common/BlackboxTestsPubSubBasic.cpp b/test/blackbox/common/BlackboxTestsPubSubBasic.cpp index f19bc1e3fcb..742830e47e2 100644 --- a/test/blackbox/common/BlackboxTestsPubSubBasic.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubBasic.cpp @@ -14,6 +14,7 @@ #include "BlackboxTests.hpp" +#include "PubSubParticipant.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" #include "ReqRepAsReliableHelloWorldRequester.hpp" @@ -620,6 +621,36 @@ TEST_P(PubSubBasic, ReceivedPropertiesDataExceedsSizeLimit) ASSERT_FALSE(reader.is_matched()); } +TEST_P(PubSubBasic, unique_flows_one_writer_two_readers) +{ + PubSubParticipant readers(0, 2, 0, 2); + PubSubWriter writer(TEST_TOPIC_NAME); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + readers.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties).reliability(RELIABLE_RELIABILITY_QOS); + + ASSERT_TRUE(readers.init_participant()); + ASSERT_TRUE(readers.init_subscriber(0)); + ASSERT_TRUE(readers.init_subscriber(1)); + + writer.history_depth(100).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + readers.sub_wait_discovery(); + + // Send data + auto data = default_helloworld_data_generator(); + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block until readers have acknowledged all samples. + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); +} #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)