From 58c1fe8209af8f48aaffeb6b6f24c205a151b73b Mon Sep 17 00:00:00 2001 From: Jaison Titus Date: Tue, 22 Oct 2019 18:32:02 -0700 Subject: [PATCH] Implemented rmw_get_publishers_info_by_topic and rmw_get_subscriptions_info_by_topic Signed-off-by: Jaison Titus --- .../custom_participant_info.hpp | 1 - .../rmw_fastrtps_shared_cpp/topic_cache.hpp | 3 +- .../src/rmw_get_topic_info.cpp | 196 +++++++++++++++++- .../test/test_topic_cache.cpp | 28 ++- 4 files changed, 213 insertions(+), 15 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index b66510e37..ca6b3d59a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -164,7 +164,6 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener { auto & topic_cache = is_reader ? reader_topic_cache : writer_topic_cache; - bool trigger; { std::lock_guard guard(topic_cache.getMutex()); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp index ff37eb6a7..a71a52958 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -50,7 +50,7 @@ class TopicCache * Map of topic names to TopicData. Where topic data is vector of tuples containing * participant GUID, topic type and the qos policy of the respective participant. * Topics here are represented as one to many, DDS XTypes 1.2 - * specifies application code 'generally' uses a 1-1 relationship. + * specifies application code that 'generally' uses a 1-1 relationship. * However, generic services such as logger and monitor, can discover * multiple types on the same topic. */ @@ -142,6 +142,7 @@ class TopicCache * @param rtpsParticipantKey * @param topic_name * @param type_name + * @param dds_qos the dds qos policy of the participant * @return true if a change has been recorded */ template diff --git a/rmw_fastrtps_shared_cpp/src/rmw_get_topic_info.cpp b/rmw_fastrtps_shared_cpp/src/rmw_get_topic_info.cpp index 851b3dc62..56916410d 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_get_topic_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_get_topic_info.cpp @@ -12,10 +12,188 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include +#include + +#include "rmw/rmw.h" +#include "rmw/types.h" +#include "rmw/topic_info_array.h" + +#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" namespace rmw_fastrtps_shared_cpp { + +rmw_ret_t +_validate_params( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + rmw_topic_info_array_t * participants_info) +{ + if (!identifier) { + RMW_SET_ERROR_MSG("null implementation identifier provided"); + return RMW_RET_ERROR; + } + + if (!topic_name) { + RMW_SET_ERROR_MSG("null topic_name provided"); + return RMW_RET_ERROR; + } + + if (!allocator) { + RMW_SET_ERROR_MSG("null allocator provided"); + return RMW_RET_ERROR; + } + + if (!node) { + RMW_SET_ERROR_MSG("null node handle"); + return RMW_RET_ERROR; + } + + // Get participant pointer from node + if (node->implementation_identifier != identifier) { + RMW_SET_ERROR_MSG("node handle not from this implementation"); + return RMW_RET_ERROR; + } + + if (!participants_info) { + RMW_SET_ERROR_MSG("null participants_info provided"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +std::vector +_get_topic_fqdns(const char * topic_name, bool no_mangle) +{ + std::vector topic_fqdns; + topic_fqdns.push_back(topic_name); + // if mangle + if (!no_mangle) { + auto ros_prefixes = _get_all_ros_prefixes(); + // Build the list of all possible topic FQDN + std::for_each(ros_prefixes.begin(), ros_prefixes.end(), + [&topic_fqdns, &topic_name](const std::string & prefix) { + topic_fqdns.push_back(prefix + topic_name); + }); + } + return topic_fqdns; +} + +rmw_ret_t +_set_rmw_topic_info( + std::tuple data, + ::ParticipantListener * slave_target, + rmw_topic_info_t * topic_info) +{ + const auto & gid = std::get<0>(data); + // convert gid to const char * and set it inside topic_info + std::ostringstream gid_stream; + gid_stream << gid; + rmw_ret_t ret = rmw_topic_info_set_gid(gid_stream.str().c_str(), topic_info); + if (ret != RMW_RET_OK) { + return ret; + } + // set topic type + ret = rmw_topic_info_set_topic_type(std::get<1>(data).c_str(), topic_info); + if (ret != RMW_RET_OK) { + return ret; + } + // set qos profile + ret = rmw_topic_info_set_qos_profile(&std::get<2>(data), topic_info); + if (ret != RMW_RET_OK) { + return ret; + } + // get node name + const auto & d_name_it = slave_target->discovered_names.find(gid); + if (d_name_it != slave_target->discovered_names.end()) { + ret = rmw_topic_info_set_node_name(d_name_it->second.c_str(), topic_info); + } else { + ret = rmw_topic_info_set_node_name("_NODE_NAME_UNKNOWN_", topic_info); + } + if (ret != RMW_RET_OK) { + return ret; + } + // get node namespace + const auto & d_namespace_it = slave_target->discovered_namespaces.find(gid); + if (d_namespace_it != slave_target->discovered_namespaces.end()) { + ret = rmw_topic_info_set_node_namespace(d_namespace_it->second.c_str(), topic_info); + } else { + ret = rmw_topic_info_set_node_namespace("_NODE_NAMESPACE_UNKNOWN_", topic_info); + } + return ret; +} + + +rmw_ret_t +_get_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + bool is_publisher, + rmw_topic_info_array_t * participants_info) +{ + rmw_ret_t ret = _validate_params( + identifier, + node, + allocator, + topic_name, + participants_info); + if (ret != RMW_RET_OK) { + return ret; + } + + const auto & topic_fqdns = _get_topic_fqdns(topic_name, no_mangle); + + auto impl = static_cast(node->data); + ::ParticipantListener * slave_target = impl->listener; + auto & topic_cache = + is_publisher ? slave_target->writer_topic_cache : slave_target->reader_topic_cache; + { + std::lock_guard guard(topic_cache.getMutex()); + auto & topic_name_to_data = topic_cache().getTopicNameToTopicData(); + std::vector topic_info_vector; + for (const auto & topic_name : topic_fqdns) { + const auto & it = topic_name_to_data.find(topic_name); + if (it != topic_name_to_data.end()) { + for (const auto & data : it->second) { + rmw_topic_info_t topic_info; + rmw_ret_t ret = _set_rmw_topic_info(data, slave_target, &topic_info); + if (ret != RMW_RET_OK) { + RMW_SET_ERROR_MSG("Failed to create set_rmw_topic_info."); + return ret; + } + // add rmw_topic_info_t to a vector + topic_info_vector.push_back(topic_info); + } + } + } + + // add all the elements from the vector to rmw_topic_info_array_t + auto count = topic_info_vector.size(); + ret = rmw_topic_info_array_init_with_size(allocator, count, participants_info); + if (ret != RMW_RET_OK) { + RMW_SET_ERROR_MSG("rmw_topic_info_array_init_with_size failed to allocate memory."); + return RMW_RET_BAD_ALLOC; + } + for (auto i = 0u; i < count; i++) { + participants_info->info_array[i] = topic_info_vector.at(i); + } + participants_info->count = count; + } + return RMW_RET_OK; +} + rmw_ret_t __rmw_get_publishers_info_by_topic( const char * identifier, @@ -25,7 +203,14 @@ __rmw_get_publishers_info_by_topic( bool no_mangle, rmw_topic_info_array_t * publishers_info) { - return RMW_RET_UNSUPPORTED; + return _get_info_by_topic( + identifier, + node, + allocator, + topic_name, + no_mangle, + true, /*is_publisher*/ + publishers_info); } rmw_ret_t @@ -37,6 +222,13 @@ __rmw_get_subscriptions_info_by_topic( bool no_mangle, rmw_topic_info_array_t * subscriptions_info) { - return RMW_RET_UNSUPPORTED; + return _get_info_by_topic( + identifier, + node, + allocator, + topic_name, + no_mangle, + false, /*is_publisher*/ + subscriptions_info); } } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp index c49bfcda8..74c54237c 100644 --- a/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp +++ b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp @@ -41,12 +41,17 @@ class TopicCacheTestFixture : public ::testing::Test GUID_t guid[2]; void SetUp() { - // Create an instance handlers + // Create instance handlers for (int i = 0; i < 2; i++) { guid[i] = GUID_t(GuidPrefix_t(), i + 1); instance_handler[i] = guid[i]; } + // Populating WriterQos -> which is from the DDS layer and + // rmw_qos_profile_t which is from rmw/types.h. + // This is done to test if topic_cache.getTopicNameToTopicData() returns + // the correct value in rmw_qos_profile_t for a given WriterQos + // DDS qos qos[0].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS; qos[0].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; @@ -85,7 +90,7 @@ class TopicCacheTestFixture : public ::testing::Test rmw_qos[1].lifespan.sec = 19u; rmw_qos[1].lifespan.nsec = 5432u; - // Add some topics + // Add data to topic_cache topic_cache.addTopic(instance_handler[0], "topic1", "type1", qos[0]); topic_cache.addTopic(instance_handler[0], "topic2", "type2", qos[0]); topic_cache.addTopic(instance_handler[1], "topic1", "type1", qos[1]); @@ -124,7 +129,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map) const auto & it = participant_topic_map.find(this->guid[0]); ASSERT_TRUE(it != participant_topic_map.end()); // Verify that the topic and respective types are present - auto & topic_type_map = it->second; + const auto & topic_type_map = it->second; const auto & topic1it = topic_type_map.find("topic1"); ASSERT_TRUE(topic1it != topic_type_map.end()); @@ -140,7 +145,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map) const auto & it2 = participant_topic_map.find(this->guid[1]); ASSERT_TRUE(it2 != participant_topic_map.end()); // Verify that the topic and respective types are present - auto & topic_type_map2 = it2->second; + const auto & topic_type_map2 = it2->second; const auto & topic1it2 = topic_type_map2.find("topic1"); ASSERT_TRUE(topic1it2 != topic_type_map2.end()); @@ -163,21 +168,21 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map) expected_results["topic2"].push_back(std::make_tuple(guid[0], "type2", rmw_qos[0])); expected_results["topic2"].push_back(std::make_tuple(guid[1], "type1", rmw_qos[1])); for (auto & result_it : expected_results) { - auto & topic_name = result_it.first; - auto & expected_topic_data = result_it.second; + const auto & topic_name = result_it.first; + const auto & expected_topic_data = result_it.second; const auto & it = topic_data_map.find(topic_name); ASSERT_TRUE(it != topic_data_map.end()); // Verify that the topic has all the associated data - auto & topic_data = it->second; + const auto & topic_data = it->second; for (auto i = 0u; i < expected_topic_data.size(); i++) { // GUID EXPECT_EQ(std::get<0>(topic_data.at(i)), std::get<0>(expected_topic_data.at(i))); // TYPE EXPECT_EQ(std::get<1>(topic_data.at(i)), std::get<1>(expected_topic_data.at(i))); // QOS - auto & qos = std::get<2>(topic_data.at(i)); - auto & expected_qos = std::get<2>(expected_topic_data.at(i)); + const auto & qos = std::get<2>(topic_data.at(i)); + const auto & expected_qos = std::get<2>(expected_topic_data.at(i)); EXPECT_EQ(qos.durability, expected_qos.durability); EXPECT_EQ(qos.reliability, expected_qos.reliability); EXPECT_EQ(qos.liveliness, expected_qos.liveliness); @@ -194,7 +199,8 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map) TEST_F(TopicCacheTestFixture, test_topic_cache_add_topic) { // Add Topic - bool did_add = this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType", + const bool did_add = this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", + "TestType", this->qos[1]); // Verify that the returned value was true EXPECT_TRUE(did_add); @@ -242,7 +248,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_remove_policy_element_does_not_ex // add topic this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType", this->qos[1]); // Assert that the return was false - auto const did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "NewTestTopic", + const auto did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "NewTestTopic", "TestType"); ASSERT_FALSE(did_remove); }