Skip to content

Commit

Permalink
Added TC_CADMIN_1_4_nofreset test module:
Browse files Browse the repository at this point in the history
- Added new test module to verify that if no factory reset occurs between TC_CADMIN_1_3_4 test modules and TC_CADMIN_1_4_nofreset, how to handle that situation for BCM.
  • Loading branch information
j-ororke committed Oct 17, 2024
1 parent 27756cb commit 4775459
Showing 1 changed file with 199 additions and 0 deletions.
199 changes: 199 additions & 0 deletions src/python_testing/TC_CADMIN_1_4_nofreset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: false
# quiet: false
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# === END CI TEST ARGUMENTS ===

import asyncio
import random
from time import sleep

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.exceptions import ChipStackError
from chip.interaction_model import Status
from chip.tlv import TLVReader
from matter_testing_infrastructure.chip.testing.matter_testing import (MatterBaseTest, TestStep, async_test_body,
default_matter_test_main)
from mdns_discovery import mdns_discovery
from mobly import asserts

opcreds = Clusters.OperationalCredentials
nonce = random.randbytes(32)

class TC_CADMIN_1_4_nofreset(MatterBaseTest):
async def get_fabrics(self, th: ChipDeviceCtrl) -> int:
OC_cluster = Clusters.OperationalCredentials
if th == self.th2:
th2_fabric_info = await th.ReadAttribute(nodeid=self.dut_node_id, fabricFiltered=True, attributes=[(0, OC_cluster.Attributes.Fabrics)])
th2_fabric_data = list(th2_fabric_info.values())[0]
th2_fabric_data = list(th2_fabric_data.values())[0]
fabric_info = vars(list(th2_fabric_data.values())[1][0])

else:
fabric_info = await self.read_single_attribute_check_success(dev_ctrl=th, fabric_filtered=True, cluster=OC_cluster, attribute=OC_cluster.Attributes.Fabrics)
return fabric_info

async def get_rcac_decoded(self, th: str) -> int:
csrResponse = await self.send_single_cmd(dev_ctrl=th, node_id=self.dut_node_id, cmd=opcreds.Commands.CSRRequest(CSRNonce=nonce, isForUpdateNOC=False))
TH_certs_real = await th.IssueNOCChain(csrResponse, self.dut_node_id)
th_rcac_decoded = TLVReader(TH_certs_real.rcacBytes).get()["Any"]
return th_rcac_decoded

async def get_txt_record(self):
discovery = mdns_discovery.MdnsDiscovery(verbose_logging=True)
comm_service = await discovery.get_commissionable_service(
discovery_timeout_sec=240,
log_output=False,
)
return comm_service

async def write_nl_attr(self, th: ChipDeviceCtrl, attr_val: object):
result = await th.WriteAttribute(nodeid=self.dut_node_id, attributes=[(0, attr_val)])
asserts.assert_equal(result[0].Status, Status.Success, f"{th} node label write failed")

async def read_nl_attr(self, th: ChipDeviceCtrl, attr_val: object):
try:
await th.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, attr_val)])
except Exception as e:
asserts.assert_equal(e.err, "Received error message from read attribute attempt")
self.print_step(0, e)

async def read_currentfabricindex(self, th: ChipDeviceCtrl) -> int:
cluster = Clusters.OperationalCredentials
attribute = Clusters.OperationalCredentials.Attributes.CurrentFabricIndex
current_fabric_index = await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute)
return current_fabric_index

def pics_TC_CADMIN_1_4_noreset(self) -> list[str]:
return ["CADMIN.S"]

def steps_TC_CADMIN_1_4_noreset(self) -> list[TestStep]:
return [
TestStep(1, "TH_CR1 starts a commissioning process with DUT_CE", is_commissioning=True),
TestStep(2, "TH_CR1 reads the BasicCommissioningInfo attribute from the General Commissioning cluster and saves the MaxCumulativeFailsafeSeconds field as max_window_duration."),
TestStep("3a", "TH_CR1 opens a commissioning window on DUT_CE using a commissioning timeout of max_window_duration using BCM",
"DUT_CE opens its Commissioning window to allow a second commissioning."),
TestStep("3b", "DNS-SD records shows DUT_CE advertising", "Verify that the DNS-SD advertisement shows CM=1"),
TestStep("3c", "TH_CR1 writes and reads the Basic Information Cluster’s NodeLabel mandatory attribute of DUT_CE",
"Verify DUT_CE responds to both write/read with a success"),
TestStep(4, "TH creates a controller (TH_CR2) on a new fabric and commissions DUT_CE using that controller. TH_CR2 should commission the device using a different NodeID than TH_CR1.",
"Commissioning is successful"),
TestStep(5, "TH_CR1 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read",
"Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device."),
TestStep(6, "TH_CR2 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read",
"Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device."),
TestStep(7, "TH_CR2 reads the CurrentFabricIndex attribute from the Operational Credentials cluster and saves as th2_idx, TH_CR1 sends the RemoveFabric command to the DUT with the FabricIndex set to th2_idx",
"TH_CR1 removes TH_CR2 fabric using th2_idx")
]

