From 79cc8e87842198446f7d8a0b7fe5c8b809739abb Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 9 Feb 2022 21:38:44 +0000 Subject: [PATCH] Add in support for the MESSAGE_LOST event. Fast-DDS (and rmw_fastrtps_cpp) don't fully support this right now, but having it in the list here allows RViz2 to start up. Implementation extracted from https://github.com/ros2/rmw_fastrtps/pull/583, which is the more complete solution. Signed-off-by: Chris Lalancette --- .../custom_subscriber_info.hpp | 10 +++++++ .../src/custom_subscriber_info.cpp | 29 +++++++++++++++++++ rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 3 +- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 85a4ac540..a16435d5e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -112,6 +112,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataReader *, const eprosima::fastrtps::LivelinessChangedStatus &) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_sample_lost( + eprosima::fastdds::dds::DataReader *, + const eprosima::fastdds::dds::SampleLostStatus &) final; + // EventListenerInterface implementation RMW_FASTRTPS_SHARED_CPP_PUBLIC bool @@ -176,6 +182,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::atomic_bool sample_lost_changes_; + eprosima::fastdds::dds::SampleLostStatus sample_lost_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 17da1fa79..2dffec498 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -64,6 +64,24 @@ void SubListener::on_liveliness_changed( liveliness_changes_.store(true, std::memory_order_relaxed); } +void SubListener::on_sample_lost( + eprosima::fastdds::dds::DataReader * /* reader */, + const eprosima::fastdds::dds::SampleLostStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to sample_lost_status_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + sample_lost_status_.total_count = status.total_count; + // Accumulate deltas + sample_lost_status_.total_count_change += status.total_count_change; + + sample_lost_changes_.store(true, std::memory_order_relaxed); +} + bool SubListener::hasEvent(rmw_event_type_t event_type) const { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); @@ -72,6 +90,8 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return liveliness_changes_.load(std::memory_order_relaxed); case RMW_EVENT_REQUESTED_DEADLINE_MISSED: return deadline_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_MESSAGE_LOST: + return sample_lost_changes_.load(std::memory_order_relaxed); default: break; } @@ -104,6 +124,15 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) deadline_changes_.store(false, std::memory_order_relaxed); } break; + case RMW_EVENT_MESSAGE_LOST: + { + auto rmw_data = static_cast(event_info); + rmw_data->total_count = sample_lost_status_.total_count; + rmw_data->total_count_change = sample_lost_status_.total_count_change; + sample_lost_status_.total_count_change = 0; + sample_lost_changes_.store(false, std::memory_order_relaxed); + } + break; default: return false; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 4544d3136..91d1fdaac 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -23,7 +23,8 @@ static const std::unordered_set g_rmw_event_type_set{ RMW_EVENT_LIVELINESS_CHANGED, RMW_EVENT_REQUESTED_DEADLINE_MISSED, RMW_EVENT_LIVELINESS_LOST, - RMW_EVENT_OFFERED_DEADLINE_MISSED + RMW_EVENT_OFFERED_DEADLINE_MISSED, + RMW_EVENT_MESSAGE_LOST }; namespace rmw_fastrtps_shared_cpp