Skip to content

Commit

Permalink
[Python] Eliminate ZCLReadAttribute/ZCLSend (#33428)
Browse files Browse the repository at this point in the history
* Convert TestLevelControlCluster to asyncio

Remove ZCLReadAttribute and ZCLSend API use from the level control
test TestLevelControlCluster and convert to asyncio.

* Convert TestReadBasicAttributes to asyncio

Remove ZCLReadAttribute API use from basic information cluster test
and convert to use asyncio.

* Use SendCommand directly in send_zcl_command

Avoid using ZCLSend API instead use SendCommand directly in the
send_zcl_command helper function.

* Convert TestFailsafe to use asyncio/SendCommand

Remove ZCLSend API usage and call SendCommand directly. Also convert
the test to a test using asyncio.

* Convert TestOnOffCluster to use asyncio/SendCommand

Remove ZCLSend API usage and call SendCommand directly. Also convert
the test to a test using asyncio.

* Drop TestResult helper class

The class is no longer required. Test results are tested directly.

* Fix send_zcl_command argument formatting

* Catch exception more specifically

* Fix TestWriteBasicAttributes for all cases

It seems TestWriteBasicAttributes did not correctly write
the attributes. The broad exception handling seems to have hidden
this issue even.  Make sure the attributes with the correct value
get written, and check for unexpected and expected IM errors
in the per-attribute results specifically.

* Fix TestFailsafe by catching correct exception

* Drop unused import
  • Loading branch information
agners authored May 16, 2024
1 parent 219e198 commit 3e6657c
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 152 deletions.
189 changes: 82 additions & 107 deletions src/controller/python/test/test_scripts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,6 @@ def run(self):
TestFail("Timeout", doCrash=True)


class TestResult:
def __init__(self, operationName, result):
self.operationName = operationName
self.result = result

def assertStatusEqual(self, expected):
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.status != expected:
raise Exception(
f"{self.operationName}: expected status {expected}, got {self.result.status}")
return self

def assertValueEqual(self, expected):
self.assertStatusEqual(0)
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.value != expected:
raise Exception(
f"{self.operationName}: expected value {expected}, got {self.result.value}")
return self


class BaseTestHelper:
def __init__(self, nodeid: int, paaTrustStorePath: str, testCommissioner: bool = False,
keypair: p256keypair.P256Keypair = None):
Expand Down Expand Up @@ -368,15 +345,16 @@ def TestOnNetworkCommissioning(self, discriminator: int, setuppin: int, nodeid:
def TestUsedTestCommissioner(self):
return self.devCtrl.GetTestCommissionerUsed()

def TestFailsafe(self, nodeid: int):
async def TestFailsafe(self, nodeid: int):
self.logger.info("Testing arm failsafe")

self.logger.info("Setting failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=60, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False

if resp.errorCode is not Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk:
Expand All @@ -387,17 +365,17 @@ def TestFailsafe(self, nodeid: int):
self.logger.info(
"Attempting to open basic commissioning window - this should fail since the failsafe is armed")
try:
asyncio.run(self.devCtrl.SendCommand(
await self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
))
)
# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening basic commissioning window')
return False
except Exception:
except IM.InteractionModelError:
pass

# TODO:
Expand All @@ -413,51 +391,52 @@ def TestFailsafe(self, nodeid: int):
self.logger.info(
"Attempting to open enhanced commissioning window - this should fail since the failsafe is armed")
try:
asyncio.run(self.devCtrl.SendCommand(
await self.devCtrl.SendCommand(
nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(
commissioningTimeout=180,
PAKEPasscodeVerifier=verifier,
discriminator=discriminator,
iterations=iterations,
salt=salt), timedRequestTimeoutMs=10000))
salt=salt), timedRequestTimeoutMs=10000)

# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening enhanced commissioning window')
return False
except Exception:
except IM.InteractionModelError:
pass

self.logger.info("Disarming failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=0, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False

self.logger.info(
"Opening Commissioning Window - this should succeed since the failsafe was just disarmed")
try:
asyncio.run(
self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
))
await self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
)
except Exception:
self.logger.error(
'Failed to open commissioning window after disarming failsafe')
return False

self.logger.info(
"Attempting to arm failsafe over CASE - this should fail since the commissioning window is open")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=60, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False
if resp.errorCode is Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kBusyWithOtherAdmin:
return True
Expand Down Expand Up @@ -1095,50 +1074,48 @@ def SetNetworkCommissioningParameters(self, dataset: str):
self.devCtrl.SetThreadOperationalDataset(bytes.fromhex(dataset))
return True

def TestOnOffCluster(self, nodeid: int, endpoint: int, group: int):
async def TestOnOffCluster(self, nodeid: int, endpoint: int):
self.logger.info(
"Sending On/Off commands to device {} endpoint {}".format(nodeid, endpoint))
err, resp = self.devCtrl.ZCLSend("OnOff", "On", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:

try:
await self.devCtrl.SendCommand(nodeid, endpoint,
Clusters.OnOff.Commands.On())
except IM.InteractionModelError as ex:
self.logger.error(
"failed to send OnOff.On: error is {} with im response{}".format(err, resp))
"failed to send OnOff.On: error is {}".format(ex.status))
return False
err, resp = self.devCtrl.ZCLSend("OnOff", "Off", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:

try:
await self.devCtrl.SendCommand(nodeid, endpoint,
Clusters.OnOff.Commands.Off())
except IM.InteractionModelError as ex:
self.logger.error(
"failed to send OnOff.Off: error is {} with im response {}".format(err, resp))
"failed to send OnOff.Off: error is {}".format(ex.status))
return False
return True

