From 93e294aa2cb2b638f761724ce508f7c99c3b3ab9 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 23 Sep 2022 13:43:14 +0800 Subject: [PATCH] Support connect/disconnect event Signed-off-by: Barry Xu --- .../custom_publisher_info.hpp | 17 ++-- .../custom_subscriber_info.hpp | 32 ++++++++ .../src/custom_publisher_info.cpp | 79 +++++++++++++++++++ .../src/custom_subscriber_info.cpp | 46 +++++++++++ 4 files changed, 165 insertions(+), 9 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 2c6687974..5c8ada572 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -62,6 +62,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds , deadline_changes_(false) , liveliness_changes_(false) , incompatible_qos_changes_(false) + , matched_changes_(false) { } @@ -70,15 +71,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds void on_publication_matched( eprosima::fastdds::dds::DataWriter * /* writer */, - const eprosima::fastdds::dds::PublicationMatchedStatus & info) final - { - std::lock_guard lock(discovery_m_); - if (info.current_count_change == 1) { - subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); - } else if (info.current_count_change == -1) { - subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); - } - } + const eprosima::fastdds::dds::PublicationMatchedStatus & info) final; RMW_FASTRTPS_SHARED_CPP_PUBLIC void @@ -144,6 +137,12 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds bool incompatible_qos_changes_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + eprosima::fastdds::dds::PublicationMatchedStatus matched_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + bool matched_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index fc77ce92b..7d583f6d1 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -90,6 +90,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds , liveliness_changes_(false) , sample_lost_changes_(false) , incompatible_qos_changes_(false) + , matched_changes_(false) { } @@ -107,6 +108,31 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); } } + + { + std::unique_lock lock_mutex(on_new_event_m_); + + matched_status_.total_count = info.total_count; + matched_status_.total_count_change += info.total_count_change; + matched_status_.current_count = info.current_count; + matched_status_.current_count_change += info.current_count_change; + + matched_changes_ = true; + + rmw_event_type_t event; + if (info.current_count_change == 1) { + event = RMW_EVENT_SUBSCRIPTION_CONNECT; + } else if (info.current_count_change == -1) { + event = RMW_EVENT_SUBSCRIPTION_DISCONNECT; + } else { + return; + } + + if (on_new_event_cb_[event]) { + on_new_event_cb_[event](user_data_[event], 1); + } + event_guard[event].set_trigger_value(true); + } } void @@ -204,6 +230,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds bool incompatible_qos_changes_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + bool matched_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 2cc784513..a616d7616 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -96,6 +96,27 @@ bool PubListener::take_event( incompatible_qos_status_.total_count_change = 0; } break; + case RMW_EVENT_PUBLICATION_CONNECT: + case RMW_EVENT_PUBLICATION_DISCONNECT: + { + auto rmw_data = static_cast(event_info); + if (matched_changes_) { + rmw_data->total_count = matched_status_.total_count; + rmw_data->total_count_change = matched_status_.total_count_change; + rmw_data->current_count = matched_status_.current_count; + rmw_data->current_count_change = matched_status_.current_count_change; + + matched_changes_ = false; + } else { + eprosima::fastdds::dds::PublicationMatchedStatus matched_status; + publisher_info_->data_writer_->get_publication_matched_status(matched_status); + rmw_data->total_count = matched_status.total_count; + rmw_data->total_count_change = matched_status.total_count_change; + rmw_data->current_count = matched_status.current_count; + rmw_data->current_count_change = matched_status.current_count_change; + } + } + break; default: return false; } @@ -133,6 +154,32 @@ void PubListener::set_on_new_event_callback( incompatible_qos_status_.total_count_change); incompatible_qos_status_.total_count_change = 0; break; + case RMW_EVENT_PUBLICATION_CONNECT: + { + eprosima::fastdds::dds::PublicationMatchedStatus matched_status; + publisher_info_->data_writer_->get_publication_matched_status( + matched_status); + if (matched_status.current_count_change > 0) { + callback( + user_data, + matched_status.current_count_change); + } + } + break; + case RMW_EVENT_PUBLICATION_DISCONNECT: + { + eprosima::fastdds::dds::PublicationMatchedStatus matched_status; + publisher_info_->data_writer_->get_publication_matched_status( + matched_status); + size_t disconnect_count + = matched_status.total_count_change - matched_status.current_count_change; + if (disconnect_count > 0) { + callback( + user_data, + disconnect_count); + } + } + break; default: break; } @@ -155,6 +202,38 @@ void PubListener::set_on_new_event_callback( } } +void +PubListener::on_publication_matched( + eprosima::fastdds::dds::DataWriter * /* writer */, + const eprosima::fastdds::dds::PublicationMatchedStatus & info) +{ + { + std::lock_guard lock(discovery_m_); + if (info.current_count_change == 1) { + subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); + } else if (info.current_count_change == -1) { + subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle)); + } + } + { + std::unique_lock lock_mutex(on_new_event_m_); + + matched_status_.total_count = info.total_count; + matched_status_.total_count_change += info.total_count_change; + matched_status_.current_count = info.current_count; + matched_status_.current_count_change += info.current_count_change; + + matched_changes_ = true; + + if (info.current_count_change == 1) { + trigger_event(RMW_EVENT_PUBLICATION_CONNECT); + } else if (info.current_count_change == -1) { + trigger_event(RMW_EVENT_PUBLICATION_DISCONNECT); + } + } + +} + void PubListener::on_offered_deadline_missed( eprosima::fastdds::dds::DataWriter * /* writer */, diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index fe8a8e8a0..c14189d0a 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -118,6 +118,27 @@ bool SubListener::take_event( incompatible_qos_status_.total_count_change = 0; } break; + case RMW_EVENT_SUBSCRIPTION_CONNECT: + case RMW_EVENT_SUBSCRIPTION_DISCONNECT: + { + auto rmw_data = static_cast(event_info); + if (matched_changes_) { + rmw_data->total_count = matched_status_.total_count; + rmw_data->total_count_change = matched_status_.total_count_change; + rmw_data->current_count = matched_status_.current_count; + rmw_data->current_count_change = matched_status_.current_count_change; + + matched_changes_ = false; + } else { + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status; + subscriber_info_->data_reader_->get_subscription_matched_status(matched_status); + rmw_data->total_count = matched_status.total_count; + rmw_data->total_count_change = matched_status.total_count_change; + rmw_data->current_count = matched_status.current_count; + rmw_data->current_count_change = matched_status.current_count_change; + } + } + break; default: return false; } @@ -174,6 +195,31 @@ void SubListener::set_on_new_event_callback( incompatible_qos_status_.total_count_change = 0; } break; + case RMW_EVENT_SUBSCRIPTION_CONNECT: + { + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status; + subscriber_info_->data_reader_->get_subscription_matched_status( + matched_status); + if (matched_status.current_count_change > 0) { + callback( + user_data, + matched_status.current_count_change); + } + } + break; + case RMW_EVENT_SUBSCRIPTION_DISCONNECT: + { + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status; + subscriber_info_->data_reader_->get_subscription_matched_status( + matched_status); + size_t disconnect_count + = matched_status.total_count_change - matched_status.current_count_change; + if (disconnect_count > 0) { + callback( + user_data, + disconnect_count); + } + } default: break; }