-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #469 from canton7/feature/custom-serial
Work around bug in pyserial's PosixPollSerial
- Loading branch information
Showing
7 changed files
with
213 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
custom_components/foxess_modbus/client/custom_modbus_tcp_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
import logging | ||
import select | ||
import socket | ||
import time | ||
from typing import Any | ||
from typing import cast | ||
|
||
from pymodbus.client import ModbusTcpClient | ||
from pymodbus.exceptions import ConnectionException | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class CustomModbusTcpClient(ModbusTcpClient): | ||
"""Custom ModbusTcpClient subclass with some hacks""" | ||
|
||
def __init__(self, delay_on_connect: int | None, **kwargs: Any) -> None: | ||
super().__init__(**kwargs) | ||
self._delay_on_connect = delay_on_connect | ||
|
||
def connect(self) -> bool: | ||
was_connected = self.socket is not None | ||
if not was_connected: | ||
_LOGGER.debug("Connecting to %s", self.params) | ||
is_connected = cast(bool, super().connect()) | ||
# pymodbus doesn't disable Nagle's algorithm. This slows down reads quite substantially as the | ||
# TCP stack waits to see if we're going to send anything else. Disable it ourselves. | ||
if not was_connected and is_connected: | ||
assert self.socket is not None | ||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) | ||
if self._delay_on_connect is not None: | ||
time.sleep(self._delay_on_connect) | ||
return is_connected | ||
|
||
# Replacement of ModbusTcpClient to use poll rather than select, see | ||
# https://github.com/nathanmarlor/foxess_modbus/issues/275 | ||
def recv(self, size: int) -> bytes: | ||
"""Read data from the underlying descriptor.""" | ||
super(ModbusTcpClient, self).recv(size) | ||
if not self.socket: | ||
raise ConnectionException(str(self)) | ||
|
||
# socket.recv(size) waits until it gets some data from the host but | ||
# not necessarily the entire response that can be fragmented in | ||
# many packets. | ||
# To avoid split responses to be recognized as invalid | ||
# messages and to be discarded, loops socket.recv until full data | ||
# is received or timeout is expired. | ||
# If timeout expires returns the read data, also if its length is | ||
# less than the expected size. | ||
self.socket.setblocking(0) | ||
|
||
# In the base method this is 'timeout = self.comm_params.timeout', but that changed from 'self.params.timeout' | ||
# in 3.4.1. So we don't have a consistent way to access the timeout. | ||
# However, this just mirrors what we set, which is the default of 3s. So use that. | ||
# Annoyingly 3.4.1 | ||
timeout = 3 | ||
|
||
# If size isn't specified read up to 4096 bytes at a time. | ||
if size is None: | ||
recv_size = 4096 | ||
else: | ||
recv_size = size | ||
|
||
data: list[bytes] = [] | ||
data_length = 0 | ||
time_ = time.time() | ||
end = time_ + timeout | ||
poll = select.poll() | ||
# We don't need to call poll.unregister, since we're deallocing the poll. register just adds the socket to a | ||
# dict owned by the poll object (the underlying syscall has no concept of register/unregister, and just takes an | ||
# array of fds to poll). If we hit a disconnection the socket.fileno() becomes -1 anyway, so unregistering will | ||
# fail | ||
poll.register(self.socket, select.POLLIN) | ||
while recv_size > 0: | ||
poll_res = poll.poll(end - time_) | ||
# We expect a single-element list if this succeeds, or an empty list if it timed out | ||
if len(poll_res) > 0: | ||
if (recv_data := self.socket.recv(recv_size)) == b"": | ||
return self._handle_abrupt_socket_close( # type: ignore[no-any-return] | ||
size, data, time.time() - time_ | ||
) | ||
data.append(recv_data) | ||
data_length += len(recv_data) | ||
time_ = time.time() | ||
|
||
# If size isn't specified continue to read until timeout expires. | ||
if size: | ||
recv_size = size - data_length | ||
|
||
# Timeout is reduced also if some data has been received in order | ||
# to avoid infinite loops when there isn't an expected response | ||
# size and the slave sends noisy data continuously. | ||
if time_ > end: | ||
break | ||
|
||
return b"".join(data) | ||
|
||
# Replacement of ModbusTcpClient to use poll rather than select, see | ||
# https://github.com/nathanmarlor/foxess_modbus/issues/275 | ||
def _check_read_buffer(self) -> bytes | None: | ||
"""Check read buffer.""" | ||
time_ = time.time() | ||
end = time_ + self.params.timeout | ||
data = None | ||
|
||
assert self.socket is not None | ||
poll = select.poll() | ||
poll.register(self.socket, select.POLLIN) | ||
poll_res = poll.poll(end - time_) | ||
if len(poll_res) > 0: | ||
data = self.socket.recv(1024) | ||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
76 changes: 76 additions & 0 deletions
76
custom_components/foxess_modbus/client/protocol_pollserial.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
""" | ||
Custom protocol handler for pyserial, which uses poll but doesn't have | ||
https://github.com/pyserial/pyserial/issues/617 | ||
""" | ||
|
||
import os | ||
import select | ||
from enum import Enum | ||
|
||
import serial | ||
from serial import serialposix | ||
from serial.serialutil import PortNotOpenError | ||
from serial.serialutil import SerialException | ||
from serial.serialutil import Timeout | ||
|
||
|
||
class _PollResult(Enum): | ||
TIMEOUT = 0 | ||
ABORT = 1 | ||
DATA = 2 | ||
|
||
|
||
class Serial(serialposix.Serial): | ||
""" | ||
From https://github.com/pyserial/pyserial/blob/7aeea35429d15f3eefed10bbb659674638903e3a/serial/serialposix.py, | ||
but with https://github.com/pyserial/pyserial/pull/618 applied | ||
""" | ||
|
||
@serial.Serial.port.setter # type: ignore | ||
def port(self, value: str) -> None: | ||
if value is not None: | ||
serial.Serial.port.__set__(self, value.removeprefix("pollserial://")) | ||
|
||
def read(self, size: int = 1) -> bytes: | ||
"""\ | ||
Read size bytes from the serial port. If a timeout is set it may | ||
return less characters as requested. With no timeout it will block | ||
until the requested number of bytes is read. | ||
""" | ||
if not self.is_open: | ||
raise PortNotOpenError() | ||
read = bytearray() | ||
timeout = Timeout(self._timeout) | ||
poll = select.poll() | ||
poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) | ||
poll.register(self.pipe_abort_read_r, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) | ||
if size > 0: | ||
while len(read) < size: | ||
# wait until device becomes ready to read (or something fails) | ||
result = _PollResult.TIMEOUT # In case poll returns an empty list | ||
for fd, event in poll.poll(None if timeout.is_infinite else (timeout.time_left() * 1000)): | ||
if fd == self.pipe_abort_read_r: | ||
os.read(self.pipe_abort_read_r, 1000) | ||
result = _PollResult.ABORT | ||
break | ||
if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): | ||
raise SerialException("device reports error (poll)") | ||
result = _PollResult.DATA | ||
|
||
if result == _PollResult.DATA: | ||
buf = os.read(self.fd, size - len(read)) | ||
read.extend(buf) | ||
if ( | ||
result == _PollResult.TIMEOUT | ||
or result == _PollResult.ABORT | ||
or timeout.expired() | ||
or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0) | ||
and not buf | ||
): | ||
break # early abort on timeout | ||
return bytes(read) | ||
|
||
|
||
# This needs to have a very particular name, as it's registered by string in modbus_client | ||
assert Serial.__module__ == "custom_components.foxess_modbus.client.protocol_pollserial" | ||
assert Serial.__name__ == "Serial" |
Oops, something went wrong.