From 28d275b5ccd5a71fd186343f3d1968d081bb277e Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:21:33 -0500 Subject: [PATCH 1/3] Centralize all command sending --- zigpy_xbee/api.py | 29 +++++++++++++---------------- zigpy_xbee/zigbee/application.py | 7 +++++-- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index b4a73da..51ba61f 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -289,6 +289,7 @@ def __init__(self, device_config: Dict[str, Any]) -> None: self._cmd_mode_future: Optional[asyncio.Future] = None self._reset: asyncio.Event = asyncio.Event() self._running: asyncio.Event = asyncio.Event() + self._send_lock = asyncio.Lock() @property def reset_event(self): @@ -353,12 +354,13 @@ async def _remote_at_command(self, ieee, nwk, options, name, *args): LOGGER.debug("Remote AT command: %s %s", name, args) data = t.serialize(args, (AT_COMMANDS[name],)) try: - return await asyncio.wait_for( - self._command( - "remote_at", ieee, nwk, options, name.encode("ascii"), data - ), - timeout=REMOTE_AT_COMMAND_TIMEOUT, - ) + async with self._send_lock: + return await asyncio.wait_for( + self._command( + "remote_at", ieee, nwk, options, name.encode("ascii"), data + ), + timeout=REMOTE_AT_COMMAND_TIMEOUT, + ) except asyncio.TimeoutError: LOGGER.warning("No response to %s command", name) raise @@ -367,10 +369,11 @@ async def _at_partial(self, cmd_type, name, *args): LOGGER.debug("%s command: %s %s", cmd_type, name, args) data = t.serialize(args, (AT_COMMANDS[name],)) try: - return await asyncio.wait_for( - self._command(cmd_type, name.encode("ascii"), data), - timeout=AT_COMMAND_TIMEOUT, - ) + async with self._send_lock: + return await asyncio.wait_for( + self._command(cmd_type, name.encode("ascii"), data), + timeout=AT_COMMAND_TIMEOUT, + ) except asyncio.TimeoutError: LOGGER.warning("%s: No response to %s command", cmd_type, name) raise @@ -597,9 +600,3 @@ async def _probe(self) -> None: raise APIException("Failed to configure XBee for API mode") finally: self.close() - - def __getattr__(self, item): - """Handle supported command requests.""" - if item in COMMAND_REQUESTS: - return functools.partial(self._command, item) - raise AttributeError(f"Unknown command {item}") diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 2158b95..f2cec57 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -302,7 +302,8 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: "Cannot send a packet to a device without a known IEEE address" ) - send_req = self._api.tx_explicit( + send_req = self._api._command( + "tx_explicit", long_addr, short_addr, packet.src_ep or 0, @@ -356,7 +357,9 @@ async def permit_with_link_key( # Key type: # 0 = Pre-configured Link Key (KY command of the joining device) # 1 = Install Code With CRC (I? command of the joining device) - await self._api.register_joining_device(node, reserved, key_type, link_key) + await self._api._command( + "register_joining_device", node, reserved, key_type, link_key + ) def handle_modem_status(self, status): """Handle changed Modem Status of the device.""" From 49def51855d88d25c2db2298b05ffb56c81d0f92 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sun, 10 Dec 2023 18:02:20 -0500 Subject: [PATCH 2/3] Limit request concurrency --- zigpy_xbee/zigbee/application.py | 45 ++++++++++++++++---------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index f2cec57..55f0ffe 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -302,30 +302,31 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: "Cannot send a packet to a device without a known IEEE address" ) - send_req = self._api._command( - "tx_explicit", - long_addr, - short_addr, - packet.src_ep or 0, - packet.dst_ep or 0, - packet.cluster_id, - packet.profile_id, - packet.radius, - tx_opts, - packet.data.serialize(), - ) - - try: - v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS) - except asyncio.TimeoutError: - raise zigpy.exceptions.DeliveryError( - "Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE + async with self._limit_concurrency(): + send_req = self._api._command( + "tx_explicit", + long_addr, + short_addr, + packet.src_ep or 0, + packet.dst_ep or 0, + packet.cluster_id, + packet.profile_id, + packet.radius, + tx_opts, + packet.data.serialize(), ) - if v != TXStatus.SUCCESS: - raise zigpy.exceptions.DeliveryError( - f"Failed to deliver packet: {v!r}", status=v - ) + try: + v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS) + except asyncio.TimeoutError: + raise zigpy.exceptions.DeliveryError( + "Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE + ) + + if v != TXStatus.SUCCESS: + raise zigpy.exceptions.DeliveryError( + f"Failed to deliver packet: {v!r}", status=v + ) @zigpy.util.retryable_request() def remote_at_command( From 6059c1e11f521f13670f6a3d2a17942d15ebc657 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:29:25 -0500 Subject: [PATCH 3/3] Use zigpy PriorityLock and deprioritize TX commands --- pyproject.toml | 2 +- zigpy_xbee/api.py | 54 ++++++++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1544f76..fba512b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ readme = "README.md" license = {text = "GPL-3.0"} requires-python = ">=3.8" dependencies = [ - "zigpy>=0.60.0", + "zigpy>=0.60.2", ] [tool.setuptools.packages.find] diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index 51ba61f..ffad765 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -8,6 +8,7 @@ import serial from zigpy.config import CONF_DEVICE_PATH, SCHEMA_DEVICE +from zigpy.datastructures import PriorityLock from zigpy.exceptions import APIException, DeliveryError import zigpy.types as t @@ -289,7 +290,7 @@ def __init__(self, device_config: Dict[str, Any]) -> None: self._cmd_mode_future: Optional[asyncio.Future] = None self._reset: asyncio.Event = asyncio.Event() self._running: asyncio.Event = asyncio.Event() - self._send_lock = asyncio.Lock() + self._send_lock = PriorityLock() @property def reset_event(self): @@ -334,33 +335,43 @@ def close(self): self._uart.close() self._uart = None - def _command(self, name, *args, mask_frame_id=False): + def _get_command_priority(self, name: str, *args) -> int: + return { + "tx_explicit": -1, + "remote_at": -1, + }.get(name, 0) + + async def _command(self, name, *args, mask_frame_id=False): """Send API frame to the device.""" - LOGGER.debug("Command %s %s", name, args) if self._uart is None: raise APIException("API is not running") - frame_id = 0 if mask_frame_id else self._seq - data, needs_response = self._api_frame(name, frame_id, *args) - self._uart.send(data) - future = None - if needs_response and frame_id: + + async with self._send_lock(priority=self._get_command_priority(name)): + LOGGER.debug("Command %s %s", name, args) + frame_id = 0 if mask_frame_id else self._seq + data, needs_response = self._api_frame(name, frame_id, *args) + self._uart.send(data) + + if not needs_response or not frame_id: + return + future = asyncio.Future() self._awaiting[frame_id] = (future,) - self._seq = (self._seq % 255) + 1 - return future + self._seq = (self._seq % 255) + 1 + + return await future async def _remote_at_command(self, ieee, nwk, options, name, *args): """Execute AT command on a different XBee module in the network.""" LOGGER.debug("Remote AT command: %s %s", name, args) data = t.serialize(args, (AT_COMMANDS[name],)) try: - async with self._send_lock: - return await asyncio.wait_for( - self._command( - "remote_at", ieee, nwk, options, name.encode("ascii"), data - ), - timeout=REMOTE_AT_COMMAND_TIMEOUT, - ) + return await asyncio.wait_for( + self._command( + "remote_at", ieee, nwk, options, name.encode("ascii"), data + ), + timeout=REMOTE_AT_COMMAND_TIMEOUT, + ) except asyncio.TimeoutError: LOGGER.warning("No response to %s command", name) raise @@ -369,11 +380,10 @@ async def _at_partial(self, cmd_type, name, *args): LOGGER.debug("%s command: %s %s", cmd_type, name, args) data = t.serialize(args, (AT_COMMANDS[name],)) try: - async with self._send_lock: - return await asyncio.wait_for( - self._command(cmd_type, name.encode("ascii"), data), - timeout=AT_COMMAND_TIMEOUT, - ) + return await asyncio.wait_for( + self._command(cmd_type, name.encode("ascii"), data), + timeout=AT_COMMAND_TIMEOUT, + ) except asyncio.TimeoutError: LOGGER.warning("%s: No response to %s command", cmd_type, name) raise