diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index 078c38c55..e1072c89a 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -25,12 +25,17 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wthread-safety -Werror) +endif() + if(SECURITY) find_package(OpenSSL REQUIRED) endif() find_package(ament_cmake_ros REQUIRED) +find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) find_package(fastrtps_cmake_module REQUIRED) @@ -76,6 +81,7 @@ target_link_libraries(rmw_fastrtps_shared_cpp # specific order: dependents before dependencies ament_target_dependencies(rmw_fastrtps_shared_cpp + "rcpputils" "rcutils" "rmw" ) @@ -89,6 +95,7 @@ PRIVATE "RMW_FASTRTPS_SHARED_CPP_BUILDING_LIBRARY") ament_export_include_directories(include) ament_export_libraries(rmw_fastrtps_shared_cpp) +ament_export_dependencies(rcpputils) ament_export_dependencies(rcutils) ament_export_dependencies(rmw) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 9dde673c4..938219334 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -31,6 +31,8 @@ #include "fastrtps/publisher/Publisher.h" #include "fastrtps/publisher/PublisherListener.h" +#include "rcpputils/thread_safety_annotations.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; @@ -108,22 +110,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener { std::lock_guard lock(internalMutex_); - auto pop_response = [this](CustomClientResponse & response) -> bool - { - if (!list.empty()) { - response = std::move(list.front()); - list.pop_front(); - list_has_data_.store(!list.empty()); - return true; - } - return false; - }; - if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); - return pop_response(response); + return popResponse(response); } - return pop_response(response); + return popResponse(response); } void @@ -167,12 +158,23 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener } private: + bool popResponse(CustomClientResponse & response) RCPPUTILS_REQUIRES(internalMutex_) + { + if (!list.empty()) { + response = std::move(list.front()); + list.pop_front(); + list_has_data_.store(!list.empty()); + return true; + } + return false; + }; + CustomClientInfo * info_; std::mutex internalMutex_; - std::list list; + std::list list RCPPUTILS_GUARDED_BY(internalMutex_); std::atomic_bool list_has_data_; - std::mutex * conditionMutex_; - std::condition_variable * conditionVariable_; + std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_); std::set publishers_; }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index fe22ffe68..e3a9875a0 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -166,10 +166,10 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener { std::lock_guard guard(topic_cache.getMutex()); if (is_alive) { - trigger = topic_cache.addTopic(proxyData.RTPSParticipantKey(), + trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(), proxyData.topicName(), proxyData.typeName()); } else { - trigger = topic_cache.removeTopic(proxyData.RTPSParticipantKey(), + trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(), proxyData.topicName(), proxyData.typeName()); } } diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 486f10869..d303398d9 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -21,6 +21,7 @@ #include "fastrtps/publisher/Publisher.h" #include "fastrtps/publisher/PublisherListener.h" +#include "rcpputils/thread_safety_annotations.h" #include "rmw/rmw.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -65,7 +66,7 @@ class PubListener : public eprosima::fastrtps::PublisherListener private: std::mutex internalMutex_; - std::set subscriptions_; + std::set subscriptions_ RCPPUTILS_GUARDED_BY(internalMutex_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 1792591e5..5f16d12df 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -29,6 +29,8 @@ #include "fastrtps/subscriber/SubscriberListener.h" #include "fastrtps/subscriber/SampleInfo.h" +#include "rcpputils/thread_safety_annotations.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ServiceListener; @@ -148,10 +150,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener private: CustomServiceInfo * info_; std::mutex internalMutex_; - std::list list; + std::list list RCPPUTILS_GUARDED_BY(internalMutex_); std::atomic_bool list_has_data_; - std::mutex * conditionMutex_; - std::condition_variable * conditionVariable_; + std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index f3938a146..f6182e6d7 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -24,6 +24,8 @@ #include "fastrtps/subscriber/Subscriber.h" #include "fastrtps/subscriber/SubscriberListener.h" +#include "rcpputils/thread_safety_annotations.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class SubListener; @@ -123,10 +125,10 @@ class SubListener : public eprosima::fastrtps::SubscriberListener private: std::mutex internalMutex_; std::atomic_size_t data_; - std::mutex * conditionMutex_; - std::condition_variable * conditionVariable_; + std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_); - std::set publishers_; + std::set publishers_ RCPPUTILS_GUARDED_BY(internalMutex_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp index 77ba29329..6429aa00f 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -28,6 +28,7 @@ #include "fastrtps/participant/Participant.h" #include "fastrtps/rtps/common/Guid.h" #include "fastrtps/rtps/common/InstanceHandle.h" +#include "rcpputils/thread_safety_annotations.h" #include "rcutils/logging_macros.h" typedef eprosima::fastrtps::rtps::GUID_t GUID_t; @@ -214,18 +215,29 @@ inline std::ostream & operator<<( } template -class LockedObject : public T +class LockedObject { private: - mutable std::mutex cache_mutex_; + mutable std::mutex mutex_; + T object_ RCPPUTILS_GUARDED_BY(mutex_); public: /** * @return a reference to this object to lock. */ - std::mutex & getMutex() const + std::mutex & getMutex() const RCPPUTILS_RETURN_CAPABILITY(mutex_) { - return cache_mutex_; + return mutex_; + } + + T & operator()() + { + return object_; + } + + const T & operator()() const + { + return object_; } }; diff --git a/rmw_fastrtps_shared_cpp/package.xml b/rmw_fastrtps_shared_cpp/package.xml index f18edf02c..6727f813e 100644 --- a/rmw_fastrtps_shared_cpp/package.xml +++ b/rmw_fastrtps_shared_cpp/package.xml @@ -16,12 +16,14 @@ fastcdr fastrtps fastrtps_cmake_module + rcpputils rcutils rmw fastcdr fastrtps fastrtps_cmake_module + rcpputils rcutils rmw diff --git a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp index aadf81f23..d6cc9df3b 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_count.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_count.cpp @@ -70,7 +70,7 @@ __rmw_count_publishers( { std::lock_guard guard(slave_target->writer_topic_cache.getMutex()); // Search and sum up the publisher counts - auto & topic_types = slave_target->writer_topic_cache.getTopicToTypes(); + auto & topic_types = slave_target->writer_topic_cache().getTopicToTypes(); for (const auto & topic_fqdn : topic_fqdns) { const auto & it = topic_types.find(topic_fqdn); if (it != topic_types.end()) { @@ -125,7 +125,7 @@ __rmw_count_subscribers( { std::lock_guard guard(slave_target->reader_topic_cache.getMutex()); // Search and sum up the subscriber counts - auto & topic_types = slave_target->reader_topic_cache.getTopicToTypes(); + auto & topic_types = slave_target->reader_topic_cache().getTopicToTypes(); for (const auto & topic_fqdn : topic_fqdns) { const auto & it = topic_types.find(topic_fqdn); if (it != topic_types.end()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp index 521ebce6c..2850c1b63 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp @@ -155,8 +155,8 @@ __accumulate_topics( bool no_demangle) { std::lock_guard guard(topic_cache.getMutex()); - const auto & node_topics = topic_cache.getParticipantToTopics().find(node_guid_); - if (node_topics == topic_cache.getParticipantToTopics().end()) { + const auto & node_topics = topic_cache().getParticipantToTopics().find(node_guid_); + if (node_topics == topic_cache().getParticipantToTopics().end()) { RCUTILS_LOG_DEBUG_NAMED( kLoggerTag, "No topics found for node"); @@ -268,7 +268,7 @@ __log_debug_information(const CustomParticipantInfo & impl) auto & topic_cache = impl.listener->writer_topic_cache; std::lock_guard guard(topic_cache.getMutex()); std::stringstream map_ss; - map_ss << topic_cache; + map_ss << topic_cache(); RCUTILS_LOG_DEBUG_NAMED( kLoggerTag, "Publisher Topic cache is: %s", map_ss.str().c_str()); @@ -277,7 +277,7 @@ __log_debug_information(const CustomParticipantInfo & impl) auto & topic_cache = impl.listener->reader_topic_cache; std::lock_guard guard(topic_cache.getMutex()); std::stringstream map_ss; - map_ss << topic_cache; + map_ss << topic_cache(); RCUTILS_LOG_DEBUG_NAMED( kLoggerTag, "Subscriber Topic cache is: %s", map_ss.str().c_str()); @@ -412,8 +412,8 @@ __rmw_get_service_names_and_types_by_node( { auto & topic_cache = impl->listener->reader_topic_cache; std::lock_guard guard(topic_cache.getMutex()); - const auto & node_topics = topic_cache.getParticipantToTopics().find(guid); - if (node_topics != topic_cache.getParticipantToTopics().end()) { + const auto & node_topics = topic_cache().getParticipantToTopics().find(guid); + if (node_topics != topic_cache().getParticipantToTopics().end()) { for (auto & topic_pair : node_topics->second) { std::string service_name = _demangle_service_from_topic(topic_pair.first); if (service_name.empty()) { diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp index f39a68020..a7dd7318e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_names_and_types.cpp @@ -70,7 +70,7 @@ __rmw_get_service_names_and_types( // Setup processing function, will be used with two maps auto map_process = [&services](const LockedObject & topic_cache) { std::lock_guard guard(topic_cache.getMutex()); - for (auto it : topic_cache.getTopicToTypes()) { + for (auto it : topic_cache().getTopicToTypes()) { std::string service_name = _demangle_service_from_topic(it.first); if (service_name.empty()) { // not a service diff --git a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp index 5e3d4ef87..7d5da6414 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp @@ -81,7 +81,7 @@ __rmw_get_topic_names_and_types( auto map_process = [&topics, no_demangle](const LockedObject & topic_cache) { std::lock_guard guard(topic_cache.getMutex()); - for (auto it : topic_cache.getTopicToTypes()) { + for (auto it : topic_cache().getTopicToTypes()) { if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp index ba88faa23..202a50bb5 100644 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp @@ -22,6 +22,8 @@ #include #include +#include "rcpputils/thread_safety_annotations.h" + class GuardCondition { public: @@ -78,8 +80,8 @@ class GuardCondition private: std::mutex internalMutex_; std::atomic_bool hasTriggered_; - std::mutex * conditionMutex_; - std::condition_variable * conditionVariable_; + std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_); }; #endif // TYPES__GUARD_CONDITION_HPP_