From f0b5c68283dc2f7e5bee75f8e138bda09a517605 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Thu, 26 Oct 2023 11:03:24 -0400 Subject: [PATCH] Cluster conformance checker script (#29895) * Cluster conformance checker script This script parses the Data Model XML conformance tags into conformance checking functions and uses them to conformance check devices based on the supplied feature, command and attribute global values. This script does NOT yet perform choice conformance checks. This script currently WILL NOT fail on conformance errors because we have conformance errors in our example apps and we want to be able to run this in CI with the other device composition tests. The test failure mode will be turned on once the conformance issues are fixed. * Fix lint errors * Restyled by autopep8 * Remove accidentally added files * Address review comments * Add new test to CI --------- Co-authored-by: Restyled.io --- .github/workflows/tests.yaml | 1 + .../TC_DeviceBasicComposition.py | 128 +++- src/python_testing/TestConformanceSupport.py | 575 ++++++++++++++++++ src/python_testing/conformance_support.py | 258 ++++++++ src/python_testing/matter_testing_support.py | 21 +- src/python_testing/spec_parsing_support.py | 322 ++++++++++ 6 files changed, 1300 insertions(+), 5 deletions(-) create mode 100644 src/python_testing/TestConformanceSupport.py create mode 100644 src/python_testing/conformance_support.py create mode 100644 src/python_testing/spec_parsing_support.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 97897e8086c941..d6c580e643e9ff 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -468,6 +468,7 @@ jobs: scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCCLEANM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCRUNM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' + scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestConformanceSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_12.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py index b333085fa2eef1..1f5dc38813a975 100644 --- a/src/python_testing/TC_DeviceBasicComposition.py +++ b/src/python_testing/TC_DeviceBasicComposition.py @@ -31,8 +31,11 @@ import chip.clusters.ClusterObjects import chip.tlv from chip.clusters.Attribute import ValueDecodeFailure -from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main +from conformance_support import ConformanceDecision, conformance_allowed +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, + async_test_body, default_matter_test_main) from mobly import asserts +from spec_parsing_support import CommandType, build_xml_clusters def MatterTlvToJson(tlv_data: dict[int, Any]) -> dict[str, Any]: @@ -870,6 +873,129 @@ def test_DESC_2_2(self): if problems or root_problems: self.fail_current_test("Problems with tags lists") + def test_spec_conformance(self): + success = True + # TODO: provisional needs to be an input parameter + allow_provisional = True + clusters, problems = build_xml_clusters() + self.problems = self.problems + problems + for id in sorted(list(clusters.keys())): + print(f'{id} 0x{id:02x}: {clusters[id].name}') + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + if cluster_id not in clusters.keys(): + if (cluster_id & 0xFFFF_0000) != 0: + # manufacturer cluster + continue + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id) + # TODO: update this from a warning once we have all the data + self.record_warning(self.get_test_name(), location=location, + problem='Standard cluster found on device, but is not present in spec data') + continue + + # TODO: switch to use global FEATURE_MAP_ID etc. once the IDM-10.1 change is merged. + FEATURE_MAP_ID = 0xFFFC + ATTRIBUTE_LIST_ID = 0xFFFB + ACCEPTED_COMMAND_ID = 0xFFF9 + GENERATED_COMMAND_ID = 0xFFF8 + + feature_map = cluster[FEATURE_MAP_ID] + attribute_list = cluster[ATTRIBUTE_LIST_ID] + all_command_list = cluster[ACCEPTED_COMMAND_ID] + cluster[GENERATED_COMMAND_ID] + + # Feature conformance checking + feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)] + for f in feature_masks: + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=FEATURE_MAP_ID) + if f not in clusters[cluster_id].features.keys(): + self.record_error(self.get_test_name(), location=location, problem=f'Unknown feature with mask 0x{f:02x}') + success = False + continue + xml_feature = clusters[cluster_id].features[f] + conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + self.record_error(self.get_test_name(), location=location, + problem=f'Disallowed feature with mask 0x{f:02x}') + success = False + for feature_mask, xml_feature in clusters[cluster_id].features.items(): + conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and feature_mask not in feature_masks: + self.record_error(self.get_test_name(), location=location, + problem=f'Required feature with mask 0x{f:02x} is not present in feature map') + success = False + + # Attribute conformance checking + for attribute_id, attribute in cluster.items(): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + if attribute_id not in clusters[cluster_id].attributes.keys(): + # TODO: Consolidate the range checks with IDM-10.1 once that lands + if attribute_id <= 0x4FFF: + # manufacturer attribute + self.record_error(self.get_test_name(), location=location, + problem='Standard attribute found on device, but not in spec') + success = False + continue + xml_attribute = clusters[cluster_id].attributes[attribute_id] + conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Attribute 0x{attribute_id:02x} is included, but is disallowed by conformance') + success = False + for attribute_id, xml_attribute in clusters[cluster_id].attributes.items(): + conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and attribute_id not in cluster.keys(): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Attribute 0x{attribute_id:02x} is required, but is not present on the DUT') + success = False + + def check_spec_conformance_for_commands(command_type: CommandType) -> bool: + success = True + # TODO: once IDM-10.1 lands, use the globals + global_attribute_id = 0xFFF9 if command_type == CommandType.ACCEPTED else 0xFFF8 + xml_commands_dict = clusters[cluster_id].accepted_commands if command_type == CommandType.ACCEPTED else clusters[cluster_id].generated_commands + command_list = cluster[global_attribute_id] + for command_id in command_list: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id) + if command_id not in xml_commands_dict: + # TODO: Consolidate range checks with IDM-10.1 once that lands + if command_id <= 0xFF: + # manufacturer command + continue + self.record_error(self.get_test_name(), location=location, + problem='Standard command found on device, but not in spec') + success = False + continue + xml_command = xml_commands_dict[command_id] + conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + self.record_error(self.get_test_name(), location=location, + problem=f'Command 0x{command_id:02x} is included, but disallowed by conformance') + success = False + for command_id, xml_command in xml_commands_dict.items(): + conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and command_id not in command_list: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Command 0x{command_id:02x} is required, but is not present on the DUT') + success = False + return success + + # Command conformance checking + cmd_success = check_spec_conformance_for_commands(CommandType.ACCEPTED) + success = False if not cmd_success else success + cmd_success = check_spec_conformance_for_commands(CommandType.GENERATED) + success = False if not cmd_success else success + + # TODO: Add choice checkers + + if not success: + # TODO: Right now, we have failures in all-cluster, so we can't fail this test and keep it in CI. For now, just log. + # Issue tracking: #29812 + # self.fail_current_test("Problems with conformance") + logging.error("Problems found with conformance, this should turn into a test failure once #29812 is resolved") + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/TestConformanceSupport.py b/src/python_testing/TestConformanceSupport.py new file mode 100644 index 00000000000000..53f9e885ff9449 --- /dev/null +++ b/src/python_testing/TestConformanceSupport.py @@ -0,0 +1,575 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import xml.etree.ElementTree as ElementTree + +from conformance_support import ConformanceDecision, ConformanceParseParameters, parse_callable_from_xml +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TestConformanceSupport(MatterBaseTest): + @async_test_body + async def setup_class(self): + super().setup_class() + # a small feature map + self.feature_names_to_bits = {'AB': 0x01, 'CD': 0x02} + + # none, AB, CD, AB&CD + self.feature_maps = [0x00, 0x01, 0x02, 0x03] + self.has_ab = [False, True, False, True] + self.has_cd = [False, False, True, True] + + self.attribute_names_to_values = {'attr1': 0x00, 'attr2': 0x01} + self.attribute_lists = [[], [0x00], [0x01], [0x00, 0x01]] + self.has_attr1 = [False, True, False, True] + self.has_attr2 = [False, False, True, True] + + self.command_names_to_values = {'cmd1': 0x00, 'cmd2': 0x01} + self.cmd_lists = [[], [0x00], [0x01], [0x00, 0x01]] + self.has_cmd1 = [False, True, False, True] + self.has_cmd2 = [False, False, True, True] + self.params = ConformanceParseParameters( + feature_map=self.feature_names_to_bits, attribute_map=self.attribute_names_to_values, command_map=self.command_names_to_values) + + @async_test_body + async def test_conformance_mandatory(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + + @async_test_body + async def test_conformance_optional(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + + @async_test_body + async def test_conformance_disallowed(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED) + + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED) + + @async_test_body + async def test_conformance_provisional(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL) + + @async_test_body + async def test_conformance_mandatory_on_condition(self): + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute mandatory + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # test command in optional and in boolean - this is the same as attribute essentially, so testing every permutation is overkill + + @async_test_body + async def test_conformance_optional_on_condition(self): + # single feature optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # single command optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, c in enumerate(self.cmd_lists): + if self.has_cmd1[i]: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, c in enumerate(self.cmd_lists): + if self.has_cmd2[i]: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_not_term_mandatory(self): + # single feature not mandatory + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute not mandatory + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if not self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if not self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_not_term_optional(self): + # single feature not optional + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term(self): + # and term for features only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] and self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # and term for attributes only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i] and self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # and term for feature and attribute + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + if self.has_ab[i] and self.has_attr2[j]: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term(self): + # or term feature only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] or self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # or term attribute only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i] or self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # or term feature and attribute + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + if self.has_ab[i] or self.has_attr2[j]: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term_with_not(self): + # and term with not + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i] and self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term_with_not(self): + # or term with not on second feature + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] or not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # not around or term with + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not (self.has_ab[i] or self.has_cd[i]): + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term_with_three_terms(self): + # and term with three features + xml = ('' + '' + '' + '' + '' + '' + '') + self.feature_names_to_bits['EF'] = 0x04 + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + # no features + asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE) + # one feature + asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.NOT_APPLICABLE) + # all features + asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL) + + # and term with one of each + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + for k, c in enumerate(self.cmd_lists): + if self.has_ab[i] and self.has_attr1[j] and self.has_cmd1[k]: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term_with_three_terms(self): + # or term with three features + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + # no features + asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE) + # one feature + asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.OPTIONAL) + # all features + asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL) + + # or term with one of each + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + for k, c in enumerate(self.cmd_lists): + if self.has_ab[i] or self.has_attr1[j] or self.has_cmd1[k]: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE) + + def test_conformance_otherwise(self): + # AB, O + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + + # AB, [CD] + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + elif self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # AB & !CD, P + xml = ('' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] and not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/conformance_support.py b/src/python_testing/conformance_support.py new file mode 100644 index 00000000000000..9df852f10df696 --- /dev/null +++ b/src/python_testing/conformance_support.py @@ -0,0 +1,258 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import xml.etree.ElementTree as ElementTree +from dataclasses import dataclass +from enum import Enum, auto +from typing import Callable + +from chip.tlv import uint + +OTHERWISE_CONFORM = 'otherwiseConform' +OPTIONAL_CONFORM = 'optionalConform' +PROVISIONAL_CONFORM = 'provisionalConform' +MANDATORY_CONFORM = 'mandatoryConform' +DEPRECATE_CONFORM = 'deprecateConform' +DISALLOW_CONFORM = 'disallowConform' +AND_TERM = 'andTerm' +OR_TERM = 'orTerm' +NOT_TERM = 'notTerm' +FEATURE_TAG = 'feature' +ATTRIBUTE_TAG = 'attribute' +COMMAND_TAG = 'command' + + +class ConformanceException(Exception): + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return f"ConformanceException({self.msg})" + + +class ConformanceDecision(Enum): + MANDATORY = auto() + OPTIONAL = auto() + NOT_APPLICABLE = auto() + DISALLOWED = auto() + PROVISIONAL = auto() + + +@dataclass +class ConformanceParseParameters: + feature_map: dict[str, uint] + attribute_map: dict[str, uint] + command_map: dict[str, uint] + + +def conformance_allowed(conformance_decision: ConformanceDecision, allow_provisional: bool): + if conformance_decision == ConformanceDecision.NOT_APPLICABLE or conformance_decision == ConformanceDecision.DISALLOWED: + return False + if conformance_decision == ConformanceDecision.PROVISIONAL: + return allow_provisional + return True + + +def mandatory(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.MANDATORY + + +def optional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.OPTIONAL + + +def deprecated(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.DISALLOWED + + +def disallowed(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.DISALLOWED + + +def provisional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.PROVISIONAL + + +def feature(requiredFeature: uint) -> Callable: + def feature_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredFeature & feature_map != 0: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return feature_inner + + +def attribute(requiredAttribute: uint) -> Callable: + def attribute_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredAttribute in attribute_list: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return attribute_inner + + +def command(requiredCommand: uint) -> Callable: + def command_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredCommand in all_command_list: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return command_inner + + +def optional_wrapper(op: Callable) -> Callable: + def optional_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.MANDATORY or decision == ConformanceDecision.OPTIONAL: + return ConformanceDecision.OPTIONAL + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.NOT_APPLICABLE + else: + raise ConformanceException(f'Optional wrapping invalid op {decision}') + return optional_wrapper_inner + + +def mandatory_wrapper(op: Callable) -> Callable: + def mandatory_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return op(feature_map, attribute_list, all_command_list) + return mandatory_wrapper_inner + + +def not_operation(op: Callable): + def not_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + # not operations can't be used with anything that returns DISALLOWED + # not operations also can't be used with things that are optional + # ie, ![AB] doesn't make sense, nor does !O + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('NOT operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.MANDATORY + elif decision == ConformanceDecision.MANDATORY: + return ConformanceDecision.NOT_APPLICABLE + else: + raise ConformanceException('NOT called on item with non-conformance value') + return not_operation_inner + + +def and_operation(op_list: list[Callable]) -> Callable: + def and_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + # and operations can't happen on optional or disallowed + if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('AND operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.NOT_APPLICABLE + elif decision == ConformanceDecision.MANDATORY: + continue + else: + raise ConformanceException('Oplist item returned non-conformance value') + return ConformanceDecision.MANDATORY + return and_operation_inner + + +def or_operation(op_list: list[Callable]) -> Callable: + def or_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('OR operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + continue + elif decision == ConformanceDecision.MANDATORY: + return ConformanceDecision.MANDATORY + elif decision == ConformanceDecision.OPTIONAL: + return ConformanceDecision.OPTIONAL + else: + raise ConformanceException('Oplist item returned non-conformance value') + return ConformanceDecision.NOT_APPLICABLE + return or_operation_inner + +# TODO: add xor operation once it's required +# TODO: how would equal and unequal operations work here? + + +def otherwise(op_list: list[Callable]) -> Callable: + def otherwise_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + # Otherwise operations apply from left to right. If any of them + # has a definite decision (optional, mandatory or disallowed), that is the one that applies + # Provisional items are meant to be marked as the first item in the list + # Deprecated items are either on their own, or follow an O as O,D. + # For O,D, optional applies (leftmost), but we should consider some way to warn here as well, + # possibly in another function + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.NOT_APPLICABLE: + continue + return decision + return ConformanceDecision.NOT_APPLICABLE + return otherwise_inner + + +def parse_callable_from_xml(element: ElementTree.Element, params: ConformanceParseParameters) -> Callable: + if len(list(element)) == 0: + # no subchildren here, so this can only be mandatory, optional, provisional, deprecated, disallowed, feature or attribute + if element.tag == MANDATORY_CONFORM: + return mandatory + elif element.tag == OPTIONAL_CONFORM: + return optional + elif element.tag == PROVISIONAL_CONFORM: + return provisional + elif element.tag == DEPRECATE_CONFORM: + return deprecated + elif element.tag == DISALLOW_CONFORM: + return disallowed + elif element.tag == FEATURE_TAG: + return feature(params.feature_map[element.get('name')]) + elif element.tag == ATTRIBUTE_TAG: + # Some command conformance tags are marked as attribute, so if this key isn't in attribute, try command + name = element.get('name') + if name in params.attribute_map: + return attribute(params.attribute_map[name]) + else: + return command(params.command_map[name]) + elif element.tag == COMMAND_TAG: + return command(params.command_map[element.get('name')]) + else: + raise ConformanceException( + f'Unexpected xml conformance element with no children {str(element.tag)} {str(element.attrib)}') + + # First build the list, then create the callable for this element + ops = [] + for sub in element: + ops.append(parse_callable_from_xml(sub, params)) + + # optional can be a wrapper as well as a standalone + # This can be any of the boolean operations, optional or otherwise + if element.tag == OPTIONAL_CONFORM: + if len(ops) > 1: + raise ConformanceException(f'OPTIONAL term found with more than one subelement {list(element)}') + return optional_wrapper(ops[0]) + elif element.tag == MANDATORY_CONFORM: + if len(ops) > 1: + raise ConformanceException(f'MANDATORY term found with more than one subelement {list(element)}') + return mandatory_wrapper(ops[0]) + elif element.tag == AND_TERM: + return and_operation(ops) + elif element.tag == OR_TERM: + return or_operation(ops) + elif element.tag == NOT_TERM: + if len(ops) > 1: + raise ConformanceException(f'NOT term found with more than one subelement {list(element)}') + return not_operation(ops[0]) + elif element.tag == OTHERWISE_CONFORM: + return otherwise(ops) + else: + raise ConformanceException(f'Unexpected conformance tag with children {element}') diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index a394952445de60..398a01f6cd9d17 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -333,6 +333,19 @@ class CommandPathLocation: cluster_id: int command_id: int + +@dataclass +class ClusterPathLocation: + endpoint_id: int + cluster_id: int + + +@dataclass +class FeaturePathLocation: + endpoint_id: int + cluster_id: int + feature_code: str + # ProblemSeverity is not using StrEnum, but rather Enum, since StrEnum only # appeared in 3.11. To make it JSON serializable easily, multiple inheritance # from `str` is used. See https://stackoverflow.com/a/51976841. @@ -347,7 +360,7 @@ class ProblemSeverity(str, Enum): @dataclass class ProblemNotice: test_name: str - location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation] + location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation] severity: ProblemSeverity problem: str spec_location: str = "" @@ -551,13 +564,13 @@ async def send_single_cmd( def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: logging.info(f'***** Test Step {stepnum} : {title}') - def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.ERROR, problem, spec_location)) - def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.WARNING, problem, spec_location)) - def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location)) def get_setup_payload_info(self) -> SetupPayloadInfo: diff --git a/src/python_testing/spec_parsing_support.py b/src/python_testing/spec_parsing_support.py new file mode 100644 index 00000000000000..b86387ce23073f --- /dev/null +++ b/src/python_testing/spec_parsing_support.py @@ -0,0 +1,322 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import glob +import os +import xml.etree.ElementTree as ElementTree +from copy import deepcopy +from dataclasses import dataclass +from enum import Enum, auto +from typing import Callable + +from chip.tlv import uint +from conformance_support import (DEPRECATE_CONFORM, DISALLOW_CONFORM, MANDATORY_CONFORM, OPTIONAL_CONFORM, OTHERWISE_CONFORM, + PROVISIONAL_CONFORM, ConformanceDecision, ConformanceParseParameters, or_operation, + parse_callable_from_xml) +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, EventPathLocation, + FeaturePathLocation, ProblemNotice, ProblemSeverity) + + +@dataclass +class XmlFeature: + code: str + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlAttribute: + name: str + datatype: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlCommand: + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlEvent: + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlCluster: + name: str + revision: int + derived: str + feature_map: dict[str, uint] + attribute_map: dict[str, uint] + command_map: dict[str, uint] + features: dict[str, XmlFeature] + attributes: dict[uint, XmlAttribute] + accepted_commands: dict[uint, XmlCommand] + generated_commands: dict[uint, XmlCommand] + events: dict[uint, XmlEvent] + + +class CommandType(Enum): + ACCEPTED = auto() + GENERATED = auto() + + +def has_zigbee_conformance(conformance: ElementTree.Element) -> bool: + # For clusters, things with zigbee conformance can share IDs with the matter elements, so we don't want them + + # TODO: it's actually possible for a thing to have a zigbee conformance AND to have other conformances, and we should check + # for that, but for now, this is fine because that hasn't happened in the cluster conformances YET. + # It does happen for device types, so we need to be careful there. + condition = conformance.iter('condition') + for c in condition: + try: + c.attrib['name'].lower() == "zigbee" + return True + except KeyError: + continue + return False + + +class ClusterParser: + def __init__(self, cluster, cluster_id, name): + self._problems: list[ProblemNotice] = [] + self._cluster = cluster + self._cluster_id = cluster_id + self._name = name + + self._derived = None + try: + classification = next(cluster.iter('classification')) + hierarchy = classification.attrib['hierarchy'] + if hierarchy.lower() == 'derived': + self._derived = classification.attrib['baseCluster'] + except (KeyError, StopIteration): + self._derived = None + + self.feature_elements = self.get_all_feature_elements() + self.attribute_elements = self.get_all_attribute_elements() + self.command_elements = self.get_all_command_elements() + self.event_elements = self.get_all_event_elements() + self.params = ConformanceParseParameters(feature_map=self.create_feature_map(), attribute_map=self.create_attribute_map(), + command_map=self.create_command_map()) + + def get_conformance(self, element: ElementTree.Element) -> ElementTree.Element: + for sub in element: + if sub.tag == OTHERWISE_CONFORM or sub.tag == MANDATORY_CONFORM or sub.tag == OPTIONAL_CONFORM or sub.tag == PROVISIONAL_CONFORM or sub.tag == DEPRECATE_CONFORM or sub.tag == DISALLOW_CONFORM: + return sub + + # Conformance is missing, so let's record the problem and treat it as optional for lack of a better choice + if element.tag == 'feature': + location = FeaturePathLocation(endpoint_id=0, cluster_id=self._cluster_id, feature_code=element.attrib['code']) + elif element.tag == 'command': + location = CommandPathLocation(endpoint_id=0, cluster_id=self._cluster_id, command_id=element.attrib['id']) + elif element.tag == 'attribute': + location = AttributePathLocation(endpoint_id=0, cluster_id=self._cluster_id, attribute_id=element.attrib['id']) + elif element.tag == 'event': + location = EventPathLocation(endpoint_id=0, cluster_id=self._cluster_id, event_id=element.attrib['id']) + else: + location = ClusterPathLocation(endpoing_id=0, cluster_id=self._cluster_id) + self._problems.append(ProblemNotice(test_name='Spec XML parsing', location=location, + severity=ProblemSeverity.WARNING, problem='Unable to find conformance element')) + + return ElementTree.Element(OPTIONAL_CONFORM) + + def get_all_type(self, type_container: str, type_name: str, key_name: str) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ret = [] + container_tags = self._cluster.iter(type_container) + for container in container_tags: + elements = container.iter(type_name) + for element in elements: + try: + element.attrib[key_name] + except KeyError: + # This is a conformance tag, which uses the same name + continue + conformance = self.get_conformance(element) + if has_zigbee_conformance(conformance): + continue + ret.append((element, conformance)) + return ret + + def get_all_feature_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of features and their conformances''' + return self.get_all_type('features', 'feature', 'code') + + def get_all_attribute_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of attributes and their conformances''' + return self.get_all_type('attributes', 'attribute', 'id') + + def get_all_command_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of commands and their conformances ''' + return self.get_all_type('commands', 'command', 'id') + + def get_all_event_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of events and their conformances''' + return self.get_all_type('events', 'event', 'id') + + def create_feature_map(self) -> dict[str, uint]: + features = {} + for element, conformance in self.feature_elements: + features[element.attrib['code']] = 1 << int(element.attrib['bit'], 0) + return features + + def create_attribute_map(self) -> dict[str, uint]: + attributes = {} + for element, conformance in self.attribute_elements: + attributes[element.attrib['name']] = int(element.attrib['id'], 0) + return attributes + + def create_command_map(self) -> dict[str, uint]: + commands = {} + for element, conformance in self.command_elements: + commands[element.attrib['name']] = int(element.attrib['id'], 0) + return commands + + def parse_features(self) -> dict[uint, XmlFeature]: + features = {} + for element, conformance in self.feature_elements: + mask = 1 << int(element.attrib['bit'], 0) + features[mask] = XmlFeature(code=element.attrib['code'], name=element.attrib['name'], + conformance=parse_callable_from_xml(conformance, self.params)) + return features + + def parse_attributes(self) -> dict[uint, XmlAttribute]: + attributes = {} + for element, conformance_xml in self.attribute_elements: + code = int(element.attrib['id'], 0) + # Some deprecated attributes don't have their types included, for now, lets just fallback to UNKNOWN + try: + datatype = element.attrib['type'] + except KeyError: + datatype = 'UNKNOWN' + conformance = parse_callable_from_xml(conformance_xml, self.params) + if code in attributes: + # This is one of those fun ones where two different rows have the same id and name, but differ in conformance and ranges + # I don't have a good way to relate the ranges to the conformance, but they're both acceptable, so let's just or them. + conformance = or_operation([conformance, attributes[code].conformance]) + attributes[code] = XmlAttribute(name=element.attrib['name'], datatype=datatype, + conformance=conformance) + return attributes + + def parse_commands(self, command_type: CommandType) -> dict[uint, XmlAttribute]: + commands = {} + for element, conformance_xml in self.command_elements: + code = int(element.attrib['id'], 0) + dir = CommandType.ACCEPTED + try: + if element.attrib['direction'].lower() == 'responsefromserver': + dir = CommandType.GENERATED + except KeyError: + pass + if dir != command_type: + continue + code = int(element.attrib['id'], 0) + conformance = parse_callable_from_xml(conformance_xml, self.params) + if code in commands: + conformance = or_operation([conformance, commands[code].conformance]) + commands[code] = XmlCommand(name=element.attrib['name'], conformance=conformance) + return commands + + def parse_events(self) -> dict[uint, XmlAttribute]: + events = {} + for element, conformance_xml in self.event_elements: + code = int(element.attrib['id'], 0) + conformance = parse_callable_from_xml(conformance_xml, self.params) + if code in events: + conformance = or_operation([conformance, events[code].conformance]) + events[code] = XmlEvent(name=element.attrib['name'], conformance=conformance) + return events + + def create_cluster(self) -> XmlCluster: + return XmlCluster(revision=self._cluster.attrib['revision'], derived=self._derived, + name=self._name, feature_map=self.params.feature_map, + attribute_map=self.params.attribute_map, command_map=self.params.command_map, + features=self.parse_features(), + attributes=self.parse_attributes(), + accepted_commands=self.parse_commands(CommandType.ACCEPTED), + generated_commands=self.parse_commands(CommandType.GENERATED), + events=self.parse_events()) + + def get_problems(self) -> list[ProblemNotice]: + return self._problems + + +def build_xml_clusters() -> tuple[list[XmlCluster], list[ProblemNotice]]: + dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', 'data_model', 'clusters') + clusters: dict[int, XmlCluster] = {} + derived_clusters: dict[str, XmlCluster] = {} + ids_by_name = {} + problems = [] + for xml in glob.glob(f"{dir}/*.xml"): + tree = ElementTree.parse(f'{xml}') + root = tree.getroot() + cluster = root.iter('cluster') + for c in cluster: + name = c.attrib['name'] + if not c.attrib['id']: + # Fully derived clusters have no id, but also shouldn't appear on a device. + # We do need to keep them, though, because we need to update the derived + # clusters. We keep them in a special dict by name, so they can be thrown + # away later. + cluster_id = None + else: + cluster_id = int(c.attrib['id'], 0) + ids_by_name[name] = cluster_id + + parser = ClusterParser(c, cluster_id, name) + new = parser.create_cluster() + problems = problems + parser.get_problems() + + if cluster_id: + clusters[cluster_id] = new + else: + derived_clusters[name] = new + + # We have the information now about which clusters are derived, so we need to fix them up. Apply first the base cluster, + # then add the specific cluster overtop + for id, c in clusters.items(): + if c.derived: + base_name = c.derived + if base_name in ids_by_name: + base = clusters[ids_by_name[c.derived]] + else: + base = derived_clusters[base_name] + + feature_map = deepcopy(base.feature_map) + feature_map.update(c.feature_map) + attribute_map = deepcopy(base.attribute_map) + attribute_map.update(c.attribute_map) + command_map = deepcopy(base.command_map) + command_map.update(c.command_map) + features = deepcopy(base.features) + features.update(c.features) + attributes = deepcopy(base.attributes) + attributes.update(c.attributes) + accepted_commands = deepcopy(base.accepted_commands) + accepted_commands.update(c.accepted_commands) + generated_commands = deepcopy(base.generated_commands) + generated_commands.update(c.generated_commands) + events = deepcopy(base.events) + events.update(c.events) + new = XmlCluster(revision=c.revision, derived=c.derived, name=c.name, + feature_map=feature_map, attribute_map=attribute_map, command_map=command_map, + features=features, attributes=attributes, accepted_commands=accepted_commands, + generated_commands=generated_commands, events=events) + clusters[id] = new + return clusters, problems