From 0ad4125c598070fe434cd732489f2f7b3f7e0e69 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 15 Feb 2023 13:30:11 +0800 Subject: [PATCH 1/5] Add matched & unmatched event support Signed-off-by: Barry Xu --- rcl/include/rcl/event.h | 4 + rcl/src/rcl/event.c | 12 ++ rcl/test/rcl/test_events.cpp | 301 +++++++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+) diff --git a/rcl/include/rcl/event.h b/rcl/include/rcl/event.h index 30858b68d..eaa3243cc 100644 --- a/rcl/include/rcl/event.h +++ b/rcl/include/rcl/event.h @@ -39,6 +39,8 @@ typedef enum rcl_publisher_event_type_e RCL_PUBLISHER_LIVELINESS_LOST, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS, RCL_PUBLISHER_INCOMPATIBLE_TYPE, + RCL_PUBLISHER_MATCHED, + RCL_PUBLISHER_UNMATCHED, } rcl_publisher_event_type_t; /// Enumeration of all of the subscription events that may fire. @@ -49,6 +51,8 @@ typedef enum rcl_subscription_event_type_e RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS, RCL_SUBSCRIPTION_MESSAGE_LOST, RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE, + RCL_SUBSCRIPTION_MATCHED, + RCL_SUBSCRIPTION_UNMATCHED, } rcl_subscription_event_type_t; /// Internal rcl implementation struct. diff --git a/rcl/src/rcl/event.c b/rcl/src/rcl/event.c index d71fafc62..0b8e4ed61 100644 --- a/rcl/src/rcl/event.c +++ b/rcl/src/rcl/event.c @@ -67,6 +67,12 @@ rcl_publisher_event_init( case RCL_PUBLISHER_INCOMPATIBLE_TYPE: rmw_event_type = RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE; break; + case RCL_PUBLISHER_MATCHED: + rmw_event_type = RMW_EVENT_PUBLICATION_MATCHED; + break; + case RCL_PUBLISHER_UNMATCHED: + rmw_event_type = RMW_EVENT_PUBLICATION_UNMATCHED; + break; default: RCL_SET_ERROR_MSG("Event type for publisher not supported"); return RCL_RET_INVALID_ARGUMENT; @@ -124,6 +130,12 @@ rcl_subscription_event_init( case RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE: rmw_event_type = RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE; break; + case RCL_SUBSCRIPTION_MATCHED: + rmw_event_type = RMW_EVENT_SUBSCRIPTION_MATCHED; + break; + case RCL_SUBSCRIPTION_UNMATCHED: + rmw_event_type = RMW_EVENT_SUBSCRIPTION_UNMATCHED; + break; default: RCL_SET_ERROR_MSG("Event type for subscription not supported"); return RCL_RET_INVALID_ARGUMENT; diff --git a/rcl/test/rcl/test_events.cpp b/rcl/test/rcl/test_events.cpp index 25fe795d4..ebf0010de 100644 --- a/rcl/test/rcl/test_events.cpp +++ b/rcl/test/rcl/test_events.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -853,3 +854,303 @@ INSTANTIATE_TEST_SUITE_P( TestEventFixture, ::testing::ValuesIn(get_test_pubsub_incompatible_qos_inputs()), TestEventFixture::PrintToStringParamName()); + +struct EventUserData +{ + std::shared_ptr event_count; +}; + +extern "C" +{ +void event_callback(const void * user_data, size_t number_of_events) +{ + (void)number_of_events; + const struct EventUserData * data = static_cast(user_data); + ASSERT_NE(nullptr, data); + (*data->event_count)++; +} +} + +/* + * Basic test of publisher matched/unmatched event + */ +TEST_F(TestEventFixture, test_pub_matched_unmatched_event) +{ + rcl_ret_t ret; + + // Create one publisher + setup_publisher(default_qos_profile); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init publisher event + rcl_event_t pub_matched_event = rcl_get_zero_initialized_event(); + ret = rcl_publisher_event_init(&pub_matched_event, &publisher, RCL_PUBLISHER_MATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&pub_matched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + rcl_event_t pub_unmatched_event = rcl_get_zero_initialized_event(); + ret = rcl_publisher_event_init(&pub_unmatched_event, &publisher, RCL_PUBLISHER_UNMATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&pub_unmatched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + struct EventUserData unmatched_data; + unmatched_data.event_count = std::make_shared(0); + + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + ret = rcl_event_set_callback(&pub_matched_event, event_callback, &matched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + ret = rcl_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data); + ASSERT_EQ(RMW_RET_OK, ret); + } + + // to take event if there is no subscription + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.current_matched_count); + EXPECT_EQ(0, matched_status.current_count_change); + + rmw_unmatched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); + EXPECT_EQ(0, unmatched_status.current_matched_count); + EXPECT_EQ(0, unmatched_status.current_count_change); + } + + // Create one subscriber + setup_subscriber(default_qos_profile); + + // Wait for connection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + nullptr, + &pub_matched_event, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(pub_event_ready, true); + } + + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + EXPECT_EQ(*matched_data.event_count, 1); + EXPECT_EQ(*unmatched_data.event_count, 0); + + *matched_data.event_count = 0; + } + + // Check matched & unmatched status + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.current_matched_count); + EXPECT_EQ(1, matched_status.current_count_change); + + rmw_unmatched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); + EXPECT_EQ(1, unmatched_status.current_matched_count); + EXPECT_EQ(0, unmatched_status.current_count_change); + } + + // Delete subscriber + ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + // Wait for disconnection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + nullptr, + &pub_unmatched_event, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(pub_event_ready, true); + } + + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + EXPECT_EQ(*matched_data.event_count, 0); + EXPECT_EQ(*unmatched_data.event_count, 1); + } + + // Check unmatched status + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.current_matched_count); + EXPECT_EQ(0, matched_status.current_count_change); + + rmw_unmatched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); + EXPECT_EQ(0, unmatched_status.current_matched_count); + EXPECT_EQ(1, unmatched_status.current_count_change); + } +} + +/* + * Basic test of publisher matched/unmatched event + */ +TEST_F(TestEventFixture, test_sub_matched_unmatched_event) +{ + rcl_ret_t ret; + + // Create one subscriber + setup_subscriber(default_qos_profile); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init subscriber event + rcl_event_t sub_matched_event = rcl_get_zero_initialized_event(); + ret = rcl_subscription_event_init(&sub_matched_event, &subscription, RCL_SUBSCRIPTION_MATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&sub_matched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + rcl_event_t sub_unmatched_event = rcl_get_zero_initialized_event(); + ret = rcl_subscription_event_init( + &sub_unmatched_event, + &subscription, + RCL_SUBSCRIPTION_UNMATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&sub_unmatched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + struct EventUserData unmatched_data; + unmatched_data.event_count = std::make_shared(0); + + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + ret = rcl_event_set_callback(&sub_matched_event, event_callback, &matched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + ret = rcl_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data); + ASSERT_EQ(RMW_RET_OK, ret); + } + + // to take event if there is no subscription + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.current_matched_count); + EXPECT_EQ(0, matched_status.current_count_change); + + rmw_unmatched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); + EXPECT_EQ(0, unmatched_status.current_matched_count); + EXPECT_EQ(0, unmatched_status.current_count_change); + } + + // Create one publisher + setup_publisher(default_qos_profile); + + // Wait for connection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + &sub_matched_event, + nullptr, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(sub_event_ready, true); + } + + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + EXPECT_EQ(*matched_data.event_count, 1); + EXPECT_EQ(*unmatched_data.event_count, 0); + + *matched_data.event_count = 0; + } + + // Check matched status + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.current_matched_count); + EXPECT_EQ(1, matched_status.current_count_change); + + rmw_unmatched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); + EXPECT_EQ(1, unmatched_status.current_matched_count); + EXPECT_EQ(0, unmatched_status.current_count_change); + } + + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + // Wait for disconnection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + &sub_unmatched_event, + nullptr, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(sub_event_ready, true); + } + + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { + EXPECT_EQ(*matched_data.event_count, 0); + EXPECT_EQ(*unmatched_data.event_count, 1); + } + + // Check unmatched status + { + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.current_matched_count); + EXPECT_EQ(0, matched_status.current_count_change); + + rmw_matched_status_t unmatched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); + EXPECT_EQ(0, unmatched_status.current_matched_count); + EXPECT_EQ(1, unmatched_status.current_count_change); + } +} From 351b2ecda1156af0ff3750d0c7b539a65158eb51 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 28 Feb 2023 14:37:44 +0800 Subject: [PATCH 2/5] Revise comments and add test cases Signed-off-by: Barry Xu --- rcl/test/rcl/test_events.cpp | 201 ++++++++++++++++++++++++++++++++++- 1 file changed, 200 insertions(+), 1 deletion(-) diff --git a/rcl/test/rcl/test_events.cpp b/rcl/test/rcl/test_events.cpp index ebf0010de..47293344d 100644 --- a/rcl/test/rcl/test_events.cpp +++ b/rcl/test/rcl/test_events.cpp @@ -1013,7 +1013,7 @@ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) } /* - * Basic test of publisher matched/unmatched event + * Basic test of subscription matched/unmatched event */ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) { @@ -1154,3 +1154,202 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) EXPECT_EQ(1, unmatched_status.current_count_change); } } + +TEST_F(TestEventFixture, test_pub_no_matched_unmatched_event) +{ + // Registered callback for matched/unmatched event should not be triggered for previous + // matched/unmatched event. + + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + rcl_ret_t ret; + + // Create one publisher + setup_publisher(default_qos_profile); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init publisher event + rcl_event_t pub_matched_event = rcl_get_zero_initialized_event(); + ret = rcl_publisher_event_init(&pub_matched_event, &publisher, RCL_PUBLISHER_MATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&pub_matched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + rcl_event_t pub_unmatched_event = rcl_get_zero_initialized_event(); + ret = rcl_publisher_event_init(&pub_unmatched_event, &publisher, RCL_PUBLISHER_UNMATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&pub_unmatched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // Create one subscriber + setup_subscriber(default_qos_profile); + + // Wait for connection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + nullptr, + &pub_matched_event, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(pub_event_ready, true); + } + + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + struct EventUserData unmatched_data; + unmatched_data.event_count = std::make_shared(0); + + ret = rcl_event_set_callback(&pub_matched_event, event_callback, &matched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(*matched_data.event_count, 0); + + // Delete subscriber + ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + // Wait for disconnection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + nullptr, + &pub_unmatched_event, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(pub_event_ready, true); + } + + ret = rcl_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(*unmatched_data.event_count, 0); +} + +TEST_F(TestEventFixture, test_sub_no_matched_unmatched_event) +{ + // Registered callback for matched/unmatched event should not be triggered for previous + // matched/unmatched event. + + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + rcl_ret_t ret; + + // Create one subscriber + setup_subscriber(default_qos_profile); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init subscriber event + rcl_event_t sub_matched_event = rcl_get_zero_initialized_event(); + ret = rcl_subscription_event_init(&sub_matched_event, &subscription, RCL_SUBSCRIPTION_MATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&sub_matched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + rcl_event_t sub_unmatched_event = rcl_get_zero_initialized_event(); + ret = rcl_subscription_event_init( + &sub_unmatched_event, + &subscription, + RCL_SUBSCRIPTION_UNMATCHED); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + ret = rcl_event_fini(&sub_unmatched_event); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + }); + + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + struct EventUserData unmatched_data; + unmatched_data.event_count = std::make_shared(0); + + // Create one publisher + setup_publisher(default_qos_profile); + + // Wait for connection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + &sub_matched_event, + nullptr, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(sub_event_ready, true); + } + + ret = rcl_event_set_callback(&sub_matched_event, event_callback, &matched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(*matched_data.event_count, 0); + + // Delete publisher + ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + // Wait for disconnection + { + bool msg_ready = false; + bool sub_event_ready = false; + bool pub_event_ready = false; + ret = wait_for_msgs_and_events( + context_ptr, + nullptr, + &sub_unmatched_event, + nullptr, + &msg_ready, + &sub_event_ready, + &pub_event_ready); + ASSERT_EQ(RMW_RET_OK, ret); + ASSERT_EQ(sub_event_ready, true); + } + + ret = rcl_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data); + ASSERT_EQ(RMW_RET_OK, ret); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(*unmatched_data.event_count, 0); +} From 3a482906f67dba3fd7b9d02aca643e678e66e885 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 2 Mar 2023 16:13:55 +0800 Subject: [PATCH 3/5] Update test case since processing previous event while registering callback Signed-off-by: Barry Xu --- rcl/test/rcl/test_events.cpp | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/rcl/test/rcl/test_events.cpp b/rcl/test/rcl/test_events.cpp index 47293344d..5073ec1eb 100644 --- a/rcl/test/rcl/test_events.cpp +++ b/rcl/test/rcl/test_events.cpp @@ -1155,10 +1155,10 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) } } -TEST_F(TestEventFixture, test_pub_no_matched_unmatched_event) +TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) { - // Registered callback for matched/unmatched event should not be triggered for previous - // matched/unmatched event. + // While registering callback for matched/unmatched event, exist previous matched/unmatched event + // will trigger callback at once. // rmw_connextdds doesn't support rmw_event_set_callback() interface if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { @@ -1222,8 +1222,7 @@ TEST_F(TestEventFixture, test_pub_no_matched_unmatched_event) ret = rcl_event_set_callback(&pub_matched_event, event_callback, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - std::this_thread::sleep_for(200ms); - EXPECT_EQ(*matched_data.event_count, 0); + EXPECT_EQ(*matched_data.event_count, 1); // Delete subscriber ret = rcl_subscription_fini(&subscription, this->node_ptr); @@ -1249,14 +1248,13 @@ TEST_F(TestEventFixture, test_pub_no_matched_unmatched_event) ret = rcl_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data); ASSERT_EQ(RMW_RET_OK, ret); - std::this_thread::sleep_for(200ms); - EXPECT_EQ(*unmatched_data.event_count, 0); + EXPECT_EQ(*unmatched_data.event_count, 1); } -TEST_F(TestEventFixture, test_sub_no_matched_unmatched_event) +TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) { - // Registered callback for matched/unmatched event should not be triggered for previous - // matched/unmatched event. + // While registering callback for matched/unmatched event, exist previous matched/unmatched event + // will trigger callback at once. // rmw_connextdds doesn't support rmw_event_set_callback() interface if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { @@ -1323,8 +1321,7 @@ TEST_F(TestEventFixture, test_sub_no_matched_unmatched_event) ret = rcl_event_set_callback(&sub_matched_event, event_callback, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - std::this_thread::sleep_for(200ms); - EXPECT_EQ(*matched_data.event_count, 0); + EXPECT_EQ(*matched_data.event_count, 1); // Delete publisher ret = rcl_publisher_fini(&publisher, this->node_ptr); @@ -1350,6 +1347,5 @@ TEST_F(TestEventFixture, test_sub_no_matched_unmatched_event) ret = rcl_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data); ASSERT_EQ(RMW_RET_OK, ret); - std::this_thread::sleep_for(200ms); - EXPECT_EQ(*unmatched_data.event_count, 0); + EXPECT_EQ(*unmatched_data.event_count, 1); } From 4052b24886493a436ae3dd9ddccffab5ae7bc51e Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 16 Mar 2023 20:09:41 +0800 Subject: [PATCH 4/5] Update to support only matched event Signed-off-by: Barry Xu --- rcl/include/rcl/event.h | 2 - rcl/src/rcl/event.c | 6 - rcl/test/rcl/test_events.cpp | 243 +++++++++++------------------------ 3 files changed, 78 insertions(+), 173 deletions(-) diff --git a/rcl/include/rcl/event.h b/rcl/include/rcl/event.h index eaa3243cc..0d57c93c1 100644 --- a/rcl/include/rcl/event.h +++ b/rcl/include/rcl/event.h @@ -40,7 +40,6 @@ typedef enum rcl_publisher_event_type_e RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS, RCL_PUBLISHER_INCOMPATIBLE_TYPE, RCL_PUBLISHER_MATCHED, - RCL_PUBLISHER_UNMATCHED, } rcl_publisher_event_type_t; /// Enumeration of all of the subscription events that may fire. @@ -52,7 +51,6 @@ typedef enum rcl_subscription_event_type_e RCL_SUBSCRIPTION_MESSAGE_LOST, RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE, RCL_SUBSCRIPTION_MATCHED, - RCL_SUBSCRIPTION_UNMATCHED, } rcl_subscription_event_type_t; /// Internal rcl implementation struct. diff --git a/rcl/src/rcl/event.c b/rcl/src/rcl/event.c index 0b8e4ed61..2bb2ffeec 100644 --- a/rcl/src/rcl/event.c +++ b/rcl/src/rcl/event.c @@ -70,9 +70,6 @@ rcl_publisher_event_init( case RCL_PUBLISHER_MATCHED: rmw_event_type = RMW_EVENT_PUBLICATION_MATCHED; break; - case RCL_PUBLISHER_UNMATCHED: - rmw_event_type = RMW_EVENT_PUBLICATION_UNMATCHED; - break; default: RCL_SET_ERROR_MSG("Event type for publisher not supported"); return RCL_RET_INVALID_ARGUMENT; @@ -133,9 +130,6 @@ rcl_subscription_event_init( case RCL_SUBSCRIPTION_MATCHED: rmw_event_type = RMW_EVENT_SUBSCRIPTION_MATCHED; break; - case RCL_SUBSCRIPTION_UNMATCHED: - rmw_event_type = RMW_EVENT_SUBSCRIPTION_UNMATCHED; - break; default: RCL_SET_ERROR_MSG("Event type for subscription not supported"); return RCL_RET_INVALID_ARGUMENT; diff --git a/rcl/test/rcl/test_events.cpp b/rcl/test/rcl/test_events.cpp index 5073ec1eb..47f9b9922 100644 --- a/rcl/test/rcl/test_events.cpp +++ b/rcl/test/rcl/test_events.cpp @@ -872,7 +872,7 @@ void event_callback(const void * user_data, size_t number_of_events) } /* - * Basic test of publisher matched/unmatched event + * Basic test of publisher matched event */ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) { @@ -895,42 +895,24 @@ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) ret = rcl_event_fini(&pub_matched_event); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; }); - rcl_event_t pub_unmatched_event = rcl_get_zero_initialized_event(); - ret = rcl_publisher_event_init(&pub_unmatched_event, &publisher, RCL_PUBLISHER_UNMATCHED); - ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - ret = rcl_event_fini(&pub_unmatched_event); - EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - }); // init event callback struct EventUserData matched_data; matched_data.event_count = std::make_shared(0); - struct EventUserData unmatched_data; - unmatched_data.event_count = std::make_shared(0); - // rmw_connextdds doesn't support rmw_event_set_callback() interface + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { ret = rcl_event_set_callback(&pub_matched_event, event_callback, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - - ret = rcl_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data); - ASSERT_EQ(RMW_RET_OK, ret); } - // to take event if there is no subscription - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); - EXPECT_EQ(0, matched_status.current_matched_count); - EXPECT_EQ(0, matched_status.current_count_change); - - rmw_unmatched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); - EXPECT_EQ(0, unmatched_status.current_matched_count); - EXPECT_EQ(0, unmatched_status.current_count_change); - } + // to take event while there is no subscription + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.total_count); + EXPECT_EQ(0, matched_status.total_count_change); + EXPECT_EQ(0, matched_status.current_count); + EXPECT_EQ(0, matched_status.current_count_change); // Create one subscriber setup_subscriber(default_qos_profile); @@ -952,25 +934,19 @@ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) ASSERT_EQ(pub_event_ready, true); } + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { EXPECT_EQ(*matched_data.event_count, 1); - EXPECT_EQ(*unmatched_data.event_count, 0); - - *matched_data.event_count = 0; } - // Check matched & unmatched status - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); - EXPECT_EQ(1, matched_status.current_matched_count); - EXPECT_EQ(1, matched_status.current_count_change); - - rmw_unmatched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); - EXPECT_EQ(1, unmatched_status.current_matched_count); - EXPECT_EQ(0, unmatched_status.current_count_change); - } + *matched_data.event_count = 0; + + // check matched status + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.total_count); + EXPECT_EQ(1, matched_status.total_count_change); + EXPECT_EQ(1, matched_status.current_count); + EXPECT_EQ(1, matched_status.current_count_change); // Delete subscriber ret = rcl_subscription_fini(&subscription, this->node_ptr); @@ -985,7 +961,7 @@ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) context_ptr, nullptr, nullptr, - &pub_unmatched_event, + &pub_matched_event, &msg_ready, &sub_event_ready, &pub_event_ready); @@ -993,27 +969,21 @@ TEST_F(TestEventFixture, test_pub_matched_unmatched_event) ASSERT_EQ(pub_event_ready, true); } + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { - EXPECT_EQ(*matched_data.event_count, 0); - EXPECT_EQ(*unmatched_data.event_count, 1); + EXPECT_EQ(*matched_data.event_count, 1); } // Check unmatched status - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); - EXPECT_EQ(0, matched_status.current_matched_count); - EXPECT_EQ(0, matched_status.current_count_change); - - rmw_unmatched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_unmatched_event, &unmatched_status)); - EXPECT_EQ(0, unmatched_status.current_matched_count); - EXPECT_EQ(1, unmatched_status.current_count_change); - } + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&pub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.total_count); + EXPECT_EQ(0, matched_status.total_count_change); + EXPECT_EQ(0, matched_status.current_count); + EXPECT_EQ(-1, matched_status.current_count_change); } /* - * Basic test of subscription matched/unmatched event + * Basic test of subscription matched event */ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) { @@ -1036,45 +1006,24 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) ret = rcl_event_fini(&sub_matched_event); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; }); - rcl_event_t sub_unmatched_event = rcl_get_zero_initialized_event(); - ret = rcl_subscription_event_init( - &sub_unmatched_event, - &subscription, - RCL_SUBSCRIPTION_UNMATCHED); - ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - ret = rcl_event_fini(&sub_unmatched_event); - EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - }); // init event callback struct EventUserData matched_data; matched_data.event_count = std::make_shared(0); - struct EventUserData unmatched_data; - unmatched_data.event_count = std::make_shared(0); - // rmw_connextdds doesn't support rmw_event_set_callback() interface + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { ret = rcl_event_set_callback(&sub_matched_event, event_callback, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - - ret = rcl_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data); - ASSERT_EQ(RMW_RET_OK, ret); } // to take event if there is no subscription - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); - EXPECT_EQ(0, matched_status.current_matched_count); - EXPECT_EQ(0, matched_status.current_count_change); - - rmw_unmatched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); - EXPECT_EQ(0, unmatched_status.current_matched_count); - EXPECT_EQ(0, unmatched_status.current_count_change); - } + rmw_matched_status_t matched_status; + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(0, matched_status.total_count); + EXPECT_EQ(0, matched_status.total_count_change); + EXPECT_EQ(0, matched_status.current_count); + EXPECT_EQ(0, matched_status.current_count_change); // Create one publisher setup_publisher(default_qos_profile); @@ -1096,25 +1045,18 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) ASSERT_EQ(sub_event_ready, true); } + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { EXPECT_EQ(*matched_data.event_count, 1); - EXPECT_EQ(*unmatched_data.event_count, 0); - - *matched_data.event_count = 0; } + *matched_data.event_count = 0; // Check matched status - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); - EXPECT_EQ(1, matched_status.current_matched_count); - EXPECT_EQ(1, matched_status.current_count_change); - - rmw_unmatched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); - EXPECT_EQ(1, unmatched_status.current_matched_count); - EXPECT_EQ(0, unmatched_status.current_count_change); - } + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.total_count); + EXPECT_EQ(1, matched_status.total_count_change); + EXPECT_EQ(1, matched_status.current_count); + EXPECT_EQ(1, matched_status.total_count_change); ret = rcl_publisher_fini(&publisher, this->node_ptr); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; @@ -1127,7 +1069,7 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) ret = wait_for_msgs_and_events( context_ptr, nullptr, - &sub_unmatched_event, + &sub_matched_event, nullptr, &msg_ready, &sub_event_ready, @@ -1136,28 +1078,33 @@ TEST_F(TestEventFixture, test_sub_matched_unmatched_event) ASSERT_EQ(sub_event_ready, true); } + // rmw_connextdds doesn't support rmw_event_set_callback() interface. if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != 0) { - EXPECT_EQ(*matched_data.event_count, 0); - EXPECT_EQ(*unmatched_data.event_count, 1); + EXPECT_EQ(*matched_data.event_count, 1); } - // Check unmatched status - { - rmw_matched_status_t matched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); - EXPECT_EQ(0, matched_status.current_matched_count); - EXPECT_EQ(0, matched_status.current_count_change); - - rmw_matched_status_t unmatched_status; - EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_unmatched_event, &unmatched_status)); - EXPECT_EQ(0, unmatched_status.current_matched_count); - EXPECT_EQ(1, unmatched_status.current_count_change); - } + // Check matched status change + EXPECT_EQ(RMW_RET_OK, rcl_take_event(&sub_matched_event, &matched_status)); + EXPECT_EQ(1, matched_status.total_count); + EXPECT_EQ(0, matched_status.total_count_change); + EXPECT_EQ(0, matched_status.current_count); + EXPECT_EQ(-1, matched_status.current_count_change); +} + +extern "C" +{ +void event_callback2(const void * user_data, size_t number_of_events) +{ + (void)number_of_events; + const struct EventUserData * data = static_cast(user_data); + ASSERT_NE(nullptr, data); + *data->event_count = number_of_events; +} } -TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) +TEST_F(TestEventFixture, test_pub_previous_matched_event) { - // While registering callback for matched/unmatched event, exist previous matched/unmatched event + // While registering callback for matched event, exist previous matched event // will trigger callback at once. // rmw_connextdds doesn't support rmw_event_set_callback() interface @@ -1184,14 +1131,6 @@ TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) ret = rcl_event_fini(&pub_matched_event); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; }); - rcl_event_t pub_unmatched_event = rcl_get_zero_initialized_event(); - ret = rcl_publisher_event_init(&pub_unmatched_event, &publisher, RCL_PUBLISHER_UNMATCHED); - ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - ret = rcl_event_fini(&pub_unmatched_event); - EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - }); // Create one subscriber setup_subscriber(default_qos_profile); @@ -1213,17 +1152,6 @@ TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) ASSERT_EQ(pub_event_ready, true); } - // init event callback - struct EventUserData matched_data; - matched_data.event_count = std::make_shared(0); - struct EventUserData unmatched_data; - unmatched_data.event_count = std::make_shared(0); - - ret = rcl_event_set_callback(&pub_matched_event, event_callback, &matched_data); - ASSERT_EQ(RMW_RET_OK, ret); - - EXPECT_EQ(*matched_data.event_count, 1); - // Delete subscriber ret = rcl_subscription_fini(&subscription, this->node_ptr); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; @@ -1237,7 +1165,7 @@ TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) context_ptr, nullptr, nullptr, - &pub_unmatched_event, + &pub_matched_event, &msg_ready, &sub_event_ready, &pub_event_ready); @@ -1245,15 +1173,19 @@ TEST_F(TestEventFixture, test_pub_previous_matched_unmatched_event) ASSERT_EQ(pub_event_ready, true); } - ret = rcl_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data); + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + ret = rcl_event_set_callback(&pub_matched_event, event_callback2, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - EXPECT_EQ(*unmatched_data.event_count, 1); + // matched event happen twice. One for connection and another for disconnection. + EXPECT_EQ(*matched_data.event_count, 2); } -TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) +TEST_F(TestEventFixture, test_sub_previous_matched_event) { - // While registering callback for matched/unmatched event, exist previous matched/unmatched event + // While registering callback for matched event, exist previous matched event // will trigger callback at once. // rmw_connextdds doesn't support rmw_event_set_callback() interface @@ -1280,23 +1212,6 @@ TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) ret = rcl_event_fini(&sub_matched_event); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; }); - rcl_event_t sub_unmatched_event = rcl_get_zero_initialized_event(); - ret = rcl_subscription_event_init( - &sub_unmatched_event, - &subscription, - RCL_SUBSCRIPTION_UNMATCHED); - ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( - { - ret = rcl_event_fini(&sub_unmatched_event); - EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; - }); - - // init event callback - struct EventUserData matched_data; - matched_data.event_count = std::make_shared(0); - struct EventUserData unmatched_data; - unmatched_data.event_count = std::make_shared(0); // Create one publisher setup_publisher(default_qos_profile); @@ -1318,11 +1233,6 @@ TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) ASSERT_EQ(sub_event_ready, true); } - ret = rcl_event_set_callback(&sub_matched_event, event_callback, &matched_data); - ASSERT_EQ(RMW_RET_OK, ret); - - EXPECT_EQ(*matched_data.event_count, 1); - // Delete publisher ret = rcl_publisher_fini(&publisher, this->node_ptr); EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; @@ -1335,7 +1245,7 @@ TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) ret = wait_for_msgs_and_events( context_ptr, nullptr, - &sub_unmatched_event, + &sub_matched_event, nullptr, &msg_ready, &sub_event_ready, @@ -1344,8 +1254,11 @@ TEST_F(TestEventFixture, test_sub_previous_matched_unmatched_event) ASSERT_EQ(sub_event_ready, true); } - ret = rcl_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data); + // init event callback + struct EventUserData matched_data; + matched_data.event_count = std::make_shared(0); + ret = rcl_event_set_callback(&sub_matched_event, event_callback2, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - EXPECT_EQ(*unmatched_data.event_count, 1); + EXPECT_EQ(*matched_data.event_count, 2); } From 588c8c436366262d0bb55b04d1d954bb80d07935 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 17 Mar 2023 16:21:05 +0800 Subject: [PATCH 5/5] Update test code Signed-off-by: Barry Xu --- rcl/test/rcl/test_events.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rcl/test/rcl/test_events.cpp b/rcl/test/rcl/test_events.cpp index 47f9b9922..34f1a0a11 100644 --- a/rcl/test/rcl/test_events.cpp +++ b/rcl/test/rcl/test_events.cpp @@ -1180,7 +1180,10 @@ TEST_F(TestEventFixture, test_pub_previous_matched_event) ASSERT_EQ(RMW_RET_OK, ret); // matched event happen twice. One for connection and another for disconnection. - EXPECT_EQ(*matched_data.event_count, 2); + // Note that different DDS have different implementation. + // This behavior isn't defined in DDS specification. + // So check if event count >= 1 + EXPECT_GE(*matched_data.event_count, 1); } TEST_F(TestEventFixture, test_sub_previous_matched_event) @@ -1260,5 +1263,9 @@ TEST_F(TestEventFixture, test_sub_previous_matched_event) ret = rcl_event_set_callback(&sub_matched_event, event_callback2, &matched_data); ASSERT_EQ(RMW_RET_OK, ret); - EXPECT_EQ(*matched_data.event_count, 2); + // matched event happen twice. One for connection and another for disconnection. + // Note that different DDS have different implementation. + // This behavior isn't defined in DDS specification. + // So check if event count >= 1. + EXPECT_GE(*matched_data.event_count, 1); }