From 98dc92b61655b532709a86d6f743529f81fd16fb Mon Sep 17 00:00:00 2001 From: Terence Hampson Date: Thu, 9 Feb 2023 15:49:55 -0500 Subject: [PATCH] Add read/subscribe event to chip-repl based yamltests (#24904) * chip-repl read event yamltests working with hacks Need to clean this up * Address PR comments --- scripts/tests/chiptest/__init__.py | 7 - src/controller/python/chip/ChipDeviceCtrl.py | 4 +- src/controller/python/chip/yaml/runner.py | 203 ++++++++++++++++++- 3 files changed, 195 insertions(+), 19 deletions(-) diff --git a/scripts/tests/chiptest/__init__.py b/scripts/tests/chiptest/__init__.py index 98bcf3e4ffee9c..b0f47556f151c1 100644 --- a/scripts/tests/chiptest/__init__.py +++ b/scripts/tests/chiptest/__init__.py @@ -131,13 +131,6 @@ def _GetInDevelopmentTests() -> Set[str]: Goal is for this set to become empty. """ return { - # TODO: Event not yet supported: - "Test_TC_ACL_2_10.yaml", - "Test_TC_ACL_2_7.yaml", - "Test_TC_ACL_2_8.yaml", - "Test_TC_ACL_2_9.yaml", - "TestEvents.yaml", - "TestGroupMessaging.yaml", # Needs group support in repl } diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index c608d598c5fcab..96e86adb090177 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1164,7 +1164,7 @@ async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[ typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], # Concrete path typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] - ]], eventNumberFilter: typing.Optional[int] = None, reportInterval: typing.Tuple[int, int] = None, keepSubscriptions: bool = False): + ]], eventNumberFilter: typing.Optional[int] = None, fabricFiltered: bool = True, reportInterval: typing.Tuple[int, int] = None, keepSubscriptions: bool = False): ''' Read a list of events from a target node, this is a wrapper of DeviceController.Read() @@ -1188,7 +1188,7 @@ async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[ reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. When not provided, a read request will be sent. ''' - res = await self.Read(nodeid=nodeid, events=events, eventNumberFilter=eventNumberFilter, reportInterval=reportInterval, keepSubscriptions=keepSubscriptions) + res = await self.Read(nodeid=nodeid, events=events, eventNumberFilter=eventNumberFilter, fabricFiltered=fabricFiltered, reportInterval=reportInterval, keepSubscriptions=keepSubscriptions) if isinstance(res, ClusterAttribute.SubscriptionTransaction): return res else: diff --git a/src/controller/python/chip/yaml/runner.py b/src/controller/python/chip/yaml/runner.py index 6f5704a6e418d9..bf8ecf0371e0f3 100644 --- a/src/controller/python/chip/yaml/runner.py +++ b/src/controller/python/chip/yaml/runner.py @@ -26,7 +26,8 @@ import chip.yaml.format_converter as Converter import stringcase from chip.ChipDeviceCtrl import ChipDeviceController, discovery -from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure +from chip.clusters.Attribute import (AttributeStatus, EventReadResult, SubscriptionTransaction, TypedAttributePath, + ValueDecodeFailure) from chip.exceptions import ChipStackError from chip.yaml.errors import ParsingError, UnexpectedParsingError from matter_yamltests.pseudo_clusters.clusters.delay_commands import DelayCommands @@ -56,6 +57,11 @@ class _GetCommissionerNodeIdResult: node_id: int +@dataclass +class EventResponse: + event_result_list: list[EventReadResult] + + @dataclass class _ActionResult: status: _ActionStatus @@ -69,6 +75,12 @@ class _AttributeSubscriptionCallbackResult: result: _ActionResult +@dataclass +class _EventSubscriptionCallbackResult: + name: str + result: _ActionResult + + @dataclass class _ExecutionContext: ''' Objects that is commonly passed around this file that are vital to test execution.''' @@ -78,7 +90,8 @@ class _ExecutionContext: subscriptions: list = field(default_factory=list) # The key is the attribute/event name, and the value is a queue of subscription callback results # that been sent by device under test. For attribute subscription the queue is of type - # _AttributeSubscriptionCallbackResult. + # _AttributeSubscriptionCallbackResult, for event the queue is of type + # _EventSubscriptionCallbackResult. subscription_callback_result_queue: dict = field(default_factory=dict) @@ -266,6 +279,55 @@ def parse_raw_response(self, raw_resp) -> _ActionResult: return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val) +class ReadEventAction(BaseAction): + ''' Read Event action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to read event action that can execute with ChipDeviceController. + + Args: + 'test_step': Step containing information required to run read event action. + 'cluster': Name of cluster read event action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step) + self._event_name = stringcase.pascalcase(test_step.event) + self._cluster = cluster + self._endpoint = test_step.endpoint + self._node_id = test_step.node_id + self._cluster_object = None + self._request_object = None + self._event_number_filter = test_step.event_number + self._fabric_filtered = False + + if test_step.fabric_filtered is not None: + self._fabric_filtered = test_step.fabric_filtered + + self._request_object = context.data_model_lookup.get_event(self._cluster, + self._event_name) + if self._request_object is None: + raise UnexpectedParsingError( + f'ReadEvent failed to find cluster:{self._cluster} Event:{self._event_name}') + + if test_step.arguments: + raise UnexpectedParsingError( + f'ReadEvent should not contain arguments. {self.label}') + + def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult: + try: + urgent = 0 + request = [(self._endpoint, self._request_object, urgent)] + resp = asyncio.run(dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter, + fabricFiltered=self._fabric_filtered)) + except chip.interaction_model.InteractionModelError as error: + return _ActionResult(status=_ActionStatus.ERROR, response=error) + + parsed_resp = EventResponse(event_result_list=resp) + return _ActionResult(status=_ActionStatus.SUCCESS, response=parsed_resp) + + class WaitForCommissioneeAction(BaseAction): ''' Wait for commissionee action to be executed.''' @@ -327,6 +389,27 @@ def name(self) -> str: return self._name +class EventChangeAccumulator: + def __init__(self, name: str, expected_event, output_queue: queue.SimpleQueue): + self._name = name + self._expected_event = expected_event + self._output_queue = output_queue + + def __call__(self, event_result: EventReadResult, transaction: SubscriptionTransaction): + if (self._expected_event.cluster_id == event_result.Header.ClusterId and + self._expected_event.event_id == event_result.Header.EventId): + event_response = EventResponse(event_result_list=[event_result]) + result = _ActionResult(status=_ActionStatus.SUCCESS, response=event_response) + + item = _EventSubscriptionCallbackResult(self._name, result) + logging.debug(f'Got subscription report on client {self.name}') + self._output_queue.put(item) + + @property + def name(self) -> str: + return self._name + + class SubscribeAttributeAction(ReadAttributeAction): '''Single subscribe attribute action to be executed.''' @@ -382,6 +465,63 @@ def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult: return self.parse_raw_response(raw_resp) +class SubscribeEventAction(ReadEventAction): + '''Single subscribe event action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to subscribe event action that can execute with ChipDeviceController. + + Args: + 'test_step': Step containing information required to run subscribe event action. + 'cluster': Name of cluster subscribe event action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + ParsingError: Raised if there is a benign error, and there is currently no + action to perform for this subscribe event. + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step, cluster, context) + self._context = context + if test_step.min_interval is None: + raise UnexpectedParsingError( + f'SubscribeEvent action does not have min_interval {self.label}') + self._min_interval = test_step.min_interval + + if test_step.max_interval is None: + raise UnexpectedParsingError( + f'SubscribeEvent action does not have max_interval {self.label}') + self._max_interval = test_step.max_interval + + def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult: + try: + urgent = 0 + request = [(self._endpoint, self._request_object, urgent)] + subscription = asyncio.run( + dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter, + reportInterval=(self._min_interval, self._max_interval), + keepSubscriptions=False)) + except chip.interaction_model.InteractionModelError as error: + return _ActionResult(status=_ActionStatus.ERROR, response=error) + + self._context.subscriptions.append(subscription) + output_queue = self._context.subscription_callback_result_queue.get(self._event_name, + None) + if output_queue is None: + output_queue = queue.SimpleQueue() + self._context.subscription_callback_result_queue[self._event_name] = output_queue + + while not output_queue.empty(): + output_queue.get(block=False) + + subscription_handler = EventChangeAccumulator(self.label, self._request_object, output_queue) + + subscription.SetEventUpdateCallback(subscription_handler) + + events = subscription.GetEvents() + response = EventResponse(event_result_list=events) + return _ActionResult(status=_ActionStatus.SUCCESS, response=response) + + class WriteAttributeAction(BaseAction): '''Single write attribute action to be executed.''' @@ -462,9 +602,15 @@ def __init__(self, test_step, context: _ExecutionContext): UnexpectedParsingError: Raised if the expected queue does not exist. ''' super().__init__(test_step) - self._attribute_name = stringcase.pascalcase(test_step.attribute) - self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name, - None) + if test_step.attribute is not None: + queue_name = stringcase.pascalcase(test_step.attribute) + elif test_step.event is not None: + queue_name = stringcase.pascalcase(test_step.event) + else: + raise UnexpectedParsingError( + f'WaitForReport needs to wait on either attribute or event, neither were provided') + + self._output_queue = context.subscription_callback_result_queue.get(queue_name, None) if self._output_queue is None: raise UnexpectedParsingError(f'Could not find output queue') @@ -477,6 +623,8 @@ def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult: except queue.Empty: return _ActionResult(status=_ActionStatus.ERROR, response=None) + if isinstance(item, _AttributeSubscriptionCallbackResult): + return item.result return item.result @@ -621,14 +769,15 @@ def _attribute_read_action_factory(self, test_step, cluster: str): 'cluster': Name of cluster read attribute action is targeting. Returns: ReadAttributeAction if 'test_step' is a valid read attribute to be executed. - None if we were unable to use the provided 'test_step' for a known reason that is not - fatal to test execution. ''' try: return ReadAttributeAction(test_step, cluster, self._context) except ParsingError: return None + def _event_read_action_factory(self, test_step, cluster: str): + return ReadEventAction(test_step, cluster, self._context) + def _attribute_subscribe_action_factory(self, test_step, cluster: str): '''Creates subscribe attribute command from TestStep provided. @@ -648,6 +797,17 @@ def _attribute_subscribe_action_factory(self, test_step, cluster: str): # propogated. return None + def _attribute_subscribe_event_factory(self, test_step, cluster: str): + '''Creates subscribe event command from TestStep provided. + + Args: + 'test_step': Step containing information required to run subscribe attribute action. + 'cluster': Name of cluster write attribute action is targeting. + Returns: + SubscribeEventAction if 'test_step' is a valid subscribe attribute to be executed. + ''' + return SubscribeEventAction(test_step, cluster, self._context) + def _attribute_write_action_factory(self, test_step, cluster: str): '''Creates write attribute command TestStep. @@ -712,11 +872,11 @@ def encode(self, request) -> BaseAction: elif command == 'readAttribute': action = self._attribute_read_action_factory(request, cluster) elif command == 'readEvent': - # TODO need to implement _event_read_action_factory - # action = self._event_read_action_factory(request, cluster) - pass + action = self._event_read_action_factory(request, cluster) elif command == 'subscribeAttribute': action = self._attribute_subscribe_action_factory(request, cluster) + elif command == 'subscribeEvent': + action = self._attribute_subscribe_event_factory(request, cluster) elif command == 'waitForReport': action = self._wait_for_report_action_factory(request) else: @@ -779,6 +939,29 @@ def decode(self, result: _ActionResult): } return decoded_response + if isinstance(response, EventResponse): + if not response.event_result_list: + # This means that the event result we got back was empty, below is how we + # represent this. + decoded_response = [{}] + return decoded_response + decoded_response = [] + for event in response.event_result_list: + if event.Status != chip.interaction_model.Status.Success: + error_message = stringcase.snakecase(event.Status.name).upper() + decoded_response.append({'error': error_message}) + continue + cluster_id = event.Header.ClusterId + cluster_name = self._test_spec_definition.get_cluster_name(cluster_id) + event_id = event.Header.EventId + event_name = self._test_spec_definition.get_event_name(cluster_id, event_id) + event_definition = self._test_spec_definition.get_event_by_name(cluster_name, event_name) + is_fabric_scoped = bool(event_definition.is_fabric_sensitive) + decoded_event = Converter.from_data_model_to_test_definition( + self._test_spec_definition, cluster_name, event_definition.fields, event.Data, is_fabric_scoped) + decoded_response.append({'value': decoded_event}) + return decoded_response + if isinstance(response, ChipStackError): decoded_response['error'] = 'FAILURE' return decoded_response