Skip to content

Commit

Permalink
Least intrusive possible thread safety annotation
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Mar 8, 2019
1 parent ea4fb7c commit 5f0da3d
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 41 deletions.
7 changes: 7 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wthread-safety -Werror)
endif()

if(SECURITY)
find_package(OpenSSL REQUIRED)
endif()

find_package(ament_cmake_ros REQUIRED)

find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
Expand Down Expand Up @@ -76,6 +81,7 @@ target_link_libraries(rmw_fastrtps_shared_cpp

# specific order: dependents before dependencies
ament_target_dependencies(rmw_fastrtps_shared_cpp
"rcpputils"
"rcutils"
"rmw"
)
Expand All @@ -89,6 +95,7 @@ PRIVATE "RMW_FASTRTPS_SHARED_CPP_BUILDING_LIBRARY")
ament_export_include_directories(include)
ament_export_libraries(rmw_fastrtps_shared_cpp)

ament_export_dependencies(rcpputils)
ament_export_dependencies(rcutils)
ament_export_dependencies(rmw)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rcpputils/thread_safety_annotations.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -108,22 +110,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
{
std::lock_guard<std::mutex> lock(internalMutex_);

auto pop_response = [this](CustomClientResponse & response) -> bool
{
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
list_has_data_.store(!list.empty());
return true;
}
return false;
};

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
return pop_response(response);
return popResponse(response);
}
return pop_response(response);
return popResponse(response);
}

void
Expand Down Expand Up @@ -164,12 +155,23 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
}

private:
bool popResponse(CustomClientResponse & response) RCPPUTILS_REQUIRES(internalMutex_)
{
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
list_has_data_.store(!list.empty());
return true;
}
return false;
};

CustomClientInfo * info_;
std::mutex internalMutex_;
std::list<CustomClientResponse> list;
std::list<CustomClientResponse> list RCPPUTILS_GUARDED_BY(internalMutex_);
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_);
};

