From 830d5fcb944325dfdc79f9961a21738fe00034c2 Mon Sep 17 00:00:00 2001 From: "tennessee.carmelveilleux@gmail.com" Date: Tue, 30 Jul 2024 16:29:32 -0400 Subject: [PATCH 1/6] WIP --- .../linux/AllClustersCommandDelegate.cpp | 59 ++++++++++- src/python_testing/TC_SWTCH.py | 98 ++++++++++++++++++- 2 files changed, 150 insertions(+), 7 deletions(-) diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp index 45ab1dac61bebc..1c93d76d91f6c1 100644 --- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp +++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp @@ -64,7 +64,7 @@ bool HasNumericField(Json::Value & jsonValue, const std::string & field) * * JSON Arguments: * - "Name": Must be "SimulateLongPress" - * - "EndpointId": number of endpoint having a switch cluster + * - "EndpointId": ID of endpoint having a switch cluster * - "ButtonId": switch position in the switch cluster for "down" button (not idle) * - "LongPressDelayMillis": Time in milliseconds before the LongPress * - "LongPressDurationMillis": Total duration in milliseconds from start of the press to LongRelease @@ -129,7 +129,7 @@ void HandleSimulateLongPress(Json::Value & jsonValue) * * JSON Arguments: * - "Name": Must be "SimulateActionSwitchMultiPress" - * - "EndpointId": number of endpoint having a switch cluster + * - "EndpointId": ID of endpoint having a switch cluster * - "ButtonId": switch position in the switch cluster for "down" button (not idle) * - "MultiPressPressedTimeMillis": Pressed time in milliseconds for each press * - "MultiPressReleasedTimeMillis": Released time in milliseconds after each press @@ -196,6 +196,57 @@ void HandleSimulateMultiPress(Json::Value & jsonValue) sButtonSimulatorInstance = std::move(buttonSimulator); } +/** + * Named pipe handler for simulating a latched switch movement. + * + * Usage example: + * echo '{"Name": "SimulateLatchedPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610 + * + * JSON Arguments: + * - "Name": Must be "SimulateLatchedPosition" + * - "EndpointId": ID of endpoint having a switch cluster + * - "PositionId": switch position for new CurrentPosition to set in switch cluster + * + * @param jsonValue - JSON payload from named pipe + */ + +void HandleSimulateLatchedPosition(Json::Value & jsonValue) +{ + bool hasEndpointId = HasNumericField(jsonValue, "EndpointId"); + bool hasPositionId = HasNumericField(jsonValue, "PositionId"); + + if (!hasEndpointId || !hasPositionId) + { + std::string inputJson = jsonValue.toStyledString(); + ChipLogError(NotSpecified, + "Missing or invalid value for one of EndpointId, PositionId in %s", + inputJson.c_str()); + return; + } + + EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); + uint8_t positionId = static_cast(jsonValue["PositionId"].asUInt()); + + uint8_t previousPositionId = 0; + Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId); + VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, + ChipLogError(NotSpecified, "Failed to get CurrentPosition attribute")); + + if (positionId != previousPositionId) + { + status = Switch::Attributes::CurrentPosition::Set(endpoint, positionId); + VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, + ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute")); + ChipLogDetail(NotSpecified, "The latching switch is moved to a new position: %u", static_cast(positionId)); + + Clusters::SwitchServer::Instance().OnSwitchLatch(endpointId, positionId); + } + else + { + ChipLogDetail(NotSpecified, "Not moving latching switch to a new position, already at %u", static_cast(positionId)); + } +} + } // namespace AllClustersAppCommandHandler * AllClustersAppCommandHandler::FromJSON(const char * json) @@ -353,6 +404,10 @@ void AllClustersAppCommandHandler::HandleCommand(intptr_t context) { HandleSimulateMultiPress(self->mJsonValue); } + else if (name == "SimulateLatchPosition") + { + HandleSimulateLatchedPosition(self->mJsonValue); + } else { ChipLogError(NotSpecified, "Unhandled command: Should never happens"); diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 926379804eb8e3..49bc74e526ac4f 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -79,10 +79,6 @@ def _send_named_pipe_command(self, command_dict: dict[str, Any]): def _use_button_simulator(self) -> bool: return self.check_pics("PICS_SDK_CI_ONLY") or self.user_params.get("use_button_simulator", False) - def _ask_for_switch_idle(self): - if not self._use_button_simulator(): - self.wait_for_user_input(prompt_msg="Ensure switch is idle") - def _send_multi_press_named_pipe_command(self, endpoint_id: int, number_of_presses: int, pressed_position: int, feature_map: uint, multi_press_max: uint): command_dict = {"Name": 'SimulateMultiPress', "EndpointId": endpoint_id, "ButtonId": pressed_position, "MultiPressPressedTimeMillis": 500, "MultiPressReleasedTimeMillis": 500, @@ -94,6 +90,20 @@ def _send_long_press_named_pipe_command(self, endpoint_id: int, pressed_position "ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500, "FeatureMap": feature_map} self._send_named_pipe_command(command_dict) + def _send_latched_switch_named_pipe_command(self, endpoint_id: int, new_position: int): + command_dict = {"Name": "SimulateLatchedPosition", "EndpointId": endpoint_id, "PositionId": new_position} + self._send_named_pipe_command(command_dict) + + def _ask_for_switch_idle(self): + if not self._use_button_simulator(): + self.wait_for_user_input(prompt_msg="Ensure switch is idle") + + def _ask_for_switch_position(self, endpoint_id: int, new_position: int): + if not self._use_button_simulator(): + self.wait_for_user_input(prompt_msg=f"Move latched switch to position {new_position}, if it is not already there.") + else: + self._send_latched_switch_named_pipe_command(endpoint_id, new_position) + def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: int, feature_map: uint, multi_press_max: uint): if not self._use_button_simulator(): msg = f""" @@ -105,7 +115,7 @@ def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: in self.wait_for_user_input(msg) else: # This is just a simulator, ignore the long press instruction for now, it doesn't matter for the CI. It does for cert. - self._send_multi_press_named_pipe_command(endpoint_id, 2, pressed_position, feature_map, multi_press_max) + self._send_multi_press_named_pipe_command(endpoint_id, number_of_presses=2, pressed_position=pressed_position, feature_map=feature_map, multi_press_max=multi_press_max) def _ask_for_multi_press_long_short(self, endpoint_id, pressed_position, feature_map: int): if not self._use_button_simulator(): @@ -361,6 +371,84 @@ def _received_event(self, event_listener: EventChangeCallback, target_event: Clu remaining = end_time - datetime.now() return False + def steps_TC_SWTCH_2_2(self): + return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), + TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), + TestStep(3, "Operator sets switch to first position on the DUT"), + TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + TestStep(5, "Operator sets switch to second position (one) on the DUT", + "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT"), + TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), + TestStep(7, "If there are more than 2 positions, test subsequent positions of the DUT"), + TestStep(8, "Operator sets switch to first position on the DUT."), + TestStep(9, "Wait 10 seconds for event reports stable." "Verify that last SwitchLatched event received is for NewPosition 0."), + TestStep(10, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + ] + + @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch)) + async def test_TC_SWTCH_2_2(self): + # Commissioning - already done + self.step(1) + + cluster = Clusters.Switch + feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) + + has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 + has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 + has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 + + endpoint_id = self.matter_test_config.endpoint + + self.step(2) + event_listener = EventChangeCallback(cluster) + await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + + self.step(3) + self._ask_for_switch_position(endpoint_id, new_position=0) + + self.step(4) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, 0, "Switch position value is not 0") + + self.step(5) + expected_switch_position = 1 + self._ask_for_switch_position(endpoint_id, expected_switch_position) + data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched) + # TODO: Implement check + print(data) + + self.step(6) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") + + # TODO: Implement loop for > 2 total positions + self.skip_step(7) + + self.step(8) + self._ask_for_switch_position(endpoint_id, new_position=0) + + self.step(9) + # Wait 10 seconds, then check last SwitchLatched event received had position 0 + time.sleep(10.0) + + + + self.step("8b") + if has_msr_feature and has_msl_feature: + asserts.assert_true(self._received_event(event_listener, cluster.Events.LongRelease, 10), + "Did not receive long release") + + self.step("8c") + if has_as_feature: + asserts.assert_false(self._received_event(event_listener, cluster.Events.ShortRelease, 10), "Received short release") + else: + self.mark_current_step_skipped() + + self.step(9) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, 0, "Button value is not 0") + + def steps_TC_SWTCH_2_3(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), From 0e61d3f72b3af1758d40f719fc793796473b0225 Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Tue, 30 Jul 2024 22:56:29 -0400 Subject: [PATCH 2/6] Fix test TC-SWTCH-2.2 - TC-SWTCH-2.2 is updated from YAML to Python by this PR - TC-SWTCH-2.2 latest test plan steps implemented - Added latching switch simulation to all clusters app - All switch cluster simulation features are now done in all-clusters-app Fixes #34304 Fixes #34305 Testing done: - New test passes with all-clusters - Existing tests also pass --- .../linux/AllClustersCommandDelegate.cpp | 13 ++-- src/python_testing/TC_SWTCH.py | 60 ++++++++++--------- src/python_testing/matter_testing_support.py | 30 +++++++--- 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp index 1c93d76d91f6c1..354d60aa8b46ab 100644 --- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp +++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp @@ -200,17 +200,17 @@ void HandleSimulateMultiPress(Json::Value & jsonValue) * Named pipe handler for simulating a latched switch movement. * * Usage example: - * echo '{"Name": "SimulateLatchedPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610 + * echo '{"Name": "SimulateLatchPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610 * * JSON Arguments: - * - "Name": Must be "SimulateLatchedPosition" + * - "Name": Must be "SimulateLatchPosition" * - "EndpointId": ID of endpoint having a switch cluster * - "PositionId": switch position for new CurrentPosition to set in switch cluster * * @param jsonValue - JSON payload from named pipe */ -void HandleSimulateLatchedPosition(Json::Value & jsonValue) +void HandleSimulateLatchPosition(Json::Value & jsonValue) { bool hasEndpointId = HasNumericField(jsonValue, "EndpointId"); bool hasPositionId = HasNumericField(jsonValue, "PositionId"); @@ -234,7 +234,7 @@ void HandleSimulateLatchedPosition(Json::Value & jsonValue) if (positionId != previousPositionId) { - status = Switch::Attributes::CurrentPosition::Set(endpoint, positionId); + status = Switch::Attributes::CurrentPosition::Set(endpointId, positionId); VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute")); ChipLogDetail(NotSpecified, "The latching switch is moved to a new position: %u", static_cast(positionId)); @@ -406,11 +406,12 @@ void AllClustersAppCommandHandler::HandleCommand(intptr_t context) } else if (name == "SimulateLatchPosition") { - HandleSimulateLatchedPosition(self->mJsonValue); + HandleSimulateLatchPosition(self->mJsonValue); } else { - ChipLogError(NotSpecified, "Unhandled command: Should never happens"); + ChipLogError(NotSpecified, "Unhandled command '%s': this hould never happen", name.c_str()); + VerifyOrDie(false && "Named pipe command not supported, see log above."); } exit: diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 49bc74e526ac4f..e3b18a7e20c6a5 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -90,8 +90,8 @@ def _send_long_press_named_pipe_command(self, endpoint_id: int, pressed_position "ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500, "FeatureMap": feature_map} self._send_named_pipe_command(command_dict) - def _send_latched_switch_named_pipe_command(self, endpoint_id: int, new_position: int): - command_dict = {"Name": "SimulateLatchedPosition", "EndpointId": endpoint_id, "PositionId": new_position} + def _send_latching_switch_named_pipe_command(self, endpoint_id: int, new_position: int): + command_dict = {"Name": "SimulateLatchPosition", "EndpointId": endpoint_id, "PositionId": new_position} self._send_named_pipe_command(command_dict) def _ask_for_switch_idle(self): @@ -102,7 +102,7 @@ def _ask_for_switch_position(self, endpoint_id: int, new_position: int): if not self._use_button_simulator(): self.wait_for_user_input(prompt_msg=f"Move latched switch to position {new_position}, if it is not already there.") else: - self._send_latched_switch_named_pipe_command(endpoint_id, new_position) + self._send_latching_switch_named_pipe_command(endpoint_id, new_position) def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: int, feature_map: uint, multi_press_max: uint): if not self._use_button_simulator(): @@ -387,68 +387,72 @@ def steps_TC_SWTCH_2_2(self): @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch)) async def test_TC_SWTCH_2_2(self): - # Commissioning - already done + post_prompt_settle_delay_seconds = 10.0 + + # Step 1: Commissioning - already done self.step(1) cluster = Clusters.Switch - feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) - - has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 - has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 - has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 - endpoint_id = self.matter_test_config.endpoint + # Step 2: Set up subscription to all events of Switch cluster on the endpoint. self.step(2) event_listener = EventChangeCallback(cluster) await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + # Step 3: Operator sets switch to first position on the DUT. self.step(3) self._ask_for_switch_position(endpoint_id, new_position=0) + event_listener.flush_events() + # Step 4: TH reads the CurrentPosition attribute from the DUT. + # Verify that the value is 0. self.step(4) button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, 0, "Switch position value is not 0") + # Step 5: Operator sets switch to second position (one) on the DUT", + # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT self.step(5) expected_switch_position = 1 self._ask_for_switch_position(endpoint_id, expected_switch_position) - data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched) - # TODO: Implement check - print(data) + data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds) + logging.info(f"-> SwitchLatched event last received: {data}") + asserts.assert_equal(data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position") + + # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1 self.step(6) button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") + # Step 7: If there are more than 2 positions, test subsequent positions of the DUT # TODO: Implement loop for > 2 total positions self.skip_step(7) + # Step 8: Operator sets switch to first position on the DUT. self.step(8) + event_listener.flush_events() self._ask_for_switch_position(endpoint_id, new_position=0) + # Step 9: Wait 10 seconds for event reports stable. + # Verify that last SwitchLatched event received is for NewPosition 0. self.step(9) - # Wait 10 seconds, then check last SwitchLatched event received had position 0 time.sleep(10.0) + expected_switch_position = 0 + last_event = event_listener.get_last_event() + asserts.assert_is_not_none(last_event, "Did not get SwitchLatched events since last operator action.") + last_event_data = last_event.Data + logging.info(f"-> SwitchLatched event last received: {last_event_data}") + asserts.assert_equal(last_event_data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position") - - self.step("8b") - if has_msr_feature and has_msl_feature: - asserts.assert_true(self._received_event(event_listener, cluster.Events.LongRelease, 10), - "Did not receive long release") - - self.step("8c") - if has_as_feature: - asserts.assert_false(self._received_event(event_listener, cluster.Events.ShortRelease, 10), "Received short release") - else: - self.mark_current_step_skipped() - - self.step(9) + # Step 10: TH reads the CurrentPosition attribute from the DUT. + # Verify that the value is 0 + self.step(10) button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, 0, "Button value is not 0") - def steps_TC_SWTCH_2_3(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 26437ea54b5b79..273ffe94b5f821 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -257,11 +257,11 @@ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction): f'Got subscription report for event on cluster {self._expected_cluster}: {res.Data}') self._q.put(res) - def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeoutS: int = 10): - """This function allows a test script to block waiting for the specific event to arrive with a timeout - (specified in seconds). It returns the event data so that the values can be checked.""" + def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeout_sec: float = 10.0) -> Any: + """This function allows a test script to block waiting for the specific event to be the next event + to arrive within a timeout (specified in seconds). It returns the event data so that the values can be checked.""" try: - res = self._q.get(block=True, timeout=timeoutS) + res = self._q.get(block=True, timeout=timeout_sec) except queue.Empty: asserts.fail("Failed to receive a report for the event {}".format(expected_event)) @@ -269,16 +269,30 @@ def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, tim asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report") return res.Data - def wait_for_event_expect_no_report(self, timeoutS: int = 10): - """This function succceeds/returns if an event does not arrive within the timeout specified in seconds. - If an event does arrive, an assert is called.""" + def wait_for_event_expect_no_report(self, timeout_sec: float = 10.0): + """This function returns if an event does not arrive within the timeout specified in seconds. + If any event does arrive, an assert failure occurs.""" try: - res = self._q.get(block=True, timeout=timeoutS) + res = self._q.get(block=True, timeout=timeout_sec) except queue.Empty: return asserts.fail(f"Event reported when not expected {res}") + def get_last_event(self) -> Optional[Any]: + """Flush entire queue, returning last (newest) event only.""" + last_event: Optional[Any] = None + while True: + try: + last_event = self._q.get(block=False) + except queue.Empty: + return last_event + + def flush_events(self) -> None: + """Flush entire queue, returning nothing.""" + _ = self.get_last_event() + return + @property def event_queue(self) -> queue.Queue: return self._q From a4b5ce3cfeb542dc7e745e9e5314819a9ff4dfde Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Tue, 30 Jul 2024 23:06:43 -0400 Subject: [PATCH 3/6] Force restyle --- src/python_testing/TC_SWTCH.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index e3b18a7e20c6a5..948d20ebad98d7 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -270,7 +270,7 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) async def test_TC_SWTCH_2_4(self): # TODO: Make this come from PIXIT - switch_pressed_position = 1 + switch_pressed_position = 1 post_prompt_settle_delay_seconds = 10.0 # Commission DUT - already done From 8b990fba38d35e0804a9c00218ce15c348009dfc Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Tue, 30 Jul 2024 23:10:25 -0400 Subject: [PATCH 4/6] Force restyle --- src/python_testing/TC_SWTCH.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 948d20ebad98d7..e3b18a7e20c6a5 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -270,7 +270,7 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) async def test_TC_SWTCH_2_4(self): # TODO: Make this come from PIXIT - switch_pressed_position = 1 + switch_pressed_position = 1 post_prompt_settle_delay_seconds = 10.0 # Commission DUT - already done From 32342d2e2564cdde2046c7c25574590f5136c47f Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Tue, 30 Jul 2024 23:20:01 -0400 Subject: [PATCH 5/6] Restyled --- .../linux/AllClustersCommandDelegate.cpp | 13 ++++++------- src/python_testing/TC_SWTCH.py | 9 ++++++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp index 354d60aa8b46ab..d620731e93dc5c 100644 --- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp +++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp @@ -218,16 +218,14 @@ void HandleSimulateLatchPosition(Json::Value & jsonValue) if (!hasEndpointId || !hasPositionId) { std::string inputJson = jsonValue.toStyledString(); - ChipLogError(NotSpecified, - "Missing or invalid value for one of EndpointId, PositionId in %s", - inputJson.c_str()); + ChipLogError(NotSpecified, "Missing or invalid value for one of EndpointId, PositionId in %s", inputJson.c_str()); return; } EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); - uint8_t positionId = static_cast(jsonValue["PositionId"].asUInt()); + uint8_t positionId = static_cast(jsonValue["PositionId"].asUInt()); - uint8_t previousPositionId = 0; + uint8_t previousPositionId = 0; Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId); VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, ChipLogError(NotSpecified, "Failed to get CurrentPosition attribute")); @@ -236,14 +234,15 @@ void HandleSimulateLatchPosition(Json::Value & jsonValue) { status = Switch::Attributes::CurrentPosition::Set(endpointId, positionId); VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, - ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute")); + ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute")); ChipLogDetail(NotSpecified, "The latching switch is moved to a new position: %u", static_cast(positionId)); Clusters::SwitchServer::Instance().OnSwitchLatch(endpointId, positionId); } else { - ChipLogDetail(NotSpecified, "Not moving latching switch to a new position, already at %u", static_cast(positionId)); + ChipLogDetail(NotSpecified, "Not moving latching switch to a new position, already at %u", + static_cast(positionId)); } } diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index e3b18a7e20c6a5..5e78cb4bbcda1d 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -115,7 +115,8 @@ def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: in self.wait_for_user_input(msg) else: # This is just a simulator, ignore the long press instruction for now, it doesn't matter for the CI. It does for cert. - self._send_multi_press_named_pipe_command(endpoint_id, number_of_presses=2, pressed_position=pressed_position, feature_map=feature_map, multi_press_max=multi_press_max) + self._send_multi_press_named_pipe_command( + endpoint_id, number_of_presses=2, pressed_position=pressed_position, feature_map=feature_map, multi_press_max=multi_press_max) def _ask_for_multi_press_long_short(self, endpoint_id, pressed_position, feature_map: int): if not self._use_button_simulator(): @@ -419,7 +420,8 @@ async def test_TC_SWTCH_2_2(self): data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds) logging.info(f"-> SwitchLatched event last received: {data}") - asserts.assert_equal(data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position") + asserts.assert_equal(data, cluster.Events.SwitchLatched( + newPosition=expected_switch_position), "Did not get expected switch position") # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1 self.step(6) @@ -445,7 +447,8 @@ async def test_TC_SWTCH_2_2(self): asserts.assert_is_not_none(last_event, "Did not get SwitchLatched events since last operator action.") last_event_data = last_event.Data logging.info(f"-> SwitchLatched event last received: {last_event_data}") - asserts.assert_equal(last_event_data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position") + asserts.assert_equal(last_event_data, cluster.Events.SwitchLatched( + newPosition=expected_switch_position), "Did not get expected switch position") # Step 10: TH reads the CurrentPosition attribute from the DUT. # Verify that the value is 0 From cc7055d1754f37c1eaa3450180480d100b26a5e5 Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Tue, 30 Jul 2024 23:40:57 -0400 Subject: [PATCH 6/6] Add follow-up issue --- src/python_testing/TC_SWTCH.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 5e78cb4bbcda1d..6da5c7a229e0b7 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -53,6 +53,7 @@ def desc_TC_SWTCH_2_4(self) -> str: """Returns a description of this test""" return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification" + # TODO(#34656): Fill test steps # def steps_TC_SWTCH_2_4(self) -> list[TestStep]: # steps = [ # TestStep("0", "Commissioning, already done", is_commissioning=True), @@ -164,7 +165,7 @@ def _ask_for_release(self): time.sleep(self.keep_pressed_delay/1000) def _placeholder_for_step(self, step_id: str): - # TODO: Global search an replace of `self._placeholder_for_step` with `self.step` when done. + # TODO(#34656): Global search an replace of `self._placeholder_for_step` with `self.step` when done. logging.info(f"Step {step_id}") pass @@ -270,7 +271,7 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) async def test_TC_SWTCH_2_4(self): - # TODO: Make this come from PIXIT + # TODO(#34656): Make this come from PIXIT switch_pressed_position = 1 post_prompt_settle_delay_seconds = 10.0 @@ -429,7 +430,7 @@ async def test_TC_SWTCH_2_2(self): asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") # Step 7: If there are more than 2 positions, test subsequent positions of the DUT - # TODO: Implement loop for > 2 total positions + # # TODO(#34656): Implement loop for > 2 total positions self.skip_step(7) # Step 8: Operator sets switch to first position on the DUT.