Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TC-IDM-10.5: Device type conformance - Add #34424

Merged
merged 11 commits into from
Jul 26, 2024
99 changes: 95 additions & 4 deletions src/python_testing/TC_DeviceConformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@
# test-runner-run/run1/script-args: --storage-path admin_storage.json --manual-code 10054912339 --bool-arg ignore_in_progress:True allow_provisional:True --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --tests test_TC_IDM_10_2
# === END CI TEST ARGUMENTS ===

# TODO: Enable 10.5 in CI once the door lock OTA requestor problem is sorted.
from typing import Callable

import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from chip.tlv import uint
from conformance_support import ConformanceDecision, conformance_allowed
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, ProblemNotice,
ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters
from global_attribute_ids import (ClusterIdType, DeviceTypeIdType, GlobalAttributeIds, cluster_id_type, device_type_id_type,
is_valid_device_type_id)
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, DeviceTypePathLocation,
MatterBaseTest, ProblemNotice, ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters, build_xml_device_types


class DeviceConformanceTests(BasicCompositionTests):
async def setup_class_helper(self):
await super().setup_class_helper()
self.xml_clusters, self.problems = build_xml_clusters()
self.xml_device_types, problems = build_xml_device_types()
self.problems.extend(problems)

def check_conformance(self, ignore_in_progress: bool, is_ci: bool):
problems = []
Expand Down Expand Up @@ -233,6 +237,86 @@ def record_warning(location, problem):

return success, problems

def check_device_type(self, fail_on_extra_clusters: bool = True, allow_provisional: bool = False) -> tuple[bool, list[ProblemNotice]]:
success = True
problems = []

def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.5", location, severity, problem, ""))

def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False

def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)

for endpoint_id, endpoint in self.endpoints.items():
if Clusters.Descriptor not in endpoint:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id)
record_error(location=location, problem='No descriptor cluster found on endpoint')
continue

device_type_list = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]
invalid_device_types = [x for x in device_type_list if not is_valid_device_type_id(device_type_id_type(x.deviceType))]
standard_device_types = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.DeviceTypeList] if device_type_id_type(x.deviceType) == DeviceTypeIdType.kStandard]
endpoint_clusters = []
server_clusters = []
for device_type in invalid_device_types:
location = DeviceTypePathLocation(device_type_id=device_type.deviceType)
record_error(location=location, problem='Invalid device type ID (out of valid range)')

for device_type in standard_device_types:
device_type_id = device_type.deviceType
location = DeviceTypePathLocation(device_type_id=device_type_id)
if device_type_id not in self.xml_device_types.keys():
record_error(location=location, problem='Unknown device type ID in standard range')
continue

if device_type_id not in self.xml_device_types.keys():
location = DeviceTypePathLocation(device_type_id=device_type_id)
record_error(location=location, problem='Unknown device type')
continue

# TODO: check revision. Possibly in another test?

xml_device = self.xml_device_types[device_type_id]
# IDM 10.1 checks individual clusters for validity,
# so here we can ignore checks for invalid and manufacturer clusters.
server_clusters = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.ServerList] if cluster_id_type(x) == ClusterIdType.kStandard]

# As a start, we are only checking server clusters
# TODO: check client clusters too?
for cluster_id, cluster_requirement in xml_device.server_clusters.items():
# Device type cluster conformances do not include any conformances based on cluster elements
conformance_decision_with_choice = cluster_requirement.conformance(0, [], [])
location = DeviceTypePathLocation(device_type_id=device_type_id, cluster_id=cluster_id)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and cluster_id not in server_clusters:
record_error(location=location,
problem=f"Mandatory cluster {cluster_requirement.name} for device type {xml_device.name} is not present in the server list")
success = False

if cluster_id in server_clusters and not conformance_allowed(conformance_decision_with_choice, allow_provisional):
record_error(location=location,
problem=f"Disallowed cluster {cluster_requirement.name} found in server list for device type {xml_device.name}")
success = False
# If we want to check for extra clusters on the endpoint, we need to know the entire set of clusters in all the device type
# lists across all the device types on the endpoint.
endpoint_clusters += xml_device.server_clusters.keys()
if fail_on_extra_clusters:
fn = record_error
else:
fn = record_warning
extra_clusters = set(server_clusters) - set(endpoint_clusters)
for extra in extra_clusters:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=extra)
fn(location=location, problem=f"Extra cluster found on endpoint with device types {device_type_list}")

return success, problems


class TC_DeviceConformance(MatterBaseTest, DeviceConformanceTests):
@async_test_body
Expand All @@ -255,6 +339,13 @@ def test_TC_IDM_10_3(self):
if not success:
self.fail_current_test("Problems with cluster revision on at least one cluster")

def test_TC_IDM_10_5(self):
fail_on_extra_clusters = self.user_params.get("fail_on_extra_clusters", True)
success, problems = self.check_device_type(fail_on_extra_clusters)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with Device type conformance on one or more endpoints")


if __name__ == "__main__":
default_matter_test_main()
149 changes: 144 additions & 5 deletions src/python_testing/TestSpecParsingDeviceType.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@
#
import xml.etree.ElementTree as ElementTree

