diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 3a2c4fabb..792a9a32c 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -65,6 +65,7 @@ add_library(rmw_fastrtps_cpp src/rmw_guard_condition.cpp src/rmw_init.cpp src/rmw_node.cpp + src/rmw_node_info_and_types.cpp src/rmw_node_names.cpp src/rmw_publish.cpp src/rmw_publisher.cpp diff --git a/rmw_fastrtps_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_cpp/src/rmw_node_info_and_types.cpp new file mode 100644 index 000000000..7e388ade8 --- /dev/null +++ b/rmw_fastrtps_cpp/src/rmw_node_info_and_types.cpp @@ -0,0 +1,68 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/allocators.h" +#include "rmw/convert_rcutils_ret_to_rmw_ret.h" +#include "rmw/error_handling.h" +#include "rmw/get_node_info_and_types.h" +#include "rmw/names_and_types.h" +#include "rmw/rmw.h" + +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +#include "rmw_fastrtps_cpp/identifier.hpp" + +extern "C" +{ +rmw_ret_t +rmw_get_subscriber_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_subscriber_names_and_types_by_node( + eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace, no_demangle, + topic_names_and_types); +} + +rmw_ret_t +rmw_get_publisher_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_publisher_names_and_types_by_node( + eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace, no_demangle, + topic_names_and_types); +} + +rmw_ret_t +rmw_get_service_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + rmw_names_and_types_t * service_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_service_names_and_types_by_node( + eprosima_fastrtps_identifier, node, allocator, node_name, node_namespace, + service_names_and_types); +} +} // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index a24f1d894..f2261b6ed 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -66,6 +66,7 @@ add_library(rmw_fastrtps_dynamic_cpp src/rmw_guard_condition.cpp src/rmw_init.cpp src/rmw_node.cpp + src/rmw_node_info_and_types.cpp src/rmw_node_names.cpp src/rmw_publish.cpp src/rmw_publisher.cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_node_info_and_types.cpp new file mode 100644 index 000000000..2131cc93d --- /dev/null +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_node_info_and_types.cpp @@ -0,0 +1,83 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/allocators.h" +#include "rmw/convert_rcutils_ret_to_rmw_ret.h" +#include "rmw/error_handling.h" +#include "rmw/get_node_info_and_types.h" +#include "rmw/names_and_types.h" +#include "rmw/rmw.h" + +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +#include "rmw_fastrtps_dynamic_cpp/identifier.hpp" + +// The extern "C" here enforces that overloading is not used. +extern "C" +{ +rmw_ret_t +rmw_get_subscriber_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_subscriber_names_and_types_by_node( + eprosima_fastrtps_identifier, + node, + allocator, + node_name, + node_namespace, + no_demangle, + topic_names_and_types); +} + +rmw_ret_t +rmw_get_publisher_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_publisher_names_and_types_by_node( + eprosima_fastrtps_identifier, + node, + allocator, + node_name, + node_namespace, + no_demangle, + topic_names_and_types); +} + +rmw_ret_t +rmw_get_service_names_and_types_by_node( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + rmw_names_and_types_t * service_names_and_types) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_service_names_and_types_by_node( + eprosima_fastrtps_identifier, + node, + allocator, + node_name, + node_namespace, + service_names_and_types); +} +} // extern "C" diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index 56d261079..078c38c55 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -52,6 +52,7 @@ add_library(rmw_fastrtps_shared_cpp src/rmw_guard_condition.cpp src/rmw_logging.cpp src/rmw_node.cpp + src/rmw_node_info_and_types.cpp src/rmw_node_names.cpp src/rmw_publish.cpp src/rmw_publisher.cpp @@ -68,8 +69,10 @@ add_library(rmw_fastrtps_shared_cpp src/rmw_wait_set.cpp src/TypeSupport_impl.cpp ) + target_link_libraries(rmw_fastrtps_shared_cpp - fastcdr fastrtps) + fastcdr fastrtps +) # specific order: dependents before dependencies ament_target_dependencies(rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index 69ee5a5b1..fe22ffe68 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -31,9 +31,9 @@ #include "rmw_common.hpp" +#include "topic_cache.hpp" + class ParticipantListener; -class ReaderInfo; -class WriterInfo; typedef struct CustomParticipantInfo { @@ -158,33 +158,21 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener template void process_discovery_info(T & proxyData, bool is_alive, bool is_reader) { - std::map> & topicNtypes = - is_reader ? reader_topic_and_types : writer_topic_and_types; + auto & topic_cache = + is_reader ? reader_topic_cache : writer_topic_cache; auto fqdn = proxyData.topicName(); - bool trigger = false; - mapmutex.lock(); - if (is_alive) { - topicNtypes[fqdn].push_back(proxyData.typeName()); - trigger = true; - } else { - auto it = topicNtypes.find(fqdn); - if (it != topicNtypes.end()) { - const auto & loc = - std::find(std::begin(it->second), std::end(it->second), proxyData.typeName()); - if (loc != std::end(it->second)) { - topicNtypes[fqdn].erase(loc, loc + 1); - trigger = true; - } else { - RCUTILS_LOG_DEBUG_NAMED( - "rmw_fastrtps_shared_cpp", - "unexpected removal of subscription on topic '%s' with type '%s'", - fqdn.c_str(), proxyData.typeName().c_str()); - } + bool trigger; + { + std::lock_guard guard(topic_cache.getMutex()); + if (is_alive) { + trigger = topic_cache.addTopic(proxyData.RTPSParticipantKey(), + proxyData.topicName(), proxyData.typeName()); + } else { + trigger = topic_cache.removeTopic(proxyData.RTPSParticipantKey(), + proxyData.topicName(), proxyData.typeName()); } } - mapmutex.unlock(); - if (trigger) { rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition( graph_guard_condition_->implementation_identifier, @@ -194,9 +182,8 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener std::map discovered_names; std::map discovered_namespaces; - std::map> reader_topic_and_types; - std::map> writer_topic_and_types; - std::mutex mapmutex; + LockedObject reader_topic_cache; + LockedObject writer_topic_cache; rmw_guard_condition_t * graph_guard_condition_; }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index a4d968cb3..22a45d2a0 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -184,6 +184,38 @@ __rmw_get_service_names_and_types( rcutils_allocator_t * allocator, rmw_names_and_types_t * service_names_and_types); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_get_publisher_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_get_service_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + rmw_names_and_types_t * service_names_and_types); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_get_subscriber_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_service_server_is_available( diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp new file mode 100644 index 000000000..31ddcebea --- /dev/null +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -0,0 +1,232 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ +#define RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "fastrtps/participant/Participant.h" +#include "fastrtps/rtps/common/Guid.h" +#include "fastrtps/rtps/common/InstanceHandle.h" +#include "rcutils/logging_macros.h" + +typedef eprosima::fastrtps::rtps::GUID_t GUID_t; + +/** + * Topic cache data structure. Manages relationships between participants and topics. + */ +class TopicCache +{ +private: + typedef std::map>> ParticipantTopicMap; + typedef std::unordered_map> TopicToTypes; + + /** + * Map of topic names to a vector of types that topic may use. + * Topics here are represented as one to many, DDS XTypes 1.2 + * specifies application code 'generally' uses a 1-1 relationship. + * However, generic services such as logger and monitor, can discover + * multiple types on the same topic. + * + */ + TopicToTypes topic_to_types_; + + /** + * Map of participant GUIDS to a set of topic-type. + */ + ParticipantTopicMap participant_to_topics_; + + /** + * Helper function to initialize a topic vector. + * + * @param topic_name + */ + void initializeTopic(const std::string & topic_name, TopicToTypes & topic_to_types) + { + if (topic_to_types.find(topic_name) == topic_to_types.end()) { + topic_to_types[topic_name] = std::vector(); + } + } + + /** + * Helper function to initialize the set inside a participant map. + * + * @param map + * @param guid + */ + void initializeParticipantMap( + ParticipantTopicMap & map, + GUID_t guid) + { + if (map.find(guid) == map.end()) { + map[guid] = TopicToTypes(); + } + } + +public: + /** + * @return a map of topic name to the vector of topic types used. + */ + const TopicToTypes & getTopicToTypes() const + { + return topic_to_types_; + } + + /** + * @return a map of participant guid to the vector of topic names used. + */ + const ParticipantTopicMap & getParticipantToTopics() const + { + return participant_to_topics_; + } + + /** + * Add a topic based on discovery. + * + * @param rtpsParticipantKey + * @param topic_name + * @param type_name + * @return true if a change has been recorded + */ + bool addTopic( + const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey, + const std::string & topic_name, + const std::string & type_name) + { + initializeTopic(topic_name, topic_to_types_); + auto guid = iHandle2GUID(rtpsParticipantKey); + initializeParticipantMap(participant_to_topics_, guid); + initializeTopic(topic_name, participant_to_topics_[guid]); + if (rcutils_logging_logger_is_enabled_for("rmw_fastrtps_shared_cpp", + RCUTILS_LOG_SEVERITY_DEBUG)) + { + std::stringstream guid_stream; + guid_stream << guid; + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_shared_cpp", + "Adding topic '%s' with type '%s' for node '%s'", + topic_name.c_str(), type_name.c_str(), guid_stream.str().c_str()); + } + topic_to_types_[topic_name].push_back(type_name); + participant_to_topics_[guid][topic_name].push_back(type_name); + return true; + } + + /** + * Remove a topic based on discovery. + * + * @param rtpsParticipantKey + * @param topic_name + * @param type_name + * @return true if a change has been recorded + */ + bool removeTopic( + const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey, + const std::string & topic_name, + const std::string & type_name) + { + if (topic_to_types_.find(topic_name) == topic_to_types_.end()) { + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_shared_cpp", + "unexpected removal on topic '%s' with type '%s'", + topic_name.c_str(), type_name.c_str()); + return false; + } + { + auto & type_vec = topic_to_types_[topic_name]; + type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name)); + if (type_vec.size() == 0) { + topic_to_types_.erase(topic_name); + } + } + + auto guid = iHandle2GUID(rtpsParticipantKey); + auto guid_topics_pair = participant_to_topics_.find(guid); + if (guid_topics_pair != participant_to_topics_.end() && + guid_topics_pair->second.find(topic_name) != guid_topics_pair->second.end()) + { + auto & type_vec = guid_topics_pair->second[topic_name]; + type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name)); + if (type_vec.size() == 0) { + participant_to_topics_[guid].erase(topic_name); + } + if (participant_to_topics_[guid].size() == 0) { + participant_to_topics_.erase(guid); + } + } else { + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_shared_cpp", + "Unable to remove topic, does not exist '%s' with type '%s'", + topic_name.c_str(), type_name.c_str()); + } + return true; + } +}; + +inline std::ostream & operator<<( + std::ostream & ostream, + const TopicCache & topic_cache) +{ + std::stringstream map_ss; + map_ss << "Participant Info: " << std::endl; + for (auto & elem : topic_cache.getParticipantToTopics()) { + std::ostringstream stream; + stream << " Topics: " << std::endl; + for (auto & types : elem.second) { + stream << " " << types.first << ": "; + std::copy(types.second.begin(), types.second.end(), + std::ostream_iterator(stream, ",")); + stream << std::endl; + } + map_ss << elem.first << std::endl << stream.str(); + } + std::stringstream topics_ss; + topics_ss << "Cumulative TopicToTypes: " << std::endl; + for (auto & elem : topic_cache.getTopicToTypes()) { + std::ostringstream stream; + std::copy(elem.second.begin(), elem.second.end(), std::ostream_iterator(stream, + ",")); + topics_ss << " " << elem.first << " : " << stream.str() << std::endl; + } + ostream << map_ss.str() << topics_ss.str(); + return ostream; +} + +template +class LockedObject : public T +{ +private: + mutable std::mutex cache_mutex_; + +public: + /** + * @return a reference to this object to lock. + */ + std::mutex & getMutex() const + { + return cache_mutex_; + } +}; + +#endif // RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp index 69b52ffd2..eaab76e63 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include "rcutils/logging_macros.h" @@ -29,6 +31,7 @@ namespace rmw_fastrtps_shared_cpp { + rmw_ret_t __rmw_count_publishers( const char * identifier, @@ -48,7 +51,7 @@ __rmw_count_publishers( return RMW_RET_ERROR; } - *count = 0; + auto ros_prefixes = _get_all_ros_prefixes(); // Build the list of all possible topic FQDN @@ -62,17 +65,19 @@ __rmw_count_publishers( } auto impl = static_cast(node->data); + *count = 0; ::ParticipantListener * slave_target = impl->listener; - - slave_target->mapmutex.lock(); - // Search and sum up the publisher counts - for (const auto & topic_fqdn : topic_fqdns) { - const auto & it = slave_target->writer_topic_and_types.find(topic_fqdn); - if (it != slave_target->writer_topic_and_types.end()) { - *count += it->second.size(); + { + std::lock_guard guard(slave_target->writer_topic_cache.getMutex()); + // Search and sum up the publisher counts + auto & topic_types = slave_target->writer_topic_cache.getTopicToTypes(); + for (const auto & topic_fqdn : topic_fqdns) { + const auto & it = topic_types.find(topic_fqdn); + if (it != topic_types.end()) { + *count += it->second.size(); + } } } - slave_target->mapmutex.unlock(); RCUTILS_LOG_DEBUG_NAMED( "rmw_fastrtps_shared_cpp", @@ -101,7 +106,7 @@ __rmw_count_subscribers( return RMW_RET_ERROR; } - *count = 0; + auto ros_prefixes = _get_all_ros_prefixes(); // Build the list of all possible topic FQDN @@ -115,17 +120,19 @@ __rmw_count_subscribers( } CustomParticipantInfo * impl = static_cast(node->data); + *count = 0; ::ParticipantListener * slave_target = impl->listener; - - slave_target->mapmutex.lock(); - // Search and sum up the subscriber counts - for (const auto & topic_fqdn : topic_fqdns) { - const auto & it = slave_target->reader_topic_and_types.find(topic_fqdn); - if (it != slave_target->reader_topic_and_types.end()) { - *count += it->second.size(); + { + std::lock_guard guard(slave_target->reader_topic_cache.getMutex()); + // Search and sum up the subscriber counts + auto & topic_types = slave_target->reader_topic_cache.getTopicToTypes(); + for (const auto & topic_fqdn : topic_fqdns) { + const auto & it = topic_types.find(topic_fqdn); + if (it != topic_types.end()) { + *count += it->second.size(); + } } } - slave_target->mapmutex.unlock(); RCUTILS_LOG_DEBUG_NAMED( "rmw_fastrtps_shared_cpp", diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp new file mode 100644 index 000000000..df07b4ed8 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp @@ -0,0 +1,487 @@ +// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/error_handling.h" +#include "rcutils/logging_macros.h" +#include "rcutils/strdup.h" +#include "rcutils/types.h" + +#include "rmw/allocators.h" +#include "rmw/convert_rcutils_ret_to_rmw_ret.h" +#include "rmw/error_handling.h" +#include "rmw/get_topic_names_and_types.h" +#include "rmw/impl/cpp/macros.hpp" +#include "rmw/names_and_types.h" +#include "rmw/rmw.h" + +#include "demangle.hpp" +#include "namespace_prefix.hpp" +#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + +namespace rmw_fastrtps_shared_cpp +{ + +constexpr char kLoggerTag[] = "rmw_fastrtps_shared_cpp"; + +/** + * Get the guid that corresponds to the node and namespace. + * + * @param node to discover other participants with + * @param node_name of the desired node + * @param node_namespace of the desired node + * @param guid [out] result + * @return RMW_RET_ERROR if unable to find guid + * @return RMW_RET_OK if guid is available + */ +rmw_ret_t __get_guid_by_name( + const rmw_node_t * node, const char * node_name, + const char * node_namespace, GUID_t & guid) +{ + auto impl = static_cast(node->data); + if (strcmp(node->name, node_name) == 0) { + guid = impl->participant->getGuid(); + } else { + std::set nodes_in_desired_namespace; + auto namespaces = impl->listener->discovered_namespaces; + for (auto & guid_to_namespace : impl->listener->discovered_namespaces) { + if (strcmp(guid_to_namespace.second.c_str(), node_namespace) == 0) { + nodes_in_desired_namespace.insert(guid_to_namespace.first); + } + } + + auto guid_node_pair = std::find_if(impl->listener->discovered_names.begin(), + impl->listener->discovered_names.end(), + [node_name, &nodes_in_desired_namespace](const std::pair & pair) { + return strcmp(pair.second.c_str(), node_name) == 0 && + nodes_in_desired_namespace.find(pair.first) != nodes_in_desired_namespace.end(); + }); + + if (guid_node_pair == impl->listener->discovered_names.end()) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerTag, + "Unable to find GUID for node: %s", node_name); + RMW_SET_ERROR_MSG("Unable to find GUID for node "); + return RMW_RET_ERROR; + } + guid = guid_node_pair->first; + } + return RMW_RET_OK; +} + +/** + * Validate the input data of node_info_and_types functions. + * + * @return RMW_RET_INVALID_ARGUMENT for null input args + * @return RMW_RET_ERROR if identifier is not the same as the input node + * @return RMW_RET_OK if all input is valid + */ +rmw_ret_t __validate_input( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + rmw_names_and_types_t * topic_names_and_types) +{ + if (!allocator) { + RMW_SET_ERROR_MSG("allocator is null"); + return RMW_RET_INVALID_ARGUMENT; + } + if (!node) { + RMW_SET_ERROR_MSG("null node handle"); + return RMW_RET_INVALID_ARGUMENT; + } + + if (!node_name) { + RMW_SET_ERROR_MSG("null node name"); + return RMW_RET_INVALID_ARGUMENT; + } + + if (!node_namespace) { + RMW_SET_ERROR_MSG("null node namespace"); + return RMW_RET_INVALID_ARGUMENT; + } + + rmw_ret_t ret = rmw_names_and_types_check_zero(topic_names_and_types); + if (ret != RMW_RET_OK) { + return ret; + } + + // Get participant pointer from node + if (node->implementation_identifier != identifier) { + RMW_SET_ERROR_MSG("node handle not from this implementation"); + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +/** + * Access the slave Listeners, which are the ones that have the topicnamesandtypes member + * Get info from publisher and subscriber + * Combined results from the two lists + * + * @param topic_cache cache with topic information + * @param topics [out] resulting topics + * @param node_guid_ to find information for + * @param no_demangle true if demangling will not occur + */ +void +__accumulate_topics( + const LockedObject & topic_cache, + std::map> & topics, + const GUID_t & node_guid_, + bool no_demangle) +{ + std::lock_guard guard(topic_cache.getMutex()); + const auto & node_topics = topic_cache.getParticipantToTopics().find(node_guid_); + if (node_topics == topic_cache.getParticipantToTopics().end()) { + RCUTILS_LOG_DEBUG_NAMED( + kLoggerTag, + "No topics found for node"); + return; + } + for (auto & topic_pair : node_topics->second) { + if (!no_demangle && _get_ros_prefix_if_exists(topic_pair.first) != ros_topic_prefix) { + // if we are demangling and this is not prefixed with rt/, skip it + continue; + } + RCUTILS_LOG_DEBUG_NAMED( + kLoggerTag, + "accumulate_topics: Found topic %s", + topic_pair.first.c_str()); + + topics[topic_pair.first].insert(topic_pair.second.begin(), + topic_pair.second.end()); + } +} + +/** + * Copy topic data to results + * + * @param topics to copy over + * @param allocator to use + * @param no_demangle true if demangling will not occur + * @param topic_names_and_types [out] final rmw result + * @return RMW_RET_OK if successful + */ +rmw_ret_t +__copy_data_to_results( + const std::map> & topics, + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + // Copy data to results handle + if (topics.size() > 0) { + // Setup string array to store names + rmw_ret_t rmw_ret = rmw_names_and_types_init(topic_names_and_types, topics.size(), allocator); + if (rmw_ret != RMW_RET_OK) { + return rmw_ret; + } + // Setup cleanup function, in case of failure below + auto fail_cleanup = [&topic_names_and_types]() { + rmw_ret_t rmw_ret = rmw_names_and_types_fini(topic_names_and_types); + if (rmw_ret != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerTag, + "error during report of error: %s", rmw_get_error_string()); + } + }; + // Setup demangling functions based on no_demangle option + auto demangle_topic = _demangle_if_ros_topic; + auto demangle_type = _demangle_if_ros_type; + if (no_demangle) { + auto noop = [](const std::string & in) { + return in; + }; + demangle_topic = noop; + demangle_type = noop; + } + // For each topic, store the name, initialize the string array for types, and store all types + size_t index = 0; + for (const auto & topic_n_types : topics) { + // Duplicate and store the topic_name + char * topic_name = rcutils_strdup(demangle_topic(topic_n_types.first).c_str(), *allocator); + if (!topic_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for topic name"); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->names.data[index] = topic_name; + // Setup storage for types + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[index], + topic_n_types.second.size(), + allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + fail_cleanup(); + return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + } + } + // Duplicate and store each type for the topic + size_t type_index = 0; + for (const auto & type : topic_n_types.second) { + char * type_name = rcutils_strdup(demangle_type(type).c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for type name"); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types[index].data[type_index] = type_name; + ++type_index; + } // for each type + ++index; + } // for each topic + } + return RMW_RET_OK; +} + +void +__log_debug_information(const CustomParticipantInfo & impl) +{ + if (rcutils_logging_logger_is_enabled_for(kLoggerTag, RCUTILS_LOG_SEVERITY_DEBUG)) { + { + auto & topic_cache = impl.listener->writer_topic_cache; + std::lock_guard guard(topic_cache.getMutex()); + std::stringstream map_ss; + map_ss << topic_cache; + RCUTILS_LOG_DEBUG_NAMED( + kLoggerTag, + "Publisher Topic cache is: %s", map_ss.str().c_str()); + } + { + auto & topic_cache = impl.listener->reader_topic_cache; + std::lock_guard guard(topic_cache.getMutex()); + std::stringstream map_ss; + map_ss << topic_cache; + RCUTILS_LOG_DEBUG_NAMED( + kLoggerTag, + "Subscriber Topic cache is: %s", map_ss.str().c_str()); + } + { + std::stringstream ss; + for (auto & node_pair : impl.listener->discovered_names) { + ss << node_pair.first << " : " << node_pair.second << " "; + } + RCUTILS_LOG_DEBUG_NAMED(kLoggerTag, "Discovered names: %s", ss.str().c_str()); + } + { + std::stringstream ss; + for (auto & node_pair : impl.listener->discovered_namespaces) { + ss << node_pair.first << " : " << node_pair.second << " "; + } + RCUTILS_LOG_DEBUG_NAMED(kLoggerTag, "Discovered namespaces: %s", ss.str().c_str()); + } + } +} + +/** + * Function to abstract which topic_cache to use when gathering information. + */ +typedef std::function&(CustomParticipantInfo & participant_info)> + RetrieveCache; + +/** + * Get topic names and types for the specific node_name and node_namespace requested. + * + * @param identifier corresponding to the input node + * @param node to use for discovery + * @param allocator for returned value + * @param node_name to search + * @param node_namespace to search + * @param no_demangle true if the topics should not be demangled + * @param retrieve_cache_func getter for topic cache + * @param topic_names_and_types result + * @return RMW_RET_OK if successful + */ +rmw_ret_t +__rmw_get_topic_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + RetrieveCache & retrieve_cache_func, + rmw_names_and_types_t * topic_names_and_types) +{ + rmw_ret_t valid_input = __validate_input(identifier, node, allocator, node_name, + node_namespace, topic_names_and_types); + if (valid_input != RMW_RET_OK) { + return valid_input; + } + RCUTILS_LOG_DEBUG_NAMED(kLoggerTag, "rmw_get_subscriber_names_and_types_by_node"); + auto impl = static_cast(node->data); + + __log_debug_information(*impl); + + GUID_t guid; + rmw_ret_t valid_guid = __get_guid_by_name(node, node_name, node_namespace, guid); + if (valid_guid != RMW_RET_OK) { + return valid_guid; + } + std::map> topics; + __accumulate_topics(retrieve_cache_func(*impl), topics, guid, no_demangle); + return __copy_data_to_results(topics, allocator, no_demangle, topic_names_and_types); +} + +rmw_ret_t +__rmw_get_subscriber_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + RetrieveCache retrieve_sub_cache = + [](CustomParticipantInfo & participant_info) -> const LockedObject & { + return participant_info.listener->reader_topic_cache; + }; + return __rmw_get_topic_names_and_types_by_node(identifier, node, allocator, node_name, + node_namespace, no_demangle, retrieve_sub_cache, topic_names_and_types); +} + +rmw_ret_t +__rmw_get_publisher_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) +{ + RetrieveCache retrieve_pub_cache = + [](CustomParticipantInfo & participant_info) -> const LockedObject & { + return participant_info.listener->writer_topic_cache; + }; + return __rmw_get_topic_names_and_types_by_node(identifier, node, allocator, node_name, + node_namespace, no_demangle, retrieve_pub_cache, topic_names_and_types); +} + +rmw_ret_t +__rmw_get_service_names_and_types_by_node( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + rmw_names_and_types_t * service_names_and_types) +{ + rmw_ret_t valid_input = __validate_input(identifier, node, allocator, node_name, + node_namespace, service_names_and_types); + if (valid_input != RMW_RET_OK) { + return valid_input; + } + auto impl = static_cast(node->data); + __log_debug_information(*impl); + + GUID_t guid; + rmw_ret_t valid_guid = __get_guid_by_name(node, node_name, node_namespace, guid); + if (valid_guid != RMW_RET_OK) { + return valid_guid; + } + + std::map> services; + { + auto & topic_cache = impl->listener->reader_topic_cache; + std::lock_guard guard(topic_cache.getMutex()); + const auto & node_topics = topic_cache.getParticipantToTopics().find(guid); + if (node_topics != topic_cache.getParticipantToTopics().end()) { + for (auto & topic_pair : node_topics->second) { + std::string service_name = _demangle_service_from_topic(topic_pair.first); + if (!service_name.length()) { + // not a service + continue; + } + for (auto & itt : topic_pair.second) { + std::string service_type = _demangle_service_type_only(itt); + if (service_type.length()) { + services[service_name].insert(service_type); + } + } + } + } + } + if (services.empty()) { + return RMW_RET_OK; + } + // Setup string array to store names + rmw_ret_t rmw_ret = + rmw_names_and_types_init(service_names_and_types, services.size(), allocator); + if (rmw_ret != RMW_RET_OK) { + return rmw_ret; + } + // Setup cleanup function, in case of failure below + auto fail_cleanup = [&service_names_and_types]() { + rmw_ret_t rmw_ret = rmw_names_and_types_fini(service_names_and_types); + if (rmw_ret != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + kLoggerTag, + "error during report of error: %s", rmw_get_error_string()); + } + }; + // For each service, store the name, initialize the string array for types, and store all types + size_t index = 0; + for (const auto & service_n_types : services) { + // Duplicate and store the service_name + char * service_name = rcutils_strdup(service_n_types.first.c_str(), *allocator); + if (!service_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for service name"); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + service_names_and_types->names.data[index] = service_name; + // Setup storage for types + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &service_names_and_types->types[index], + service_n_types.second.size(), + allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG(rcutils_get_error_string().str); + fail_cleanup(); + return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + } + // Duplicate and store each type for the service + size_t type_index = 0; + for (const auto & type : service_n_types.second) { + char * type_name = rcutils_strdup(type.c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG("failed to allocate memory for type name"); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + service_names_and_types->types[index].data[type_index] = type_name; + ++type_index; + } // for each type + ++index; + } // for each service + return RMW_RET_OK; +} +} // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp index 6a9d1b88b..20dae9c92 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp @@ -30,6 +30,8 @@ #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + namespace rmw_fastrtps_shared_cpp { rmw_ret_t @@ -66,8 +68,9 @@ __rmw_get_service_names_and_types( std::map> services; // Setup processing function, will be used with two maps - auto map_process = [&services](const std::map> & map) { - for (auto it : map) { + auto map_process = [&services](const LockedObject & topic_cache) { + std::lock_guard guard(topic_cache.getMutex()); + for (auto it : topic_cache.getTopicToTypes()) { std::string service_name = _demangle_service_from_topic(it.first); if (!service_name.length()) { // not a service @@ -83,10 +86,8 @@ __rmw_get_service_names_and_types( }; ::ParticipantListener * slave_target = impl->listener; - slave_target->mapmutex.lock(); - map_process(slave_target->reader_topic_and_types); - map_process(slave_target->writer_topic_and_types); - slave_target->mapmutex.unlock(); + map_process(slave_target->reader_topic_cache); + map_process(slave_target->writer_topic_cache); // Fill out service_names_and_types if (services.size()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp index 4c1f3cb78..88c235253 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp @@ -15,6 +15,8 @@ #include #include #include + +#include #include #include "rcutils/allocator.h" @@ -36,6 +38,8 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + namespace rmw_fastrtps_shared_cpp { rmw_ret_t @@ -75,8 +79,9 @@ __rmw_get_topic_names_and_types( // Setup processing function, will be used with two maps auto map_process = - [&topics, no_demangle](const std::map> & map) { - for (auto it : map) { + [&topics, no_demangle](const LockedObject & topic_cache) { + std::lock_guard guard(topic_cache.getMutex()); + for (auto it : topic_cache.getTopicToTypes()) { if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; @@ -88,10 +93,8 @@ __rmw_get_topic_names_and_types( }; ::ParticipantListener * slave_target = impl->listener; - slave_target->mapmutex.lock(); - map_process(slave_target->reader_topic_and_types); - map_process(slave_target->writer_topic_and_types); - slave_target->mapmutex.unlock(); + map_process(slave_target->reader_topic_cache); + map_process(slave_target->writer_topic_cache); // Copy data to results handle if (topics.size() > 0) {