From b2da1de3878976ac91427bdb731a3f269853c39b Mon Sep 17 00:00:00 2001 From: Ruud de Jong Date: Mon, 4 Mar 2024 17:08:57 +0100 Subject: [PATCH] Finalized all checks --- .github/workflows/test.yml | 9 +- docs/source/conf.py | 17 ++- examples/echoserver/client.py | 14 +-- examples/echoserver/server.py | 44 +++++--- pyproject.toml | 37 +++++-- src/sliplib/__init__.py | 12 +-- src/sliplib/slip.py | 45 ++++---- src/sliplib/sliprequesthandler.py | 19 ++-- src/sliplib/slipsocket.py | 101 +++++++----------- src/sliplib/slipstream.py | 82 +++++++------- src/sliplib/slipwrapper.py | 25 +++-- src/sliplib/version.py | 2 +- tests/integration/test_server_client.py | 23 ++-- tests/integration/test_slip_file.py | 4 +- .../integration/test_slip_file_unbuffered.py | 2 +- tests/integration/test_slip_in_memory_io.py | 7 +- tests/unit/test_slip.py | 6 +- tests/unit/test_sliprequesthandler.py | 17 ++- tests/unit/test_slipsocket.py | 55 +++------- tests/unit/test_slipstream.py | 51 +++------ tests/unit/test_slipwrapper.py | 4 +- 21 files changed, 258 insertions(+), 318 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 04c9346..11e595d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,6 @@ name: test on: push: - branches: [main, master] pull_request: branches: [main, master] @@ -35,8 +34,14 @@ jobs: - name: Install Hatch run: pip install --upgrade hatch + - name: Run style check + run: hatch run style:check + + - name: Check types + run: hatch run types:check + - name: Run static analysis run: hatch fmt --check - name: Run tests - run: hatch run cov + run: hatch run test:cov diff --git a/docs/source/conf.py b/docs/source/conf.py index b6175d9..75f6332 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -1,3 +1,5 @@ +# ruff: noqa: INP001 + # Configuration file for the Sphinx documentation builder. # # For the full list of built-in configuration values, see the documentation: @@ -10,8 +12,10 @@ project = "SlipLib" -copyright = "2024, Ruud de Jong" +copyright = "2024, Ruud de Jong" # noqa: A001 author = "Ruud de Jong" +github_username = "rhjdjong" +github_repository = "https://github.com/rhjdjong/SlipLib/" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration @@ -25,7 +29,13 @@ "sphinx.ext.autodoc.typehints", "sphinx.ext.intersphinx", "sphinx.ext.napoleon", + "sphinx.ext.viewcode", "sphinx_rtd_theme", + "sphinx_tabs.tabs", + "sphinx_toolbox.more_autodoc.autoprotocol", + "sphinx_toolbox.more_autodoc.typehints", + "sphinx_toolbox.more_autodoc.typevars", + "sphinx_toolbox.more_autodoc.variables", "sphinx_autodoc_typehints", ] @@ -36,10 +46,7 @@ # napoleon_use_rtype = False autoclass_content = "both" autodoc_typehints = "description" -autodoc_type_aliases = { - "IPAddress": "IPAddress", - "Tuple[SlipSocket, IPAddress]": "Tuple[SlipSocket, IPAddress]", -} +autodoc_type_aliases = {} add_module_names = False intersphinx_mapping = {"python": ("https://docs.python.org/3", None)} diff --git a/examples/echoserver/client.py b/examples/echoserver/client.py index 7f3c54f..d5e734b 100644 --- a/examples/echoserver/client.py +++ b/examples/echoserver/client.py @@ -1,3 +1,5 @@ +# ruff: noqa: T201 + # Copyright (c) 2020 Ruud de Jong # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. @@ -13,25 +15,25 @@ This is repeated until the user enters an empty message. """ -# ruff: noqa: F201 +# ruff: noqa: T201 import sys import sliplib -if __name__ == '__main__': +if __name__ == "__main__": if len(sys.argv) != 2: # noqa: PLR2004 print("Usage: python client.py ") sys.exit(1) port = sys.argv[1] print(f"Connecting to server on port {port}") - sock = sliplib.SlipSocket.create_connection(('localhost', int(port))) + sock = sliplib.SlipSocket.create_connection(("localhost", int(port))) print(f"Connected to {sock.getpeername()}") while True: - message = input('Message>') + message = input("Message>") if not message: break - b_message = bytes(message, 'utf-8') + b_message = bytes(message, "utf-8") sock.send_msg(b_message) b_reply = sock.recv_msg() - print('Response:', b_reply) + print("Response:", b_reply) diff --git a/examples/echoserver/server.py b/examples/echoserver/server.py index 2dbb155..3721dde 100644 --- a/examples/echoserver/server.py +++ b/examples/echoserver/server.py @@ -19,11 +19,20 @@ # ruff: noqa: T201 +from __future__ import annotations + import socket import sys from socketserver import TCPServer +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + if sys.version_info >= (3, 12): # noqa: UP036 + from collections.abc import Buffer + else: + from typing_extensions import Buffer -from _socket import dup # type: ignore +from _socket import dup from sliplib import SlipRequestHandler @@ -31,50 +40,51 @@ class _ChattySocket(socket.socket): """A socket subclass that prints the raw data that is received and sent.""" - def __init__(self, sock): + def __init__(self, sock: socket.socket) -> None: fd = dup(sock.fileno()) super().__init__(sock.family, sock.type, sock.proto, fileno=fd) super().settimeout(sock.gettimeout()) - def recv(self, chunksize): - data = super().recv(chunksize) - print('Raw data received:', data) + def recv(self, chunksize: int, *args: Any) -> bytes: + data = super().recv(chunksize, *args) + print("Raw data received:", data) return data - def sendall(self, data): - print('Sending raw data:', data) - super().sendall(data) + def sendall(self, data: Buffer, *args: Any) -> None: + print("Sending raw data:", data) + super().sendall(data, *args) class SlipHandler(SlipRequestHandler): """A SlipRequestHandler that echoes the received message with the bytes in reversed order.""" - def setup(self): + def setup(self) -> None: self.request = _ChattySocket(self.request) print(f"Incoming connection from {self.request.getpeername()}") super().setup() # Dedicated handler to show the encoded bytes. - def handle(self): + def handle(self) -> None: while True: message = self.request.recv_msg() - print('Decoded data:', message) + print("Decoded data:", message) if message: self.request.send_msg(bytes(reversed(message))) else: - print('Closing down') + print("Closing down") break class TCPServerIPv6(TCPServer): """An IPv6 TCPServer""" + address_family = socket.AF_INET6 -if __name__ == '__main__': - if len(sys.argv) > 1 and sys.argv[1].lower() == 'ipv6': - server = TCPServerIPv6(('localhost', 0), SlipHandler) # type: TCPServer +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1].lower() == "ipv6": + server = TCPServerIPv6(("localhost", 0), SlipHandler) # type: TCPServer else: - server = TCPServer(('localhost', 0), SlipHandler) - print('Slip server listening on localhost, port', server.server_address[1]) + server = TCPServer(("localhost", 0), SlipHandler) + print("Slip server listening on localhost, port", server.server_address[1]) server.handle_request() diff --git a/pyproject.toml b/pyproject.toml index cf8cece..3efcb65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,13 +35,21 @@ dependencies = [] path = "src/sliplib/version.py" [tool.hatch.envs.default] +python = "3.12" dependencies = [ - "coverage[toml]>=6.5", "pytest", "pytest-mock", ] -[tool.hatch.envs.default.scripts] +[[tool.hatch.envs.test.matrix]] +python = ["3.8", "3.9", "3.10", "3.11", "3.12"] + +[tool.hatch.envs.test] +extra-dependencies = [ + "coverage[toml]>=6.5", +] + +[tool.hatch.envs.test.scripts] test = "pytest {args:tests}" test-cov = "coverage run -m pytest {args:tests}" cov-report = [ @@ -53,18 +61,17 @@ cov = [ "cov-report", ] -[[tool.hatch.envs.all.matrix]] +[[tool.hatch.envs.types.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] [tool.hatch.envs.types] -dependencies = [ - "pytest", - "pytest-mock", +extra-dependencies = [ "mypy>=1.0.0", + "typing_extensions" ] [tool.hatch.envs.types.scripts] -check = "mypy --strict --install-types --non-interactive {args:src/sliplib tests}" +check = "mypy --strict --install-types --non-interactive {args:src tests examples}" [tool.hatch.envs.style] dependencies = [ @@ -73,10 +80,10 @@ dependencies = [ ] [tool.hatch.envs.style.scripts] -isort-check = "isort --check-only {args:src tests}" -black-check = "black --check {args:src tests}" -isort-fix = "isort {args:src tests}" -black-fix = "black {args:src tests}" +isort-check = "isort --check-only {args:src tests examples docs}" +black-check = "black --check {args:src tests examples docs}" +isort-fix = "isort {args:src tests examples docs}" +black-fix = "black {args:src tests examples docs}" check = [ "- isort-check", "black-check", @@ -116,10 +123,18 @@ python = "3.12" dependencies = [ "sphinx", "sphinx_rtd_theme", + "sphinx-toolbox", ] [tool.hatch.envs.doc.scripts] build = "cd {root}/docs && make {args:html}" +[tool.black] +line-length = 120 + [tool.isort] profile = "black" +line_length = 120 + +[tool.ruff] +target-version = "py312" diff --git a/src/sliplib/__init__.py b/src/sliplib/__init__.py index c0cf2aa..293dc8b 100644 --- a/src/sliplib/__init__.py +++ b/src/sliplib/__init__.py @@ -76,12 +76,11 @@ .. autoexception:: ProtocolError """ -from .slip import * -from .sliprequesthandler import * -from .slipsocket import * -from .slipstream import * -from .slipwrapper import * -from .version import __version__ +from sliplib.slip import END, ESC, ESC_END, ESC_ESC, Driver, ProtocolError, decode, encode, is_valid +from sliplib.sliprequesthandler import SlipRequestHandler +from sliplib.slipsocket import SlipSocket +from sliplib.slipstream import SlipStream +from sliplib.slipwrapper import SlipWrapper __all__ = [ "encode", @@ -97,5 +96,4 @@ "ESC", "ESC_END", "ESC_ESC", - "__version__", ] diff --git a/src/sliplib/slip.py b/src/sliplib/slip.py index dcb6546..af0b64a 100644 --- a/src/sliplib/slip.py +++ b/src/sliplib/slip.py @@ -6,13 +6,18 @@ Constants --------- -.. data:: END -.. data:: ESC -.. data:: ESC_END -.. data:: ESC_ESC +The following constants represent the special bytes +used by SLIP for delimiting and encoding messages. + +.. autovariable:: END + :no-type: +.. autovariable:: ESC + :no-type: +.. autovariable:: ESC_END + :no-type: +.. autovariable:: ESC_ESC + :no-type: - These constants represent the special bytes - used by SLIP for delimiting and encoding messages. Functions --------- @@ -44,13 +49,11 @@ import collections import re -from typing import Deque, List, Union -END = b"\xc0" -ESC = b"\xdb" -ESC_END = b"\xdc" -ESC_ESC = b"\xdd" -"""These constants represent the special SLIP bytes""" +END = b"\xc0" #: The SLIP `END` byte. +ESC = b"\xdb" #: The SLIP `ESC` byte. +ESC_END = b"\xdc" #: The SLIP byte that, when preceded by an `ESC` byte, represents an escaped `END` byte. +ESC_ESC = b"\xdd" #: The SLIP byte that, when preceded by an `ESC` byte, represents an escaped `ESC` byte. class ProtocolError(ValueError): @@ -115,11 +118,7 @@ def is_valid(packet: bytes) -> bool: :const:`True` if the packet is valid, :const:`False` otherwise """ packet = packet.strip(END) - return not ( - END in packet - or packet.endswith(ESC) - or re.search(ESC + b"[^" + ESC_END + ESC_ESC + b"]", packet) - ) + return not (END in packet or packet.endswith(ESC) or re.search(ESC + b"[^" + ESC_END + ESC_ESC + b"]", packet)) class Driver: @@ -131,8 +130,8 @@ class Driver: def __init__(self) -> None: self._recv_buffer = b"" - self._packets: Deque[bytes] = collections.deque() - self._messages: List[bytes] = [] + self._packets: collections.deque[bytes] = collections.deque() + self._messages: list[bytes] = [] def send(self, message: bytes) -> bytes: # pylint: disable=no-self-use """Encodes a message into a SLIP-encoded packet. @@ -147,7 +146,7 @@ def send(self, message: bytes) -> bytes: # pylint: disable=no-self-use """ return encode(message) - def receive(self, data: Union[bytes, int]) -> List[bytes]: + def receive(self, data: bytes | int) -> list[bytes]: """Decodes data and gives a list of decoded messages. Processes :obj:`data`, which must be a bytes-like object, @@ -207,7 +206,7 @@ def receive(self, data: Union[bytes, int]) -> List[bytes]: # Process the buffered packets return self.flush() - def flush(self) -> List[bytes]: + def flush(self) -> list[bytes]: """Gives a list of decoded messages. Decodes the packets in the internal buffer. @@ -221,7 +220,7 @@ def flush(self) -> List[bytes]: Raises: ProtocolError: When any of the buffered packets contains an invalid byte sequence. """ - messages = [] # type: List[bytes] + messages: list[bytes] = [] while self._packets: packet = self._packets.popleft() try: @@ -234,7 +233,7 @@ def flush(self) -> List[bytes]: return messages @property - def messages(self) -> List[bytes]: + def messages(self) -> list[bytes]: """A list of decoded messages. The read-only attribute :attr:`messages` contains diff --git a/src/sliplib/sliprequesthandler.py b/src/sliplib/sliprequesthandler.py index 55fcb8d..04d065d 100644 --- a/src/sliplib/sliprequesthandler.py +++ b/src/sliplib/sliprequesthandler.py @@ -9,12 +9,6 @@ .. autoclass:: SlipRequestHandler :show-inheritance: - The interface is identical to that offered by the - :class:`socketserver.BaseRequestHandler` baseclass. - To do anything useful, a derived class must define - a new :meth:`handle` method, and may override any - of the other methods. - .. automethod:: setup .. automethod:: handle .. automethod:: finish @@ -23,7 +17,7 @@ from socketserver import BaseRequestHandler -from .slipsocket import SlipSocket +from sliplib.slipsocket import SlipSocket class SlipRequestHandler(BaseRequestHandler): @@ -33,10 +27,11 @@ class SlipRequestHandler(BaseRequestHandler): for the purpose of creating TCP server instances that can handle incoming SLIP-based connections. - To implement a specific behaviour, all that - is needed is to derive a class that - defines a :meth:`handle` method that uses + To do anything useful, a derived class must define + a new :meth:`handle` method that uses :attr:`self.request` to send and receive SLIP-encoded messages. + + Other methods can of course also be overridden if necessary. """ def setup(self) -> None: @@ -62,14 +57,14 @@ def handle(self) -> None: The purpose of the SLIP protocol is to allow separation of messages in a continuous byte stream. As such, it is expected that the :meth:`handle` method of a derived class - is capable of handling multiple SLIP messages: + is capable of handling multiple SLIP messages, for example: .. code:: def handle(self): while True: msg = self.request.recv_msg() - if msg == b'': + if msg == b"": break # Do something with the message """ diff --git a/src/sliplib/slipsocket.py b/src/sliplib/slipsocket.py index 3d4fd3c..6b1ba50 100644 --- a/src/sliplib/slipsocket.py +++ b/src/sliplib/slipsocket.py @@ -2,11 +2,12 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. +# ruff: noqa: UP007 UP006 UP035 """ SlipSocket ---------- -.. autoclass:: TCPAddress +.. autodata:: TCPAddress .. autoclass:: SlipSocket(sock) :show-inheritance: @@ -28,6 +29,7 @@ The following commonly used :class:`socket.socket` methods are exposed through a :class:`SlipSocket` object. These methods are simply delegated to the wrapped `socket` instance. + See the documentation for :mod:`socket.socket` for more information on these methods. .. automethod:: bind .. automethod:: close @@ -51,14 +53,11 @@ In particular, do not use any of the :meth:`recv*` or :meth:`send*` methods on the :attr:`socket` attribute. - A :class:`SlipSocket` instance has the following attributes in addition to the attributes + A :class:`SlipSocket` instance has the following attributes + and read-only properties in addition to the attributes offered by its base class :class:`SlipWrapper`: - .. attribute:: socket - - The wrapped `socket`. - This is actually just an alias for the :attr:`stream` attribute in the base class. - + .. autoattribute:: socket .. autoattribute:: family .. autoattribute:: type .. autoattribute:: proto @@ -67,15 +66,13 @@ import socket import warnings -from typing import Any, Optional, Tuple, TypeAlias, Union, cast +from typing import Any, Optional, Tuple, Union, cast -from .slipwrapper import SlipWrapper +from sliplib.slipwrapper import SlipWrapper #: TCPAddress stands for either an IPv4 address, i.e. a (host, port) tuple, #: or an IPv6 address, i.e. a (host, port, flowinfo, scope_id) tuple. -TCPAddress: TypeAlias = Union[ - Tuple[Optional[str], int], Tuple[Optional[str], int, int, int] -] +TCPAddress = Union[Tuple[Optional[str], int], Tuple[Optional[str], int, int, int]] class SlipSocket(SlipWrapper[socket.socket]): @@ -100,7 +97,7 @@ class SlipSocket(SlipWrapper[socket.socket]): but the socket that is returned by the :class:`socket`'s :meth:`accept` method is automatically wrapped in a :class:`SlipSocket` object. - In stead of the :class:`socket`'s :meth:`send*` and :meth:`recv*` methods + Instead of the :class:`socket`'s :meth:`send*` and :meth:`recv*` methods a :class:`SlipSocket` provides the method :meth:`send_msg` and :meth:`recv_msg` to send and receive SLIP-encoded messages. @@ -135,8 +132,13 @@ class method :meth:`create_connection`. """ if not isinstance(sock, socket.socket) or sock.type != socket.SOCK_STREAM: - raise ValueError("Only sockets with type SOCK_STREAM are supported") + error_msg = "Only sockets with type SOCK_STREAM are supported." + raise ValueError(error_msg) super().__init__(sock) + + #: The wrapped `socket`. + #: This is actually just an alias for the :attr:`stream` + #: attribute in the base class. self.socket = self.stream def send_bytes(self, packet: bytes) -> None: @@ -154,8 +156,6 @@ def accept(self) -> Tuple[SlipSocket, TCPAddress]: (:class:`SlipSocket`, :class:`TCPAddress`): A tuple with a :class:`SlipSocket` object and the remote IP address. - See Also: - :meth:`socket.socket.accept` """ conn, address = self.socket.accept() return self.__class__(conn), address @@ -165,19 +165,11 @@ def bind(self, address: TCPAddress) -> None: Args: address (:class:`TCPAddress`): The address to bind to. - - See Also: - :meth:`socket.socket.bind` """ self.socket.bind(address) def close(self) -> None: - """Close the `SlipSocket`. - - - See Also: - :meth:`socket.socket.close` - """ + """Close the `SlipSocket`.""" self.socket.close() def connect(self, address: TCPAddress) -> None: @@ -185,9 +177,6 @@ def connect(self, address: TCPAddress) -> None: Args: address (:class:`TCPAddress`): The IP address of the remote socket. - - See Also: - :meth:`socket.socket.connect` """ self.socket.connect(address) @@ -196,9 +185,6 @@ def connect_ex(self, address: TCPAddress) -> None: Args: address (:class:`TCPAddress`): The IP address of the remote socket. - - See Also: - :meth:`socket.socket.connect_ex` """ self.socket.connect_ex(address) @@ -207,9 +193,6 @@ def getpeername(self) -> TCPAddress: Returns: :class:`TCPAddress`: The remote IP address. - - See Also: - :meth:`socket.socket.getpeername` """ return cast(TCPAddress, self.socket.getpeername()) @@ -218,20 +201,14 @@ def getsockname(self) -> TCPAddress: Returns: :class:`TCPAddress`: The local IP address. - - See Also: - :meth:`socket.socket.getsockname` """ return cast(TCPAddress, self.socket.getsockname()) - def listen(self, backlog: Optional[int] = None) -> None: + def listen(self, backlog: int | None = None) -> None: """Enable a `SlipSocket` server to accept connections. Args: backlog (int): The maximum number of waiting connections. - - See Also: - :meth:`socket.socket.listen` """ if backlog is None: self.socket.listen() @@ -243,42 +220,42 @@ def shutdown(self, how: int) -> None: Args: how: Flag to indicate which halves of the connection must be shut down. - - See Also: - :meth:`socket.socket.shutdown` """ self.socket.shutdown(how) @property def family(self) -> int: # pylint: disable=line-too-long - """The wrapped socket's address family. Usually :const:`socket.AF_INET` (IPv4) or :const:`socket.AF_INET6` (IPv6).""" + """The wrapped socket's address family. + + Usually :const:`socket.AF_INET` (IPv4) or :const:`socket.AF_INET6` (IPv6). + """ return self.socket.family @property - def type(self) -> int: - """The wrapped socket's type. Always :const:`socket.SOCK_STREAM`.""" + def type(self) -> int: # noqa: A003 + """The wrapped socket's type. + + Always :const:`socket.SOCK_STREAM`. + """ return self.socket.type @property def proto(self) -> int: - """The wrapped socket's protocol number. Usually 0.""" + """The wrapped socket's protocol number. + + Usually 0. + """ return self.socket.proto def __getattr__(self, attribute: str) -> Any: - if ( - attribute.startswith("recv") - or attribute.startswith("send") - or attribute - in ( - "makefile", - "share", - "dup", - ) + if attribute.startswith(("recv", "send")) or attribute in ( + "makefile", + "share", + "dup", ): - raise AttributeError( - f"'{self.__class__.__name__}' object has no attribute '{attribute}'" - ) + error_msg = f"'{self.__class__.__name__}' object has no attribute '{attribute}'" + raise AttributeError(error_msg) warnings.warn( "Direct access to the enclosed socket attributes and methods will be removed in version 1.0", DeprecationWarning, @@ -290,8 +267,8 @@ def __getattr__(self, attribute: str) -> Any: def create_connection( cls, address: TCPAddress, - timeout: Optional[float] = None, - source_address: Optional[TCPAddress] = None, + timeout: float | None = None, + source_address: TCPAddress | None = None, ) -> SlipSocket: """Create a SlipSocket connection. diff --git a/src/sliplib/slipstream.py b/src/sliplib/slipstream.py index 7d9ab48..85773f8 100644 --- a/src/sliplib/slipstream.py +++ b/src/sliplib/slipstream.py @@ -5,12 +5,9 @@ """ SlipStream ---------- -.. autoclass:: IOStream +.. autoprotocol:: IOStream :show-inheritance: - .. automethod:: read - .. automethod:: write - .. autoclass:: SlipStream(stream, [chunk_size]) :show-inheritance: @@ -26,25 +23,26 @@ import warnings from typing import Any, Protocol -from .slipwrapper import SlipWrapper +from sliplib.slipwrapper import SlipWrapper class IOStream(Protocol): - """Protocol class for wrappable byte streams. + """ + Protocol class for wrappable byte streams. - An IOStream must be a byte stream that has `read()` and `write()` methods. - This can be e.g. a subclass of :class:`io.RawIOBase`, :class:`io.BufferedIOBase`, - :class:`io.FileIO`, but any class that contains the two required methods can be used. + Any object that produces and consumes a byte stream and contains the two required methods can be used. + Typically, an IOStream is a subclass of :class:`io.RawIOBase`, :class:`io.BufferedIOBase`, + :class:`io.FileIO`, or similar classes, but this is not required. """ def read(self, chunksize: int) -> bytes: """Read `chunksize` bytes from the stream Args: - chunksize: The number of bytes to read from the `IOStream`. + chunksize: The number of bytes to read from the :protocol:`IOStream`. Returns: - The bytes read from the `IOStream`. May be less than the number + The bytes read from the :protocol:`IOStream`. May be less than the number specified by `chunksize`. """ @@ -72,7 +70,7 @@ class SlipStream(SlipWrapper[IOStream]): The :class:`SlipStream` class has all the methods and attributes from its base class :class:`SlipWrapper`. - In addition it directly exposes all methods and attributes of + In addition, it directly exposes all methods and attributes of the contained :obj:`stream`, except for the following: * :meth:`read*` and :meth:`write*`. These methods are not @@ -83,7 +81,7 @@ class SlipStream(SlipWrapper[IOStream]): * :meth:`raw`, :meth:`detach` and other methods that provide access to or manipulate the stream's internal data. - In stead of the :meth:`read*` and :meth:`write*` methods + Instead of the :meth:`read*` and :meth:`write*` methods a :class:`SlipStream` object provides the method :meth:`recv_msg` and :meth:`send_msg` to read and write SLIP-encoded messages. @@ -94,7 +92,6 @@ class SlipStream(SlipWrapper[IOStream]): """ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): - # pylint: disable=missing-raises-doc """ To instantiate a :class:`SlipStream` object, the user must provide a pre-constructed open byte stream that is ready for reading and/or writing @@ -104,6 +101,7 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): chunk_size: the number of bytes to read per read operation. The default value for `chunck_size` is `io.DEFAULT_BUFFER_SIZE`. + Setting the `chunk_size` is useful when the stream has a low bandwidth and/or bursty data (e.g. a serial port interface). In such cases it is useful to have a `chunk_size` of 1, to avoid that the application @@ -125,11 +123,11 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): """ for method in ("read", "write"): if not hasattr(stream, method) or not callable(getattr(stream, method)): - raise TypeError( - f"{stream.__class__.__name__} object has no method {method}" - ) + error_msg = f"{stream.__class__.__name__} object has no method {method}" + raise TypeError(error_msg) if hasattr(stream, "encoding"): - raise TypeError(f"{stream.__class__.__name__} object is not a byte stream") + error_msg = f"{stream.__class__.__name__} object is not a byte stream" + raise TypeError(error_msg) self._chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE super().__init__(stream) @@ -167,32 +165,26 @@ def _stream_is_closed(self) -> bool: return getattr(self.stream, "closed", False) def __getattr__(self, attribute: str) -> Any: - if ( - attribute.startswith("read") - or attribute.startswith("write") - or attribute - in ( - "detach", - "flushInput", - "flushOutput", - "getbuffer", - "getvalue", - "peek", - "raw", - "reset_input_buffer", - "reset_output_buffer", - "seek", - "seekable", - "tell", - "truncate", - ) + if attribute.startswith(("read", "write")) or attribute in ( + "detach", + "flushInput", + "flushOutput", + "getbuffer", + "getvalue", + "peek", + "raw", + "reset_input_buffer", + "reset_output_buffer", + "seek", + "seekable", + "tell", + "truncate", ): - raise AttributeError( - f"'{self.__class__.__name__}' object has no attribute '{attribute}'" - ) - warnings.warn( - "Direct access to the enclosed stream attributes and methods will be removed in version 1.0", - DeprecationWarning, - stacklevel=2, - ) + error_msg = f"'{self.__class__.__name__}' object has no attribute '{attribute}'" + raise AttributeError(error_msg) + + # Deprecation warning + warning_msg = "Direct access to the enclosed stream attributes and methods will be removed in version 1.0" + warnings.warn(warning_msg, DeprecationWarning, stacklevel=2) + return getattr(self.stream, attribute) diff --git a/src/sliplib/slipwrapper.py b/src/sliplib/slipwrapper.py index 8d69193..2e491e3 100644 --- a/src/sliplib/slipwrapper.py +++ b/src/sliplib/slipwrapper.py @@ -6,7 +6,8 @@ SlipWrapper ----------- -.. autoclass:: ByteStream +.. autotypevar:: ByteStream + :no-type: .. autoclass:: SlipWrapper :show-inheritance: @@ -26,12 +27,16 @@ """ from __future__ import annotations -import collections import sys -from types import TracebackType -from typing import Deque, Generic, Iterator, Optional, TypeVar +from types import TracebackType # noqa: TCH003 +from typing import TYPE_CHECKING, Generic, TypeVar -from .slip import Driver, ProtocolError +if TYPE_CHECKING: + from collections.abc import Iterator + +from collections import deque + +from sliplib.slip import Driver, ProtocolError #: ByteStream is a :class:`TypeVar` that stands for a generic byte stream. ByteStream = TypeVar("ByteStream") @@ -69,9 +74,9 @@ def __init__(self, stream: ByteStream): self.stream = stream #: The :class:`SlipWrapper`'s :class:`Driver` instance. self.driver = Driver() - self._messages: Deque[bytes] = collections.deque() - self._protocol_error: Optional[ProtocolError] = None - self._traceback: Optional[TracebackType] = None + self._messages: deque[bytes] = deque() + self._protocol_error: ProtocolError | None = None + self._traceback: TracebackType | None = None self._flush_needed = False self._stream_closed = False @@ -144,9 +149,7 @@ def recv_msg(self) -> bytes: data = self.recv_bytes() if data == b"": self._stream_closed = True - if isinstance( - data, int - ): # Single byte reads are represented as integers + if isinstance(data, int): # Single byte reads are represented as integers data = bytes([data]) self._messages.extend(self.driver.receive(data)) except ProtocolError as protocol_error: diff --git a/src/sliplib/version.py b/src/sliplib/version.py index a5a3a57..bf74992 100644 --- a/src/sliplib/version.py +++ b/src/sliplib/version.py @@ -4,4 +4,4 @@ """Indicates the version of sliplib.""" -__version__ = "0.6.3" +__version__ = "0.7.0" diff --git a/tests/integration/test_server_client.py b/tests/integration/test_server_client.py index 98a0011..0d99c92 100644 --- a/tests/integration/test_server_client.py +++ b/tests/integration/test_server_client.py @@ -2,7 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init +# ruff: noqa: UP006 UP035 """ This module tests SlipSocket using a SLIP echo server, similar to the one in the examples directory. @@ -12,13 +12,12 @@ from multiprocessing import Pipe, Process from multiprocessing.connection import Connection from socketserver import TCPServer -from typing import Dict, Generator, Tuple, Type +from typing import Generator, Mapping, Tuple, Type import pytest -from pytest import CaptureFixture, FixtureRequest from sliplib import SlipRequestHandler, SlipSocket -from sliplib.slipsocket import Address +from sliplib.slipsocket import TCPAddress class SlipEchoHandler(SlipRequestHandler): @@ -34,16 +33,16 @@ def handle(self) -> None: self.request.send_msg(data_to_send) -class SlipEchoServer: # pylint: disable=too-few-public-methods +class SlipEchoServer: """Execution helper for the echo server. Sends the server address back over the pipe.""" - server_data = { + server_data: Mapping[int, Tuple[Type[TCPServer], str]] = { socket.AF_INET: (TCPServer, "127.0.0.1"), socket.AF_INET6: ( type("TCPServerIPv6", (TCPServer,), {"address_family": socket.AF_INET6}), "::1", ), - } # type: Dict[int, Tuple[Type[TCPServer], str]] + } def __init__(self, address_family: int, pipe: Connection) -> None: server_class, localhost = self.server_data[address_family] @@ -55,7 +54,7 @@ def __init__(self, address_family: int, pipe: Connection) -> None: class SlipEchoClient: """Client for the SLIP echo server""" - def __init__(self, address: Address) -> None: + def __init__(self, address: TCPAddress) -> None: self.sock = SlipSocket.create_connection(address) def echo(self, msg: bytes) -> bytes: @@ -72,17 +71,13 @@ class TestEchoServer: """Test for the SLIP echo server""" @pytest.fixture(autouse=True, params=[socket.AF_INET, socket.AF_INET6]) - def setup( - self, request: FixtureRequest, capfd: CaptureFixture[str] - ) -> Generator[None, None, None]: + def setup(self, request: pytest.FixtureRequest, capfd: pytest.CaptureFixture[str]) -> Generator[None, None, None]: """Prepare the server and client""" near, far = Pipe() address_family = request.param self.server = Process(target=SlipEchoServer, args=(address_family, far)) self.server.start() - address_available = near.poll( - 1.5 - ) # AppVeyor sometimes takes a long time to run the server. + address_available = near.poll(1.5) # AppVeyor sometimes takes a long time to run the server. if address_available: server_address = near.recv() else: diff --git a/tests/integration/test_slip_file.py b/tests/integration/test_slip_file.py index b485a4c..7117d02 100644 --- a/tests/integration/test_slip_file.py +++ b/tests/integration/test_slip_file.py @@ -2,8 +2,6 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=relative-beyond-top-level - """Test using SlipStream with a buffered file""" from sliplib import SlipStream, encode @@ -20,7 +18,7 @@ def test_reading_slip_file(self) -> None: self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) with self.filepath.open(mode="rb") as f: slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): + for exp, act in zip(data, slipstream): # noqa: B905 assert exp == act def test_writing_slip_file(self) -> None: diff --git a/tests/integration/test_slip_file_unbuffered.py b/tests/integration/test_slip_file_unbuffered.py index 55b49ac..dd0be49 100644 --- a/tests/integration/test_slip_file_unbuffered.py +++ b/tests/integration/test_slip_file_unbuffered.py @@ -20,7 +20,7 @@ def test_reading_slip_file(self) -> None: self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) with self.filepath.open(mode="rb", buffering=0) as f: slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): + for exp, act in zip(data, slipstream): # noqa: B905 assert exp == act def test_writing_slip_file(self) -> None: diff --git a/tests/integration/test_slip_in_memory_io.py b/tests/integration/test_slip_in_memory_io.py index 0c306dd..ee22ec8 100644 --- a/tests/integration/test_slip_in_memory_io.py +++ b/tests/integration/test_slip_in_memory_io.py @@ -2,7 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init +# ruff: noqa: UP035 """Test using SlipStream with an in-memory bytestream""" @@ -54,7 +54,4 @@ def test_stream_writing(self) -> None: msg_list = [b"hallo", b"bye"] for msg in msg_list: self.slipstream.send_msg(msg) - assert ( - self.basestream.getvalue() - == END + msg_list[0] + END + END + msg_list[1] + END - ) + assert self.basestream.getvalue() == END + msg_list[0] + END + END + msg_list[1] + END diff --git a/tests/unit/test_slip.py b/tests/unit/test_slip.py index cd7a144..9ae2647 100644 --- a/tests/unit/test_slip.py +++ b/tests/unit/test_slip.py @@ -2,7 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init +# ruff: noqa: UP035 """ This module contains the tests for the low-level SLIP functions and classes. @@ -44,7 +44,7 @@ def test_message_with_zero_byte_decoding(self) -> None: assert encode(msg) == packet @pytest.mark.parametrize( - "msg,packet", + ("msg", "packet"), [ (END, ESC + ESC_END), (ESC, ESC + ESC_ESC), @@ -89,7 +89,7 @@ def test_message_with_zero_byte_decoding(self) -> None: assert decode(packet) == msg @pytest.mark.parametrize( - "packet,msg", + ("packet", "msg"), [ (ESC + ESC_ESC, ESC), (ESC + ESC_END, END), diff --git a/tests/unit/test_sliprequesthandler.py b/tests/unit/test_sliprequesthandler.py index 26ad273..c30fbb2 100644 --- a/tests/unit/test_sliprequesthandler.py +++ b/tests/unit/test_sliprequesthandler.py @@ -2,7 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init +# ruff: noqa: UP035 """Tests for SlipRequestHandler""" @@ -12,10 +12,9 @@ from typing import Generator import pytest -from pytest import FixtureRequest from sliplib import END, SlipRequestHandler, SlipSocket -from sliplib.slipsocket import Address +from sliplib.slipsocket import TCPAddress class DummySlipRequestHandler(SlipRequestHandler): @@ -31,23 +30,19 @@ def handle(self) -> None: class TestSlipRequestHandler: """Tests for SlipRequestHandler.""" - @pytest.fixture( - autouse=True, params=[(AF_INET, ("127.0.0.1", 0)), (AF_INET6, ("::1", 0, 0, 0))] - ) - def setup(self, request: FixtureRequest) -> Generator[None, None, None]: + @pytest.fixture(autouse=True, params=[(AF_INET, ("127.0.0.1", 0)), (AF_INET6, ("::1", 0, 0, 0))]) + def setup(self, request: pytest.FixtureRequest) -> Generator[None, None, None]: """Prepare the test.""" self.family = request.param[0] self.bind_address = request.param[1] # Cannot use standard TCPServer, because that is hardcoded to IPv4 - self.server_class = type( - "SlipServer", (socketserver.TCPServer,), {"address_family": self.family} - ) + self.server_class = type("SlipServer", (socketserver.TCPServer,), {"address_family": self.family}) self.client_socket = socket(family=self.family) self.server_is_running = threading.Event() yield self.client_socket.close() - def server(self, bind_address: Address) -> None: + def server(self, bind_address: TCPAddress) -> None: """Create a server.""" srv = self.server_class(bind_address, DummySlipRequestHandler) self.server_address = srv.server_address diff --git a/tests/unit/test_slipsocket.py b/tests/unit/test_slipsocket.py index a39d1f7..f2a79f7 100644 --- a/tests/unit/test_slipsocket.py +++ b/tests/unit/test_slipsocket.py @@ -2,8 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init -# pylint: disable=too-many-public-methods +# ruff: noqa: UP035 """Tests for SlipSocket""" @@ -12,16 +11,13 @@ from typing import Generator import pytest -from pytest import FixtureRequest from pytest_mock import MockerFixture import sliplib from sliplib import END, ESC, ProtocolError, SlipSocket SOCKET_METHODS = [ - attr - for attr in dir(socket.socket) - if callable(getattr(socket.socket, attr)) and not attr.startswith("_") + attr for attr in dir(socket.socket) if callable(getattr(socket.socket, attr)) and not attr.startswith("_") ] EXPLICITLY_EXPOSED_SOCKET_METHODS = ( @@ -40,17 +36,14 @@ attr for attr in SOCKET_METHODS if not ( - attr.startswith("recv") - or attr.startswith("send") + attr.startswith(("recv", "send")) or attr in ("dup", "makefile", "share") or attr in EXPLICITLY_EXPOSED_SOCKET_METHODS ) ) NOT_DELEGATED_METHODS = tuple( - attr - for attr in SOCKET_METHODS - if attr not in DELEGATED_METHODS and attr not in EXPLICITLY_EXPOSED_SOCKET_METHODS + attr for attr in SOCKET_METHODS if attr not in DELEGATED_METHODS and attr not in EXPLICITLY_EXPOSED_SOCKET_METHODS ) @@ -77,9 +70,7 @@ class TestSlipSocket: ), ], ) - def setup( - self, request: FixtureRequest, mocker: MockerFixture - ) -> Generator[None, None, None]: + def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Generator[None, None, None]: """Prepare the test.""" self.family, self.far_address, self.near_address = request.param @@ -103,19 +94,15 @@ def test_slipsocket_instantiation(self) -> None: def test_slipsocket_requires_tcp_socket(self) -> None: """Test that non-TCP sockets are rejected.""" - self.sock_mock.configure_mock( - type=socket.SOCK_DGRAM - ) # pylint: disable=no-member - with pytest.raises(ValueError): + self.sock_mock.configure_mock(type=socket.SOCK_DGRAM) + with pytest.raises(ValueError, match="type SOCK_STREAM"): SlipSocket(self.sock_mock) def test_sending_data(self, mocker: MockerFixture) -> None: """Test that the sendall method on the socket is called when sending a message.""" self.slipsocket.send_msg(b"hallo") self.slipsocket.send_msg(b"bye") - self.sock_mock.sendall.assert_has_calls( - [mocker.call(END + b"hallo" + END), mocker.call(END + b"bye" + END)] - ) + self.sock_mock.sendall.assert_has_calls([mocker.call(END + b"hallo" + END), mocker.call(END + b"bye" + END)]) def test_receiving_data(self, mocker: MockerFixture) -> None: """Test that the recv method on the socket is called when receiving a message. @@ -132,7 +119,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) assert self.slipsocket.recv_msg() == b"hallo" # noinspection PyProtectedMember - chunk_size = sliplib.SlipSocket._chunk_size # pylint: disable=protected-access + chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 expected_calls = [mocker.call.recv(chunk_size)] * 2 self.sock_mock.recv.assert_has_calls(expected_calls) assert self.slipsocket.recv_msg() == b"bye" @@ -156,13 +143,11 @@ def socket_data_generator() -> Generator[bytes, None, None]: assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" assert self.slipsocket.recv_msg() == b"" - chunk_size = sliplib.SlipSocket._chunk_size # pylint: disable=protected-access + chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 expected_calls = [mocker.call.recv(chunk_size)] * 4 self.sock_mock.recv.assert_has_calls(expected_calls) - def test_exception_on_protocol_error_in_first_packet( - self, mocker: MockerFixture - ) -> None: + def test_exception_on_protocol_error_in_first_packet(self, mocker: MockerFixture) -> None: """Test that an invalid packet causes a ProtocolError. Packets after the invalid packet are handled correctly.""" @@ -177,9 +162,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" - def test_exception_on_protocol_error_in_subsequent_packet( - self, mocker: MockerFixture - ) -> None: + def test_exception_on_protocol_error_in_subsequent_packet(self, mocker: MockerFixture) -> None: """Test that an invalid packet causes a ProtocolError Packets before the invalid packet are decoded correctly.""" @@ -193,9 +176,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: self.slipsocket.recv_msg() assert exc.value.args == (ESC + b"error",) - def test_exceptions_on_consecutive_invalid_packets( - self, mocker: MockerFixture - ) -> None: + def test_exceptions_on_consecutive_invalid_packets(self, mocker: MockerFixture) -> None: """Test that multiple invalid packets result in a ProtocolError for each invalid packet.""" def socket_data_generator() -> Generator[bytes, None, None]: @@ -313,15 +294,11 @@ def test_create_connection(self, mocker: MockerFixture) -> None: type=socket.SOCK_STREAM, proto=0, ) # pylint: disable=no-member - create_connection_mock = mocker.patch( - "sliplib.socket.create_connection", return_value=new_sock_mock - ) + create_connection_mock = mocker.patch("sliplib.slipsocket.socket.create_connection", return_value=new_sock_mock) sock = SlipSocket.create_connection(self.far_address) assert isinstance(sock, SlipSocket) assert sock.socket is new_sock_mock - create_connection_mock.assert_called_once_with( - self.far_address[0:2], None, None - ) + create_connection_mock.assert_called_once_with(self.far_address[0:2], None, None) def test_slip_socket_iteration(self, mocker: MockerFixture) -> None: """Test that a SlipSocket can be iterated over.""" @@ -335,7 +312,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) expected = (b"hallo", b"bye") - for exp, act in zip(expected, self.slipsocket): + for exp, act in zip(expected, self.slipsocket): # noqa: B905 assert exp == act diff --git a/tests/unit/test_slipstream.py b/tests/unit/test_slipstream.py index 6ff6e05..49bda32 100644 --- a/tests/unit/test_slipstream.py +++ b/tests/unit/test_slipstream.py @@ -2,7 +2,7 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init +# ruff: noqa: UP006 UP035 """Tests for SlipStream.""" @@ -35,9 +35,7 @@ class TestSlipStreamBasics: def setup(self, mocker: MockerFixture) -> None: """Prepare the test.""" - self.stream_mock = mocker.Mock( - spec_set=("read", "write", "readable", "writable") - ) + self.stream_mock = mocker.Mock(spec_set=("read", "write", "readable", "writable")) self.stream_mock.read = mocker.Mock() self.stream_mock.write = mocker.Mock() self.slipstream = SlipStream(self.stream_mock) @@ -46,12 +44,8 @@ def test_slipstream_creation(self) -> None: """Verify the creation of the SlipStream instance.""" assert self.slipstream.stream is self.stream_mock - @pytest.mark.parametrize( - "rbl, wbl", [(True, True), (True, False), (False, True), (False, False)] - ) - def test_slipstream_readable_and_writable_attributes( - self, rbl: bool, wbl: bool - ) -> None: + @pytest.mark.parametrize(("rbl", "wbl"), [(True, True), (True, False), (False, True), (False, False)]) + def test_slipstream_readable_and_writable_attributes(self, rbl: bool, wbl: bool) -> None: # noqa: FBT001 """Verify the readable and writable attributes.""" self.stream_mock.configure_mock(readable=rbl, writable=wbl) assert self.slipstream.readable == rbl @@ -68,17 +62,12 @@ def test_slipstream_reading(self, mocker: MockerFixture) -> None: assert self.slipstream.recv_msg() == msg_list[1] # No more messages assert self.slipstream.recv_msg() == b"" - assert ( - self.stream_mock.read.mock_calls - == [mocker.call(io.DEFAULT_BUFFER_SIZE)] * 2 - ) + assert self.stream_mock.read.mock_calls == [mocker.call(io.DEFAULT_BUFFER_SIZE)] * 2 def test_slipstream_reading_single_bytes(self, mocker: MockerFixture) -> None: """Verify that receiving messages works when reading the packets byte for byte.""" msg_list = [b"hallo", b"bye"] - self.stream_mock.read.side_effect = list( - END + msg_list[0] + END + END + msg_list[1] + END - ) + [b""] + self.stream_mock.read.side_effect = [END, *msg_list[0], END, END, *msg_list[1], END, b""] self.slipstream = SlipStream(self.stream_mock, 1) assert self.slipstream.recv_msg() == msg_list[0] assert self.slipstream.recv_msg() == msg_list[1] @@ -107,11 +96,7 @@ def test_slipstream_writing_single_bytes(self, mocker: MockerFixture) -> None: for msg in msg_list: self.slipstream.send_msg(msg) encoded_messages = (END + msg_list[0] + END, END + msg_list[1] + END) - call_list = [ - mocker.call(enc_msg[i:]) - for enc_msg in encoded_messages - for i in range(len(enc_msg)) - ] + call_list = [mocker.call(enc_msg[i:]) for enc_msg in encoded_messages for i in range(len(enc_msg))] assert self.stream_mock.write.mock_calls == call_list def test_iterating_over_slipstream(self) -> None: @@ -141,23 +126,21 @@ def test_recovery_from_protocol_error(self) -> None: self.verify_error_recovery(msg_list) def test_recovery_from_protocol_error_with_unbuffered_reads(self) -> None: - """Verify error recover for unbuffered reads.""" + """Verify error recovery for unbuffered reads.""" msg_list = [b"hallo", b"bye"] - self.stream_mock.read.side_effect = list( - END + msg_list[0] + END + ESC + END + msg_list[1] + END - ) + [b""] + self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] self.slipstream = SlipStream(self.stream_mock, 1) self.verify_error_recovery(msg_list) def verify_error_recovery_during_iteration(self, msg_list: List[bytes]) -> None: """Helper function to verify error recovery during iteration.""" received_message = [] - with pytest.raises(ProtocolError): + with pytest.raises(ProtocolError): # noqa: PT012 for msg in self.slipstream: - received_message.append(msg) + received_message.append(msg) # noqa: PERF402 assert received_message == msg_list[:1] for msg in self.slipstream: - received_message.append(msg) + received_message.append(msg) # noqa: PERF402 assert received_message == msg_list def test_recovery_from_protocol_error_during_iteration(self) -> None: @@ -174,9 +157,7 @@ def test_recovery_from_protocol_error_during_iteration_with_unbuffered_reads( ) -> None: """Verify that error recovery works during iteration with unbuffered reads.""" msg_list = [b"hallo", b"bye"] - self.stream_mock.read.side_effect = list( - END + msg_list[0] + END + ESC + END + msg_list[1] + END - ) + [b""] + self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] self.slipstream = SlipStream(self.stream_mock, 1) self.verify_error_recovery_during_iteration(msg_list) @@ -187,11 +168,7 @@ def test_recovery_from_protocol_error_during_iteration_with_unbuffered_reads( # Use the io.BytesIO methods for testing NOT_DELEGATED_METHODS = ( [attr for attr in dir(io.BytesIO) if attr.startswith("read") and attr != "readable"] - + [ - attr - for attr in dir(io.BytesIO) - if attr.startswith("write") and attr != "writable" - ] + + [attr for attr in dir(io.BytesIO) if attr.startswith("write") and attr != "writable"] + [ "detach", "flushInput", diff --git a/tests/unit/test_slipwrapper.py b/tests/unit/test_slipwrapper.py index 9d0387c..88fbe77 100644 --- a/tests/unit/test_slipwrapper.py +++ b/tests/unit/test_slipwrapper.py @@ -18,9 +18,7 @@ class TestSlipWrapper: def setup(self) -> None: """Prepare the test.""" self.slipwrapper = SlipWrapper("not a valid byte stream") - self.subwrapper = type("SubSlipWrapper", (SlipWrapper,), {})( - None - ) # Dummy subclass without implementation + self.subwrapper = type("SubSlipWrapper", (SlipWrapper,), {})(None) # Dummy subclass without implementation def test_slip_wrapper_recv_msg_is_not_implemented(self) -> None: """Verify that calling recv_msg on a SlipWrapper calls that does not implement read_bytes fails."""