def TestLevelControlCluster(self, nodeid: int, endpoint: int, group: int):
async def TestLevelControlCluster(self, nodeid: int, endpoint: int):
self.logger.info(
f"Sending MoveToLevel command to device {nodeid} endpoint {endpoint}")
try:
commonArgs = dict(transitionTime=0, optionsMask=1, optionsOverride=1)

commonArgs = dict(transitionTime=0, optionsMask=1, optionsOverride=1)

async def _moveClusterLevel(setLevel):
await self.devCtrl.SendCommand(nodeid,
endpoint,
Clusters.LevelControl.Commands.MoveToLevel(**commonArgs, level=setLevel))
res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.LevelControl.Attributes.CurrentLevel)])
readVal = res[endpoint][Clusters.LevelControl][Clusters.LevelControl.Attributes.CurrentLevel]
if readVal != setLevel:
raise Exception(f"Read attribute LevelControl.CurrentLevel: expected value {setLevel}, got {readVal}")

try:
# Move to 1
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=1), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(1)
await _moveClusterLevel(1)

# Move to 254
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=254), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(254)
await _moveClusterLevel(254)

return True
except Exception as ex:
Expand Down Expand Up @@ -1171,29 +1148,27 @@ def TestResolve(self, nodeid):
self.logger.exception("Failed to resolve. {}".format(ex))
return False

def TestReadBasicAttributes(self, nodeid: int, endpoint: int, group: int):
async def TestReadBasicAttributes(self, nodeid: int, endpoint: int):
attrs = Clusters.BasicInformation.Attributes
basic_cluster_attrs = {
"VendorName": "TEST_VENDOR",
"VendorID": 0xFFF1,
"ProductName": "TEST_PRODUCT",
"ProductID": 0x8001,
"NodeLabel": "Test",
"Location": "XX",
"HardwareVersion": 0,
"HardwareVersionString": "TEST_VERSION",
"SoftwareVersion": 1,
"SoftwareVersionString": "1.0",
attrs.VendorName: "TEST_VENDOR",
attrs.VendorID: 0xFFF1,
attrs.ProductName: "TEST_PRODUCT",
attrs.ProductID: 0x8001,
attrs.NodeLabel: "Test",
attrs.Location: "XX",
attrs.HardwareVersion: 0,
attrs.HardwareVersionString: "TEST_VERSION",
attrs.SoftwareVersion: 1,
attrs.SoftwareVersionString: "1.0",
}
failed_zcl = {}
for basic_attr, expected_value in basic_cluster_attrs.items():
try:
res = self.devCtrl.ZCLReadAttribute(cluster="BasicInformation",
attribute=basic_attr,
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult(f"Read attribute {basic_attr}", res).assertValueEqual(
expected_value)
res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, basic_attr)])
readVal = res[endpoint][Clusters.BasicInformation][basic_attr]
if readVal != expected_value:
raise Exception(f"Read attribute: expected value {expected_value}, got {readVal}")
except Exception as ex:
failed_zcl[basic_attr] = str(ex)
if failed_zcl:
Expand All @@ -1217,16 +1192,16 @@ class AttributeWriteRequest:
failed_attribute_write = []
for req in requests:
try:
try:
await self.devCtrl.WriteAttribute(nodeid, [(endpoint, req.attribute, 0)])
if req.expected_status != IM.Status.Success:
raise AssertionError(
f"Write attribute {req.attribute.__qualname__} expects failure but got success response")
except Exception as ex:
if req.expected_status != IM.Status.Success:
continue
else:
raise ex
# Errors tested here is in the per-attribute result list (type AttributeStatus)
write_res = await self.devCtrl.WriteAttribute(nodeid, [(endpoint, req.attribute(req.value))])
status = write_res[0].Status
if req.expected_status != status:
raise AssertionError(
f"Write attribute {req.attribute.__qualname__} expects {req.expected_status} but got {status}")

# Only execute read tests where write is successful.
if req.expected_status != IM.Status.Success:
continue

res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, req.attribute)])
val = res[endpoint][req.cluster][req.attribute]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -121,9 +122,8 @@ def main():
FailIfNot(test.TestCommissionFailure(1, 0), "Failed to commission device")

logger.info("Testing on off cluster")
FailIfNot(test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID,
group=GROUP_ID), "Failed to test on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")

timeoutTicker.stop()

Expand Down
6 changes: 3 additions & 3 deletions src/controller/python/test/test_scripts/commissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -146,9 +147,8 @@ def main():
TestFail("Must provide device address or setup payload to commissioning the device")

logger.info("Testing on off cluster")
FailIfNot(test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID,
group=GROUP_ID), "Failed to test on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")

FailIfNot(test.TestUsedTestCommissioner(),
"Test commissioner check failed")
Expand Down
3 changes: 2 additions & 1 deletion src/controller/python/test/test_scripts/failsafe_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -99,7 +100,7 @@ def main():
nodeid=1),
"Failed to finish key exchange")

FailIfNot(test.TestFailsafe(nodeid=1), "Failed failsafe test")
FailIfNot(asyncio.run(test.TestFailsafe(nodeid=1)), "Failed failsafe test")

timeoutTicker.stop()

Expand Down
Loading

0 comments on commit 3e6657c

Please sign in to comment.