Skip to content

Commit

Permalink
Updated BRBINFO-4-1 test after timeoutMs added (#35160)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: saurabhst <s.kumar9@samsung.com>
  • Loading branch information
3 people authored and pull[bot] committed Aug 30, 2024
1 parent 0a0bca7 commit 4124265
Showing 1 changed file with 102 additions and 63 deletions.
165 changes: 102 additions & 63 deletions src/python_testing/TC_BRBINFO_4_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, SimpleEventCallback, TestStep, async_test_body, default_matter_test_main
from mobly import asserts

Expand Down Expand Up @@ -57,14 +58,25 @@ def desc_TC_BRBINFO_4_1(self) -> str:

def steps_TC_BRBINFO_4_1(self) -> list[TestStep]:
steps = [
TestStep("0", "DUT commissioned", is_commissioning=True),
TestStep("0a", "Preconditions"),
TestStep("1a", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"),
TestStep("1b", "Simple KeepActive command w/ subscription. ActiveChanged event received by TH contains PromisedActiveDuration"),
TestStep("2", "Sends 3x KeepActive commands w/ subscription. ActiveChanged event received ONCE and contains PromisedActiveDuration"),
TestStep("3", "TH waits for check-in from TH_ICD to confirm no additional ActiveChanged events are recieved"),
TestStep("4", "KeepActive not returned after 60 minutes of offline ICD"),
]
TestStep("0", "DUT commissioned and preconditions", is_commissioning=True),
TestStep("1", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"),
TestStep("2", "Setting up subscribe to ActiveChange event"),
TestStep("3", "Check TimeoutMs too low fails"),
TestStep("4", "Check TimeoutMs too high fails"),
TestStep("5", "Check KeepActive successful with valid command parameters lowest possible TimeoutMs"),
TestStep("6", "Validate previous command results in ActiveChanged event shortly after ICD device checks-in"),
TestStep("7", "Check KeepActive successful with valid command parameters highest possible TimeoutMs"),
TestStep("8", "Validate previous command results in ActiveChanged event shortly after ICD device checks-in"),
TestStep("9", "Send multiple KeepActive commands during window where ICD device will not check in"),
TestStep("10", "Validate previous command results in single ActiveChanged event shortly after ICD device checks-in"),
TestStep("11", "Validate we received no additional ActiveChanged event after subsequent ICD check in"),
TestStep("12", "Send KeepActive command with shortest TimeoutMs value while TH_ICD is prevented from sending check-ins"),
TestStep("13", "TH allows TH_ICD to resume sending check-ins after timeout should have expired"),
TestStep("14", "Wait for TH_ICD to check into TH twice, then confirm we have had no new ActiveChanged events reported from DUT"),
TestStep("15", "Send KeepActive command with shortest TimeoutMs value while TH_ICD is prevented from sending check-ins"),
TestStep("16", "Wait 15 seconds then send second KeepActive command with double the TimeoutMs value of the previous step"),
TestStep("17", "TH allows TH_ICD to resume sending check-ins after timeout from step 15 expired but before second timeout from step 16 still valid"),
TestStep("18", "Wait for TH_ICD to check into TH, then confirm we have received new event from DUT")]
return steps

def _ask_for_vendor_commissioniong_ux_operation(self, discriminator, setupPinCode, setupManualCode, setupQRCode):
Expand All @@ -77,9 +89,9 @@ def _ask_for_vendor_commissioniong_ux_operation(self, discriminator, setupPinCod
f"If using FabricSync Admin test app, you may type:\n"
f">>> pairing onnetwork 111 {setupPinCode} --icd-registration true")

async def _send_keep_active_command(self, duration, endpoint_id) -> int:
async def _send_keep_active_command(self, stay_active_duration_ms, timeout_ms, endpoint_id) -> int:
logging.info("Sending keep active command")
keep_active = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=endpoint_id, payload=Clusters.Objects.BridgedDeviceBasicInformation.Commands.KeepActive(stayActiveDuration=duration))
keep_active = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=endpoint_id, payload=Clusters.Objects.BridgedDeviceBasicInformation.Commands.KeepActive(stayActiveDuration=stay_active_duration_ms, timeoutMs=timeout_ms))
return keep_active

async def _wait_for_active_changed_event(self, timeout_s) -> int:
Expand Down Expand Up @@ -189,10 +201,6 @@ async def test_TC_BRBINFO_4_1(self):
logging.info(f"Dynamic endpoint is {dynamic_endpoint_id}")

self.step("0")

# Preconditions
self.step("0a")

logging.info("Ensuring DUT is commissioned to TH")

