Skip to content

Commit

Permalink
Support connect/disconnect event
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Sep 26, 2022
1 parent 5134c5d commit 93e294a
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
, deadline_changes_(false)
, liveliness_changes_(false)
, incompatible_qos_changes_(false)
, matched_changes_(false)
{
}

Expand All @@ -70,15 +71,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
void
on_publication_matched(
eprosima::fastdds::dds::DataWriter * /* writer */,
const eprosima::fastdds::dds::PublicationMatchedStatus & info) final
{
std::lock_guard<std::mutex> lock(discovery_m_);
if (info.current_count_change == 1) {
subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
} else if (info.current_count_change == -1) {
subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
}
}
const eprosima::fastdds::dds::PublicationMatchedStatus & info) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
Expand Down Expand Up @@ -144,6 +137,12 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
bool incompatible_qos_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::PublicationMatchedStatus matched_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

bool matched_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
, liveliness_changes_(false)
, sample_lost_changes_(false)
, incompatible_qos_changes_(false)
, matched_changes_(false)
{
}

Expand All @@ -107,6 +108,31 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
}
}

{
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

matched_status_.total_count = info.total_count;
matched_status_.total_count_change += info.total_count_change;
matched_status_.current_count = info.current_count;
matched_status_.current_count_change += info.current_count_change;

matched_changes_ = true;

rmw_event_type_t event;
if (info.current_count_change == 1) {
event = RMW_EVENT_SUBSCRIPTION_CONNECT;
} else if (info.current_count_change == -1) {
event = RMW_EVENT_SUBSCRIPTION_DISCONNECT;
} else {
return;
}

if (on_new_event_cb_[event]) {
on_new_event_cb_[event](user_data_[event], 1);
}
event_guard[event].set_trigger_value(true);
}
}

void
Expand Down Expand Up @@ -204,6 +230,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
bool incompatible_qos_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

bool matched_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

Expand Down
79 changes: 79 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ bool PubListener::take_event(
incompatible_qos_status_.total_count_change = 0;
}
break;
case RMW_EVENT_PUBLICATION_CONNECT:
case RMW_EVENT_PUBLICATION_DISCONNECT:
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);
if (matched_changes_) {
rmw_data->total_count = matched_status_.total_count;
rmw_data->total_count_change = matched_status_.total_count_change;
rmw_data->current_count = matched_status_.current_count;
rmw_data->current_count_change = matched_status_.current_count_change;

matched_changes_ = false;
} else {
eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(matched_status);
rmw_data->total_count = matched_status.total_count;
rmw_data->total_count_change = matched_status.total_count_change;
rmw_data->current_count = matched_status.current_count;
rmw_data->current_count_change = matched_status.current_count_change;
}
}
break;
default:
return false;
}
Expand Down Expand Up @@ -133,6 +154,32 @@ void PubListener::set_on_new_event_callback(
incompatible_qos_status_.total_count_change);
incompatible_qos_status_.total_count_change = 0;
break;
case RMW_EVENT_PUBLICATION_CONNECT:
{
eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(
matched_status);
if (matched_status.current_count_change > 0) {
callback(
user_data,
matched_status.current_count_change);
}
}
break;
case RMW_EVENT_PUBLICATION_DISCONNECT:
{
eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(
matched_status);
size_t disconnect_count
= matched_status.total_count_change - matched_status.current_count_change;
if (disconnect_count > 0) {
callback(
user_data,
disconnect_count);
}
}
break;
default:
break;
}
Expand All @@ -155,6 +202,38 @@ void PubListener::set_on_new_event_callback(
}
}

void
PubListener::on_publication_matched(
eprosima::fastdds::dds::DataWriter * /* writer */,
const eprosima::fastdds::dds::PublicationMatchedStatus & info)
{
{
std::lock_guard<std::mutex> lock(discovery_m_);
if (info.current_count_change == 1) {
subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
} else if (info.current_count_change == -1) {
subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
}
}
{
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

matched_status_.total_count = info.total_count;
matched_status_.total_count_change += info.total_count_change;
matched_status_.current_count = info.current_count;
matched_status_.current_count_change += info.current_count_change;

matched_changes_ = true;

if (info.current_count_change == 1) {
trigger_event(RMW_EVENT_PUBLICATION_CONNECT);
} else if (info.current_count_change == -1) {
trigger_event(RMW_EVENT_PUBLICATION_DISCONNECT);
}
}

}

void
PubListener::on_offered_deadline_missed(
eprosima::fastdds::dds::DataWriter * /* writer */,
Expand Down
46 changes: 46 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,27 @@ bool SubListener::take_event(
incompatible_qos_status_.total_count_change = 0;
}
break;
case RMW_EVENT_SUBSCRIPTION_CONNECT:
case RMW_EVENT_SUBSCRIPTION_DISCONNECT:
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);
if (matched_changes_) {
rmw_data->total_count = matched_status_.total_count;
rmw_data->total_count_change = matched_status_.total_count_change;
rmw_data->current_count = matched_status_.current_count;
rmw_data->current_count_change = matched_status_.current_count_change;

matched_changes_ = false;
} else {
eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(matched_status);
rmw_data->total_count = matched_status.total_count;
rmw_data->total_count_change = matched_status.total_count_change;
rmw_data->current_count = matched_status.current_count;
rmw_data->current_count_change = matched_status.current_count_change;
}
}
break;
default:
return false;
}
Expand Down Expand Up @@ -174,6 +195,31 @@ void SubListener::set_on_new_event_callback(
incompatible_qos_status_.total_count_change = 0;
}
break;
case RMW_EVENT_SUBSCRIPTION_CONNECT:
{
eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(
matched_status);
if (matched_status.current_count_change > 0) {
callback(
user_data,
matched_status.current_count_change);
}
}
break;
case RMW_EVENT_SUBSCRIPTION_DISCONNECT:
{
eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(
matched_status);
size_t disconnect_count
= matched_status.total_count_change - matched_status.current_count_change;
if (disconnect_count > 0) {
callback(
user_data,
disconnect_count);
}
}
default:
break;
}
Expand Down

0 comments on commit 93e294a

Please sign in to comment.