Skip to content

Commit

Permalink
Implement matched event (ros2#2105)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Jan 12, 2024
1 parent ba9561a commit 4b8802d
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 10 deletions.
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_
using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t;

using IncompatibleTypeInfo = rmw_incompatible_type_status_t;
using MatchedInfo = rmw_matched_status_t;

using QOSDeadlineRequestedCallbackType = std::function<void (QOSDeadlineRequestedInfo &)>;
using QOSDeadlineOfferedCallbackType = std::function<void (QOSDeadlineOfferedInfo &)>;
Expand All @@ -58,6 +59,8 @@ using QOSRequestedIncompatibleQoSCallbackType =
std::function<void (QOSRequestedIncompatibleQoSInfo &)>;

using IncompatibleTypeCallbackType = std::function<void (IncompatibleTypeInfo &)>;
using PublisherMatchedCallbackType = std::function<void (MatchedInfo &)>;
using SubscriptionMatchedCallbackType = std::function<void (MatchedInfo &)>;

/// Contains callbacks for various types of events a Publisher can receive from the middleware.
struct PublisherEventCallbacks
Expand All @@ -66,6 +69,7 @@ struct PublisherEventCallbacks
QOSLivelinessLostCallbackType liveliness_callback;
QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback;
IncompatibleTypeCallbackType incompatible_type_callback;
PublisherMatchedCallbackType matched_callback;
};

/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
Expand All @@ -76,6 +80,7 @@ struct SubscriptionEventCallbacks
QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback;
QOSMessageLostCallbackType message_lost_callback;
IncompatibleTypeCallbackType incompatible_type_callback;
SubscriptionMatchedCallbackType matched_callback;
};

class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public std::runtime_error
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ PublisherBase::bind_event_callbacks(
rclcpp::get_logger("rclcpp"),
"Failed to add event handler for incompatible type; wrong callback type");
}
if (event_callbacks.matched_callback) {
this->add_event_handler(
event_callbacks.matched_callback,
RCL_PUBLISHER_MATCHED);
}
}

size_t
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ SubscriptionBase::bind_event_callbacks(
event_callbacks.message_lost_callback,
RCL_SUBSCRIPTION_MESSAGE_LOST);
}
if (event_callbacks.matched_callback) {
this->add_event_handler(
event_callbacks.matched_callback,
RCL_SUBSCRIPTION_MATCHED);
}
}

const char *
Expand Down
26 changes: 16 additions & 10 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,17 +444,23 @@ function(test_generic_pubsub_for_rmw_implementation)
endif()
endfunction()
call_for_each_rmw_implementation(test_generic_pubsub_for_rmw_implementation)
ament_add_gtest(test_qos_event test_qos_event.cpp)
if(TARGET test_qos_event)
ament_target_dependencies(test_qos_event
"rmw"
"test_msgs"
)
target_link_libraries(test_qos_event
${PROJECT_NAME}
mimick

function(test_qos_event_for_rmw_implementation)
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
ament_add_gmock(test_qos_event${target_suffix} test_qos_event.cpp
ENV ${rmw_implementation_env_var}
)
endif()
if(TARGET test_qos_event${target_suffix})
target_link_libraries(test_qos_event${target_suffix} ${PROJECT_NAME} mimick)
ament_target_dependencies(test_qos_event${target_suffix}
"rmw"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
endfunction()
call_for_each_rmw_implementation(test_qos_event_for_rmw_implementation)

ament_add_gmock(test_qos_overriding_options test_qos_overriding_options.cpp)
if(TARGET test_qos_overriding_options)
target_link_libraries(test_qos_overriding_options
Expand Down
190 changes: 190 additions & 0 deletions rclcpp/test/rclcpp/test_qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <functional>
#include <future>
Expand Down Expand Up @@ -313,6 +314,11 @@ TEST_F(TestQosEvent, add_to_wait_set) {

TEST_F(TestQosEvent, test_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1));
auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2));

Expand Down Expand Up @@ -354,6 +360,11 @@ TEST_F(TestQosEvent, test_on_new_event_callback)

TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

auto pub = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
auto dummy_cb = [](size_t count_events) {(void)count_events;};
Expand All @@ -376,6 +387,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
EXPECT_NO_THROW(
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS));

EXPECT_NO_THROW(
pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_MATCHED));

EXPECT_NO_THROW(
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_MATCHED));

EXPECT_NO_THROW(
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED));

Expand All @@ -394,6 +411,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
EXPECT_NO_THROW(
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS));

EXPECT_NO_THROW(
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_MATCHED));

EXPECT_NO_THROW(
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_MATCHED));

std::function<void(size_t)> invalid_cb;

rclcpp::SubscriptionOptions sub_options;
Expand All @@ -413,3 +436,170 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
pub->set_on_new_qos_event_callback(invalid_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
std::invalid_argument);
}

TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

std::atomic_size_t matched_count = 0;

rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback = [](auto) {};
auto pub = node->create_publisher<test_msgs::msg::Empty>(
topic_name, 10, pub_options);

auto matched_event_callback = [&matched_count](size_t count) {
matched_count += count;
};

pub->set_on_new_qos_event_callback(matched_event_callback, RCL_PUBLISHER_MATCHED);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

const auto timeout = std::chrono::milliseconds(200);

{
auto sub1 = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(1));

{
auto sub2 = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}

TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

std::atomic_size_t matched_count = 0;

rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.matched_callback = [](auto) {};
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);

auto matched_event_callback = [&matched_count](size_t count) {
matched_count += count;
};

sub->set_on_new_qos_event_callback(matched_event_callback, RCL_SUBSCRIPTION_MATCHED);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

const auto timeout = std::chrono::milliseconds(200);

{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(1));

{
auto pub2 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}

TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;

rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
};

auto pub = node->create_publisher<test_msgs::msg::Empty>(
topic_name, 10, pub_options);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

// Create a connected subscription
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 1;
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;

const auto timeout = std::chrono::milliseconds(200);

{
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);

// destroy a connected subscription
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 0;
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
}

TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;

rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
};
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

// Create a connected publisher
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 1;
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;

const auto timeout = std::chrono::milliseconds(200);
{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);

// destroy a connected publisher
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 0;
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
}

0 comments on commit 4b8802d

Please sign in to comment.