From 3597273d3d41c6ad7c59528fbed1e9781bdfb2e6 Mon Sep 17 00:00:00 2001 From: Gibran Vargas Date: Wed, 10 Jul 2024 02:15:31 -0700 Subject: [PATCH] chore(TC_OPCREDS_3.4): implementation until step CSRRequest IsForUpdatedNOC=True --- .../chip/utils/CommissioningBuildingBlocks.py | 7 +- src/python_testing/TC_OPCREDS_3_2.py | 4 +- src/python_testing/TC_OPCREDS_3_4.py | 108 ++++++++++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py index b6dac307dfd997..4645e4f7683ea3 100644 --- a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py +++ b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py @@ -183,8 +183,9 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, chainForAddNOC.ipkBytes, newFabricDevCtrl.nodeId, newFabricDevCtrl.fabricAdmin.vendorId)) - - rcacResp = chainForAddNOC.rcacBytes + nocBytes = chainForAddNOC.nocBytes + icacBytes = chainForAddNOC.icacBytes + rcacBytes = chainForAddNOC.rcacBytes if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. @@ -201,7 +202,7 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId): return False, nocResp - return True, nocResp, rcacResp + return True, nocResp, nocBytes, rcacBytes, icacBytes async def UpdateNOC(devCtrl, existingNodeId, newNodeId): diff --git a/src/python_testing/TC_OPCREDS_3_2.py b/src/python_testing/TC_OPCREDS_3_2.py index 3eab07bc9dae06..8006c5582d3fe1 100644 --- a/src/python_testing/TC_OPCREDS_3_2.py +++ b/src/python_testing/TC_OPCREDS_3_2.py @@ -86,7 +86,7 @@ async def test_TC_OPCREDS_3_2(self): cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController( nodeId=cr2_nodeid) - success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + success, nocResp, nocBytes, rcacResp, icacBytes = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id ) @@ -106,7 +106,7 @@ async def test_TC_OPCREDS_3_2(self): cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController( nodeId=cr3_nodeid) - success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + success, nocResp, nocBytes, rcacResp, icacBytes = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id ) diff --git a/src/python_testing/TC_OPCREDS_3_4.py b/src/python_testing/TC_OPCREDS_3_4.py index d42e25a4da9638..f611ced5a7ff4e 100644 --- a/src/python_testing/TC_OPCREDS_3_4.py +++ b/src/python_testing/TC_OPCREDS_3_4.py @@ -15,7 +15,13 @@ # limitations under the License. # +import random + +import chip.clusters as Clusters +from chip.utils import CommissioningBuildingBlocks +from chip.interaction_model import InteractionModelError, Status from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts class TC_OPCREDS_3_4(MatterBaseTest): @@ -26,6 +32,108 @@ async def test_TC_OPCREDS_3_4(self): # TODO: add steps self.print_step(0, "Preconditions") + self.print_step(1, "TH1 fully commissions the DUT") + opcreds = Clusters.OperationalCredentials + dev_ctrl = self.default_controller + + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + th1_vid = 0xFFF1 + th1_fabricId = 1111 + th1_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=th1_vid, fabricId=th1_fabricId) + th1_nodeId = self.default_controller.nodeId+1 + th1_dut_node_id = self.dut_node_id+1 + + th1_new_fabric_ctrl = th1_new_fabric_admin.NewController( + nodeId=th1_nodeId) + success, nocResp, noc_original, rcac_original, icac_original = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=th1_new_fabric_ctrl, + existingNodeId=self.dut_node_id, newNodeId=th1_dut_node_id + ) + + self.print_step( + 2, "TH1 reads the NOCs attribute from the Node Operational Credentials cluster using a fabric-filtered read") + nocs = await self.read_single_attribute_check_success(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.NOCs, fabric_filtered=True) + + self.print_step( + 3, "TH1 reads the TrustedRootCertificates attribute from the Node Operational Credentials cluster") + trusted_root_original = await self.read_single_attribute_check_success( + dev_ctrl=th1_new_fabric_ctrl, + node_id=th1_dut_node_id, cluster=opcreds, + attribute=opcreds.Attributes.TrustedRootCertificates) + print("trusted_root_original: ", trusted_root_original) + + self.print_step( + 4, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue") + cmd = opcreds.Commands.UpdateNOC( + NOCValue=noc_original, ICACValue=icac_original) + try: + await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + asserts.fail("Unexpected error sending UpdateNOC command") + except InteractionModelError as e: + asserts.assert_equal( + e.status, Status.FailsafeRequired, "Unexpected Failsafe status") + + self.print_step( + 5, "TH1 sends ArmFailSafe command to the DUT with the ExpiryLengthSeconds field set to 900") + cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe( + expiryLengthSeconds=900) + resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + print(resp) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + "Failure status returned from arm failsafe") + + self.print_step( + 6, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue") + cmd = opcreds.Commands.UpdateNOC( + NOCValue=noc_original, ICACValue=icac_original) + resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kMissingCsr, + "Failure status returned from UpdateNOC") + + self.print_step( + 7, "TH1 Sends CSRRequest command with the IsForUpdateNOC field set to false") + cmd = opcreds.Commands.CSRRequest( + CSRNonce=random.randbytes(32), isForUpdateNOC=False) + csr_not_updated = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + print("csr_not_updated: ", csr_not_updated) + + self.print_step( + 8, "TH1 generates a new NOC chain with ICAC with the following properties") + new_noc_chain = th1_new_fabric_ctrl.IssueNOCChain( + csr_not_updated, th1_dut_node_id) + noc_not_for_update = csr_not_updated.NOCSRElements + icac_not_for_update = new_noc_chain.icacBytes + # TODO: Ask to @Cecille if ICACvalue should be get from new NOCchain or must be get from somewhere else? + + self.print_step( + 9, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster") + cmd = opcreds.Commands.UpdateNOC( + NOCValue=noc_not_for_update, ICACValue=icac_not_for_update) + try: + resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + asserts.fail("Unexpected error sending UpdateNOC command") + except InteractionModelError as e: + asserts.assert_equal( + e.status, Status.ConstraintError, "Failure status returned from UpdateNOC") + + self.print_step( + 10, "TH1 Sends CSRRequest command with the IsForUpdateNOC field set to true") + # resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(300)) + # print("ArmFailSafe 600: ", resp) + cmd = opcreds.Commands.CSRRequest( + CSRNonce=random.randbytes(32), isForUpdateNOC=True) + csr_update = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + # I don't know why but I received an FAILURE error if set the isForUpdateNOC with True instead if I use False I get ConstraintError + + # cmd = opcreds.Commands.AddTrustedRootCertificate(rcac_original) + # await self.send_single_cmd(cmd=cmd, dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id) + # temp_certs = th1_new_fabric_ctrl.IssueNOCChain(csr_update, th1_dut_node_id) + + cmd = opcreds.Commands.UpdateNOC( + NOCValue=noc_original, ICACValue=icac_original) + resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd) + if __name__ == "__main__": default_matter_test_main()