Skip to content

Commit

Permalink
Add types to rosidl_generator_type_description
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <rmc@carlstrom.com>
  • Loading branch information
InvincibleRMC committed Nov 22, 2024
1 parent a2590c9 commit 7415db7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import fastjsonschema


def test_type_hash():
def test_type_hash() -> None:
"""Test all rosidl_generator_type_description output files against defined schemas."""
schema_path = (
Path(get_package_share_directory('rosidl_generator_type_description')) / 'resource' /
Expand Down
1 change: 1 addition & 0 deletions rosidl_generator_type_description/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ament_python_install_package(${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
find_package(ament_cmake_pytest REQUIRED)
find_package(ament_cmake_mypy REQUIRED)
ament_lint_auto_find_test_dependencies()
ament_add_pytest_test(pytest_type_hash_generator test)
endif()
Expand Down
1 change: 1 addition & 0 deletions rosidl_generator_type_description/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>ament_cmake_mypy</test_depend>

<member_of_group>rosidl_generator_packages</member_of_group>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,49 @@
from pathlib import Path
import re
import sys
from typing import List, Tuple
from typing import Dict, Final, List, Set, Tuple, TYPE_CHECKING, TypedDict

from rosidl_parser import definition
from rosidl_parser.parser import parse_idl_file


if TYPE_CHECKING:
from typing_extensions import NotRequired

# RIHS: ROS Interface Hashing Standard, per REP-2011
# NOTE: These values and implementations must be updated if
# - type_description_interfaces messsages change, or
# - the hashing algorithm for type descriptions changes
# Both changes require an increment of the RIHS version
RIHS01_PREFIX = 'RIHS01_'
RIHS01_HASH_VALUE_SIZE = 32
RIHS01_PATTERN = re.compile(r'RIHS([0-9a-f]{2})_([0-9a-f]{64})')
RIHS01_PREFIX: Final = 'RIHS01_'
RIHS01_HASH_VALUE_SIZE: Final = 32
RIHS01_PATTERN: Final = re.compile(r'RIHS([0-9a-f]{2})_([0-9a-f]{64})')

# Used by code generators to create variable names
GET_DESCRIPTION_FUNC = 'get_type_description'
GET_HASH_FUNC = 'get_type_hash'
GET_INDIVIDUAL_SOURCE_FUNC = 'get_individual_type_description_source'
GET_SOURCES_FUNC = 'get_type_description_sources'
GET_DESCRIPTION_FUNC: Final = 'get_type_description'
GET_HASH_FUNC: Final = 'get_type_hash'
GET_INDIVIDUAL_SOURCE_FUNC: Final = 'get_individual_type_description_source'
GET_SOURCES_FUNC: Final = 'get_type_description_sources'


def to_type_name(namespaced_type):
def to_type_name(namespaced_type: definition.NamespacedType) -> str:
return '/'.join(namespaced_type.namespaced_name())


class GenericInterface:
def __init__(
self, namespaced_type: definition.NamespacedType, members: List[definition.Member]
):
) -> None:
self.namespaced_type = namespaced_type
self.members = members


def add_msg(msg: definition.Message, to_dict: dict):
def add_msg(msg: definition.Message, to_dict: Dict[str, GenericInterface]) -> None:
to_dict[to_type_name(msg.structure.namespaced_type)] = GenericInterface(
msg.structure.namespaced_type, msg.structure.members)


def add_srv(srv: definition.Service, to_dict: dict):
def add_srv(srv: definition.Service, to_dict: Dict[str, GenericInterface]) -> None:
service_members = [
definition.Member(srv.request_message.structure.namespaced_type, 'request_message'),
definition.Member(srv.response_message.structure.namespaced_type, 'response_message'),
Expand All @@ -69,7 +73,7 @@ def add_srv(srv: definition.Service, to_dict: dict):
add_msg(srv.event_message, to_dict)


def add_action(action, to_dict):
def add_action(action: definition.Action, to_dict: Dict[str, GenericInterface]) -> None:
action_members = [
definition.Member(action.goal.structure.namespaced_type, 'goal'),
definition.Member(action.result.structure.namespaced_type, 'result'),
Expand All @@ -88,7 +92,7 @@ def add_action(action, to_dict):
add_msg(action.feedback_message, to_dict)


def generate_type_hash(generator_arguments_file: str) -> List[str]:
def generate_type_hash(generator_arguments_file: str) -> List[Path]:
with open(generator_arguments_file, 'r') as f:
args = json.load(f)
package_name = args['package_name']
Expand All @@ -107,7 +111,7 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
include_map[include_package_name] = Path(include_base_path)

# Define all local IndividualTypeDescriptions
individual_types = {}
individual_types: Dict[str, GenericInterface] = {}
for idl_tuple in idl_tuples:
idl_parts = idl_tuple.rsplit(':', 1)
assert len(idl_parts) == 2
Expand All @@ -131,7 +135,7 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
add_action(el, individual_types)

# Determine needed includes for types from other packages
pending_includes = set()
pending_includes: Set[Path] = set()
for individual_type in individual_types.values():
for member in individual_type.members:
if isinstance(member.type, definition.NamespacedType):
Expand Down Expand Up @@ -176,14 +180,14 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
serialized_type_lookup[referenced_type['type_name']] = referenced_type

# Create fully-unrolled TypeDescription instances for local full types, and calculate hashes
full_types = []
full_types: List['FullTypeDescriptionDict'] = []
for type_name, individual_type in individual_types.items():
full_type_description = extract_full_type_description(type_name, serialized_type_lookup)
full_types.append(full_type_description)
hash_lookup[type_name] = calculate_type_hash(full_type_description)

# Write JSON output for each full TypeDescription
generated_files = []
generated_files: List[Path] = []
for full_type_description in full_types:
top_type_name = full_type_description['type_description']['type_name']
hashes = [{
Expand Down Expand Up @@ -220,7 +224,7 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
# This mapping must match the constants defined in type_description_interfaces/msgs/FieldType.msg
# NOTE: Nonexplicit integer types are not defined in FieldType (short, long, long long).
# If a ROS IDL uses these, this generator will throw a KeyError.
FIELD_VALUE_TYPE_NAMES = {
FIELD_VALUE_TYPE_NAMES: Final = {
None: 'FIELD_TYPE_NOT_SET',
'nested_type': 'FIELD_TYPE_NESTED_TYPE',
'int8': 'FIELD_TYPE_INT8',
Expand All @@ -245,14 +249,14 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
definition.BoundedWString: 'FIELD_TYPE_BOUNDED_WSTRING',
}

NESTED_FIELD_TYPE_SUFFIXES = {
NESTED_FIELD_TYPE_SUFFIXES: Final = {
definition.Array: '_ARRAY',
definition.BoundedSequence: '_BOUNDED_SEQUENCE',
definition.UnboundedSequence: '_UNBOUNDED_SEQUENCE',
}

# Copied directly from FieldType.msg, with simple string manipulation to create a dict
FIELD_TYPE_NAME_TO_ID = {
FIELD_TYPE_NAME_TO_ID: Final = {
'FIELD_TYPE_NOT_SET': 0,

# Nested type defined in other .msg/.idl files.
Expand Down Expand Up @@ -369,7 +373,7 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
'FIELD_TYPE_BOUNDED_WSTRING_UNBOUNDED_SEQUENCE': 166,
}

FIELD_TYPE_ID_TO_NAME = {
FIELD_TYPE_ID_TO_NAME: Final = {
val: key for key, val in FIELD_TYPE_NAME_TO_ID.items()
}

Expand Down Expand Up @@ -397,7 +401,7 @@ def field_type_type_name(ftype: definition.AbstractType) -> str:
return value_type_name + name_suffix


def field_type_type_id(ftype: definition.AbstractType) -> Tuple[str, int]:
def field_type_type_id(ftype: definition.AbstractType) -> int:
return FIELD_TYPE_NAME_TO_ID[field_type_type_name(ftype)]


Expand Down Expand Up @@ -425,7 +429,7 @@ def field_type_string_capacity(ftype: definition.AbstractType) -> int:
return 0


def field_type_nested_type_name(ftype: definition.AbstractType, joiner='/') -> str:
def field_type_nested_type_name(ftype: definition.AbstractType, joiner: str = '/') -> str:
value_type = ftype
if isinstance(ftype, definition.AbstractNestedType):
value_type = ftype.value_type
Expand All @@ -436,7 +440,14 @@ def field_type_nested_type_name(ftype: definition.AbstractType, joiner='/') -> s
return ''


def serialize_field_type(ftype: definition.AbstractType) -> dict:
class SerializeFieldTypeDict(TypedDict):
type_id: int
capacity: int
string_capacity: int
nested_type_name: str


def serialize_field_type(ftype: definition.AbstractType) -> SerializeFieldTypeDict:
return {
'type_id': field_type_type_id(ftype),
'capacity': field_type_capacity(ftype),
Expand All @@ -445,7 +456,13 @@ def serialize_field_type(ftype: definition.AbstractType) -> dict:
}


def serialize_field(member: definition.Member) -> dict:
class SerializeFieldDict(TypedDict):
name: str
type: SerializeFieldTypeDict # noqa: A003
default_value: 'NotRequired[str]'


def serialize_field(member: definition.Member) -> SerializeFieldDict:
return {
'name': member.name,
'type': serialize_field_type(member.type),
Expand All @@ -455,16 +472,21 @@ def serialize_field(member: definition.Member) -> dict:
}


class SerializeIndividualTypeDescriptionDict(TypedDict):
type_name: str
fields: List[SerializeFieldDict]


def serialize_individual_type_description(
namespaced_type: definition.NamespacedType, members: List[definition.Member]
) -> dict:
) -> SerializeIndividualTypeDescriptionDict:
return {
'type_name': to_type_name(namespaced_type),
'fields': [serialize_field(member) for member in members]
}


def calculate_type_hash(serialized_type_description):
def calculate_type_hash(serialized_type_description: 'FullTypeDescriptionDict') -> str:
# Create a copy of the description, removing all default values
hashable_dict = deepcopy(serialized_type_description)
for field in hashable_dict['type_description']['fields']:
Expand All @@ -490,10 +512,18 @@ def calculate_type_hash(serialized_type_description):
return type_hash


def extract_full_type_description(output_type_name, type_map):
class FullTypeDescriptionDict(TypedDict):
type_description: SerializeIndividualTypeDescriptionDict
referenced_type_descriptions: List[SerializeIndividualTypeDescriptionDict]


def extract_full_type_description(
output_type_name: str,
type_map: dict[str, SerializeIndividualTypeDescriptionDict]
) -> FullTypeDescriptionDict:
# Traverse reference graph to narrow down the references for the output type
output_type = type_map[output_type_name]
output_references = set()
output_references: Set[str] = set()
process_queue = [
field['type']['nested_type_name']
for field in output_type['fields']
Expand All @@ -517,7 +547,10 @@ def extract_full_type_description(output_type_name, type_map):
}


def extract_subinterface(type_description_msg: dict, field_name: str):
def extract_subinterface(
type_description_msg: FullTypeDescriptionDict,
field_name: str
) -> FullTypeDescriptionDict:
"""
Filter full TypeDescription to produce a TypeDescription for one of its fields' types.
Expand Down
Empty file.
4 changes: 2 additions & 2 deletions rosidl_generator_type_description/test/test_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from rosidl_parser import definition


def test_field_type_serializer():
def test_field_type_serializer() -> None:
# Sanity check for the more complex capacity/string_capacity types and nesting
string_limit = 12
array_size = 22
Expand Down Expand Up @@ -55,7 +55,7 @@ def test_field_type_serializer():
assert result == expected


def test_nested_type_serializer():
def test_nested_type_serializer() -> None:
namespaced_type = definition.NamespacedType(['my_pkg', 'msg'], 'TestThing')
referenced_type = definition.NamespacedType(['other_pkg', 'msg'], 'RefThing')
nested_referenced_type = definition.UnboundedSequence(referenced_type)
Expand Down

0 comments on commit 7415db7

Please sign in to comment.