diff --git a/src/python_testing/TC_AccessChecker.py b/src/python_testing/TC_AccessChecker.py index 5533b0e3d91b4d..f340f94fd4be17 100644 --- a/src/python_testing/TC_AccessChecker.py +++ b/src/python_testing/TC_AccessChecker.py @@ -8,7 +8,7 @@ from chip.interaction_model import Status from chip.tlv import uint from global_attribute_ids import GlobalAttributeIds -from matter_testing_support import (AttributePathLocation, ClusterPathLocation, MatterBaseTest, async_test_body, +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, MatterBaseTest, TestStep, async_test_body, default_matter_test_main) from spec_parsing_support import XmlCluster, build_xml_clusters @@ -18,6 +18,10 @@ class AccessTestType(Enum): WRITE = auto() +def step_number_with_privilege(step: int, substep: str, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum) -> str: + return f'{step}{substep}_{privilege.name}' + + def operation_allowed(spec_requires: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, acl_set_to: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]) -> bool: ''' Determines if the action is allowed on the device based on the spec_requirements and the current ACL privilege granted. @@ -60,6 +64,7 @@ async def setup_class(self): @async_test_body async def setup_test(self): + super().setup_test() self.success = True @async_test_body @@ -187,16 +192,26 @@ async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluste await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))]) async def run_access_test(self, test_type: AccessTestType): + # Step 1 and 2 are handled in the class setup, but need to be marked for every test + self.step(1) + self.step(2) # Read all the attributes on TH2 using admin access + check_step = 3 if test_type == AccessTestType.WRITE: + self.step(3) await self._setup_acl(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister) + self.step(4) wildcard_read = await self.TH2.Read(self.dut_node_id, [()]) + check_step = 5 + self.step(check_step) enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] for privilege in privilege_enum: logging.info(f"Testing for {privilege}") + self.step(step_number_with_privilege(check_step, 'a', privilege)) await self._setup_acl(privilege=privilege) + self.step(step_number_with_privilege(check_step, 'b', privilege)) for endpoint_id, endpoint in self.endpoints_tlv.items(): for cluster_id, device_cluster_data in endpoint.items(): if cluster_id > 0x7FFF or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES: @@ -212,12 +227,46 @@ async def run_access_test(self, test_type: AccessTestType): if not self.success: self.fail_current_test("One or more access violations was found") + def steps_TC_ACE_2_1(self): + steps = [TestStep(1, "TH_commissioner performs a wildcard read"), + TestStep(2, "TH_commissioner reads the ACL attribute"), + TestStep(3, "Repeat steps 3a and 3b for each permission level")] + enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum + privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] + for p in privilege_enum: + steps.append(TestStep(step_number_with_privilege(3, 'a', p), + "TH_commissioner gives TH_second_commissioner the specified privilege")) + steps.append(TestStep(step_number_with_privilege(3, 'b', p), + "TH_second_controller reads all the attributes and checks for appropriate permission errors")) + return steps + + def desc_TC_ACE_2_1(self): + return "[TC-ACE-2.1] Attribute read privilege enforcement - [DUT as Server]" + @async_test_body - async def test_read_access(self): + async def test_TC_ACE_2_1(self): await self.run_access_test(AccessTestType.READ) + def steps_TC_ACE_2_2(self): + steps = [TestStep(1, "TH_commissioner performs a wildcard read"), + TestStep(2, "TH_commissioner reads the ACL attribute"), + TestStep(3, "TH_commissioner grants TH_second_controller admin permission"), + TestStep(4, "TH_second_controller performs a wildcard read"), + TestStep(5, "Repeat steps 5a and 5b for each permission level")] + enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum + privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] + for p in privilege_enum: + steps.append(TestStep(step_number_with_privilege(5, 'a', p), + "TH_commissioner gives TH_second_commissioner the specified privilege")) + steps.append(TestStep(step_number_with_privilege(5, 'b', p), + "TH_second_commissioner writes all the attributes and checks for appropriate permission errors")) + return steps + + def desc_TC_ACE_2_2(self): + return "[TC-ACE-2.2] Attribute write privilege enforcement - [DUT as Server]" + @async_test_body - async def test_write_access(self): + async def test_TC_ACE_2_2(self): await self.run_access_test(AccessTestType.WRITE)