Skip to content

Commit

Permalink
Add in support for the MESSAGE_LOST event.
Browse files Browse the repository at this point in the history
Fast-DDS (and rmw_fastrtps_cpp) don't fully support this right
now, but having it in the list here allows RViz2 to start up.

Implementation extracted from
#583, which is the more
complete solution.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
  • Loading branch information
clalancette committed Feb 16, 2022
1 parent 33c54b0 commit 79cc8e8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::DataReader *,
const eprosima::fastrtps::LivelinessChangedStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_sample_lost(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::SampleLostStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
Expand Down Expand Up @@ -176,6 +182,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool sample_lost_changes_;
eprosima::fastdds::dds::SampleLostStatus sample_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
29 changes: 29 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ void SubListener::on_liveliness_changed(
liveliness_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_sample_lost(
eprosima::fastdds::dds::DataReader * /* reader */,
const eprosima::fastdds::dds::SampleLostStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to sample_lost_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
sample_lost_status_.total_count = status.total_count;
// Accumulate deltas
sample_lost_status_.total_count_change += status.total_count_change;

sample_lost_changes_.store(true, std::memory_order_relaxed);
}

bool SubListener::hasEvent(rmw_event_type_t event_type) const
{
assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type));
Expand All @@ -72,6 +90,8 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
return deadline_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_MESSAGE_LOST:
return sample_lost_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand Down Expand Up @@ -104,6 +124,15 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
deadline_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_MESSAGE_LOST:
{
auto rmw_data = static_cast<rmw_message_lost_status_t *>(event_info);
rmw_data->total_count = sample_lost_status_.total_count;
rmw_data->total_count_change = sample_lost_status_.total_count_change;
sample_lost_status_.total_count_change = 0;
sample_lost_changes_.store(false, std::memory_order_relaxed);
}
break;
default:
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ static const std::unordered_set<rmw_event_type_t> g_rmw_event_type_set{
RMW_EVENT_LIVELINESS_CHANGED,
RMW_EVENT_REQUESTED_DEADLINE_MISSED,
RMW_EVENT_LIVELINESS_LOST,
RMW_EVENT_OFFERED_DEADLINE_MISSED
RMW_EVENT_OFFERED_DEADLINE_MISSED,
RMW_EVENT_MESSAGE_LOST
};

namespace rmw_fastrtps_shared_cpp
Expand Down

0 comments on commit 79cc8e8

Please sign in to comment.