Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PICS checker test implementation #30970

Merged
merged 10 commits into from
Jan 17, 2024
4 changes: 2 additions & 2 deletions src/python_testing/TC_TIMESYNC_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ async def test_TC_TIMESYNC_2_1(self):
asserts.assert_true(False, "NTPServerAvailable is mandatory if the NTPS (TIMESYNC.S.F02) feature is supported")

self.print_step(12, "Read TimeZoneListMaxSize")
if self.check_pics("TIMESYNC.S.A000A"):
if self.check_pics("TIMESYNC.S.A000a"):
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneListMaxSize)
asserts.assert_greater_equal(size, 1, "TimeZoneListMaxSize must be at least 1")
asserts.assert_less_equal(size, 2, "TimeZoneListMaxSize must be max 2")
elif self.check_pics("TIMESYNC.S.F00"):
asserts.assert_true(False, "TimeZoneListMaxSize is mandatory if the TZ (TIMESYNC.S.F00) feature is supported")

self.print_step(13, "Read DSTOffsetListMaxSize")
if self.check_pics("TIMESYNC.S.A000B"):
if self.check_pics("TIMESYNC.S.A000b"):
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffsetListMaxSize)
asserts.assert_greater_equal(size, 1, "DSTOffsetListMaxSize must be at least 1")
elif self.check_pics("TIMESYNC.S.F00"):
Expand Down
160 changes: 160 additions & 0 deletions src/python_testing/TC_pics_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math

import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, FeaturePathLocation,
tcarmelveilleux marked this conversation as resolved.
Show resolved Hide resolved
MatterBaseTest, async_test_body, default_matter_test_main)
from mobly import asserts
from spec_parsing_support import build_xml_clusters


def attribute_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.A{id:04x}'


def accepted_cmd_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.C{id:02x}.Rsp'


def generated_cmd_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.C{id:02x}.Tx'


def feature_pics(pics_base: str, bit: int) -> str:
return f'{pics_base}.S.F{bit:02x}'


class TC_PICS_Checker(MatterBaseTest, BasicCompositionTests):
@async_test_body
async def setup_class(self):
super().setup_class()
await self.setup_class_helper(False)
# build_xml_cluster returns a list of issues found when paring the XML
# Problems in the XML shouldn't cause test failure, but we want them recorded
# so they are added to the list of problems that get output when the test set completes.
self.xml_clusters, self.problems = build_xml_clusters()
cecille marked this conversation as resolved.
Show resolved Hide resolved

def _check_and_record_errors(self, location, required, pics):
if required and not self.check_pics(pics):
self.record_error("PICS check", location=location,
problem=f"An element found on the device, but the corresponding PICS {pics} was not found in pics list")
self.success = False
elif not required and self.check_pics(pics):
self.record_error("PICS check", location=location, problem=f"PICS {pics} found in PICS list, but not on device")
self.success = False

def _add_pics_for_lists(self, cluster_id: int, attribute_id_of_element_list: GlobalAttributeIds) -> None:
try:
if attribute_id_of_element_list == GlobalAttributeIds.ATTRIBUTE_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
pics_mapper = attribute_pics
elif attribute_id_of_element_list == GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
pics_mapper = accepted_cmd_pics

elif attribute_id_of_element_list == GlobalAttributeIds.GENERATED_COMMAND_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_GENERATED_COMMANDS[cluster_id]
pics_mapper = generated_cmd_pics
else:
asserts.fail("add_pics_for_list function called for non-list attribute")
except KeyError:
# This cluster does not have any of this element type
return

for element_id in all_spec_elements_to_check:
if element_id > 0xF000:
# No pics for global elements
continue
pics = pics_mapper(self.xml_clusters[cluster_id].pics, element_id)

if cluster_id not in self.endpoint.keys():
# This cluster is not on this endpoint
required = False
elif element_id in self.endpoint[cluster_id][attribute_id_of_element_list]:
# Cluster and element are on the endpoint
required = True
else:
# Cluster is on the endpoint but the element is not in the list
required = False

if attribute_id_of_element_list == GlobalAttributeIds.ATTRIBUTE_LIST_ID:
location = AttributePathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id, attribute_id=element_id)
else:
location = CommandPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id, command_id=element_id)

self._check_and_record_errors(location, required, pics)

def test_TC_pics_checker(self):
self.endpoint_id = self.matter_test_config.endpoint
self.endpoint = self.endpoints_tlv[self.endpoint_id]
self.success = True

for cluster_id, cluster in Clusters.ClusterObjects.ALL_CLUSTERS.items():
# Data model XML is used to get the PICS code for this cluster. If we don't know the PICS
# code, we can't evaluate the PICS list. Clusters that are present on the device but are
# not present in the spec are checked in the IDM tests.
if cluster_id not in self.xml_clusters or self.xml_clusters[cluster_id].pics is None:
cecille marked this conversation as resolved.
Show resolved Hide resolved
continue