import chip.clusters as Clusters
from chip.clusters import Attribute
from chip.tlv import uint
from conformance_support import conformance_allowed
from jinja2 import Template
from matter_testing_support import MatterBaseTest, default_matter_test_main
from mobly import asserts
from spec_parsing_support import build_xml_device_types, parse_single_device_type
from spec_parsing_support import build_xml_clusters, build_xml_device_types, parse_single_device_type
from TC_DeviceConformance import DeviceConformanceTests


class TestSpecParsingDeviceType(MatterBaseTest):

# This just tests that the current spec can be parsed without failures
def test_spec_device_parsing(self):
device_types, problems = build_xml_device_types()
self.problems += problems
for id, d in device_types.items():
for id, d in self.xml_device_types.items():
print(str(d))

def setup_class(self):
self.xml_clusters, self.xml_cluster_problems = build_xml_clusters()
self.xml_device_types, self.xml_device_types_problems = build_xml_device_types()

self.device_type_id = 0xBBEF
self.revision = 2
self.classification_class = "simple"
Expand Down Expand Up @@ -106,6 +111,140 @@ def test_bad_scope(self):
device_type, problems = parse_single_device_type(et)
asserts.assert_equal(len(problems), 1, "Device with no scope did not generate a problem notice")

# All these tests are based on the temp sensor device type because it is very simple
# it requires temperature measurement, identify and the base devices.
# Right now I'm not testing for binding condition.
# The test is entirely based on the descriptor cluster so that's all I'm populating here
# because it makes the test less complex to write.
def create_test(self, server_list: list[uint], no_descriptor: bool = False, bad_device_id: bool = False) -> DeviceConformanceTests:
self.test = DeviceConformanceTests()
self.test.xml_device_types = self.xml_device_types
self.test.xml_clusters = self.xml_clusters

if bad_device_id:
known_ids = list(self.test.xml_device_types.keys())
device_type_id = [a for a in range(min(known_ids), max(known_ids)) if a not in known_ids][0]
else:
device_type_id = 0x0302

resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
if no_descriptor:
resp.attributes = {1: {}}
else:
desc = Clusters.Descriptor
server_list_attr = Clusters.Descriptor.Attributes.ServerList
device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList
device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type_id, revision=2)]
resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}}
self.test.endpoints = resp.attributes

def create_good_device(self, device_type_id: int) -> DeviceConformanceTests:
self.test = DeviceConformanceTests()
self.test.xml_device_types = self.xml_device_types
self.test.xml_clusters = self.xml_clusters

resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
desc = Clusters.Descriptor
server_list_attr = Clusters.Descriptor.Attributes.ServerList
device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList
device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct(
deviceType=device_type_id, revision=self.xml_device_types[device_type_id].revision)]
server_list = [k for k, v in self.xml_device_types[device_type_id].server_clusters.items(
) if conformance_allowed(v.conformance(0, [], []), False)]
resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}}

self.test.endpoints = resp.attributes

# Test with temp sensor with temp sensor, identify and descriptor
def test_ts_minimal_clusters(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")

# Temp sensor with temp sensor, identify, descriptor, binding
def test_ts_minimal_with_binding(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Binding.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")
asserts.assert_false(problems, "Found problems on Temperature sensor device type test")

# Temp sensor with temp sensor, identify, descriptor, fixed label
def test_ts_minimal_with_label(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.FixedLabel.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")
asserts.assert_false(problems, "Found problems on Temperature sensor device type test")

# Temp sensor with temp sensor, descriptor
def test_ts_missing_identify(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

# endpoint 1 empty
def test_endpoint_missing_descriptor(self):
self.create_test([], no_descriptor=True)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

# Temp sensor with temp sensor, descriptor, identify, onoff
def test_ts_extra_cluster(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id, Clusters.OnOff.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

success, problems = self.test.check_device_type(fail_on_extra_clusters=False)
asserts.assert_equal(len(problems), 1, "Did not receive expected warning for extra clusters")
asserts.assert_true(success, "Unexpected failure")

def test_bad_device_type_id_device_type_test(self):
self.create_test([], bad_device_id=True)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

def test_all_device_types(self):
for id in self.xml_device_types.keys():
self.create_good_device(id)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_false(problems, f"Unexpected problems on device type {id}")
asserts.assert_true(success, f"Unexpected failure on device type {id}")

def test_disallowed_cluster(self):
for id, dt in self.xml_device_types.items():
expected_problems = 0
self.create_good_device(id)
for cluster_id, cluster in dt.server_clusters.items():
if not conformance_allowed(cluster.conformance(0, [], []), False):
self.test.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList].append(cluster_id)
expected_problems += 1
if expected_problems == 0:
continue
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), expected_problems, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")


if __name__ == "__main__":
default_matter_test_main()
4 changes: 3 additions & 1 deletion src/python_testing/conformance_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ def __call__(self, feature_map: uint, attribute_list: list[uint], all_command_li
for op in self.op_list:
decision_with_choice = op(feature_map, attribute_list, all_command_list)
# and operations can't happen on optional or disallowed
if decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]:
if decision_with_choice.decision == ConformanceDecision.OPTIONAL and all([type(op) == device_feature for op in self.op_list]):
return decision_with_choice
elif decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]:
raise ConformanceException('AND operation on optional or disallowed item')
elif decision_with_choice.decision == ConformanceDecision.NOT_APPLICABLE:
return decision_with_choice
Expand Down
Loading