class ClientPubListener : public eprosima::fastrtps::PublisherListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
if (is_alive) {
trigger = topic_cache.addTopic(proxyData.RTPSParticipantKey(),
trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName(), proxyData.typeName());
} else {
trigger = topic_cache.removeTopic(proxyData.RTPSParticipantKey(),
trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName(), proxyData.typeName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rcpputils/thread_safety_annotations.h"
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
Expand Down Expand Up @@ -65,7 +66,7 @@ class PubListener : public eprosima::fastrtps::PublisherListener

private:
std::mutex internalMutex_;
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_;
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_ RCPPUTILS_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/subscriber/SampleInfo.h"

#include "rcpputils/thread_safety_annotations.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ServiceListener;
Expand Down Expand Up @@ -148,10 +150,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
private:
CustomServiceInfo * info_;
std::mutex internalMutex_;
std::list<CustomServiceRequest> list;
std::list<CustomServiceRequest> list RCPPUTILS_GUARDED_BY(internalMutex_);
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"

#include "rcpputils/thread_safety_annotations.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class SubListener;
Expand Down Expand Up @@ -123,10 +125,10 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
private:
std::mutex internalMutex_;
std::atomic_size_t data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_);

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fastrtps/participant/Participant.h"
#include "fastrtps/rtps/common/Guid.h"
#include "fastrtps/rtps/common/InstanceHandle.h"
#include "rcpputils/thread_safety_annotations.h"
#include "rcutils/logging_macros.h"

typedef eprosima::fastrtps::rtps::GUID_t GUID_t;
Expand Down Expand Up @@ -214,18 +215,29 @@ inline std::ostream & operator<<(
}

template<class T>
class LockedObject : public T
class LockedObject
{
private:
mutable std::mutex cache_mutex_;
mutable std::mutex mutex_;
T object_ RCPPUTILS_GUARDED_BY(mutex_);

public:
/**
* @return a reference to this object to lock.
*/
std::mutex & getMutex() const
std::mutex & getMutex() const RCPPUTILS_RETURN_CAPABILITY(mutex_)
{
return cache_mutex_;
return mutex_;
}

T & operator()()
{
return object_;
}

const T & operator()() const
{
return object_;
}
};

Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_shared_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
<build_depend>fastcdr</build_depend>
<build_depend>fastrtps</build_depend>
<build_depend>fastrtps_cmake_module</build_depend>
<build_depend>rcpputils</build_depend>
<build_depend>rcutils</build_depend>
<build_depend>rmw</build_depend>

<build_export_depend>fastcdr</build_export_depend>
<build_export_depend>fastrtps</build_export_depend>
<build_export_depend>fastrtps_cmake_module</build_export_depend>
<build_export_depend>rcpputils</build_export_depend>
<build_export_depend>rcutils</build_export_depend>
<build_export_depend>rmw</build_export_depend>

Expand Down
4 changes: 2 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_count.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ __rmw_count_publishers(
{
std::lock_guard<std::mutex> guard(slave_target->writer_topic_cache.getMutex());
// Search and sum up the publisher counts
auto & topic_types = slave_target->writer_topic_cache.getTopicToTypes();
auto & topic_types = slave_target->writer_topic_cache().getTopicToTypes();
for (const auto & topic_fqdn : topic_fqdns) {
const auto & it = topic_types.find(topic_fqdn);
if (it != topic_types.end()) {
Expand Down Expand Up @@ -125,7 +125,7 @@ __rmw_count_subscribers(
{
std::lock_guard<std::mutex> guard(slave_target->reader_topic_cache.getMutex());
// Search and sum up the subscriber counts
auto & topic_types = slave_target->reader_topic_cache.getTopicToTypes();
auto & topic_types = slave_target->reader_topic_cache().getTopicToTypes();
for (const auto & topic_fqdn : topic_fqdns) {
const auto & it = topic_types.find(topic_fqdn);
if (it != topic_types.end()) {
Expand Down
12 changes: 6 additions & 6 deletions rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ __accumulate_topics(
bool no_demangle)
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
const auto & node_topics = topic_cache.getParticipantToTopics().find(node_guid_);
if (node_topics == topic_cache.getParticipantToTopics().end()) {
const auto & node_topics = topic_cache().getParticipantToTopics().find(node_guid_);
if (node_topics == topic_cache().getParticipantToTopics().end()) {
RCUTILS_LOG_DEBUG_NAMED(
kLoggerTag,
"No topics found for node");
Expand Down Expand Up @@ -268,7 +268,7 @@ __log_debug_information(const CustomParticipantInfo & impl)
auto & topic_cache = impl.listener->writer_topic_cache;
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
std::stringstream map_ss;
map_ss << topic_cache;
map_ss << topic_cache();
RCUTILS_LOG_DEBUG_NAMED(
kLoggerTag,
"Publisher Topic cache is: %s", map_ss.str().c_str());
Expand All @@ -277,7 +277,7 @@ __log_debug_information(const CustomParticipantInfo & impl)
auto & topic_cache = impl.listener->reader_topic_cache;
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
std::stringstream map_ss;
map_ss << topic_cache;
map_ss << topic_cache();
RCUTILS_LOG_DEBUG_NAMED(
kLoggerTag,
"Subscriber Topic cache is: %s", map_ss.str().c_str());
Expand Down Expand Up @@ -412,8 +412,8 @@ __rmw_get_service_names_and_types_by_node(
{
auto & topic_cache = impl->listener->reader_topic_cache;
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
const auto & node_topics = topic_cache.getParticipantToTopics().find(guid);
if (node_topics != topic_cache.getParticipantToTopics().end()) {
const auto & node_topics = topic_cache().getParticipantToTopics().find(guid);
if (node_topics != topic_cache().getParticipantToTopics().end()) {
for (auto & topic_pair : node_topics->second) {
std::string service_name = _demangle_service_from_topic(topic_pair.first);
if (service_name.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ __rmw_get_service_names_and_types(
// Setup processing function, will be used with two maps
auto map_process = [&services](const LockedObject<TopicCache> & topic_cache) {
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
for (auto it : topic_cache.getTopicToTypes()) {
for (auto it : topic_cache().getTopicToTypes()) {
std::string service_name = _demangle_service_from_topic(it.first);
if (service_name.empty()) {
// not a service
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ __rmw_get_topic_names_and_types(
auto map_process =
[&topics, no_demangle](const LockedObject<TopicCache> & topic_cache) {
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
for (auto it : topic_cache.getTopicToTypes()) {
for (auto it : topic_cache().getTopicToTypes()) {
if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) {
// if we are demangling and this is not prefixed with rt/, skip it
continue;
Expand Down
6 changes: 4 additions & 2 deletions rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <mutex>
#include <utility>

#include "rcpputils/thread_safety_annotations.h"

class GuardCondition
{
public:
Expand Down Expand Up @@ -78,8 +80,8 @@ class GuardCondition
private:
std::mutex internalMutex_;
std::atomic_bool hasTriggered_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
std::mutex * conditionMutex_ RCPPUTILS_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_GUARDED_BY(internalMutex_);
};

#endif // TYPES__GUARD_CONDITION_HPP_

0 comments on commit 5f0da3d

Please sign in to comment.