# Ensure the PICS.S code is correctly marked
pics_cluster = f'{self.xml_clusters[cluster_id].pics}.S'
location = ClusterPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id)
self._check_and_record_errors(location, cluster_id in self.endpoint, pics_cluster)

self._add_pics_for_lists(cluster_id, GlobalAttributeIds.ATTRIBUTE_LIST_ID)
self._add_pics_for_lists(cluster_id, GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID)
self._add_pics_for_lists(cluster_id, GlobalAttributeIds.GENERATED_COMMAND_LIST_ID)

try:
cluster_features = cluster.Bitmaps.Feature
except AttributeError:
# cluster has no features
continue

pics_base = self.xml_clusters[cluster_id].pics
try:
feature_map = self.endpoint[cluster_id][GlobalAttributeIds.FEATURE_MAP_ID]
except KeyError:
feature_map = 0

for feature_mask in cluster_features:
# Codegen in python uses feature masks (0x01, 0x02, 0x04 etc.)
# PICS uses the mask bit number (1, 2, 3)
# Convert the mask to a bit number so we can check the PICS.
feature_bit = int(math.log2(feature_mask))
pics = feature_pics(pics_base, feature_bit)
if feature_mask & feature_map:
required = True
else:
required = False

try:
location = FeaturePathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id,
feature_code=self.xml_clusters[cluster_id].features[feature_mask].code)
except KeyError:
location = ClusterPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id)
self._check_and_record_errors(location, required, pics)

if not self.success:
self.fail_current_test("At least one PICS error was found for this endpoint")


if __name__ == "__main__":
default_matter_test_main()
5 changes: 2 additions & 3 deletions src/python_testing/TestMatterTestingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def test_type_checking(self):
async def test_pics_support(self):
pics_list = ['TEST.S.A0000=1',
'TEST.S.A0001=0',
'lower.s.a0000=1',
'TEST.S.A000a=1'
'',
' ',
'# comment',
Expand All @@ -148,10 +148,9 @@ async def test_pics_support(self):

asserts.assert_true(self.check_pics("TEST.S.A0000"), "PICS parsed incorrectly for TEST.S.A0000")
asserts.assert_false(self.check_pics("TEST.S.A0001"), "PICS parsed incorrectly for TEST.S.A0001")
asserts.assert_true(self.check_pics("LOWER.S.A0000"), "PICS pased incorrectly for LOWER.S.A0000")
asserts.assert_true(self.check_pics("TEST.S.A000a"), "PICS parsed incorrectly for TEST.S.A000a")
asserts.assert_true(self.check_pics("SPACE.S.A0000"), "PICS parsed incorrectly for SPACE.S.A0000")
asserts.assert_false(self.check_pics("NOT.S.A0000"), "PICS parsed incorrectly for NOT.S.A0000")
asserts.assert_true(self.check_pics(" test.s.a0000"), "PICS checker lowercase handled incorrectly")

# invalid pics file should throw a value error
pics_list.append("BAD.S.A000=5")
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
async def setup_class_helper(self):
async def setup_class_helper(self, default_to_pase: bool = True):
dev_ctrl = self.default_controller
self.problems = []

do_test_over_pase = self.user_params.get("use_pase_only", True)
do_test_over_pase = self.user_params.get("use_pase_only", default_to_pase)
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
Expand Down
2 changes: 2 additions & 0 deletions src/python_testing/drlk_2_x_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ async def run_drlk_test_common(self, lockUnlockCommand, lockUnlockCmdRspPICS, lo
self.print_step("1", "TH writes the RequirePINforRemoteOperation attribute value as false on the DUT")
attribute = attributes.RequirePINforRemoteOperation(False)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
print("---------------------- PICS is true")
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
print("---------------------- PICS is false")
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)

if self.check_pics("DRLK.S.A0033"):
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def parse_pics(lines=typing.List[str]) -> dict[str, bool]:
if val not in ["1", "0"]:
raise ValueError('PICS {} must have a value of 0 or 1'.format(key))

pics[key.strip().upper()] = (val == "1")
pics[key.strip()] = (val == "1")
return pics


Expand Down Expand Up @@ -725,7 +725,7 @@ def teardown_class(self):

def check_pics(self, pics_key: str) -> bool:
picsd = self.matter_test_config.pics
pics_key = pics_key.strip().upper()
pics_key = pics_key.strip()
return pics_key in picsd and picsd[pics_key]

async def read_single_attribute(
Expand Down
42 changes: 24 additions & 18 deletions src/python_testing/spec_parsing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class XmlCluster:
accepted_commands: dict[uint, XmlCommand]
generated_commands: dict[uint, XmlCommand]
events: dict[uint, XmlEvent]
pics: str


class CommandType(Enum):
Expand All @@ -124,6 +125,12 @@ def __init__(self, cluster, cluster_id, name, is_alias):
except (KeyError, StopIteration):
self._derived = None

try:
classification = next(cluster.iter('classification'))
self._pics = classification.attrib['picsCode']
except (KeyError, StopIteration):
self._pics = None

self.feature_elements = self.get_all_feature_elements()
self.attribute_elements = self.get_all_attribute_elements()
self.command_elements = self.get_all_command_elements()
Expand Down Expand Up @@ -348,31 +355,29 @@ def create_cluster(self) -> XmlCluster:
attributes=self.parse_attributes(),
accepted_commands=self.parse_commands(CommandType.ACCEPTED),
generated_commands=self.parse_commands(CommandType.GENERATED),
events=self.parse_events())
events=self.parse_events(), pics=self._pics)

