From 9bdc763b507b0f93a316d95a9ca7e4d9ca38e273 Mon Sep 17 00:00:00 2001 From: Juan Carlos <40226503+JuanCarlos-eProsima@users.noreply.github.com> Date: Wed, 5 Dec 2018 22:06:11 +0100 Subject: [PATCH 1/2] RMW_FastRTPS configuration from XML only (#243) * Refs #3555 Apply the changes to the current master on ROS2 * Refs #3555 rename the environment variable from "RMW_LEAVE_MIDDLEWARE_DEFAULT_QOS" to "RMW_FASTRTPS_USE_QOS_FROM_XML" * Refs #3555 Fix lint issues. * Refs #3555 Fix lint issues. * Refs #3555 Add an explanation to the attribute to explain it --- rmw_fastrtps_cpp/src/rmw_client.cpp | 16 +++++-- rmw_fastrtps_cpp/src/rmw_publisher.cpp | 9 ++-- rmw_fastrtps_cpp/src/rmw_service.cpp | 16 +++++-- rmw_fastrtps_cpp/src/rmw_subscription.cpp | 7 ++- rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp | 16 +++++-- .../src/rmw_publisher.cpp | 9 ++-- rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp | 16 +++++-- .../src/rmw_subscription.cpp | 7 ++- .../custom_participant_info.hpp | 7 +++ rmw_fastrtps_shared_cpp/src/rmw_node.cpp | 48 +++++++++++++++++-- 10 files changed, 117 insertions(+), 34 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 171a72cfb..409a91f1c 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -120,10 +120,13 @@ rmw_create_client( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = response_type_name; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { @@ -131,11 +134,14 @@ rmw_create_client( } subscriberParam.topic.topicName += "Reply"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = request_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; } else { diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 5907590fd..520afdf6a 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -113,9 +113,12 @@ rmw_create_publisher( _register_type(participant, info->type_support_); } - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index af81304e4..80a8739ff 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -132,9 +132,12 @@ rmw_create_service( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; subscriberParam.topic.topicDataType = request_type_name; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; @@ -143,11 +146,14 @@ rmw_create_service( } subscriberParam.topic.topicName += "Request"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = response_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 998b859f8..27ad9e374 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -116,8 +116,11 @@ rmw_create_subscription( _register_type(participant, info->type_support_); } - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index da3c6cda0..7adbf62f0 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -125,10 +125,13 @@ rmw_create_client( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = response_type_name; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { @@ -136,11 +139,14 @@ rmw_create_client( } subscriberParam.topic.topicName += "Reply"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = request_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; } else { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index ae44cfe03..f279dcbfc 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -108,9 +108,12 @@ rmw_create_publisher( _register_type(participant, info->type_support_); } - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 2de1d7645..42495c7d0 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -137,9 +137,12 @@ rmw_create_service( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; subscriberParam.topic.topicDataType = request_type_name; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; @@ -148,11 +151,14 @@ rmw_create_service( } subscriberParam.topic.topicName += "Request"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = response_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp index a421424fe..a5c9e7e52 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp @@ -112,8 +112,11 @@ rmw_create_subscription( _register_type(participant, info->type_support_); } - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index 4e086435d..163eb9484 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -37,6 +37,13 @@ typedef struct CustomParticipantInfo ReaderInfo * secondarySubListener; WriterInfo * secondaryPubListener; rmw_guard_condition_t * graph_guard_condition; + + // Flag to establish if the QoS of the participant, + // its publishers and its subscribers are going + // to be configured only from an XML file or if + // their settings are going to be overwritten by code + // with the default configuration. + bool leave_middleware_default_qos; } CustomParticipantInfo; class ParticipantListener : public eprosima::fastrtps::ParticipantListener diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp index c106f5db6..e9dc8b207 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp @@ -104,6 +104,25 @@ create_node( try { node_impl = new CustomParticipantInfo(); + + node_impl->leave_middleware_default_qos = false; + const char * env_var = "RMW_FASTRTPS_USE_QOS_FROM_XML"; + // Check if the configuration from XML has been enabled from + // the RMW_FASTRTPS_USE_QOS_FROM_XML env variable. + char * config_env_val = nullptr; +#ifndef _WIN32 + config_env_val = getenv(env_var); + if (config_env_val != nullptr) { + node_impl->leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } +#else + size_t config_env_val_size; + _dupenv_s(&config_env_val, &config_env_val_size, env_var); + if (config_env_val != nullptr) { + node_impl->leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } + free(config_env_val); +#endif } catch (std::bad_alloc &) { RMW_SET_ERROR_MSG("failed to allocate node impl struct"); goto fail; @@ -246,11 +265,32 @@ __rmw_create_node( // since the participant name is not part of the DDS spec participantAttrs.rtps.setName(name); + bool leave_middleware_default_qos = false; + const char * env_var = "RMW_FASTRTPS_USE_QOS_FROM_XML"; + // Check if the configuration from XML has been enabled from + // the RMW_FASTRTPS_USE_QOS_FROM_XML env variable. + char * config_env_val = nullptr; +#ifndef _WIN32 + config_env_val = getenv(env_var); + if (config_env_val != nullptr) { + leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } +#else + size_t config_env_val_size; + _dupenv_s(&config_env_val, &config_env_val_size, env_var); + if (config_env_val != nullptr) { + leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } + free(config_env_val); +#endif + // allow reallocation to support discovery messages bigger than 5000 bytes - participantAttrs.rtps.builtin.readerHistoryMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - participantAttrs.rtps.builtin.writerHistoryMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!leave_middleware_default_qos) { + participantAttrs.rtps.builtin.readerHistoryMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + participantAttrs.rtps.builtin.writerHistoryMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } size_t length = strlen(name) + strlen("name=;") + strlen(namespace_) + strlen("namespace=;") + 1; From 0c39c9ed072252933231814be40962b8436a3bb3 Mon Sep 17 00:00:00 2001 From: MiguelCompany Date: Thu, 6 Dec 2018 00:03:44 +0100 Subject: [PATCH 2/2] Fastrtps 1.7.0 (#233) * Changes to adapt to new TopicDataType interface. * Adapting to eProsima/Fast-RTPS#325 * Topic info kept inside participant listener. * Adapting to new listener interfaces. * match style of file * match style of file * match style of file * Fixed clang warnings. --- rmw_fastrtps_cpp/CMakeLists.txt | 4 + rmw_fastrtps_dynamic_cpp/CMakeLists.txt | 4 + rmw_fastrtps_shared_cpp/CMakeLists.txt | 4 + .../rmw_fastrtps_shared_cpp/TypeSupport.hpp | 20 +++- .../custom_participant_info.hpp | 97 ++++++++++++--- rmw_fastrtps_shared_cpp/src/reader_info.hpp | 111 ------------------ rmw_fastrtps_shared_cpp/src/rmw_count.cpp | 14 +-- rmw_fastrtps_shared_cpp/src/rmw_node.cpp | 65 ++-------- .../src/rmw_service_names_and_types.cpp | 56 ++++----- .../src/rmw_topic_names_and_types.cpp | 50 ++++---- rmw_fastrtps_shared_cpp/src/writer_info.hpp | 110 ----------------- 11 files changed, 166 insertions(+), 369 deletions(-) delete mode 100644 rmw_fastrtps_shared_cpp/src/reader_info.hpp delete mode 100644 rmw_fastrtps_shared_cpp/src/writer_info.hpp diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 788e57d3e..3a2c4fabb 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index 3122a5aca..a24f1d894 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index df358c6e4..56d261079 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index 212e6120c..85c50dc90 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -47,19 +47,29 @@ class TypeSupport : public eprosima::fastrtps::TopicDataType virtual bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message) = 0; RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload); + bool getKey( + void * data, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5 = false) override + { + (void)data; (void)ihandle; (void)force_md5; + return false; + } + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool deserialize(eprosima::fastrtps::rtps::SerializedPayload_t * payload, void * data); + bool deserialize(eprosima::fastrtps::rtps::SerializedPayload_t * payload, void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - std::function getSerializedSizeProvider(void * data); + std::function getSerializedSizeProvider(void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void * createData(); + void * createData() override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void deleteData(void * data); + void deleteData(void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index 163eb9484..6a74de4a7 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -16,6 +16,7 @@ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_ #include +#include #include #include @@ -23,9 +24,13 @@ #include "fastrtps/participant/Participant.h" #include "fastrtps/participant/ParticipantListener.h" +#include "rcutils/logging_macros.h" + #include "rmw/impl/cpp/key_value.hpp" #include "rmw/rmw.h" +#include "rmw_common.hpp" + class ParticipantListener; class ReaderInfo; class WriterInfo; @@ -34,8 +39,6 @@ typedef struct CustomParticipantInfo { eprosima::fastrtps::Participant * participant; ::ParticipantListener * listener; - ReaderInfo * secondarySubListener; - WriterInfo * secondaryPubListener; rmw_guard_condition_t * graph_guard_condition; // Flag to establish if the QoS of the participant, @@ -49,22 +52,26 @@ typedef struct CustomParticipantInfo class ParticipantListener : public eprosima::fastrtps::ParticipantListener { public: + explicit ParticipantListener(rmw_guard_condition_t * graph_guard_condition) + : graph_guard_condition_(graph_guard_condition) + {} + void onParticipantDiscovery( eprosima::fastrtps::Participant *, - eprosima::fastrtps::ParticipantDiscoveryInfo info) override + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info) override { if ( - info.rtps.m_status != eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT && - info.rtps.m_status != eprosima::fastrtps::rtps::REMOVED_RTPSPARTICIPANT && - info.rtps.m_status != eprosima::fastrtps::rtps::DROPPED_RTPSPARTICIPANT) + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT && + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT && + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) { return; } - if (eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT == info.rtps.m_status) { + if (eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT == info.status) { // ignore already known GUIDs - if (discovered_names.find(info.rtps.m_guid) == discovered_names.end()) { - auto map = rmw::impl::cpp::parse_key_value(info.rtps.m_userData); + if (discovered_names.find(info.info.m_guid) == discovered_names.end()) { + auto map = rmw::impl::cpp::parse_key_value(info.info.m_userData); auto name_found = map.find("name"); auto ns_found = map.find("namespace"); @@ -80,24 +87,24 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener if (name.empty()) { // use participant name if no name was found in the user data - name = info.rtps.m_RTPSParticipantName; + name = info.info.m_participantName; } // ignore discovered participants without a name if (!name.empty()) { - discovered_names[info.rtps.m_guid] = name; - discovered_namespaces[info.rtps.m_guid] = namespace_; + discovered_names[info.info.m_guid] = name; + discovered_namespaces[info.info.m_guid] = namespace_; } } } else { { - auto it = discovered_names.find(info.rtps.m_guid); + auto it = discovered_names.find(info.info.m_guid); // only consider known GUIDs if (it != discovered_names.end()) { discovered_names.erase(it); } } { - auto it = discovered_namespaces.find(info.rtps.m_guid); + auto it = discovered_namespaces.find(info.info.m_guid); // only consider known GUIDs if (it != discovered_namespaces.end()) { discovered_namespaces.erase(it); @@ -126,9 +133,71 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener return namespaces; } + void onSubscriberDiscovery( + eprosima::fastrtps::Participant *, + eprosima::fastrtps::rtps::ReaderDiscoveryInfo && info) override + { + if (eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER != info.status) { + bool is_alive = + eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER == info.status; + process_discovery_info(info.info, is_alive, true); + } + } + + void onPublisherDiscovery( + eprosima::fastrtps::Participant *, + eprosima::fastrtps::rtps::WriterDiscoveryInfo && info) override + { + if (eprosima::fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER != info.status) { + bool is_alive = + eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER == info.status; + process_discovery_info(info.info, is_alive, false); + } + } + + template + void process_discovery_info(T & proxyData, bool is_alive, bool is_reader) + { + std::map> & topicNtypes = + is_reader ? reader_topic_and_types : writer_topic_and_types; + + auto fqdn = proxyData.topicName(); + bool trigger = false; + mapmutex.lock(); + if (is_alive) { + topicNtypes[fqdn].push_back(proxyData.typeName()); + trigger = true; + } else { + auto it = topicNtypes.find(fqdn); + if (it != topicNtypes.end()) { + const auto & loc = + std::find(std::begin(it->second), std::end(it->second), proxyData.typeName()); + if (loc != std::end(it->second)) { + topicNtypes[fqdn].erase(loc, loc + 1); + trigger = true; + } else { + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_shared_cpp", + "unexpected removal of subscription on topic '%s' with type '%s'", + fqdn.c_str(), proxyData.typeName().c_str()); + } + } + } + mapmutex.unlock(); + + if (trigger) { + rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition( + graph_guard_condition_->implementation_identifier, + graph_guard_condition_); + } + } std::map discovered_names; std::map discovered_namespaces; + std::map> reader_topic_and_types; + std::map> writer_topic_and_types; + std::mutex mapmutex; + rmw_guard_condition_t * graph_guard_condition_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/reader_info.hpp b/rmw_fastrtps_shared_cpp/src/reader_info.hpp deleted file mode 100644 index 0bfaebc58..000000000 --- a/rmw_fastrtps_shared_cpp/src/reader_info.hpp +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef READER_INFO_HPP_ -#define READER_INFO_HPP_ - -#include -#include -#include -#include -#include -#include -#include - -#include "rcutils/logging_macros.h" - -#include "rmw/error_handling.h" -#include "rmw/rmw.h" - -#include "fastrtps/participant/Participant.h" -#include "fastrtps/rtps/builtin/data/ReaderProxyData.h" -#include "fastrtps/rtps/reader/ReaderListener.h" -#include "fastrtps/rtps/reader/RTPSReader.h" - -#include "types/guard_condition.hpp" - -class ReaderInfo : public eprosima::fastrtps::rtps::ReaderListener -{ -public: - ReaderInfo( - eprosima::fastrtps::Participant * participant, - rmw_guard_condition_t * graph_guard_condition) - : participant_(participant), - graph_guard_condition_(static_cast(graph_guard_condition->data)) - {} - - void - onNewCacheChangeAdded( - eprosima::fastrtps::rtps::RTPSReader *, - const eprosima::fastrtps::rtps::CacheChange_t * const change) - { - eprosima::fastrtps::rtps::ReaderProxyData proxyData; - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - eprosima::fastrtps::rtps::CDRMessage_t tempMsg(0); - tempMsg.wraps = true; - if (PL_CDR_BE == change->serializedPayload.encapsulation) { - tempMsg.msg_endian = eprosima::fastrtps::rtps::BIGEND; - } else { - tempMsg.msg_endian = eprosima::fastrtps::rtps::LITTLEEND; - } - tempMsg.length = change->serializedPayload.length; - tempMsg.max_size = change->serializedPayload.max_size; - tempMsg.buffer = change->serializedPayload.data; - if (!proxyData.readFromCDRMessage(&tempMsg)) { - return; - } - } else { - eprosima::fastrtps::rtps::GUID_t readerGuid; - iHandle2GUID(readerGuid, change->instanceHandle); - if (!participant_->get_remote_reader_info(readerGuid, proxyData)) { - return; - } - } - - auto fqdn = proxyData.topicName(); - - bool trigger = false; - mapmutex.lock(); - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - topicNtypes[fqdn].push_back(proxyData.typeName()); - trigger = true; - } else { - auto it = topicNtypes.find(fqdn); - if (it != topicNtypes.end()) { - const auto & loc = - std::find(std::begin(it->second), std::end(it->second), proxyData.typeName()); - if (loc != std::end(it->second)) { - topicNtypes[fqdn].erase(loc, loc + 1); - trigger = true; - } else { - RCUTILS_LOG_DEBUG_NAMED( - "rmw_fastrtps_shared_cpp", - "unexpected removal of subscription on topic '%s' with type '%s'", - fqdn.c_str(), proxyData.typeName().c_str()); - } - } - } - mapmutex.unlock(); - - if (trigger) { - graph_guard_condition_->trigger(); - } - } - std::map> topicNtypes; - std::mutex mapmutex; - eprosima::fastrtps::Participant * participant_; - GuardCondition * graph_guard_condition_; -}; - -#endif // READER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp index 626d37b78..69b52ffd2 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp @@ -26,8 +26,6 @@ #include "namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" namespace rmw_fastrtps_shared_cpp { @@ -64,13 +62,13 @@ __rmw_count_publishers( } auto impl = static_cast(node->data); - WriterInfo * slave_target = impl->secondaryPubListener; + ::ParticipantListener * slave_target = impl->listener; slave_target->mapmutex.lock(); // Search and sum up the publisher counts for (const auto & topic_fqdn : topic_fqdns) { - const auto & it = slave_target->topicNtypes.find(topic_fqdn); - if (it != slave_target->topicNtypes.end()) { + const auto & it = slave_target->writer_topic_and_types.find(topic_fqdn); + if (it != slave_target->writer_topic_and_types.end()) { *count += it->second.size(); } } @@ -117,13 +115,13 @@ __rmw_count_subscribers( } CustomParticipantInfo * impl = static_cast(node->data); - ReaderInfo * slave_target = impl->secondarySubListener; + ::ParticipantListener * slave_target = impl->listener; slave_target->mapmutex.lock(); // Search and sum up the subscriber counts for (const auto & topic_fqdn : topic_fqdns) { - const auto & it = slave_target->topicNtypes.find(topic_fqdn); - if (it != slave_target->topicNtypes.end()) { + const auto & it = slave_target->reader_topic_and_types.find(topic_fqdn); + if (it != slave_target->reader_topic_and_types.end()) { *count += it->second.size(); } } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp index e9dc8b207..653bf1e4c 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp @@ -46,9 +46,6 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" - using Domain = eprosima::fastrtps::Domain; using Participant = eprosima::fastrtps::Participant; using ParticipantAttributes = eprosima::fastrtps::ParticipantAttributes; @@ -79,12 +76,15 @@ create_node( rmw_guard_condition_t * graph_guard_condition = nullptr; CustomParticipantInfo * node_impl = nullptr; rmw_node_t * node_handle = nullptr; - ReaderInfo * tnat_1 = nullptr; - WriterInfo * tnat_2 = nullptr; - std::pair edp_readers; + + graph_guard_condition = __rmw_create_guard_condition(identifier); + if (!graph_guard_condition) { + // error already set + goto fail; + } try { - listener = new ::ParticipantListener(); + listener = new ::ParticipantListener(graph_guard_condition); } catch (std::bad_alloc &) { RMW_SET_ERROR_MSG("failed to allocate participant listener"); goto fail; @@ -96,12 +96,6 @@ create_node( return nullptr; } - graph_guard_condition = __rmw_create_guard_condition(identifier); - if (!graph_guard_condition) { - // error already set - goto fail; - } - try { node_impl = new CustomParticipantInfo(); @@ -156,32 +150,8 @@ create_node( } memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); - tnat_1 = new ReaderInfo(participant, graph_guard_condition); - tnat_2 = new WriterInfo(participant, graph_guard_condition); - - node_impl->secondarySubListener = tnat_1; - node_impl->secondaryPubListener = tnat_2; - - edp_readers = participant->getEDPReaders(); - if (!edp_readers.first) { - RMW_SET_ERROR_MSG("edp_readers.first is null"); - goto fail; - } - - if (!edp_readers.second) { - RMW_SET_ERROR_MSG("edp_readers.second is null"); - goto fail; - } - - if (!(edp_readers.first->setListener(tnat_1) & edp_readers.second->setListener(tnat_2))) { - RMW_SET_ERROR_MSG("Failed to attach ROS related logic to the Participant"); - goto fail; - } - return node_handle; fail: - delete tnat_2; - delete tnat_1; if (node_handle) { rmw_free(const_cast(node_handle->namespace_)); node_handle->namespace_ = nullptr; @@ -373,36 +343,19 @@ __rmw_destroy_node( Participant * participant = impl->participant; // Begin deleting things in the same order they were created in __rmw_create_node(). - std::pair edp_readers = participant->getEDPReaders(); - if (!edp_readers.first || !edp_readers.second) { - RMW_SET_ERROR_MSG("failed to get EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - - if (edp_readers.first && !edp_readers.first->setListener(nullptr)) { - RMW_SET_ERROR_MSG("failed to unset EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - delete impl->secondarySubListener; - if (edp_readers.second && !edp_readers.second->setListener(nullptr)) { - RMW_SET_ERROR_MSG("failed to unset EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - delete impl->secondaryPubListener; - rmw_free(const_cast(node->name)); node->name = nullptr; rmw_free(const_cast(node->namespace_)); node->namespace_ = nullptr; rmw_node_free(node); + Domain::removeParticipant(participant); + if (RMW_RET_OK != __rmw_destroy_guard_condition(impl->graph_guard_condition)) { RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } - Domain::removeParticipant(participant); - delete impl->listener; impl->listener = nullptr; delete impl; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp index 0ac7c7b5c..6a9d1b88b 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "rcutils/strdup.h" @@ -28,8 +29,6 @@ #include "demangle.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" namespace rmw_fastrtps_shared_cpp { @@ -65,42 +64,29 @@ __rmw_get_service_names_and_types( // Get info from publisher and subscriber // Combined results from the two lists std::map> services; - { - ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); - for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { - // not a service - continue; - } - for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + + // Setup processing function, will be used with two maps + auto map_process = [&services](const std::map> & map) { + for (auto it : map) { + std::string service_name = _demangle_service_from_topic(it.first); + if (!service_name.length()) { + // not a service + continue; } - } - } - slave_target->mapmutex.unlock(); - } - { - WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); - for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { - // not a service - continue; - } - for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + for (auto & itt : it.second) { + std::string service_type = _demangle_service_type_only(itt); + if (service_type.length()) { + services[service_name].insert(service_type); + } } } - } - slave_target->mapmutex.unlock(); - } + }; + + ::ParticipantListener * slave_target = impl->listener; + slave_target->mapmutex.lock(); + map_process(slave_target->reader_topic_and_types); + map_process(slave_target->writer_topic_and_types); + slave_target->mapmutex.unlock(); // Fill out service_names_and_types if (services.size()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp index 4023938b8..4c1f3cb78 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "rcutils/allocator.h" #include "rcutils/error_handling.h" @@ -35,9 +36,6 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" - namespace rmw_fastrtps_shared_cpp { rmw_ret_t @@ -74,34 +72,26 @@ __rmw_get_topic_names_and_types( // Get info from publisher and subscriber // Combined results from the two lists std::map> topics; - { - ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); - for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { - // if we are demangling and this is not prefixed with rt/, skip it - continue; - } - for (auto & itt : it.second) { - topics[it.first].insert(itt); - } - } - slave_target->mapmutex.unlock(); - } - { - WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); - for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { - // if we are demangling and this is not prefixed with rt/, skip it - continue; - } - for (auto & itt : it.second) { - topics[it.first].insert(itt); + + // Setup processing function, will be used with two maps + auto map_process = + [&topics, no_demangle](const std::map> & map) { + for (auto it : map) { + if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + // if we are demangling and this is not prefixed with rt/, skip it + continue; + } + for (auto & itt : it.second) { + topics[it.first].insert(itt); + } } - } - slave_target->mapmutex.unlock(); - } + }; + + ::ParticipantListener * slave_target = impl->listener; + slave_target->mapmutex.lock(); + map_process(slave_target->reader_topic_and_types); + map_process(slave_target->writer_topic_and_types); + slave_target->mapmutex.unlock(); // Copy data to results handle if (topics.size() > 0) { diff --git a/rmw_fastrtps_shared_cpp/src/writer_info.hpp b/rmw_fastrtps_shared_cpp/src/writer_info.hpp deleted file mode 100644 index 1fb894cb1..000000000 --- a/rmw_fastrtps_shared_cpp/src/writer_info.hpp +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef WRITER_INFO_HPP_ -#define WRITER_INFO_HPP_ - -#include -#include -#include -#include -#include -#include -#include - -#include "fastrtps/participant/Participant.h" -#include "fastrtps/rtps/builtin/data/WriterProxyData.h" -#include "fastrtps/rtps/reader/ReaderListener.h" -#include "fastrtps/rtps/reader/RTPSReader.h" - -#include "rcutils/logging_macros.h" - -#include "rmw/rmw.h" - -#include "types/guard_condition.hpp" - -class WriterInfo : public eprosima::fastrtps::rtps::ReaderListener -{ -public: - WriterInfo( - eprosima::fastrtps::Participant * participant, - rmw_guard_condition_t * graph_guard_condition) - : participant_(participant), - graph_guard_condition_(static_cast(graph_guard_condition->data)) - {} - - void - onNewCacheChangeAdded( - eprosima::fastrtps::rtps::RTPSReader *, - const eprosima::fastrtps::rtps::CacheChange_t * const change) - { - eprosima::fastrtps::rtps::WriterProxyData proxyData; - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - eprosima::fastrtps::rtps::CDRMessage_t tempMsg(0); - tempMsg.wraps = true; - if (PL_CDR_BE == change->serializedPayload.encapsulation) { - tempMsg.msg_endian = eprosima::fastrtps::rtps::BIGEND; - } else { - tempMsg.msg_endian = eprosima::fastrtps::rtps::LITTLEEND; - } - tempMsg.length = change->serializedPayload.length; - tempMsg.max_size = change->serializedPayload.max_size; - tempMsg.buffer = change->serializedPayload.data; - if (!proxyData.readFromCDRMessage(&tempMsg)) { - return; - } - } else { - eprosima::fastrtps::rtps::GUID_t writerGuid; - iHandle2GUID(writerGuid, change->instanceHandle); - if (!participant_->get_remote_writer_info(writerGuid, proxyData)) { - return; - } - } - - auto fqdn = proxyData.topicName(); - - bool trigger = false; - mapmutex.lock(); - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - topicNtypes[fqdn].push_back(proxyData.typeName()); - trigger = true; - } else { - auto it = topicNtypes.find(fqdn); - if (it != topicNtypes.end()) { - const auto & loc = - std::find(std::begin(it->second), std::end(it->second), proxyData.typeName()); - if (loc != std::end(it->second)) { - topicNtypes[fqdn].erase(loc, loc + 1); - trigger = true; - } else { - RCUTILS_LOG_DEBUG_NAMED( - "rmw_fastrtps_shared_cpp", - "unexpected removal of subscription on topic '%s' with type '%s'", - fqdn.c_str(), proxyData.typeName().c_str()); - } - } - } - mapmutex.unlock(); - - if (trigger) { - graph_guard_condition_->trigger(); - } - } - std::map> topicNtypes; - std::mutex mapmutex; - eprosima::fastrtps::Participant * participant_; - GuardCondition * graph_guard_condition_; -}; - -#endif // WRITER_INFO_HPP_