diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 1b06b2f14..792a9a32c 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 171a72cfb..409a91f1c 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -120,10 +120,13 @@ rmw_create_client( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = response_type_name; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { @@ -131,11 +134,14 @@ rmw_create_client( } subscriberParam.topic.topicName += "Reply"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = request_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; } else { diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 5907590fd..520afdf6a 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -113,9 +113,12 @@ rmw_create_publisher( _register_type(participant, info->type_support_); } - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index af81304e4..80a8739ff 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -132,9 +132,12 @@ rmw_create_service( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; subscriberParam.topic.topicDataType = request_type_name; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; @@ -143,11 +146,14 @@ rmw_create_service( } subscriberParam.topic.topicName += "Request"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = response_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 998b859f8..27ad9e374 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -116,8 +116,11 @@ rmw_create_subscription( _register_type(participant, info->type_support_); } - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index bb461f589..f2261b6ed 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index da3c6cda0..7adbf62f0 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -125,10 +125,13 @@ rmw_create_client( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = response_type_name; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { @@ -136,11 +139,14 @@ rmw_create_client( } subscriberParam.topic.topicName += "Reply"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = request_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; } else { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index ae44cfe03..f279dcbfc 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -108,9 +108,12 @@ rmw_create_publisher( _register_type(participant, info->type_support_); } - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 2de1d7645..42495c7d0 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -137,9 +137,12 @@ rmw_create_service( _register_type(participant, info->response_type_support_); } + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; subscriberParam.topic.topicDataType = request_type_name; if (!qos_policies->avoid_ros_namespace_conventions) { subscriberParam.topic.topicName = std::string(ros_service_requester_prefix) + service_name; @@ -148,11 +151,14 @@ rmw_create_service( } subscriberParam.topic.topicName += "Request"; + if (!impl->leave_middleware_default_qos) { + publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + publisherParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; publisherParam.topic.topicDataType = response_type_name; - publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; if (!qos_policies->avoid_ros_namespace_conventions) { publisherParam.topic.topicName = std::string(ros_service_response_prefix) + service_name; } else { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp index a421424fe..a5c9e7e52 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp @@ -112,8 +112,11 @@ rmw_create_subscription( _register_type(participant, info->type_support_); } - subscriberParam.historyMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!impl->leave_middleware_default_qos) { + subscriberParam.historyMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } + subscriberParam.topic.topicKind = eprosima::fastrtps::rtps::NO_KEY; subscriberParam.topic.topicDataType = type_name; if (!qos_policies->avoid_ros_namespace_conventions) { diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index 02375bbef..078c38c55 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(SECURITY) + find_package(OpenSSL REQUIRED) +endif() + find_package(ament_cmake_ros REQUIRED) find_package(rcutils REQUIRED) @@ -63,10 +67,12 @@ add_library(rmw_fastrtps_shared_cpp src/rmw_trigger_guard_condition.cpp src/rmw_wait.cpp src/rmw_wait_set.cpp - src/TypeSupport_impl.cpp) + src/TypeSupport_impl.cpp +) target_link_libraries(rmw_fastrtps_shared_cpp - fastcdr fastrtps) + fastcdr fastrtps +) # specific order: dependents before dependencies ament_target_dependencies(rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index 212e6120c..85c50dc90 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -47,19 +47,29 @@ class TypeSupport : public eprosima::fastrtps::TopicDataType virtual bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message) = 0; RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload); + bool getKey( + void * data, + eprosima::fastrtps::rtps::InstanceHandle_t * ihandle, + bool force_md5 = false) override + { + (void)data; (void)ihandle; (void)force_md5; + return false; + } + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - bool deserialize(eprosima::fastrtps::rtps::SerializedPayload_t * payload, void * data); + bool deserialize(eprosima::fastrtps::rtps::SerializedPayload_t * payload, void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - std::function getSerializedSizeProvider(void * data); + std::function getSerializedSizeProvider(void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void * createData(); + void * createData() override; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void deleteData(void * data); + void deleteData(void * data) override; RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index 4e086435d..fe22ffe68 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -16,6 +16,7 @@ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_ #include +#include #include #include @@ -23,41 +24,54 @@ #include "fastrtps/participant/Participant.h" #include "fastrtps/participant/ParticipantListener.h" +#include "rcutils/logging_macros.h" + #include "rmw/impl/cpp/key_value.hpp" #include "rmw/rmw.h" +#include "rmw_common.hpp" + +#include "topic_cache.hpp" + class ParticipantListener; -class ReaderInfo; -class WriterInfo; typedef struct CustomParticipantInfo { eprosima::fastrtps::Participant * participant; ::ParticipantListener * listener; - ReaderInfo * secondarySubListener; - WriterInfo * secondaryPubListener; rmw_guard_condition_t * graph_guard_condition; + + // Flag to establish if the QoS of the participant, + // its publishers and its subscribers are going + // to be configured only from an XML file or if + // their settings are going to be overwritten by code + // with the default configuration. + bool leave_middleware_default_qos; } CustomParticipantInfo; class ParticipantListener : public eprosima::fastrtps::ParticipantListener { public: + explicit ParticipantListener(rmw_guard_condition_t * graph_guard_condition) + : graph_guard_condition_(graph_guard_condition) + {} + void onParticipantDiscovery( eprosima::fastrtps::Participant *, - eprosima::fastrtps::ParticipantDiscoveryInfo info) override + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info) override { if ( - info.rtps.m_status != eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT && - info.rtps.m_status != eprosima::fastrtps::rtps::REMOVED_RTPSPARTICIPANT && - info.rtps.m_status != eprosima::fastrtps::rtps::DROPPED_RTPSPARTICIPANT) + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT && + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT && + info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) { return; } - if (eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT == info.rtps.m_status) { + if (eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT == info.status) { // ignore already known GUIDs - if (discovered_names.find(info.rtps.m_guid) == discovered_names.end()) { - auto map = rmw::impl::cpp::parse_key_value(info.rtps.m_userData); + if (discovered_names.find(info.info.m_guid) == discovered_names.end()) { + auto map = rmw::impl::cpp::parse_key_value(info.info.m_userData); auto name_found = map.find("name"); auto ns_found = map.find("namespace"); @@ -73,24 +87,24 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener if (name.empty()) { // use participant name if no name was found in the user data - name = info.rtps.m_RTPSParticipantName; + name = info.info.m_participantName; } // ignore discovered participants without a name if (!name.empty()) { - discovered_names[info.rtps.m_guid] = name; - discovered_namespaces[info.rtps.m_guid] = namespace_; + discovered_names[info.info.m_guid] = name; + discovered_namespaces[info.info.m_guid] = namespace_; } } } else { { - auto it = discovered_names.find(info.rtps.m_guid); + auto it = discovered_names.find(info.info.m_guid); // only consider known GUIDs if (it != discovered_names.end()) { discovered_names.erase(it); } } { - auto it = discovered_namespaces.find(info.rtps.m_guid); + auto it = discovered_namespaces.find(info.info.m_guid); // only consider known GUIDs if (it != discovered_namespaces.end()) { discovered_namespaces.erase(it); @@ -119,9 +133,58 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener return namespaces; } + void onSubscriberDiscovery( + eprosima::fastrtps::Participant *, + eprosima::fastrtps::rtps::ReaderDiscoveryInfo && info) override + { + if (eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER != info.status) { + bool is_alive = + eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER == info.status; + process_discovery_info(info.info, is_alive, true); + } + } + + void onPublisherDiscovery( + eprosima::fastrtps::Participant *, + eprosima::fastrtps::rtps::WriterDiscoveryInfo && info) override + { + if (eprosima::fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER != info.status) { + bool is_alive = + eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER == info.status; + process_discovery_info(info.info, is_alive, false); + } + } + + template + void process_discovery_info(T & proxyData, bool is_alive, bool is_reader) + { + auto & topic_cache = + is_reader ? reader_topic_cache : writer_topic_cache; + + auto fqdn = proxyData.topicName(); + bool trigger; + { + std::lock_guard guard(topic_cache.getMutex()); + if (is_alive) { + trigger = topic_cache.addTopic(proxyData.RTPSParticipantKey(), + proxyData.topicName(), proxyData.typeName()); + } else { + trigger = topic_cache.removeTopic(proxyData.RTPSParticipantKey(), + proxyData.topicName(), proxyData.typeName()); + } + } + if (trigger) { + rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition( + graph_guard_condition_->implementation_identifier, + graph_guard_condition_); + } + } std::map discovered_names; std::map discovered_namespaces; + LockedObject reader_topic_cache; + LockedObject writer_topic_cache; + rmw_guard_condition_t * graph_guard_condition_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/types/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp similarity index 97% rename from rmw_fastrtps_shared_cpp/src/types/topic_cache.hpp rename to rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp index a62c7ff08..31ddcebea 100644 --- a/rmw_fastrtps_shared_cpp/src/types/topic_cache.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef TYPES__TOPIC_CACHE_HPP_ -#define TYPES__TOPIC_CACHE_HPP_ +#ifndef RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ +#define RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ #include #include @@ -32,10 +32,6 @@ typedef eprosima::fastrtps::rtps::GUID_t GUID_t; -class TopicCache; -std::ostream & operator<<( - std::ostream & ostream, - const TopicCache & topic_cache); /** * Topic cache data structure. Manages relationships between participants and topics. */ @@ -233,4 +229,4 @@ class LockedObject : public T } }; -#endif // TYPES__TOPIC_CACHE_HPP_ +#endif // RMW_FASTRTPS_SHARED_CPP__TOPIC_CACHE_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/reader_info.hpp b/rmw_fastrtps_shared_cpp/src/reader_info.hpp deleted file mode 100644 index c01e47f2a..000000000 --- a/rmw_fastrtps_shared_cpp/src/reader_info.hpp +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef READER_INFO_HPP_ -#define READER_INFO_HPP_ - -#include -#include -#include -#include -#include - -#include "rcutils/logging_macros.h" - -#include "rmw/error_handling.h" -#include "rmw/rmw.h" - -#include "fastrtps/participant/Participant.h" -#include "fastrtps/rtps/builtin/data/ReaderProxyData.h" -#include "fastrtps/rtps/reader/ReaderListener.h" -#include "fastrtps/rtps/reader/RTPSReader.h" - -#include "types/guard_condition.hpp" -#include "types/topic_cache.hpp" - -class ReaderInfo : public eprosima::fastrtps::rtps::ReaderListener -{ -public: - ReaderInfo( - eprosima::fastrtps::Participant * participant, - rmw_guard_condition_t * graph_guard_condition) - : participant_(participant), - graph_guard_condition_(static_cast(graph_guard_condition->data)) - {} - - void - onNewCacheChangeAdded( - eprosima::fastrtps::rtps::RTPSReader *, - const eprosima::fastrtps::rtps::CacheChange_t * const change) - { - eprosima::fastrtps::rtps::ReaderProxyData proxyData; - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - eprosima::fastrtps::rtps::CDRMessage_t tempMsg(0); - tempMsg.wraps = true; - if (PL_CDR_BE == change->serializedPayload.encapsulation) { - tempMsg.msg_endian = eprosima::fastrtps::rtps::BIGEND; - } else { - tempMsg.msg_endian = eprosima::fastrtps::rtps::LITTLEEND; - } - tempMsg.length = change->serializedPayload.length; - tempMsg.max_size = change->serializedPayload.max_size; - tempMsg.buffer = change->serializedPayload.data; - if (!proxyData.readFromCDRMessage(&tempMsg)) { - return; - } - } else { - GUID_t readerGuid; - iHandle2GUID(readerGuid, change->instanceHandle); - if (!participant_->get_remote_reader_info(readerGuid, proxyData)) { - return; - } - } - - bool trigger = false; - { - std::lock_guard guard(topic_cache_.getMutex()); - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - trigger = topic_cache_.addTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName(), proxyData.typeName()); - } else { - trigger = topic_cache_.removeTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName(), proxyData.typeName()); - } - } - - if (trigger) { - graph_guard_condition_->trigger(); - } - } - - LockedObject topic_cache_; - eprosima::fastrtps::Participant * participant_; - GuardCondition * graph_guard_condition_; -}; - -#endif // READER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp index 9ac0697c5..eaab76e63 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp @@ -28,8 +28,6 @@ #include "namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" namespace rmw_fastrtps_shared_cpp { @@ -67,12 +65,12 @@ __rmw_count_publishers( } auto impl = static_cast(node->data); - WriterInfo * slave_target = impl->secondaryPubListener; *count = 0; + ::ParticipantListener * slave_target = impl->listener; { - std::lock_guard guard(slave_target->topic_cache_.getMutex()); + std::lock_guard guard(slave_target->writer_topic_cache.getMutex()); // Search and sum up the publisher counts - auto & topic_types = slave_target->topic_cache_.getTopicToTypes(); + auto & topic_types = slave_target->writer_topic_cache.getTopicToTypes(); for (const auto & topic_fqdn : topic_fqdns) { const auto & it = topic_types.find(topic_fqdn); if (it != topic_types.end()) { @@ -122,12 +120,12 @@ __rmw_count_subscribers( } CustomParticipantInfo * impl = static_cast(node->data); - ReaderInfo * slave_target = impl->secondarySubListener; *count = 0; + ::ParticipantListener * slave_target = impl->listener; { - std::lock_guard guard(slave_target->topic_cache_.getMutex()); + std::lock_guard guard(slave_target->reader_topic_cache.getMutex()); // Search and sum up the subscriber counts - auto & topic_types = slave_target->topic_cache_.getTopicToTypes(); + auto & topic_types = slave_target->reader_topic_cache.getTopicToTypes(); for (const auto & topic_fqdn : topic_fqdns) { const auto & it = topic_types.find(topic_fqdn); if (it != topic_types.end()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp index c106f5db6..653bf1e4c 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node.cpp @@ -46,9 +46,6 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" - using Domain = eprosima::fastrtps::Domain; using Participant = eprosima::fastrtps::Participant; using ParticipantAttributes = eprosima::fastrtps::ParticipantAttributes; @@ -79,12 +76,15 @@ create_node( rmw_guard_condition_t * graph_guard_condition = nullptr; CustomParticipantInfo * node_impl = nullptr; rmw_node_t * node_handle = nullptr; - ReaderInfo * tnat_1 = nullptr; - WriterInfo * tnat_2 = nullptr; - std::pair edp_readers; + + graph_guard_condition = __rmw_create_guard_condition(identifier); + if (!graph_guard_condition) { + // error already set + goto fail; + } try { - listener = new ::ParticipantListener(); + listener = new ::ParticipantListener(graph_guard_condition); } catch (std::bad_alloc &) { RMW_SET_ERROR_MSG("failed to allocate participant listener"); goto fail; @@ -96,14 +96,27 @@ create_node( return nullptr; } - graph_guard_condition = __rmw_create_guard_condition(identifier); - if (!graph_guard_condition) { - // error already set - goto fail; - } - try { node_impl = new CustomParticipantInfo(); + + node_impl->leave_middleware_default_qos = false; + const char * env_var = "RMW_FASTRTPS_USE_QOS_FROM_XML"; + // Check if the configuration from XML has been enabled from + // the RMW_FASTRTPS_USE_QOS_FROM_XML env variable. + char * config_env_val = nullptr; +#ifndef _WIN32 + config_env_val = getenv(env_var); + if (config_env_val != nullptr) { + node_impl->leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } +#else + size_t config_env_val_size; + _dupenv_s(&config_env_val, &config_env_val_size, env_var); + if (config_env_val != nullptr) { + node_impl->leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } + free(config_env_val); +#endif } catch (std::bad_alloc &) { RMW_SET_ERROR_MSG("failed to allocate node impl struct"); goto fail; @@ -137,32 +150,8 @@ create_node( } memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); - tnat_1 = new ReaderInfo(participant, graph_guard_condition); - tnat_2 = new WriterInfo(participant, graph_guard_condition); - - node_impl->secondarySubListener = tnat_1; - node_impl->secondaryPubListener = tnat_2; - - edp_readers = participant->getEDPReaders(); - if (!edp_readers.first) { - RMW_SET_ERROR_MSG("edp_readers.first is null"); - goto fail; - } - - if (!edp_readers.second) { - RMW_SET_ERROR_MSG("edp_readers.second is null"); - goto fail; - } - - if (!(edp_readers.first->setListener(tnat_1) & edp_readers.second->setListener(tnat_2))) { - RMW_SET_ERROR_MSG("Failed to attach ROS related logic to the Participant"); - goto fail; - } - return node_handle; fail: - delete tnat_2; - delete tnat_1; if (node_handle) { rmw_free(const_cast(node_handle->namespace_)); node_handle->namespace_ = nullptr; @@ -246,11 +235,32 @@ __rmw_create_node( // since the participant name is not part of the DDS spec participantAttrs.rtps.setName(name); + bool leave_middleware_default_qos = false; + const char * env_var = "RMW_FASTRTPS_USE_QOS_FROM_XML"; + // Check if the configuration from XML has been enabled from + // the RMW_FASTRTPS_USE_QOS_FROM_XML env variable. + char * config_env_val = nullptr; +#ifndef _WIN32 + config_env_val = getenv(env_var); + if (config_env_val != nullptr) { + leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } +#else + size_t config_env_val_size; + _dupenv_s(&config_env_val, &config_env_val_size, env_var); + if (config_env_val != nullptr) { + leave_middleware_default_qos = strcmp(config_env_val, "1") == 0; + } + free(config_env_val); +#endif + // allow reallocation to support discovery messages bigger than 5000 bytes - participantAttrs.rtps.builtin.readerHistoryMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - participantAttrs.rtps.builtin.writerHistoryMemoryPolicy = - eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + if (!leave_middleware_default_qos) { + participantAttrs.rtps.builtin.readerHistoryMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + participantAttrs.rtps.builtin.writerHistoryMemoryPolicy = + eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + } size_t length = strlen(name) + strlen("name=;") + strlen(namespace_) + strlen("namespace=;") + 1; @@ -333,36 +343,19 @@ __rmw_destroy_node( Participant * participant = impl->participant; // Begin deleting things in the same order they were created in __rmw_create_node(). - std::pair edp_readers = participant->getEDPReaders(); - if (!edp_readers.first || !edp_readers.second) { - RMW_SET_ERROR_MSG("failed to get EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - - if (edp_readers.first && !edp_readers.first->setListener(nullptr)) { - RMW_SET_ERROR_MSG("failed to unset EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - delete impl->secondarySubListener; - if (edp_readers.second && !edp_readers.second->setListener(nullptr)) { - RMW_SET_ERROR_MSG("failed to unset EDPReader listener"); - result_ret = RMW_RET_ERROR; - } - delete impl->secondaryPubListener; - rmw_free(const_cast(node->name)); node->name = nullptr; rmw_free(const_cast(node->namespace_)); node->namespace_ = nullptr; rmw_node_free(node); + Domain::removeParticipant(participant); + if (RMW_RET_OK != __rmw_destroy_guard_condition(impl->graph_guard_condition)) { RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } - Domain::removeParticipant(participant); - delete impl->listener; impl->listener = nullptr; delete impl; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp index 754d16598..385727063 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - +#include #include #include #include -#include #include #include "rcutils/allocator.h" @@ -39,9 +37,7 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" -#include "types/topic_cache.hpp" +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" namespace rmw_fastrtps_shared_cpp { @@ -49,7 +45,7 @@ namespace rmw_fastrtps_shared_cpp constexpr char kLoggerTag[] = "rmw_fastrtps_shared_cpp"; /** - * Get the guid that corresponds to the nodeand namespace. + * Get the guid that corresponds to the node and namespace. * * @param node to discover other participants with * @param node_name of the desired node @@ -76,7 +72,8 @@ rmw_ret_t __get_guid_by_name( auto guid_node_pair = std::find_if(impl->listener->discovered_names.begin(), impl->listener->discovered_names.end(), - [node_name, &nodes_in_desired_namespace](std::pair pair) { + [node_name, &nodes_in_desired_namespace](const std::pair & pair) { return strcmp(pair.second.c_str(), node_name) == 0 && nodes_in_desired_namespace.find(pair.first) != nodes_in_desired_namespace.end(); }); @@ -268,7 +265,7 @@ __log_debug_information(const CustomParticipantInfo & impl) { if (rcutils_logging_logger_is_enabled_for(kLoggerTag, RCUTILS_LOG_SEVERITY_DEBUG)) { { - auto & topic_cache = impl.secondaryPubListener->topic_cache_; + auto & topic_cache = impl.listener->writer_topic_cache; std::lock_guard guard(topic_cache.getMutex()); std::stringstream map_ss; map_ss << topic_cache; @@ -277,7 +274,7 @@ __log_debug_information(const CustomParticipantInfo & impl) "Publisher Topic cache is: %s", map_ss.str().c_str()); } { - auto & topic_cache = impl.secondarySubListener->topic_cache_; + auto & topic_cache = impl.listener->reader_topic_cache; std::lock_guard guard(topic_cache.getMutex()); std::stringstream map_ss; map_ss << topic_cache; @@ -364,7 +361,7 @@ __rmw_get_subscriber_names_and_types_by_node( { RetrieveCache retrieve_sub_cache = [](CustomParticipantInfo & participant_info) -> const LockedObject & { - return participant_info.secondarySubListener->topic_cache_; + return participant_info.listener->reader_topic_cache; }; return __rmw_get_topic_names_and_types_by_node(identifier, node, allocator, node_name, node_namespace, no_demangle, retrieve_sub_cache, topic_names_and_types); @@ -382,7 +379,7 @@ __rmw_get_publisher_names_and_types_by_node( { RetrieveCache retrieve_pub_cache = [](CustomParticipantInfo & participant_info) -> const LockedObject & { - return participant_info.secondaryPubListener->topic_cache_; + return participant_info.listener->writer_topic_cache; }; return __rmw_get_topic_names_and_types_by_node(identifier, node, allocator, node_name, node_namespace, no_demangle, retrieve_pub_cache, topic_names_and_types); @@ -413,7 +410,7 @@ __rmw_get_service_names_and_types_by_node( std::map> services; { - auto & topic_cache = impl->secondaryPubListener->topic_cache_; + auto & topic_cache = impl->listener->reader_topic_cache; std::lock_guard guard(topic_cache.getMutex()); const auto & node_topics = topic_cache.getParticipantToTopics().find(guid); if (node_topics != topic_cache.getParticipantToTopics().end()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp index 3300428e8..20dae9c92 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "rcutils/strdup.h" @@ -28,8 +29,8 @@ #include "demangle.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" + +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" namespace rmw_fastrtps_shared_cpp { @@ -65,40 +66,28 @@ __rmw_get_service_names_and_types( // Get info from publisher and subscriber // Combined results from the two lists std::map> services; - { - auto & topic_cache = impl->secondarySubListener->topic_cache_; - std::lock_guard guard(topic_cache.getMutex()); - for (auto it : topic_cache.getTopicToTypes()) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { - // not a service - continue; - } - for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + + // Setup processing function, will be used with two maps + auto map_process = [&services](const LockedObject & topic_cache) { + std::lock_guard guard(topic_cache.getMutex()); + for (auto it : topic_cache.getTopicToTypes()) { + std::string service_name = _demangle_service_from_topic(it.first); + if (!service_name.length()) { + // not a service + continue; } - } - } - } - { - auto & topic_cache = impl->secondaryPubListener->topic_cache_; - std::lock_guard guard(topic_cache.getMutex()); - for (auto it : topic_cache.getTopicToTypes()) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { - // not a service - continue; - } - for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + for (auto & itt : it.second) { + std::string service_type = _demangle_service_type_only(itt); + if (service_type.length()) { + services[service_name].insert(service_type); + } } } - } - } + }; + + ::ParticipantListener * slave_target = impl->listener; + map_process(slave_target->reader_topic_cache); + map_process(slave_target->writer_topic_cache); // Fill out service_names_and_types if (services.size()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp index 6971a4063..88c235253 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp @@ -15,7 +15,9 @@ #include #include #include + #include +#include #include "rcutils/allocator.h" #include "rcutils/error_handling.h" @@ -36,9 +38,7 @@ #include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" -#include "reader_info.hpp" -#include "writer_info.hpp" -#include "types/topic_cache.hpp" +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" namespace rmw_fastrtps_shared_cpp { @@ -76,10 +76,10 @@ __rmw_get_topic_names_and_types( // Get info from publisher and subscriber // Combined results from the two lists std::map> topics; - std::function &, - std::map> &)> accumulate_topics = - [no_demangle](const LockedObject & topic_cache, - std::map> & topics) { + + // Setup processing function, will be used with two maps + auto map_process = + [&topics, no_demangle](const LockedObject & topic_cache) { std::lock_guard guard(topic_cache.getMutex()); for (auto it : topic_cache.getTopicToTypes()) { if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { @@ -92,8 +92,9 @@ __rmw_get_topic_names_and_types( } }; - accumulate_topics(impl->secondarySubListener->topic_cache_, topics); - accumulate_topics(impl->secondaryPubListener->topic_cache_, topics); + ::ParticipantListener * slave_target = impl->listener; + map_process(slave_target->reader_topic_cache); + map_process(slave_target->writer_topic_cache); // Copy data to results handle if (topics.size() > 0) { diff --git a/rmw_fastrtps_shared_cpp/src/writer_info.hpp b/rmw_fastrtps_shared_cpp/src/writer_info.hpp deleted file mode 100644 index 95d9793a4..000000000 --- a/rmw_fastrtps_shared_cpp/src/writer_info.hpp +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef WRITER_INFO_HPP_ -#define WRITER_INFO_HPP_ - -#include -#include -#include -#include -#include -#include -#include - -#include "fastrtps/participant/Participant.h" -#include "fastrtps/rtps/builtin/data/WriterProxyData.h" -#include "fastrtps/rtps/reader/ReaderListener.h" -#include "fastrtps/rtps/reader/RTPSReader.h" - -#include "rcutils/logging_macros.h" - -#include "rmw/rmw.h" - -#include "types/guard_condition.hpp" -#include "types/topic_cache.hpp" - -class WriterInfo : public eprosima::fastrtps::rtps::ReaderListener -{ -public: - WriterInfo( - eprosima::fastrtps::Participant * participant, - rmw_guard_condition_t * graph_guard_condition) - : participant_(participant), - graph_guard_condition_(static_cast(graph_guard_condition->data)) - {} - - void - onNewCacheChangeAdded( - eprosima::fastrtps::rtps::RTPSReader *, - const eprosima::fastrtps::rtps::CacheChange_t * const change) - { - eprosima::fastrtps::rtps::WriterProxyData proxyData; - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - eprosima::fastrtps::rtps::CDRMessage_t tempMsg(0); - tempMsg.wraps = true; - if (PL_CDR_BE == change->serializedPayload.encapsulation) { - tempMsg.msg_endian = eprosima::fastrtps::rtps::BIGEND; - } else { - tempMsg.msg_endian = eprosima::fastrtps::rtps::LITTLEEND; - } - tempMsg.length = change->serializedPayload.length; - tempMsg.max_size = change->serializedPayload.max_size; - tempMsg.buffer = change->serializedPayload.data; - if (!proxyData.readFromCDRMessage(&tempMsg)) { - return; - } - } else { - eprosima::fastrtps::rtps::GUID_t writerGuid; - iHandle2GUID(writerGuid, change->instanceHandle); - if (!participant_->get_remote_writer_info(writerGuid, proxyData)) { - return; - } - } - - bool trigger = false; - { - std::lock_guard guard(topic_cache_.getMutex()); - if (eprosima::fastrtps::rtps::ALIVE == change->kind) { - trigger = topic_cache_.addTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName(), proxyData.typeName()); - } else { - trigger = topic_cache_.removeTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName(), proxyData.typeName()); - } - } - - if (trigger) { - graph_guard_condition_->trigger(); - } - } - - LockedObject topic_cache_; - eprosima::fastrtps::Participant * participant_; - GuardCondition * graph_guard_condition_; -}; - -#endif // WRITER_INFO_HPP_