# Confirms commissioning of DUT on TH as it reads its fature map
Expand All @@ -205,15 +213,7 @@ async def test_TC_BRBINFO_4_1(self):

logging.info("Ensuring ICD is commissioned to TH")

# Confirms commissioning of ICD on TH as it reads its feature map
await self._read_attribute_expect_success(
_ROOT_ENDPOINT_ID,
basic_info_cluster,
basic_info_attributes.FeatureMap,
self.icd_nodeid
)

self.step("1a")
self.step("1")

idle_mode_duration_s = await self._read_attribute_expect_success(
_ROOT_ENDPOINT_ID,
Expand All @@ -231,76 +231,115 @@ async def test_TC_BRBINFO_4_1(self):
)
logging.info(f"ActiveModeDurationMs: {active_mode_duration_ms}")

self.step("1b")

# Subscription to ActiveChanged
self.step("2")
event = brb_info_cluster.Events.ActiveChanged
self.q = queue.Queue()
urgent = 1
cb = SimpleEventCallback("ActiveChanged", event.cluster_id, event.event_id, self.q)
subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(dynamic_endpoint_id, event, urgent)], reportInterval=[1, 3])
subscription.SetEventUpdateCallback(callback=cb)

self.step("3")
stay_active_duration_ms = 1000
logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms}ms)")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)

logging.info("Waiting for ActiveChanged from DUT...")
timeout_s = idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms)/1000
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s)
keep_active_timeout_ms = 29999
try:
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
asserts.fail("KeepActive with invalid TimeoutMs was expected to fail")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.ConstraintError,
"DUT sent back an unexpected error, we were expecting ConstraintError")

self.step("4")
keep_active_timeout_ms = 3600001
try:
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
asserts.fail("KeepActive with invalid TimeoutMs was expected to fail")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.ConstraintError,
"DUT sent back an unexpected error, we were expecting ConstraintError")

self.step("5")
keep_active_timeout_ms = 30000
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

self.step("6")
wait_for_icd_checkin_timeout_s = idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms)/1000
wait_for_dut_event_subscription_s = 5
# This will throw exception if timeout is exceeded.
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_greater_equal(promised_active_duration_ms, stay_active_duration_ms,
"PromisedActiveDuration < StayActiveDuration")
asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")

self.step("2")
self.step("7")
keep_active_timeout_ms = 3600000
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

self.step("8")
# This will throw exception if timeout is exceeded.
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_greater_equal(promised_active_duration_ms, stay_active_duration_ms,
"PromisedActiveDuration < StayActiveDuration")
asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")

# Prevent icd app from sending any check-in messages.
self.step("9")
self.pause_th_icd_server(check_state=True)
# sends 3x keep active commands
stay_active_duration_ms = 2000
keep_active_timeout_ms = 60000
logging.info(f"Sending first KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
logging.info(f"Sending second KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
logging.info(f"Sending third KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
self.resume_th_icd_server(check_state=True)
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

logging.info("Waiting for ActiveChanged from DUT...")
promised_active_duration_ms = await self._wait_for_active_changed_event((idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms))/1000)
self.step("10")
self.resume_th_icd_server(check_state=True)
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")

self.step("3")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=5000)
self.step("11")
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")

self.step("4")
self.step("12")
self.pause_th_icd_server(check_state=True)
stay_active_duration_ms = 2000
keep_active_timeout_ms = 30000
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

self.step("13")
time.sleep(30)
self.resume_th_icd_server(check_state=True)

logging.info("TH waiting for checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
stay_active_duration_ms = 10000
logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms})")
await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
self.step("14")
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")

self.step("15")
self.pause_th_icd_server(check_state=True)
# If we are seeing assertion below fail test assumption is likely incorrect.
# Test assumes after TH waits for check-in from TH_ICD it has enough time to
# call the KeepActive command and pause the app to prevent it from checking in
# after DUT recieved the KeepActive command. Should this assumption be incorrect
# we could look into using existing ICDTestEventTriggerEvent, or adding test
# event trigger that will help suppress check-ins from the TH_ICD_SERVER.
asserts.assert_equal(self.q.qsize(), 0, "")

if not self.is_ci:
logging.info("Waiting for 60 minutes")
time.sleep(60*60)
stay_active_duration_ms = 2000
keep_active_timeout_ms = 30000
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

self.step("16")
time.sleep(15)
stay_active_duration_ms = 2000
keep_active_timeout_ms = 60000
await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)

self.step("17")
time.sleep(15)
self.resume_th_icd_server(check_state=True)

logging.info("TH waiting for first checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
logging.info("TH waiting for second checkin from TH_ICD...")
await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
self.step("18")
await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")


Expand Down

0 comments on commit 4124265

Please sign in to comment.