Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Eliminate ZCLReadAttribute/ZCLSend #33428

Merged
merged 11 commits into from
May 16, 2024
189 changes: 82 additions & 107 deletions src/controller/python/test/test_scripts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,6 @@ def run(self):
TestFail("Timeout", doCrash=True)


class TestResult:
def __init__(self, operationName, result):
self.operationName = operationName
self.result = result

def assertStatusEqual(self, expected):
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.status != expected:
raise Exception(
f"{self.operationName}: expected status {expected}, got {self.result.status}")
return self

def assertValueEqual(self, expected):
self.assertStatusEqual(0)
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.value != expected:
raise Exception(
f"{self.operationName}: expected value {expected}, got {self.result.value}")
return self


class BaseTestHelper:
def __init__(self, nodeid: int, paaTrustStorePath: str, testCommissioner: bool = False,
keypair: p256keypair.P256Keypair = None):
Expand Down Expand Up @@ -368,15 +345,16 @@ def TestOnNetworkCommissioning(self, discriminator: int, setuppin: int, nodeid:
def TestUsedTestCommissioner(self):
return self.devCtrl.GetTestCommissionerUsed()

def TestFailsafe(self, nodeid: int):
async def TestFailsafe(self, nodeid: int):
self.logger.info("Testing arm failsafe")

self.logger.info("Setting failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=60, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False

if resp.errorCode is not Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk:
Expand All @@ -387,17 +365,17 @@ def TestFailsafe(self, nodeid: int):
self.logger.info(
"Attempting to open basic commissioning window - this should fail since the failsafe is armed")
try:
asyncio.run(self.devCtrl.SendCommand(
await self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
))
)
# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening basic commissioning window')
agners marked this conversation as resolved.
Show resolved Hide resolved
return False
except Exception:
except IM.InteractionModelError:
pass

# TODO:
Expand All @@ -413,51 +391,52 @@ def TestFailsafe(self, nodeid: int):
self.logger.info(
"Attempting to open enhanced commissioning window - this should fail since the failsafe is armed")
try:
asyncio.run(self.devCtrl.SendCommand(
await self.devCtrl.SendCommand(
nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(
commissioningTimeout=180,
PAKEPasscodeVerifier=verifier,
discriminator=discriminator,
iterations=iterations,
salt=salt), timedRequestTimeoutMs=10000))
salt=salt), timedRequestTimeoutMs=10000)

# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening enhanced commissioning window')
return False
except Exception:
except IM.InteractionModelError:
pass
agners marked this conversation as resolved.
Show resolved Hide resolved

self.logger.info("Disarming failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=0, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False

self.logger.info(
"Opening Commissioning Window - this should succeed since the failsafe was just disarmed")
try:
asyncio.run(
self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
))
await self.devCtrl.SendCommand(
nodeid,
0,
Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180),
timedRequestTimeoutMs=10000
)
except Exception:
self.logger.error(
'Failed to open commissioning window after disarming failsafe')
return False

self.logger.info(
"Attempting to arm failsafe over CASE - this should fail since the commissioning window is open")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
try:
resp = await self.devCtrl.SendCommand(nodeid, 0,
Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=60, breadcrumb=1))
except IM.InteractionModelError as ex:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
"Failed to send arm failsafe command error is {}".format(ex.status))
return False
if resp.errorCode is Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kBusyWithOtherAdmin:
return True
Expand Down Expand Up @@ -1095,50 +1074,48 @@ def SetNetworkCommissioningParameters(self, dataset: str):
self.devCtrl.SetThreadOperationalDataset(bytes.fromhex(dataset))
return True

def TestOnOffCluster(self, nodeid: int, endpoint: int, group: int):
async def TestOnOffCluster(self, nodeid: int, endpoint: int):
self.logger.info(
"Sending On/Off commands to device {} endpoint {}".format(nodeid, endpoint))
err, resp = self.devCtrl.ZCLSend("OnOff", "On", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:

try:
await self.devCtrl.SendCommand(nodeid, endpoint,
Clusters.OnOff.Commands.On())
except IM.InteractionModelError as ex:
self.logger.error(
"failed to send OnOff.On: error is {} with im response{}".format(err, resp))
"failed to send OnOff.On: error is {}".format(ex.status))
return False
err, resp = self.devCtrl.ZCLSend("OnOff", "Off", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:

try:
await self.devCtrl.SendCommand(nodeid, endpoint,
Clusters.OnOff.Commands.Off())
except IM.InteractionModelError as ex:
self.logger.error(
"failed to send OnOff.Off: error is {} with im response {}".format(err, resp))
"failed to send OnOff.Off: error is {}".format(ex.status))
return False
return True

def TestLevelControlCluster(self, nodeid: int, endpoint: int, group: int):
async def TestLevelControlCluster(self, nodeid: int, endpoint: int):
self.logger.info(
f"Sending MoveToLevel command to device {nodeid} endpoint {endpoint}")
try:
commonArgs = dict(transitionTime=0, optionsMask=1, optionsOverride=1)

commonArgs = dict(transitionTime=0, optionsMask=1, optionsOverride=1)

