Skip to content

Commit

Permalink
Add matched & unmatched event support
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Feb 15, 2023
1 parent 29da63f commit 05c0178
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 2 deletions.
51 changes: 51 additions & 0 deletions rclpy/rclpy/qos_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,24 @@
# Payload type for Subscription Incompatible QoS callback.
QoSRequestedIncompatibleQoSInfo = _rclpy.rmw_requested_qos_incompatible_event_status_t

# Payload type for Subscription matched callback.
QoSSubscriptionMatchedInfo = _rclpy.rmw_matched_status_t

# Payload type for Subscription unmatched callback.
QoSSubscriptionUnmatchedInfo = _rclpy.rmw_unmatched_status_t

# Payload type for Publisher Deadline callback.
QoSOfferedDeadlineMissedInfo = _rclpy.rmw_offered_deadline_missed_status_t

# Payload type for Publisher Liveliness callback.
QoSLivelinessLostInfo = _rclpy.rmw_liveliness_lost_status_t

# Payload type for Publisher matched callback.
QoSPublisherMatchedInfo = _rclpy.rmw_matched_status_t

# Payload type for Publisher unmatched callback.
QoSPublisherUnmatchedInfo = _rclpy.rmw_unmatched_status_t

"""
Payload type for Publisher Incompatible QoS callback.
Expand Down Expand Up @@ -136,6 +148,8 @@ def __init__(
incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None,
liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None,
message_lost: Optional[Callable[[QoSMessageLostInfo], None]] = None,
matched: Optional[Callable[[QoSSubscriptionMatchedInfo], None]] = None,
unmatched: Optional[Callable[[QoSSubscriptionUnmatchedInfo], None]] = None,
use_default_callbacks: bool = True,
) -> None:
"""
Expand All @@ -148,13 +162,17 @@ def __init__(
:param liveliness: A user-defined callback that is called when the Liveliness of
a Publisher on subscribed topic changes.
:param message_lost: A user-defined callback that is called when a messages is lost.
:param matched: A user-defined callback that is called when a Publisher is matched.
:param unmatched: A user-defined callback that is called when a Publisher is unmatched.
:param use_default_callbacks: Whether or not to use default callbacks when the user
doesn't supply one
"""
self.deadline = deadline
self.incompatible_qos = incompatible_qos
self.liveliness = liveliness
self.message_lost = message_lost
self.matched = matched
self.unmatched = unmatched
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
Expand Down Expand Up @@ -211,6 +229,19 @@ def _default_incompatible_qos_callback(event):
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MESSAGE_LOST,
parent_impl=subscription))

if self.matched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED,
parent_impl=subscription))

if self.unmatched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.unmatched,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED,
parent_impl=subscription))
return event_handlers


Expand All @@ -223,6 +254,8 @@ def __init__(
deadline: Optional[Callable[[QoSOfferedDeadlineMissedInfo], None]] = None,
liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None,
incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None,
matched: Optional[Callable[[QoSPublisherMatchedInfo], None]] = None,
unmatched: Optional[Callable[[QoSPublisherUnmatchedInfo], None]] = None,
use_default_callbacks: bool = True,
) -> None:
"""
Expand All @@ -234,12 +267,16 @@ def __init__(
fails to signal its Liveliness and is reported as not-alive.
:param incompatible_qos: A user-defined callback that is called when a Subscription
with incompatible QoS policies is discovered on subscribed topic.
:param matched: A user-defined callback that is called when a Subscription is matched.
:param unmatched: A user-defined callback that is called when a Subscription is unmatched.
:param use_default_callbacks: Whether or not to use default callbacks when the user
doesn't supply one
"""
self.deadline = deadline
self.liveliness = liveliness
self.incompatible_qos = incompatible_qos
self.matched = matched
self.unmatched = unmatched
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
Expand Down Expand Up @@ -288,4 +325,18 @@ def _default_incompatible_qos_callback(event):
except UnsupportedEventTypeError:
pass

if self.matched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED,
parent_impl=publisher))

if self.unmatched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.unmatched,
event_type=QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED,
parent_impl=publisher))

return event_handlers
47 changes: 45 additions & 2 deletions rclpy/src/rclpy/qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,30 @@ typedef union qos_event_callback_data {
rmw_liveliness_changed_status_t liveliness_changed;
rmw_message_lost_status_t message_lost;
rmw_requested_qos_incompatible_event_status_t requested_incompatible_qos;
rmw_matched_status_t subscription_matched;
rmw_unmatched_status_t subscription_unmatched;
// Publisher events
rmw_offered_deadline_missed_status_t offered_deadline_missed;
rmw_liveliness_lost_status_t liveliness_lost;
rmw_offered_qos_incompatible_event_status_t offered_incompatible_qos;
rmw_matched_status_t publisher_matched;
rmw_unmatched_status_t publisher_unmatched;
} qos_event_callback_data_t;

struct rmw_matched_status_python_t : rmw_matched_unmatched_status_s
{
rmw_matched_status_python_t() = default;
explicit rmw_matched_status_python_t(const rmw_matched_unmatched_status_s & status)
: rmw_matched_unmatched_status_s(status) {}
};

struct rmw_unmatched_status_python_t : rmw_matched_unmatched_status_s
{
rmw_unmatched_status_python_t() = default;
explicit rmw_unmatched_status_python_t(const rmw_matched_unmatched_status_s & status)
: rmw_matched_unmatched_status_s(status) {}
};

py::object
QoSEvent::take_event()
{
Expand All @@ -140,6 +158,10 @@ QoSEvent::take_event()
return py::cast(data.message_lost);
case RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS:
return py::cast(data.requested_incompatible_qos);
case RCL_SUBSCRIPTION_MATCHED:
return py::cast(rmw_matched_status_python_t(data.subscription_matched));
case RCL_SUBSCRIPTION_UNMATCHED:
return py::cast(rmw_unmatched_status_python_t(data.subscription_unmatched));
default:
// suggests a misalignment between C and Python interfaces
throw py::value_error("event type for subscriptions not understood");
Expand All @@ -152,6 +174,10 @@ QoSEvent::take_event()
return py::cast(data.liveliness_lost);
case RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS:
return py::cast(data.offered_incompatible_qos);
case RCL_PUBLISHER_MATCHED:
return py::cast(rmw_matched_status_python_t(data.publisher_matched));
case RCL_PUBLISHER_UNMATCHED:
return py::cast(rmw_unmatched_status_python_t(data.publisher_unmatched));
default:
// suggests a misalignment between C and Python interfaces
throw py::value_error("event type for publishers not understood");
Expand Down Expand Up @@ -179,12 +205,16 @@ define_qos_event(py::module module)
.value("RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED", RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)
.value("RCL_SUBSCRIPTION_LIVELINESS_CHANGED", RCL_SUBSCRIPTION_LIVELINESS_CHANGED)
.value("RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS", RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)
.value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST);
.value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST)
.value("RCL_SUBSCRIPTION_MATCHED", RCL_SUBSCRIPTION_MATCHED)
.value("RCL_SUBSCRIPTION_UNMATCHED", RCL_SUBSCRIPTION_UNMATCHED);

py::enum_<rcl_publisher_event_type_t>(module, "rcl_publisher_event_type_t")
.value("RCL_PUBLISHER_OFFERED_DEADLINE_MISSED", RCL_PUBLISHER_OFFERED_DEADLINE_MISSED)
.value("RCL_PUBLISHER_LIVELINESS_LOST", RCL_PUBLISHER_LIVELINESS_LOST)
.value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
.value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)
.value("RCL_PUBLISHER_MATCHED", RCL_PUBLISHER_MATCHED)
.value("RCL_PUBLISHER_UNMATCHED", RCL_PUBLISHER_UNMATCHED);

py::class_<rmw_requested_deadline_missed_status_t>(
module, "rmw_requested_deadline_missed_status_t")
Expand Down Expand Up @@ -223,6 +253,19 @@ define_qos_event(py::module module)
.def_readonly("total_count", &rmw_liveliness_lost_status_t::total_count)
.def_readonly("total_count_change", &rmw_liveliness_lost_status_t::total_count_change);

py::class_<rmw_matched_unmatched_status_s>(
module, "rmw_matched_unmatched_status_s", py::module_local())
.def_readonly("count_matched_change", &rmw_matched_status_t::current_matched_count)
.def_readonly("current_count_change", &rmw_matched_status_t::current_count_change);

py::class_<rmw_matched_status_python_t, rmw_matched_unmatched_status_s>(
module, "rmw_matched_status_t")
.def(py::init<>());

py::class_<rmw_unmatched_status_python_t, rmw_matched_unmatched_status_s>(
module, "rmw_unmatched_status_t")
.def(py::init<>());

py::enum_<rmw_qos_policy_kind_t>(module, "rmw_qos_policy_kind_t")
.value("RMW_QOS_POLICY_INVALID", RMW_QOS_POLICY_INVALID)
.value("RMW_QOS_POLICY_DURABILITY", RMW_QOS_POLICY_DURABILITY)
Expand Down
148 changes: 148 additions & 0 deletions rclpy/test/test_qos_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
from rclpy.qos_event import QoSOfferedDeadlineMissedInfo
from rclpy.qos_event import QoSOfferedIncompatibleQoSInfo
from rclpy.qos_event import QoSPublisherEventType
from rclpy.qos_event import QoSPublisherMatchedInfo
from rclpy.qos_event import QoSPublisherUnmatchedInfo
from rclpy.qos_event import QoSRequestedDeadlineMissedInfo
from rclpy.qos_event import QoSRequestedIncompatibleQoSInfo
from rclpy.qos_event import QoSSubscriptionEventType
from rclpy.qos_event import QoSSubscriptionMatchedInfo
from rclpy.qos_event import QoSSubscriptionUnmatchedInfo
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.task import Future

Expand Down Expand Up @@ -208,6 +212,10 @@ def test_publisher_event_create_destroy(self):
publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST)
self._do_create_destroy(
publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)
self._do_create_destroy(
publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED)
self._do_create_destroy(
publisher, QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED)
self.node.destroy_publisher(publisher)

def test_subscription_event_create_destroy(self):
Expand All @@ -220,6 +228,10 @@ def test_subscription_event_create_destroy(self):
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)
self._do_create_destroy(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)
self._do_create_destroy(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED)
self._do_create_destroy(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED)
self.node.destroy_subscription(subscription)

def test_call_publisher_rclpy_event_apis(self):
Expand Down Expand Up @@ -354,3 +366,139 @@ def test_call_subscription_rclpy_event_apis(self):
pass

self.node.destroy_subscription(subscription)

def test_call_publisher_rclpy_event_matched_unmatched(self):
publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10)
with self.context.handle:
wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle)

matched_event_handle = self._create_event_handle(
publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED)
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

unmatched_event_handle = self._create_event_handle(
publisher, QoSPublisherEventType.RCL_PUBLISHER_UNMATCHED)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

wait_set.wait(0)
self.assertFalse(wait_set.is_ready('event', matched_event_index))
self.assertFalse(wait_set.is_ready('event', unmatched_event_index))

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

subscription = self.node.create_subscription(EmptyMsg, self.topic_name, Mock(), 10)
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))
self.assertFalse(wait_set.is_ready('event', unmatched_event_index))

matched_status = matched_event_handle.take_event()
self.assertIsInstance(matched_status, QoSPublisherMatchedInfo)
self.assertEqual(matched_status.count_matched_change, 1)
self.assertEqual(matched_status.current_count_change, 1)

unmatched_status = unmatched_event_handle.take_event()
self.assertIsInstance(unmatched_status, QoSPublisherUnmatchedInfo)
self.assertEqual(unmatched_status.count_matched_change, 1)
self.assertEqual(unmatched_status.current_count_change, 0)

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

subscription.destroy()
# wait 500ms
wait_set.wait(500000000)
self.assertFalse(wait_set.is_ready('event', matched_event_index))
self.assertTrue(wait_set.is_ready('event', unmatched_event_index))

unmatched_status = unmatched_event_handle.take_event()
self.assertEqual(unmatched_status.count_matched_change, 0)
self.assertEqual(unmatched_status.current_count_change, 1)

matched_status = matched_event_handle.take_event()
self.assertEqual(matched_status.count_matched_change, 0)
self.assertEqual(matched_status.current_count_change, 0)

def test_call_subscription_rclpy_event_matched_unmatched(self):
message_callback = Mock()
subscription = self.node.create_subscription(
EmptyMsg, self.topic_name, message_callback, 10)
with self.context.handle:
wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle)

matched_event_handle = self._create_event_handle(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED)
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

unmatched_event_handle = self._create_event_handle(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_UNMATCHED)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

wait_set.wait(0)
self.assertFalse(wait_set.is_ready('event', matched_event_index))
self.assertFalse(wait_set.is_ready('event', unmatched_event_index))

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10)
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))
self.assertFalse(wait_set.is_ready('event', unmatched_event_index))

matched_status = matched_event_handle.take_event()
self.assertIsInstance(matched_status, QoSSubscriptionMatchedInfo)
self.assertEqual(matched_status.count_matched_change, 1)
self.assertEqual(matched_status.current_count_change, 1)

unmatched_status = unmatched_event_handle.take_event()
self.assertIsInstance(unmatched_status, QoSSubscriptionUnmatchedInfo)
self.assertEqual(unmatched_status.count_matched_change, 1)
self.assertEqual(unmatched_status.current_count_change, 0)

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)
with unmatched_event_handle:
unmatched_event_index = wait_set.add_event(unmatched_event_handle)
self.assertIsNotNone(unmatched_event_index)

publisher.destroy()
# wait 500ms
wait_set.wait(500000000)
self.assertFalse(wait_set.is_ready('event', matched_event_index))
self.assertTrue(wait_set.is_ready('event', unmatched_event_index))

unmatched_status = unmatched_event_handle.take_event()
self.assertEqual(unmatched_status.count_matched_change, 0)
self.assertEqual(unmatched_status.current_count_change, 1)

matched_status = matched_event_handle.take_event()
self.assertEqual(matched_status.count_matched_change, 0)
self.assertEqual(matched_status.current_count_change, 0)

0 comments on commit 05c0178

Please sign in to comment.