diff --git a/rclpy/rclpy/qos_event.py b/rclpy/rclpy/qos_event.py index 4969816da..f18441285 100644 --- a/rclpy/rclpy/qos_event.py +++ b/rclpy/rclpy/qos_event.py @@ -42,12 +42,24 @@ # Payload type for Subscription Incompatible QoS callback. QoSRequestedIncompatibleQoSInfo = _rclpy.rmw_requested_qos_incompatible_event_status_t +# Payload type for Subscription matched callback. +QoSSubscriptionMatchedInfo = _rclpy.rmw_matched_status_t + +# Payload type for Subscription unmatched callback. +QoSSubscriptionUnmatchedInfo = _rclpy.rmw_unmatched_status_t + # Payload type for Publisher Deadline callback. QoSOfferedDeadlineMissedInfo = _rclpy.rmw_offered_deadline_missed_status_t # Payload type for Publisher Liveliness callback. QoSLivelinessLostInfo = _rclpy.rmw_liveliness_lost_status_t +# Payload type for Publisher matched callback. +QoSPublisherMatchedInfo = _rclpy.rmw_matched_status_t + +# Payload type for Publisher unmatched callback. +QoSPublisherUnmatchedInfo = _rclpy.rmw_unmatched_status_t + """ Payload type for Publisher Incompatible QoS callback. @@ -136,6 +148,8 @@ def __init__( incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None, liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None, message_lost: Optional[Callable[[QoSMessageLostInfo], None]] = None, + matched: Optional[Callable[[QoSSubscriptionMatchedInfo], None]] = None, + unmatched: Optional[Callable[[QoSSubscriptionUnmatchedInfo], None]] = None, use_default_callbacks: bool = True, ) -> None: """ @@ -148,6 +162,8 @@ def __init__( :param liveliness: A user-defined callback that is called when the Liveliness of a Publisher on subscribed topic changes. :param message_lost: A user-defined callback that is called when a messages is lost. + :param matched: A user-defined callback that is called when a Publisher is matched. + :param unmatched: A user-defined callback that is called when a Publisher is unmatched. :param use_default_callbacks: Whether or not to use default callbacks when the user doesn't supply one """ @@ -155,6 +171,8 @@ def __init__( self.incompatible_qos = incompatible_qos self.liveliness = liveliness self.message_lost = message_lost + self.matched = matched + self.unmatched = unmatched self.use_default_callbacks = use_default_callbacks def create_event_handlers( @@ -211,6 +229,19 @@ def _default_incompatible_qos_callback(event): event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MESSAGE_LOST, parent_impl=subscription)) + if self.matched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.matched, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED, + parent_impl=subscription)) + + if self.unmatched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.unmatched, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED, + parent_impl=subscription)) return event_handlers @@ -223,6 +254,8 @@ def __init__( deadline: Optional[Callable[[QoSOfferedDeadlineMissedInfo], None]] = None, liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None, incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None, + matched: Optional[Callable[[QoSPublisherMatchedInfo], None]] = None, + unmatched: Optional[Callable[[QoSPublisherUnmatchedInfo], None]] = None, use_default_callbacks: bool = True, ) -> None: """ @@ -234,12 +267,16 @@ def __init__( fails to signal its Liveliness and is reported as not-alive. :param incompatible_qos: A user-defined callback that is called when a Subscription with incompatible QoS policies is discovered on subscribed topic. + :param matched: A user-defined callback that is called when a Subscription is matched. + :param unmatched: A user-defined callback that is called when a Subscription is unmatched. :param use_default_callbacks: Whether or not to use default callbacks when the user doesn't supply one """ self.deadline = deadline self.liveliness = liveliness self.incompatible_qos = incompatible_qos + self.matched = matched + self.unmatched = unmatched self.use_default_callbacks = use_default_callbacks def create_event_handlers( @@ -288,4 +325,18 @@ def _default_incompatible_qos_callback(event): except UnsupportedEventTypeError: pass + if self.matched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.matched, + event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED, + parent_impl=publisher)) + + if self.unmatched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.unmatched, + event_type=QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED, + parent_impl=publisher)) + return event_handlers diff --git a/rclpy/src/rclpy/qos_event.cpp b/rclpy/src/rclpy/qos_event.cpp index a5a11565c..baa364f28 100644 --- a/rclpy/src/rclpy/qos_event.cpp +++ b/rclpy/src/rclpy/qos_event.cpp @@ -108,12 +108,30 @@ typedef union qos_event_callback_data { rmw_liveliness_changed_status_t liveliness_changed; rmw_message_lost_status_t message_lost; rmw_requested_qos_incompatible_event_status_t requested_incompatible_qos; + rmw_matched_status_t subscription_matched; + rmw_unmatched_status_t subscription_unmatched; // Publisher events rmw_offered_deadline_missed_status_t offered_deadline_missed; rmw_liveliness_lost_status_t liveliness_lost; rmw_offered_qos_incompatible_event_status_t offered_incompatible_qos; + rmw_matched_status_t publisher_matched; + rmw_unmatched_status_t publisher_unmatched; } qos_event_callback_data_t; +struct rmw_matched_status_python_t : rmw_matched_unmatched_status_s +{ + rmw_matched_status_python_t() = default; + explicit rmw_matched_status_python_t(const rmw_matched_unmatched_status_s & status) + : rmw_matched_unmatched_status_s(status) {} +}; + +struct rmw_unmatched_status_python_t : rmw_matched_unmatched_status_s +{ + rmw_unmatched_status_python_t() = default; + explicit rmw_unmatched_status_python_t(const rmw_matched_unmatched_status_s & status) + : rmw_matched_unmatched_status_s(status) {} +}; + py::object QoSEvent::take_event() { @@ -140,6 +158,10 @@ QoSEvent::take_event() return py::cast(data.message_lost); case RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS: return py::cast(data.requested_incompatible_qos); + case RCL_SUBSCRIPTION_MATCHED: + return py::cast(rmw_matched_status_python_t(data.subscription_matched)); + case RCL_SUBSCRIPTION_UNMATCHED: + return py::cast(rmw_unmatched_status_python_t(data.subscription_unmatched)); default: // suggests a misalignment between C and Python interfaces throw py::value_error("event type for subscriptions not understood"); @@ -152,6 +174,10 @@ QoSEvent::take_event() return py::cast(data.liveliness_lost); case RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS: return py::cast(data.offered_incompatible_qos); + case RCL_PUBLISHER_MATCHED: + return py::cast(rmw_matched_status_python_t(data.publisher_matched)); + case RCL_PUBLISHER_UNMATCHED: + return py::cast(rmw_unmatched_status_python_t(data.publisher_unmatched)); default: // suggests a misalignment between C and Python interfaces throw py::value_error("event type for publishers not understood"); @@ -179,12 +205,16 @@ define_qos_event(py::module module) .value("RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED", RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) .value("RCL_SUBSCRIPTION_LIVELINESS_CHANGED", RCL_SUBSCRIPTION_LIVELINESS_CHANGED) .value("RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS", RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS) - .value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST); + .value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST) + .value("RCL_SUBSCRIPTION_MATCHED", RCL_SUBSCRIPTION_MATCHED) + .value("RCL_SUBSCRIPTION_UNMATCHED", RCL_SUBSCRIPTION_UNMATCHED); py::enum_(module, "rcl_publisher_event_type_t") .value("RCL_PUBLISHER_OFFERED_DEADLINE_MISSED", RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) .value("RCL_PUBLISHER_LIVELINESS_LOST", RCL_PUBLISHER_LIVELINESS_LOST) - .value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + .value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS) + .value("RCL_PUBLISHER_MATCHED", RCL_PUBLISHER_MATCHED) + .value("RCL_PUBLISHER_UNMATCHED", RCL_PUBLISHER_UNMATCHED); py::class_( module, "rmw_requested_deadline_missed_status_t") @@ -223,6 +253,19 @@ define_qos_event(py::module module) .def_readonly("total_count", &rmw_liveliness_lost_status_t::total_count) .def_readonly("total_count_change", &rmw_liveliness_lost_status_t::total_count_change); + py::class_( + module, "rmw_matched_unmatched_status_s", py::module_local()) + .def_readonly("count_matched_change", &rmw_matched_status_t::current_matched_count) + .def_readonly("current_count_change", &rmw_matched_status_t::current_count_change); + + py::class_( + module, "rmw_matched_status_t") + .def(py::init<>()); + + py::class_( + module, "rmw_unmatched_status_t") + .def(py::init<>()); + py::enum_(module, "rmw_qos_policy_kind_t") .value("RMW_QOS_POLICY_INVALID", RMW_QOS_POLICY_INVALID) .value("RMW_QOS_POLICY_DURABILITY", RMW_QOS_POLICY_DURABILITY) diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index 4acfc1b90..1ad43c6e0 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -27,9 +27,13 @@ from rclpy.qos_event import QoSOfferedDeadlineMissedInfo from rclpy.qos_event import QoSOfferedIncompatibleQoSInfo from rclpy.qos_event import QoSPublisherEventType +from rclpy.qos_event import QoSPublisherMatchedInfo +from rclpy.qos_event import QoSPublisherUnmatchedInfo from rclpy.qos_event import QoSRequestedDeadlineMissedInfo from rclpy.qos_event import QoSRequestedIncompatibleQoSInfo from rclpy.qos_event import QoSSubscriptionEventType +from rclpy.qos_event import QoSSubscriptionMatchedInfo +from rclpy.qos_event import QoSSubscriptionUnmatchedInfo from rclpy.qos_event import SubscriptionEventCallbacks from rclpy.task import Future @@ -208,6 +212,10 @@ def test_publisher_event_create_destroy(self): publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) self._do_create_destroy( publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS) + self._do_create_destroy( + publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED) + self._do_create_destroy( + publisher, QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED) self.node.destroy_publisher(publisher) def test_subscription_event_create_destroy(self): @@ -220,6 +228,10 @@ def test_subscription_event_create_destroy(self): subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) self._do_create_destroy( subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS) + self._do_create_destroy( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED) + self._do_create_destroy( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED) self.node.destroy_subscription(subscription) def test_call_publisher_rclpy_event_apis(self): @@ -354,3 +366,139 @@ def test_call_subscription_rclpy_event_apis(self): pass self.node.destroy_subscription(subscription) + + def test_call_publisher_rclpy_event_matched_unmatched(self): + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) + with self.context.handle: + wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle) + + matched_event_handle = self._create_event_handle( + publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED) + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + unmatched_event_handle = self._create_event_handle( + publisher, QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + wait_set.wait(0) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + self.assertFalse(wait_set.is_ready('event', unmatched_event_index)) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + subscription = self.node.create_subscription(EmptyMsg, self.topic_name, Mock(), 10) + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + self.assertFalse(wait_set.is_ready('event', unmatched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertIsInstance(matched_status, QoSPublisherMatchedInfo) + self.assertEqual(matched_status.count_matched_change, 1) + self.assertEqual(matched_status.current_count_change, 1) + + unmatched_status = unmatched_event_handle.take_event() + self.assertIsInstance(unmatched_status, QoSPublisherUnmatchedInfo) + self.assertEqual(unmatched_status.count_matched_change, 1) + self.assertEqual(unmatched_status.current_count_change, 0) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + subscription.destroy() + # wait 500ms + wait_set.wait(500000000) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + self.assertTrue(wait_set.is_ready('event', unmatched_event_index)) + + unmatched_status = unmatched_event_handle.take_event() + self.assertEqual(unmatched_status.count_matched_change, 0) + self.assertEqual(unmatched_status.current_count_change, 1) + + matched_status = matched_event_handle.take_event() + self.assertEqual(matched_status.count_matched_change, 0) + self.assertEqual(matched_status.current_count_change, 0) + + def test_call_subscription_rclpy_event_matched_unmatched(self): + message_callback = Mock() + subscription = self.node.create_subscription( + EmptyMsg, self.topic_name, message_callback, 10) + with self.context.handle: + wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle) + + matched_event_handle = self._create_event_handle( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED) + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + unmatched_event_handle = self._create_event_handle( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + wait_set.wait(0) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + self.assertFalse(wait_set.is_ready('event', unmatched_event_index)) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + self.assertFalse(wait_set.is_ready('event', unmatched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertIsInstance(matched_status, QoSSubscriptionMatchedInfo) + self.assertEqual(matched_status.count_matched_change, 1) + self.assertEqual(matched_status.current_count_change, 1) + + unmatched_status = unmatched_event_handle.take_event() + self.assertIsInstance(unmatched_status, QoSSubscriptionUnmatchedInfo) + self.assertEqual(unmatched_status.count_matched_change, 1) + self.assertEqual(unmatched_status.current_count_change, 0) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + with unmatched_event_handle: + unmatched_event_index = wait_set.add_event(unmatched_event_handle) + self.assertIsNotNone(unmatched_event_index) + + publisher.destroy() + # wait 500ms + wait_set.wait(500000000) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + self.assertTrue(wait_set.is_ready('event', unmatched_event_index)) + + unmatched_status = unmatched_event_handle.take_event() + self.assertEqual(unmatched_status.count_matched_change, 0) + self.assertEqual(unmatched_status.current_count_change, 1) + + matched_status = matched_event_handle.take_event() + self.assertEqual(matched_status.count_matched_change, 0) + self.assertEqual(matched_status.current_count_change, 0)