-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement functions to get publisher and subcription informations like QoS policies from topic name #454
Implement functions to get publisher and subcription informations like QoS policies from topic name #454
Changes from 22 commits
d724c74
f3c9d33
585b41a
95bee0b
292e38e
40d054c
ae24ed4
98fe926
704fb5c
f985c41
128ef54
4995c34
e18a15f
214c113
c768a53
f126ec5
5933310
327554c
536620a
6a4486f
1723927
b34b127
1ac4677
96946b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
from rclpy.time_source import TimeSource | ||
from rclpy.timer import Rate | ||
from rclpy.timer import Timer | ||
from rclpy.topic_endpoint_info import TopicEndpointInfo | ||
from rclpy.type_support import check_for_type_support | ||
from rclpy.utilities import get_default_context | ||
from rclpy.validate_full_topic_name import validate_full_topic_name | ||
|
@@ -1642,7 +1643,7 @@ def get_node_names_and_namespaces(self) -> List[Tuple[str, str]]: | |
|
||
def _count_publishers_or_subscribers(self, topic_name, func): | ||
fq_topic_name = expand_topic_name(topic_name, self.get_name(), self.get_namespace()) | ||
validate_topic_name(fq_topic_name) | ||
validate_full_topic_name(fq_topic_name) | ||
with self.handle as node_capsule: | ||
return func(node_capsule, fq_topic_name) | ||
|
||
|
@@ -1681,3 +1682,76 @@ def assert_liveliness(self) -> None: | |
""" | ||
with self.handle as capsule: | ||
_rclpy.rclpy_assert_liveliness(capsule) | ||
|
||
def _get_info_by_topic( | ||
self, | ||
topic_name: str, | ||
no_mangle: bool, | ||
func: Callable[[object, str, bool], List[Dict]] | ||
) -> List[TopicEndpointInfo]: | ||
with self.handle as node_capsule: | ||
if no_mangle: | ||
fq_topic_name = topic_name | ||
else: | ||
fq_topic_name = expand_topic_name( | ||
topic_name, self.get_name(), self.get_namespace()) | ||
validate_full_topic_name(fq_topic_name) | ||
fq_topic_name = _rclpy.rclpy_remap_topic_name(node_capsule, fq_topic_name) | ||
|
||
info_dicts = func(node_capsule, fq_topic_name, no_mangle) | ||
infos = [TopicEndpointInfo(**x) for x in info_dicts] | ||
return infos | ||
|
||
def get_publishers_info_by_topic( | ||
self, topic_name: str, no_mangle: bool = False) -> List[TopicEndpointInfo]: | ||
""" | ||
Return a list of publishers publishing to a given topic. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: a publisher doesn't need to be "publishing" on the topic to be seen here, technically I'd say "list of publishers on a given topic" or "list of publishers that have advertised on a given topic" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
The returned parameter is a list of TopicEndpointInfo objects, where each will contain | ||
the node name, node namespace, topic type, topic endpoint's GID, and its QoS profile. | ||
wjwwood marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
When the `no_mangle` parameter is `true`, the provided `topic_name` should be a valid topic | ||
name for the middleware (useful when combining ROS with native middleware (e.g. DDS) apps). | ||
When the `no_mangle` parameter is `false`, the provided `topic_name` should follow | ||
ROS topic name conventions. | ||
|
||
`topic_name` may be a relative, private, or fully qualified topic name. | ||
A relative or private topic will be expanded using this node's namespace and name. | ||
The queried `topic_name` is not remapped. | ||
|
||
:param topic_name: the topic_name on which to find the publishers. | ||
:param no_mangle: no_mangle if `true`, `topic_name` needs to be a valid middleware topic | ||
name, otherwise it should be a valid ROS topic name. Defaults to `false`. | ||
:return: a list of TopicEndpointInfo for all the publishers on this topic. | ||
""" | ||
return self._get_info_by_topic( | ||
topic_name, | ||
no_mangle, | ||
_rclpy.rclpy_get_publishers_info_by_topic) | ||
|
||
def get_subscriptions_info_by_topic( | ||
self, topic_name: str, no_mangle: bool = False) -> List[TopicEndpointInfo]: | ||
""" | ||
Return a list of subscriptions to a given topic. | ||
|
||
The returned parameter is a list of TopicEndpointInfo objects, where each will contain | ||
the node name, node namespace, topic type, topic endpoint's GID, and its QoS profile. | ||
|
||
When the `no_mangle` parameter is `true`, the provided `topic_name` should be a valid topic | ||
name for the middleware (useful when combining ROS with native middleware (e.g. DDS) apps). | ||
When the `no_mangle` parameter is `false`, the provided `topic_name` should follow | ||
ROS topic name conventions. | ||
|
||
`topic_name` may be a relative, private, or fully qualified topic name. | ||
A relative or private topic will be expanded using this node's namespace and name. | ||
The queried `topic_name` is not remapped. | ||
|
||
:param topic_name: the topic_name on which to find the subscriptions. | ||
:param no_mangle: no_mangle if `true`, `topic_name` needs to be a valid middleware topic | ||
name, otherwise it should be a valid ROS topic name. Defaults to `false`. | ||
:return: a list of TopicEndpointInfo for all the subscriptions on this topic. | ||
""" | ||
return self._get_info_by_topic( | ||
topic_name, | ||
no_mangle, | ||
_rclpy.rclpy_get_subscriptions_info_by_topic) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from enum import IntEnum | ||
|
||
from rclpy.qos import QoSProfile, QoSPresetProfiles | ||
|
||
|
||
class TopicEndpointTypeEnum(IntEnum): | ||
""" | ||
Enum for possible types of topic endpoints. | ||
|
||
This enum matches the one defined in rmw/types.h | ||
""" | ||
|
||
INVALID = 0 | ||
PUBLISHER = 1 | ||
SUBSCRIPTION = 2 | ||
|
||
|
||
class TopicEndpointInfo: | ||
"""Information on a topic endpoint.""" | ||
|
||
__slots__ = [ | ||
'_node_name', | ||
'_node_namespace', | ||
'_topic_type', | ||
'_endpoint_type', | ||
'_endpoint_gid', | ||
'_qos_profile' | ||
] | ||
|
||
def __init__(self, **kwargs): | ||
assert all('_' + key in self.__slots__ for key in kwargs.keys()), \ | ||
'Invalid arguments passed to constructor: %r' % kwargs.keys() | ||
|
||
self.node_name = kwargs.get('node_name', '') | ||
self.node_namespace = kwargs.get('node_namespace', '') | ||
self.topic_type = kwargs.get('topic_type', '') | ||
self.endpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID) | ||
self.endpoint_gid = kwargs.get('endpoint_gid', list()) | ||
self.qos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value) | ||
|
||
@property | ||
def node_name(self): | ||
""" | ||
Get field 'node_name'. | ||
|
||
:returns: node_name attribute | ||
:rtype: str | ||
""" | ||
return self._node_name | ||
|
||
@node_name.setter | ||
def node_name(self, value): | ||
assert isinstance(value, str) | ||
self._node_name = value | ||
|
||
@property | ||
def node_namespace(self): | ||
""" | ||
Get field 'node_namespace'. | ||
|
||
:returns: node_namespace attribute | ||
:rtype: str | ||
""" | ||
return self._node_namespace | ||
|
||
@node_namespace.setter | ||
def node_namespace(self, value): | ||
assert isinstance(value, str) | ||
self._node_namespace = value | ||
|
||
@property | ||
def topic_type(self): | ||
""" | ||
Get field 'topic_type'. | ||
|
||
:returns: topic_type attribute | ||
:rtype: str | ||
""" | ||
return self._topic_type | ||
|
||
@topic_type.setter | ||
def topic_type(self, value): | ||
assert isinstance(value, str) | ||
self._topic_type = value | ||
|
||
@property | ||
def endpoint_type(self): | ||
""" | ||
Get field 'endpoint_type'. | ||
|
||
:returns: endpoint_type attribute | ||
:rtype: TopicEndpointTypeEnum | ||
""" | ||
return self._endpoint_type | ||
|
||
@endpoint_type.setter | ||
def endpoint_type(self, value): | ||
if isinstance(value, TopicEndpointTypeEnum): | ||
self._endpoint_type = value | ||
elif isinstance(value, int): | ||
self._endpoint_type = TopicEndpointTypeEnum(value) | ||
else: | ||
assert False | ||
|
||
@property | ||
def endpoint_gid(self): | ||
""" | ||
Get field 'endpoint_gid'. | ||
|
||
:returns: endpoint_gid attribute | ||
:rtype: list | ||
""" | ||
return self._endpoint_gid | ||
|
||
@endpoint_gid.setter | ||
def endpoint_gid(self, value): | ||
assert all(isinstance(x, int) for x in value) | ||
self._endpoint_gid = value | ||
|
||
@property | ||
def qos_profile(self): | ||
""" | ||
Get field 'qos_profile'. | ||
|
||
:returns: qos_profile attribute | ||
:rtype: QoSProfile | ||
""" | ||
return self._qos_profile | ||
|
||
@qos_profile.setter | ||
def qos_profile(self, value): | ||
if isinstance(value, QoSProfile): | ||
self._qos_profile = value | ||
elif isinstance(value, dict): | ||
self._qos_profile = QoSProfile(**value) | ||
else: | ||
assert False | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, TopicEndpointInfo): | ||
return False | ||
return all( | ||
self.__getattribute__(slot) == other.__getattribute__(slot) | ||
for slot in self.__slots__) | ||
|
||
def __str__(self): | ||
result = 'Node name: %s\n' % self.node_name | ||
result += 'Node namespace: %s\n' % self.node_namespace | ||
result += 'Topic type: %s\n' % self.topic_type | ||
result += 'Endpoint type: %s\n' % self.endpoint_type.name | ||
result += 'GID: %s\n' % '.'.join(format(x, '02x') for x in self.endpoint_gid) | ||
result += 'QoS profile:\n' | ||
result += ' Reliability: %s\n' % self.qos_profile.reliability.name | ||
result += ' Durability: %s\n' % self.qos_profile.durability.name | ||
result += ' Lifespan: %d nanoseconds\n' % self.qos_profile.lifespan.nanoseconds | ||
result += ' Deadline: %d nanoseconds\n' % self.qos_profile.deadline.nanoseconds | ||
result += ' Liveliness: %s\n' % self.qos_profile.liveliness.name | ||
result += ' Liveliness lease duration: %d nanoseconds' % \ | ||
self.qos_profile.liveliness_lease_duration.nanoseconds | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This style doesn't look correct to me, I'd expect:
Or
There are a few other places this applies in this pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.