Skip to content

Commit

Permalink
Updating TC_CADMIN_1_3_4 test module:
Browse files Browse the repository at this point in the history
- Removed local OpenCommissiongWindow and CommissionAttempt functions
- Started using openCommissioningWindow function from matter_testing support module
  • Loading branch information
j-ororke committed Oct 17, 2024
1 parent eb23bf5 commit 9bc9970
Showing 1 changed file with 13 additions and 39 deletions.
52 changes: 13 additions & 39 deletions src/python_testing/TC_CADMIN_1_3_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,6 @@


class TC_CADMIN_1_3_4(MatterBaseTest):
async def CommissionAttempt(
self, setupPinCode: int, thnum: int, th: str, fail: bool):

if fail:
logging.info(f"-----------------Commissioning with TH_CR{str(thnum)}-------------------------")
try:
await th.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
except ChipStackError as e:
asserts.assert_equal(e.err, 0x0000007E,
"Expected to return Trying to add NOC for fabric that already exists")

elif not fail:
await th.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)

async def get_fabrics(self, th: ChipDeviceCtrl) -> int:
OC_cluster = Clusters.OperationalCredentials
if th == self.th2:
Expand All @@ -95,20 +77,6 @@ async def get_txt_record(self):
)
return comm_service

async def OpenCommissioningWindow(self, th: ChipDeviceCtrl, duration: int) -> CommissioningParameters:
self.discriminator = random.randint(0, 4095)
try:
params = await th.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=duration, iteration=10000, discriminator=self.discriminator, option=1)
return params

except Exception as e:
logging.exception('Error running OpenCommissioningWindow %s', e)
if str(Clusters.AdministratorCommissioning.Enums.StatusCode.kBusy) in e.msg:
asserts.assert_true(False, 'Failed to open commissioning window')
else:
asserts.assert_true(False, 'Failed to verify')

async def write_nl_attr(self, th: ChipDeviceCtrl, attr_val: object):
result = await th.WriteAttribute(nodeid=self.dut_node_id, attributes=[(0, attr_val)])
asserts.assert_equal(result[0].Status, Status.Success, f"{th} node label write failed")
Expand Down Expand Up @@ -174,8 +142,7 @@ async def test_TC_CADMIN_1_3(self):
self.max_window_duration = duration.maxCumulativeFailsafeSeconds

self.step("3a")
params = await self.OpenCommissioningWindow(th=self.th1, duration=self.max_window_duration)
setupPinCode = params.setupPinCode
params = await self.openCommissioningWindow(dev_ctrl=self.th1, node_id=self.dut_node_id)

self.step("3b")
services = await self.get_txt_record()
Expand All @@ -193,7 +160,9 @@ async def test_TC_CADMIN_1_3(self):
th2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
th2_fabric_admin = th2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.th1.fabricId + 1)
self.th2 = th2_fabric_admin.NewController(nodeId=2)
await self.CommissionAttempt(setupPinCode, th=self.th2, fail=False, thnum=2)
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.commissioningParameters.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params.randomDiscriminator)

self.step(5)
# TH_CR1 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read
Expand Down Expand Up @@ -261,17 +230,23 @@ async def test_TC_CADMIN_1_3(self):
self.step(12)
# TH_CR2 opens a commissioning window on DUT_CE using ECM
self.discriminator = random.randint(0, 4095)
params2 = await self.th2.OpenCommissioningWindow(nodeid=self.dut_node_id, timeout=180, iteration=1000, discriminator=self.discriminator, option=1)
setupPinCode2 = params2.setupPinCode
params2 = await self.openCommissioningWindow(dev_ctrl=self.th2, node_id=self.dut_node_id)

self.step(13)
# TH_CR1 starts a commissioning process with DUT_CE before the timeout from step 12
await self.CommissionAttempt(setupPinCode2, th=self.th1, fail=True, thnum=1)
try:
await self.th1.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params2.commissioningParameters.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params2.randomDiscriminator)
except ChipStackError as e:
asserts.assert_equal(e.err, 0x0000007E,
"Expected to return Trying to add NOC for fabric that already exists")
"""
expected error:
[2024-10-08 11:57:43.144125][TEST][STDOUT][MatterTest] 10-08 11:57:42.777 INFO Device returned status 9 on receiving the NOC
[2024-10-08 11:57:43.144365][TEST][STDOUT][MatterTest] 10-08 11:57:42.777 INFO Add NOC failed with error src/controller/CHIPDeviceController.cpp:1712: CHIP Error 0x0000007E: Trying to add a NOC for a fabric that already exists
"""

revokeCmd = Clusters.AdministratorCommissioning.Commands.RevokeCommissioning()
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=revokeCmd, timedRequestTimeoutMs=6000)
# The failsafe cleanup is scheduled after the command completes, so give it a bit of time to do that
Expand Down Expand Up @@ -384,6 +359,5 @@ async def test_TC_CADMIN_1_4(self):
removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2_idx[outer_key][inner_key][attribute_key])
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 9bc9970

Please sign in to comment.