Skip to content

Commit

Permalink
Implement functions to get publisher and subcription informations lik…
Browse files Browse the repository at this point in the history
…e QoS policies from topic name (#454)

Signed-off-by: Jaison Titus <jaisontj92@gmail.com>
Signed-off-by: Miaofei <miaofei@amazon.com>
Co-authored-by: Miaofei Mei <ameision@hotmail.com>
  • Loading branch information
jaisontj and mm318 authored Feb 14, 2020
1 parent 6d009fb commit 17a0854
Show file tree
Hide file tree
Showing 10 changed files with 772 additions and 13 deletions.
1 change: 1 addition & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ if(BUILD_TESTING)
test/test_time.py
test/test_timer.py
test/test_topic_or_service_is_hidden.py
test/test_topic_endpoint_info.py
test/test_utilities.py
test/test_validate_full_topic_name.py
test/test_validate_namespace.py
Expand Down
82 changes: 81 additions & 1 deletion rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from rclpy.time_source import TimeSource
from rclpy.timer import Rate
from rclpy.timer import Timer
from rclpy.topic_endpoint_info import TopicEndpointInfo
from rclpy.type_support import check_for_type_support
from rclpy.utilities import get_default_context
from rclpy.validate_full_topic_name import validate_full_topic_name
Expand Down Expand Up @@ -1642,7 +1643,7 @@ def get_node_names_and_namespaces(self) -> List[Tuple[str, str]]:

def _count_publishers_or_subscribers(self, topic_name, func):
fq_topic_name = expand_topic_name(topic_name, self.get_name(), self.get_namespace())
validate_topic_name(fq_topic_name)
validate_full_topic_name(fq_topic_name)
with self.handle as node_capsule:
return func(node_capsule, fq_topic_name)

Expand Down Expand Up @@ -1681,3 +1682,82 @@ def assert_liveliness(self) -> None:
"""
with self.handle as capsule:
_rclpy.rclpy_assert_liveliness(capsule)

def _get_info_by_topic(
self,
topic_name: str,
no_mangle: bool,
func: Callable[[object, str, bool], List[Dict]]
) -> List[TopicEndpointInfo]:
with self.handle as node_capsule:
if no_mangle:
fq_topic_name = topic_name
else:
fq_topic_name = expand_topic_name(
topic_name, self.get_name(), self.get_namespace())
validate_full_topic_name(fq_topic_name)
fq_topic_name = _rclpy.rclpy_remap_topic_name(node_capsule, fq_topic_name)

info_dicts = func(node_capsule, fq_topic_name, no_mangle)
infos = [TopicEndpointInfo(**x) for x in info_dicts]
return infos

def get_publishers_info_by_topic(
self,
topic_name: str,
no_mangle: bool = False
) -> List[TopicEndpointInfo]:
"""
Return a list of publishers on a given topic.
The returned parameter is a list of TopicEndpointInfo objects, where each will contain
the node name, node namespace, topic type, topic endpoint's GID, and its QoS profile.
When the `no_mangle` parameter is `true`, the provided `topic_name` should be a valid topic
name for the middleware (useful when combining ROS with native middleware (e.g. DDS) apps).
When the `no_mangle` parameter is `false`, the provided `topic_name` should follow
ROS topic name conventions.
`topic_name` may be a relative, private, or fully qualified topic name.
A relative or private topic will be expanded using this node's namespace and name.
The queried `topic_name` is not remapped.
:param topic_name: the topic_name on which to find the publishers.
:param no_mangle: no_mangle if `true`, `topic_name` needs to be a valid middleware topic
name, otherwise it should be a valid ROS topic name. Defaults to `false`.
:return: a list of TopicEndpointInfo for all the publishers on this topic.
"""
return self._get_info_by_topic(
topic_name,
no_mangle,
_rclpy.rclpy_get_publishers_info_by_topic)

def get_subscriptions_info_by_topic(
self,
topic_name: str,
no_mangle: bool = False
) -> List[TopicEndpointInfo]:
"""
Return a list of subscriptions on a given topic.
The returned parameter is a list of TopicEndpointInfo objects, where each will contain
the node name, node namespace, topic type, topic endpoint's GID, and its QoS profile.
When the `no_mangle` parameter is `true`, the provided `topic_name` should be a valid topic
name for the middleware (useful when combining ROS with native middleware (e.g. DDS) apps).
When the `no_mangle` parameter is `false`, the provided `topic_name` should follow
ROS topic name conventions.
`topic_name` may be a relative, private, or fully qualified topic name.
A relative or private topic will be expanded using this node's namespace and name.
The queried `topic_name` is not remapped.
:param topic_name: the topic_name on which to find the subscriptions.
:param no_mangle: no_mangle if `true`, `topic_name` needs to be a valid middleware topic
name, otherwise it should be a valid ROS topic name. Defaults to `false`.
:return: a list of TopicEndpointInfo for all the subscriptions on this topic.
"""
return self._get_info_by_topic(
topic_name,
no_mangle,
_rclpy.rclpy_get_subscriptions_info_by_topic)
15 changes: 13 additions & 2 deletions rclpy/rclpy/qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class HistoryPolicy(QoSPolicyEnum):
KEEP_LAST = RMW_QOS_POLICY_HISTORY_KEEP_LAST
RMW_QOS_POLICY_HISTORY_KEEP_ALL = 2
KEEP_ALL = RMW_QOS_POLICY_HISTORY_KEEP_ALL
RMW_QOS_POLICY_HISTORY_UNKNOWN = 3
UNKNOWN = RMW_QOS_POLICY_HISTORY_UNKNOWN


# Alias with the old name, for retrocompatibility
Expand All @@ -295,6 +297,8 @@ class ReliabilityPolicy(QoSPolicyEnum):
RELIABLE = RMW_QOS_POLICY_RELIABILITY_RELIABLE
RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT = 2
BEST_EFFORT = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT
RMW_QOS_POLICY_RELIABILITY_UNKNOWN = 3
UNKNOWN = RMW_QOS_POLICY_RELIABILITY_UNKNOWN


# Alias with the old name, for retrocompatibility
Expand All @@ -314,6 +318,8 @@ class DurabilityPolicy(QoSPolicyEnum):
TRANSIENT_LOCAL = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
RMW_QOS_POLICY_DURABILITY_VOLATILE = 2
VOLATILE = RMW_QOS_POLICY_DURABILITY_VOLATILE
RMW_QOS_POLICY_DURABILITY_UNKNOWN = 3
UNKNOWN = RMW_QOS_POLICY_DURABILITY_UNKNOWN


# Alias with the old name, for retrocompatibility
Expand All @@ -335,11 +341,15 @@ class LivelinessPolicy(QoSPolicyEnum):
MANUAL_BY_NODE = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE
RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC = 3
MANUAL_BY_TOPIC = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC
RMW_QOS_POLICY_LIVELINESS_UNKNOWN = 4
UNKNOWN = RMW_QOS_POLICY_LIVELINESS_UNKNOWN


# Alias with the old name, for retrocompatibility
QoSLivelinessPolicy = LivelinessPolicy

qos_profile_unknown = QoSProfile(**_rclpy.rclpy_get_rmw_qos_profile(
'qos_profile_unknown'))
qos_profile_system_default = QoSProfile(**_rclpy.rclpy_get_rmw_qos_profile(
'qos_profile_system_default'))
qos_profile_sensor_data = QoSProfile(**_rclpy.rclpy_get_rmw_qos_profile(
Expand All @@ -350,11 +360,12 @@ class LivelinessPolicy(QoSPolicyEnum):
'qos_profile_parameters'))
qos_profile_parameter_events = QoSProfile(**_rclpy.rclpy_get_rmw_qos_profile(
'qos_profile_parameter_events'))
qos_profile_action_status_default = QoSProfile(
**_rclpy_action.rclpy_action_get_rmw_qos_profile('rcl_action_qos_profile_status_default'))
qos_profile_action_status_default = QoSProfile(**_rclpy_action.rclpy_action_get_rmw_qos_profile(
'rcl_action_qos_profile_status_default'))


class QoSPresetProfiles(Enum):
UNKNOWN = qos_profile_unknown
SYSTEM_DEFAULT = qos_profile_system_default
SENSOR_DATA = qos_profile_sensor_data
SERVICES_DEFAULT = qos_profile_services_default
Expand Down
174 changes: 174 additions & 0 deletions rclpy/rclpy/topic_endpoint_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import IntEnum

from rclpy.qos import QoSPresetProfiles, QoSProfile


class TopicEndpointTypeEnum(IntEnum):
"""
Enum for possible types of topic endpoints.
This enum matches the one defined in rmw/types.h
"""

INVALID = 0
PUBLISHER = 1
SUBSCRIPTION = 2


class TopicEndpointInfo:
"""Information on a topic endpoint."""

__slots__ = [
'_node_name',
'_node_namespace',
'_topic_type',
'_endpoint_type',
'_endpoint_gid',
'_qos_profile'
]

def __init__(self, **kwargs):
assert all('_' + key in self.__slots__ for key in kwargs.keys()), \
'Invalid arguments passed to constructor: %r' % kwargs.keys()

self.node_name = kwargs.get('node_name', '')
self.node_namespace = kwargs.get('node_namespace', '')
self.topic_type = kwargs.get('topic_type', '')
self.endpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID)
self.endpoint_gid = kwargs.get('endpoint_gid', [])
self.qos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value)

@property
def node_name(self):
"""
Get field 'node_name'.
:returns: node_name attribute
:rtype: str
"""
return self._node_name

@node_name.setter
def node_name(self, value):
assert isinstance(value, str)
self._node_name = value

@property
def node_namespace(self):
"""
Get field 'node_namespace'.
:returns: node_namespace attribute
:rtype: str
"""
return self._node_namespace

@node_namespace.setter
def node_namespace(self, value):
assert isinstance(value, str)
self._node_namespace = value

@property
def topic_type(self):
"""
Get field 'topic_type'.
:returns: topic_type attribute
:rtype: str
"""
return self._topic_type

@topic_type.setter
def topic_type(self, value):
assert isinstance(value, str)
self._topic_type = value

@property
def endpoint_type(self):
"""
Get field 'endpoint_type'.
:returns: endpoint_type attribute
:rtype: TopicEndpointTypeEnum
"""
return self._endpoint_type

@endpoint_type.setter
def endpoint_type(self, value):
if isinstance(value, TopicEndpointTypeEnum):
self._endpoint_type = value
elif isinstance(value, int):
self._endpoint_type = TopicEndpointTypeEnum(value)
else:
assert False

@property
def endpoint_gid(self):
"""
Get field 'endpoint_gid'.
:returns: endpoint_gid attribute
:rtype: list
"""
return self._endpoint_gid

@endpoint_gid.setter
def endpoint_gid(self, value):
assert all(isinstance(x, int) for x in value)
self._endpoint_gid = value

@property
def qos_profile(self):
"""
Get field 'qos_profile'.
:returns: qos_profile attribute
:rtype: QoSProfile
"""
return self._qos_profile

@qos_profile.setter
def qos_profile(self, value):
if isinstance(value, QoSProfile):
self._qos_profile = value
elif isinstance(value, dict):
self._qos_profile = QoSProfile(**value)
else:
assert False

def __eq__(self, other):
if not isinstance(other, TopicEndpointInfo):
return False
return all(
self.__getattribute__(slot) == other.__getattribute__(slot)
for slot in self.__slots__)

def __str__(self):
result = 'Node name: %s\n' % self.node_name
result += 'Node namespace: %s\n' % self.node_namespace
result += 'Topic type: %s\n' % self.topic_type
result += 'Endpoint type: %s\n' % self.endpoint_type.name
result += 'GID: %s\n' % '.'.join(format(x, '02x') for x in self.endpoint_gid)
result += 'QoS profile:\n'
result += ' Reliability: %s\n' % self.qos_profile.reliability.name
result += ' Durability: %s\n' % self.qos_profile.durability.name
result += ' Lifespan: %d nanoseconds\n' % self.qos_profile.lifespan.nanoseconds
result += ' Deadline: %d nanoseconds\n' % self.qos_profile.deadline.nanoseconds
result += ' Liveliness: %s\n' % self.qos_profile.liveliness.name
result += ' Liveliness lease duration: %d nanoseconds' % \
self.qos_profile.liveliness_lease_duration.nanoseconds
return result
Loading

0 comments on commit 17a0854

Please sign in to comment.