def get_problems(self) -> list[ProblemNotice]:
return self._problems


def build_xml_clusters() -> tuple[list[XmlCluster], list[ProblemNotice]]:
# workaround for aliased clusters not appearing in the xml. Remove this once https://github.com/csa-data-model/projects/issues/373 is addressed
conc_clusters = {0x040C: 'Carbon Monoxide Concentration Measurement',
0x040D: 'Carbon Dioxide Concentration Measurement',
0x0413: 'Nitrogen Dioxide Concentration Measurement',
0x0415: 'Ozone Concentration Measurement',
0x042A: 'PM2.5 Concentration Measurement',
0x042B: 'Formaldehyde Concentration Measurement',
0x042C: 'PM1 Concentration Measurement',
0x042D: 'PM10 Concentration Measurement',
0x042E: 'Total Volatile Organic Compounds Concentration Measurement',
0x042F: 'Radon Concentration Measurement'}
conc_clusters = {0x040C: ('Carbon Monoxide Concentration Measurement', 'CMOCONC'),
0x040D: ('Carbon Dioxide Concentration Measurement', 'CDOCONC'),
0x0413: ('Nitrogen Dioxide Concentration Measurement', 'NDOCONC'),
0x0415: ('Ozone Concentration Measurement', 'OZCONC'),
0x042A: ('PM2.5 Concentration Measurement', 'PMICONC'),
0x042B: ('Formaldehyde Concentration Measurement', 'FLDCONC'),
0x042C: ('PM1 Concentration Measurement', 'PMHCONC'),
0x042D: ('PM10 Concentration Measurement', 'PMKCONC'),
0x042E: ('Total Volatile Organic Compounds Concentration Measurement', 'TVOCCONC'),
0x042F: ('Radon Concentration Measurement', 'RNCONC')}
conc_base_name = 'Concentration Measurement Clusters'
resource_clusters = {0x0071: 'HEPA Filter Monitoring',
0x0072: 'Activated Carbon Filter Monitoring'}
resource_clusters = {0x0071: ('HEPA Filter Monitoring', 'HEPAFREMON'),
0x0072: ('Activated Carbon Filter Monitoring', 'ACFREMON')}
resource_base_name = 'Resource Monitoring Clusters'
water_clusters = {0x0405: 'Relative Humidity Measurement',
0x0407: 'Leaf Wetness Measurement',
0x0408: 'Soil Moisture Measurement'}
water_clusters = {0x0405: ('Relative Humidity Measurement', 'RH')}
water_base_name = 'Water Content Measurement Clusters'
aliases = {conc_base_name: conc_clusters, resource_base_name: resource_clusters, water_base_name: water_clusters}

Expand Down Expand Up @@ -482,15 +487,16 @@ def remove_problem(location: typing.Union[CommandPathLocation, FeaturePathLocati
new = XmlCluster(revision=c.revision, derived=c.derived, name=c.name,
feature_map=feature_map, attribute_map=attribute_map, command_map=command_map,
features=features, attributes=attributes, accepted_commands=accepted_commands,
generated_commands=generated_commands, events=events)
generated_commands=generated_commands, events=events, pics=c.pics)
clusters[id] = new

for alias_base_name, aliased_clusters in aliases.items():
for id, alias_name in aliased_clusters.items():
for id, (alias_name, pics) in aliased_clusters.items():
base = derived_clusters[alias_base_name]
new = deepcopy(base)
new.derived = alias_base_name
new.name = alias_name
new.pics = pics
clusters[id] = new

# TODO: All these fixups should be removed BEFORE SVE if at all possible
Expand Down
Loading