From 4360000822170c4eeb9b5498eee8acb6d56e6cea Mon Sep 17 00:00:00 2001 From: Vivien Nicolas Date: Fri, 17 Feb 2023 15:18:32 +0100 Subject: [PATCH] [matter_yamltests] Generates an error if a cluster definition exists but the attribute/event/command can not be found (#25109) --- .../matter_yamltests/definitions.py | 112 ++++++------ .../matter_yamltests/parser.py | 161 ++++++++++++++---- .../test_spec_definitions.py | 68 ++++++-- 3 files changed, 240 insertions(+), 101 deletions(-) diff --git a/scripts/py_matter_yamltests/matter_yamltests/definitions.py b/scripts/py_matter_yamltests/matter_yamltests/definitions.py index 6995b78946daf0..23995fae5dd408 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/definitions.py +++ b/scripts/py_matter_yamltests/matter_yamltests/definitions.py @@ -109,6 +109,18 @@ def get_event_name(self, cluster_id: int, event_id: int) -> str: event = self.__get_by_id(cluster_id, event_id, _ItemType.Event) return event.name if event else None + def get_cluster_id_by_name(self, cluster_name: str) -> int: + if cluster_name is None: + return None + + # The idl parser remove spaces + cluster_name = cluster_name.replace(' ', '') + return self.__clusters_by_name.get(cluster_name) + + def has_cluster_by_name(self, cluster_name: str) -> bool: + cluster_id = self.get_cluster_id_by_name(cluster_name) + return cluster_id is not None + def get_command_by_name(self, cluster_name: str, command_name: str) -> Command: return self.__get_by_name(cluster_name, command_name, _ItemType.Request) @@ -149,6 +161,21 @@ def get_type_by_name(self, cluster_name: str, target_name: str): return None + def get_command_names(self, cluster_name: str) -> list[str]: + targets = self.__get_targets_by_cluster_name( + cluster_name, _ItemType.Request) + return [] if targets is None else [name for name in targets] + + def get_event_names(self, cluster_name: str) -> list[str]: + targets = self.__get_targets_by_cluster_name( + cluster_name, _ItemType.Event) + return [] if targets is None else [name for name in targets] + + def get_attribute_names(self, cluster_name: str) -> list[str]: + targets = self.__get_targets_by_cluster_name( + cluster_name, _ItemType.Attribute) + return [] if targets is None else [name for name in targets] + def is_fabric_scoped(self, target) -> bool: if isinstance(target, Event): return bool(target.is_fabric_sensitive) @@ -164,82 +191,71 @@ def is_nullable(self, target) -> bool: return False def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType): - if not cluster_name or not target_name: + if target_name is None: return None - # The idl parser remove spaces - cluster_name = cluster_name.replace(' ', '') - - cluster_id = self.__clusters_by_name.get(cluster_name) + cluster_id = self.get_cluster_id_by_name(cluster_name) if cluster_id is None: return None + targets = self.__get_targets_by_cluster_name(cluster_name, target_type) + if targets is None: + return None + target = None if target_type == _ItemType.Request: - self.__enforce_casing( - target_name, self.__commands_by_name.get(cluster_name)) - target_id = self.__commands_by_name.get( - cluster_name).get(target_name) + target_id = targets.get(target_name) target = self.__get_by_id(cluster_id, target_id, target_type) elif target_type == _ItemType.Response: - self.__enforce_casing( - target_name, self.__responses_by_name.get(cluster_name)) - target_id = self.__responses_by_name.get( - cluster_name).get(target_name) + target_id = targets.get(target_name) target = self.__get_by_id(cluster_id, target_id, target_type) elif target_type == _ItemType.Event: - self.__enforce_casing( - target_name, self.__events_by_name.get(cluster_name)) - target_id = self.__events_by_name.get( - cluster_name).get(target_name) + target_id = targets.get(target_name) target = self.__get_by_id(cluster_id, target_id, target_type) elif target_type == _ItemType.Attribute: - self.__enforce_casing( - target_name, self.__attributes_by_name.get(cluster_name)) - target_id = self.__attributes_by_name.get( - cluster_name).get(target_name) + target_id = targets.get(target_name) target = self.__get_by_id(cluster_id, target_id, target_type) elif target_type == _ItemType.Bitmap: - self.__enforce_casing( - target_name, self.__bitmaps_by_name.get(cluster_name)) - target = self.__bitmaps_by_name.get(cluster_name).get(target_name) + target = targets.get(target_name) elif target_type == _ItemType.Enum: - self.__enforce_casing( - target_name, self.__enums_by_name.get(cluster_name)) - target = self.__enums_by_name.get(cluster_name).get(target_name) + target = targets.get(target_name) elif target_type == _ItemType.Struct: - self.__enforce_casing( - target_name, self.__structs_by_name.get(cluster_name)) - target = self.__structs_by_name.get(cluster_name).get(target_name) + target = targets.get(target_name) return target def __get_by_id(self, cluster_id: int, target_id: int, target_type: str): - targets = None - - if target_type == _ItemType.Request: - targets = self.__commands_by_id.get(cluster_id) - elif target_type == _ItemType.Response: - targets = self.__responses_by_id.get(cluster_id) - elif target_type == _ItemType.Event: - targets = self.__events_by_id.get(cluster_id) - elif target_type == _ItemType.Attribute: - targets = self.__attributes_by_id.get(cluster_id) - + target_mapping = { + _ItemType.Request: self.__commands_by_id, + _ItemType.Response: self.__responses_by_id, + _ItemType.Attribute: self.__attributes_by_id, + _ItemType.Event: self.__events_by_id, + } + + targets = target_mapping[target_type].get(cluster_id) if targets is None: return None return targets.get(target_id) - def __enforce_casing(self, target_name: str, targets: list): - if targets.get(target_name) is not None: - return + def __get_targets_by_cluster_name(self, cluster_name: str, target_type: _ItemType): + if not cluster_name: + return None + + target_mapping = { + _ItemType.Request: self.__commands_by_name, + _ItemType.Response: self.__responses_by_name, + _ItemType.Attribute: self.__attributes_by_name, + _ItemType.Event: self.__events_by_name, + _ItemType.Bitmap: self.__bitmaps_by_name, + _ItemType.Enum: self.__enums_by_name, + _ItemType.Struct: self.__structs_by_name, + } - for name in targets: - if name.lower() == target_name.lower(): - raise KeyError( - f'Unknown target {target_name}. Did you mean {name} ?') + # The idl parser remove spaces + cluster_name = cluster_name.replace(' ', '') + return target_mapping[target_type].get(cluster_name) def SpecDefinitionsFromPaths(paths: str, pseudo_clusters: Optional[PseudoClusters] = PseudoClusters([])): diff --git a/scripts/py_matter_yamltests/matter_yamltests/parser.py b/scripts/py_matter_yamltests/matter_yamltests/parser.py index ee525db4e4ab74..8f9b38a85f63f8 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/parser.py +++ b/scripts/py_matter_yamltests/matter_yamltests/parser.py @@ -25,6 +25,45 @@ from .yaml_loader import YamlLoader +class UnknownPathQualifierError(TestStepError): + """Raise when an attribute/command/event name is not found in the definitions.""" + + def __init__(self, content, target_type, target_name, candidate_names=[]): + if candidate_names: + message = f'Unknown {target_type}: "{target_name}". Candidates are: "{candidate_names}"' + + for candidate_name in candidate_names: + if candidate_name.lower() == target_name.lower(): + message = f'Unknown {target_type}: "{target_name}". Did you mean "{candidate_name}" ?' + break + else: + message = f'The cluster does not have any {target_type}s.' + + super().__init__(message) + self.tag_key_with_error(content, target_type) + + +class TestStepAttributeKeyError(UnknownPathQualifierError): + """Raise when an attribute name is not found in the definitions.""" + + def __init__(self, content, target_name, candidate_names=[]): + super().__init__(content, 'attribute', target_name, candidate_names) + + +class TestStepCommandKeyError(UnknownPathQualifierError): + """Raise when a command name is not found in the definitions.""" + + def __init__(self, content, target_name, candidate_names=[]): + super().__init__(content, 'command', target_name, candidate_names) + + +class TestStepEventKeyError(UnknownPathQualifierError): + """Raise when an event name is not found in the definitions.""" + + def __init__(self, content, target_name, candidate_names=[]): + super().__init__(content, 'event', target_name, candidate_names) + + class PostProcessCheckStatus(Enum): '''Indicates the post processing check step status.''' SUCCESS = 'success', @@ -177,42 +216,7 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_ self._convert_single_value_to_values(response) self.responses_with_placeholders = responses - argument_mapping = None - response_mapping = None - response_mapping_name = None - - if definitions is not None: - if self.is_attribute: - attribute = definitions.get_attribute_by_name( - self.cluster, self.attribute) - if attribute: - attribute_mapping = self._as_mapping(definitions, self.cluster, - attribute.definition.data_type.name) - argument_mapping = attribute_mapping - response_mapping = attribute_mapping - response_mapping_name = attribute.definition.data_type.name - elif self.is_event: - event = definitions.get_event_by_name( - self.cluster, self.event) - if event: - event_mapping = self._as_mapping(definitions, self.cluster, - event.name) - argument_mapping = event_mapping - response_mapping = event_mapping - response_mapping_name = event.name - else: - command = definitions.get_command_by_name( - self.cluster, self.command) - if command: - argument_mapping = self._as_mapping( - definitions, self.cluster, command.input_param) - response_mapping = self._as_mapping( - definitions, self.cluster, command.output_param) - response_mapping_name = command.output_param - - self.argument_mapping = argument_mapping - self.response_mapping = response_mapping - self.response_mapping_name = response_mapping_name + self._update_mappings(test, definitions) self.update_arguments(self.arguments_with_placeholders) self.update_responses(self.responses_with_placeholders) @@ -227,6 +231,89 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_ continue get_constraints(value['constraints']) + def _update_mappings(self, test: dict, definitions: SpecDefinitions): + cluster_name = self.cluster + if definitions is None or not definitions.has_cluster_by_name(cluster_name): + self.argument_mapping = None + self.response_mapping = None + self.response_mapping_name = None + return + + argument_mapping = None + response_mapping = None + response_mapping_name = None + + if self.is_attribute: + attribute_name = self.attribute + attribute = definitions.get_attribute_by_name( + cluster_name, + attribute_name + ) + + if not attribute: + targets = definitions.get_attribute_names(cluster_name) + raise TestStepAttributeKeyError(test, attribute_name, targets) + + attribute_mapping = self._as_mapping( + definitions, + cluster_name, + attribute.definition.data_type.name + ) + + argument_mapping = attribute_mapping + response_mapping = attribute_mapping + response_mapping_name = attribute.definition.data_type.name + elif self.is_event: + event_name = self.event + event = definitions.get_event_by_name( + cluster_name, + event_name + ) + + if not event: + targets = definitions.get_event_names(cluster_name) + raise TestStepEventKeyError(test, event_name, targets) + + event_mapping = self._as_mapping( + definitions, + cluster_name, + event_name + ) + + argument_mapping = event_mapping + response_mapping = event_mapping + response_mapping_name = event.name + else: + command_name = self.command + command = definitions.get_command_by_name( + cluster_name, + command_name + ) + + if not command: + targets = definitions.get_command_names(cluster_name) + raise TestStepCommandKeyError(test, command_name, targets) + + if command.input_param is None: + argument_mapping = {} + else: + argument_mapping = self._as_mapping( + definitions, + cluster_name, + command.input_param + ) + + response_mapping = self._as_mapping( + definitions, + cluster_name, + command.output_param + ) + response_mapping_name = command.output_param + + self.argument_mapping = argument_mapping + self.response_mapping = response_mapping + self.response_mapping_name = response_mapping_name + def _convert_single_value_to_values(self, container): if container is None or 'values' in container: return @@ -274,7 +361,7 @@ def update_responses(self, responses_with_placeholders): response, self.response_mapping) def _update_with_definition(self, container: dict, mapping_type): - if not container or not mapping_type: + if not container or mapping_type is None: return values = container['values'] diff --git a/scripts/py_matter_yamltests/test_spec_definitions.py b/scripts/py_matter_yamltests/test_spec_definitions.py index 4ef8653dfded96..cbfa95799d50d9 100644 --- a/scripts/py_matter_yamltests/test_spec_definitions.py +++ b/scripts/py_matter_yamltests/test_spec_definitions.py @@ -252,8 +252,8 @@ def test_get_command_by_name(self): 'Test', 'TestCommand'), Command) self.assertIsNone( definitions.get_command_by_name('test', 'TestCommand')) - self.assertRaises(KeyError, definitions.get_command_by_name, - 'Test', 'testcommand') + self.assertIsNone( + definitions.get_command_by_name('Test', 'testcommand')) def test_get_response_by_name(self): definitions = SpecDefinitions( @@ -268,8 +268,8 @@ def test_get_response_by_name(self): 'Test', 'TestCommandResponse'), Struct) self.assertIsNone(definitions.get_response_by_name( 'test', 'TestCommandResponse')) - self.assertRaises(KeyError, definitions.get_response_by_name, - 'Test', 'testcommandresponse') + self.assertIsNone(definitions.get_response_by_name( + 'Test', 'testcommandresponse')) def test_get_attribute_by_name(self): definitions = SpecDefinitions( @@ -288,10 +288,10 @@ def test_get_attribute_by_name(self): 'test', 'TestAttribute')) self.assertIsNone(definitions.get_attribute_by_name( 'test', 'TestGlobalAttribute')) - self.assertRaises(KeyError, definitions.get_attribute_by_name, - 'Test', 'testattribute') - self.assertRaises(KeyError, definitions.get_attribute_by_name, - 'Test', 'testglobalattribute') + self.assertIsNone(definitions.get_attribute_by_name( + 'Test', 'testattribute')) + self.assertIsNone(definitions.get_attribute_by_name( + 'Test', 'testglobalattribute')) def test_get_event_by_name(self): definitions = SpecDefinitions( @@ -303,8 +303,7 @@ def test_get_event_by_name(self): self.assertIsInstance( definitions.get_event_by_name('Test', 'TestEvent'), Event) self.assertIsNone(definitions.get_event_by_name('test', 'TestEvent')) - self.assertRaises( - KeyError, definitions.get_event_by_name, 'Test', 'testevent') + self.assertIsNone(definitions.get_event_by_name('Test', 'testevent')) def test_get_bitmap_by_name(self): definitions = SpecDefinitions( @@ -316,8 +315,7 @@ def test_get_bitmap_by_name(self): self.assertIsInstance(definitions.get_bitmap_by_name( 'Test', 'TestBitmap'), Bitmap) self.assertIsNone(definitions.get_bitmap_by_name('test', 'TestBitmap')) - self.assertRaises(KeyError, definitions.get_bitmap_by_name, - 'Test', 'testbitmap') + self.assertIsNone(definitions.get_bitmap_by_name('Test', 'testbitmap')) def test_get_enum_by_name(self): definitions = SpecDefinitions( @@ -329,8 +327,7 @@ def test_get_enum_by_name(self): self.assertIsInstance( definitions.get_enum_by_name('Test', 'TestEnum'), Enum) self.assertIsNone(definitions.get_enum_by_name('test', 'TestEnum')) - self.assertRaises( - KeyError, definitions.get_enum_by_name, 'Test', 'testenum') + self.assertIsNone(definitions.get_enum_by_name('Test', 'testenum')) def test_get_struct_by_name(self): definitions = SpecDefinitions( @@ -342,8 +339,7 @@ def test_get_struct_by_name(self): self.assertIsInstance(definitions.get_struct_by_name( 'Test', 'TestStruct'), Struct) self.assertIsNone(definitions.get_struct_by_name('test', 'TestStruct')) - self.assertRaises( - KeyError, definitions.get_struct_by_name, 'Test', 'teststruct') + self.assertIsNone(definitions.get_struct_by_name('Test', 'teststruct')) def test_get_type_by_name(self): definitions = SpecDefinitions( @@ -402,6 +398,46 @@ def test_event_is_fabric_scoped(self): 'Test', 'TestEventFabricScoped') self.assertTrue(definitions.is_fabric_scoped(event)) + def test_get_cluster_id_by_name(self): + definitions = SpecDefinitions( + [ParseSource(source=io.StringIO(source_cluster), name='source_cluster')]) + + cluster_id = definitions.get_cluster_id_by_name('Test') + self.assertEqual(cluster_id, 0x1234) + + cluster_id = definitions.get_cluster_id_by_name('test') + self.assertIsNone(cluster_id) + + def test_get_command_names(self): + definitions = SpecDefinitions( + [ParseSource(source=io.StringIO(source_command), name='source_command')]) + + commands = definitions.get_command_names('Test') + self.assertEqual(commands, ['TestCommand']) + + commands = definitions.get_command_names('test') + self.assertEqual(commands, []) + + def test_get_attribute_names(self): + definitions = SpecDefinitions( + [ParseSource(source=io.StringIO(source_attribute), name='source_attribute')]) + + attributes = definitions.get_attribute_names('Test') + self.assertEqual(attributes, ['TestAttribute', 'TestGlobalAttribute']) + + attributes = definitions.get_attribute_names('test') + self.assertEqual(attributes, []) + + def test_get_event_names(self): + definitions = SpecDefinitions( + [ParseSource(source=io.StringIO(source_event), name='source_event')]) + + events = definitions.get_event_names('Test') + self.assertEqual(events, ['TestEvent', 'TestEventFabricScoped']) + + events = definitions.get_event_names('test') + self.assertEqual(events, []) + if __name__ == '__main__': unittest.main()