@async_test_body
async def test_TC_CADMIN_1_4_noreset(self):
setupPayloadInfo = self.get_setup_payload_info()
self.print_step("Setup payload info", setupPayloadInfo)
if not setupPayloadInfo[0].passcode:
asserts.assert_true(False, 'passcode must be a provided value in the test in order for this test to work due to using BCM, please rerun test with providing --passcode <value>')

if not setupPayloadInfo[0].filter_value:
asserts.assert_true(False, 'discriminator must be a provided value in the test in order for this test to work due to using BCM, please rerun test with providing --discriminator <value>')
self.step(1)

# Establishing TH1
self.th1 = self.default_controller

self.step(2)
GC_cluster = Clusters.GeneralCommissioning
attribute = GC_cluster.Attributes.BasicCommissioningInfo
duration = await self.read_single_attribute_check_success(endpoint=0, cluster=GC_cluster, attribute=attribute)
self.max_window_duration = duration.maxCumulativeFailsafeSeconds

self.step("3a")
obcCmd = Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180)
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=obcCmd, timedRequestTimeoutMs=6000)

self.step("3b")
services = await self.get_txt_record()
if services.txt_record['CM'] != "1":
asserts.fail(f"Expected cm record value not found, instead value found was {str(services.txt_record['CM'])}")

self.step("3c")
BI_cluster = Clusters.BasicInformation
nl_attribute = BI_cluster.Attributes.NodeLabel
await self.write_nl_attr(th=self.th1, attr_val=nl_attribute)
await self.read_nl_attr(th=self.th1, attr_val=nl_attribute)

self.step(4)
# Establishing TH2
th2_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
th2_fabric_admin = th2_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.th1.fabricId + 1)
self.th2 = th2_fabric_admin.NewController(nodeId=2)
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=setupPayloadInfo[0].passcode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=setupPayloadInfo[0].filter_value)

self.step(5)
# TH_CR1 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read
th1_fabric_info = await self.get_fabrics(th=self.th1)

# Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1)
if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]:
asserts.fail("public keys from fabric and certs for TH1 are not the same")
if th1_fabric_info[0].nodeID != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")

# Expiring the failsafe timer in an attempt to clean up before TH2 attempt.
await self.th1.SendCommand(self.dut_node_id, 0, Clusters.GeneralCommissioning.Commands.ArmFailSafe(0))

self.step(6)
# TH_CR2 reads the Fabrics attribute from the Node Operational Credentials cluster using a fabric-filtered read
th2_fabric_info = await self.get_fabrics(th=self.th2)

# Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(self.max_window_duration))
th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2)
if th2_fabric_info['rootPublicKey'] != th2_rcac_decoded[9]:
asserts.fail("public keys from fabric and certs for TH2 are not the same")
if th2_fabric_info['nodeID'] != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not match DUT node ID for TH2 during commissioning")

await self.th2.SendCommand(self.dut_node_id, 0, Clusters.GeneralCommissioning.Commands.ArmFailSafe(0))

self.step(7)
# TH_CR2 reads the CurrentFabricIndex attribute from the Operational Credentials cluster and saves as th2_idx, TH_CR1 sends the RemoveFabric command to the DUT with the FabricIndex set to th2_idx
th2_idx = await self.th2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)])
outer_key = list(th2_idx.keys())[0]
inner_key = list(th2_idx[outer_key].keys())[0]
attribute_key = list(th2_idx[outer_key][inner_key].keys())[1]
removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2_idx[outer_key][inner_key][attribute_key])
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd)

if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 4775459

Please sign in to comment.