async def _moveClusterLevel(setLevel):
await self.devCtrl.SendCommand(nodeid,
endpoint,
Clusters.LevelControl.Commands.MoveToLevel(**commonArgs, level=setLevel))
res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, Clusters.LevelControl.Attributes.CurrentLevel)])
readVal = res[endpoint][Clusters.LevelControl][Clusters.LevelControl.Attributes.CurrentLevel]
if readVal != setLevel:
raise Exception(f"Read attribute LevelControl.CurrentLevel: expected value {setLevel}, got {readVal}")

try:
# Move to 1
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=1), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(1)
await _moveClusterLevel(1)

# Move to 254
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=254), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(254)
await _moveClusterLevel(254)

return True
except Exception as ex:
Expand Down Expand Up @@ -1171,29 +1148,27 @@ def TestResolve(self, nodeid):
self.logger.exception("Failed to resolve. {}".format(ex))
return False

def TestReadBasicAttributes(self, nodeid: int, endpoint: int, group: int):
async def TestReadBasicAttributes(self, nodeid: int, endpoint: int):
attrs = Clusters.BasicInformation.Attributes
basic_cluster_attrs = {
"VendorName": "TEST_VENDOR",
"VendorID": 0xFFF1,
"ProductName": "TEST_PRODUCT",
"ProductID": 0x8001,
"NodeLabel": "Test",
"Location": "XX",
"HardwareVersion": 0,
"HardwareVersionString": "TEST_VERSION",
"SoftwareVersion": 1,
"SoftwareVersionString": "1.0",
attrs.VendorName: "TEST_VENDOR",
attrs.VendorID: 0xFFF1,
attrs.ProductName: "TEST_PRODUCT",
attrs.ProductID: 0x8001,
attrs.NodeLabel: "Test",
attrs.Location: "XX",
attrs.HardwareVersion: 0,
attrs.HardwareVersionString: "TEST_VERSION",
attrs.SoftwareVersion: 1,
attrs.SoftwareVersionString: "1.0",
}
failed_zcl = {}
for basic_attr, expected_value in basic_cluster_attrs.items():
try:
res = self.devCtrl.ZCLReadAttribute(cluster="BasicInformation",
attribute=basic_attr,
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult(f"Read attribute {basic_attr}", res).assertValueEqual(
expected_value)
res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, basic_attr)])
readVal = res[endpoint][Clusters.BasicInformation][basic_attr]
if readVal != expected_value:
raise Exception(f"Read attribute: expected value {expected_value}, got {readVal}")
except Exception as ex:
failed_zcl[basic_attr] = str(ex)
if failed_zcl:
Expand All @@ -1217,16 +1192,16 @@ class AttributeWriteRequest:
failed_attribute_write = []
for req in requests:
try:
try:
await self.devCtrl.WriteAttribute(nodeid, [(endpoint, req.attribute, 0)])
if req.expected_status != IM.Status.Success:
raise AssertionError(
f"Write attribute {req.attribute.__qualname__} expects failure but got success response")
except Exception as ex:
if req.expected_status != IM.Status.Success:
continue
else:
raise ex
# Errors tested here is in the per-attribute result list (type AttributeStatus)
write_res = await self.devCtrl.WriteAttribute(nodeid, [(endpoint, req.attribute(req.value))])
status = write_res[0].Status
if req.expected_status != status:
raise AssertionError(
f"Write attribute {req.attribute.__qualname__} expects {req.expected_status} but got {status}")

# Only execute read tests where write is successful.
if req.expected_status != IM.Status.Success:
continue

res = await self.devCtrl.ReadAttribute(nodeid, [(endpoint, req.attribute)])
val = res[endpoint][req.cluster][req.attribute]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -121,9 +122,8 @@ def main():
FailIfNot(test.TestCommissionFailure(1, 0), "Failed to commission device")

logger.info("Testing on off cluster")
FailIfNot(test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID,
group=GROUP_ID), "Failed to test on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=1,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")

timeoutTicker.stop()

Expand Down
6 changes: 3 additions & 3 deletions src/controller/python/test/test_scripts/commissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -146,9 +147,8 @@ def main():
TestFail("Must provide device address or setup payload to commissioning the device")

logger.info("Testing on off cluster")
FailIfNot(test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID,
group=GROUP_ID), "Failed to test on off cluster")
FailIfNot(asyncio.run(test.TestOnOffCluster(nodeid=options.nodeid,
endpoint=LIGHTING_ENDPOINT_ID)), "Failed to test on off cluster")

FailIfNot(test.TestUsedTestCommissioner(),
"Test commissioner check failed")
Expand Down
3 changes: 2 additions & 1 deletion src/controller/python/test/test_scripts/failsafe_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Commissioning test.

import asyncio
import os
import sys
from optparse import OptionParser
Expand Down Expand Up @@ -99,7 +100,7 @@ def main():
nodeid=1),
"Failed to finish key exchange")

FailIfNot(test.TestFailsafe(nodeid=1), "Failed failsafe test")
FailIfNot(asyncio.run(test.TestFailsafe(nodeid=1)), "Failed failsafe test")

timeoutTicker.stop()

Expand Down
Loading
Loading