diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 34e4217..82d778b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,7 +38,7 @@ jobs: run: hatch run style:check - name: Check types - run: hatch run types.py${{ matrix.python-version }}:check + run: hatch run types:check - name: Run static analysis run: hatch fmt --check diff --git a/examples/echoserver/__init__.py b/examples/echoserver/__init__.py index c062961..ac28400 100644 --- a/examples/echoserver/__init__.py +++ b/examples/echoserver/__init__.py @@ -56,7 +56,7 @@ .. code:: bash - $ python server_ipv6.py + $ python server.py Slip server listening on localhost, port 59454 Incoming connection from ('127.0.0.1', 59458) Raw data received: b'\\xc0hallo\\xc0' diff --git a/examples/echoserver/client.py b/examples/echoserver/client.py index d5e734b..ad3d9ae 100644 --- a/examples/echoserver/client.py +++ b/examples/echoserver/client.py @@ -20,7 +20,8 @@ import sliplib -if __name__ == "__main__": + +def main() -> None: if len(sys.argv) != 2: # noqa: PLR2004 print("Usage: python client.py ") sys.exit(1) @@ -37,3 +38,7 @@ sock.send_msg(b_message) b_reply = sock.recv_msg() print("Response:", b_reply) + + +if __name__ == "__main__": + main() diff --git a/examples/echoserver/server.py b/examples/echoserver/server.py index 3721dde..1db66fa 100644 --- a/examples/echoserver/server.py +++ b/examples/echoserver/server.py @@ -13,7 +13,7 @@ the raw data that is received and sent. The request handler prints the decoded message, and then reverses the order of the bytes in the encoded message -(so ``abc`` becomes ``cab``), +(so ``abc`` becomes ``cba``), and sends it back to the client. """ @@ -24,44 +24,34 @@ import socket import sys from socketserver import TCPServer -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: - if sys.version_info >= (3, 12): # noqa: UP036 - from collections.abc import Buffer - else: - from typing_extensions import Buffer - -from _socket import dup - -from sliplib import SlipRequestHandler +from typing import Any +from sliplib import SlipRequestHandler, SlipSocket -class _ChattySocket(socket.socket): - """A socket subclass that prints the raw data that is received and sent.""" - def __init__(self, sock: socket.socket) -> None: - fd = dup(sock.fileno()) - super().__init__(sock.family, sock.type, sock.proto, fileno=fd) - super().settimeout(sock.gettimeout()) +class _ChattySocket(SlipSocket): + """A SlipSocket subclass that prints the raw data that is received and sent.""" - def recv(self, chunksize: int, *args: Any) -> bytes: - data = super().recv(chunksize, *args) + def recv_bytes(self) -> bytes: + data = super().recv_bytes() print("Raw data received:", data) return data - def sendall(self, data: Buffer, *args: Any) -> None: + def send_bytes(self, data: bytes) -> None: print("Sending raw data:", data) - super().sendall(data, *args) + super().send_bytes(data) class SlipHandler(SlipRequestHandler): """A SlipRequestHandler that echoes the received message with the bytes in reversed order.""" - def setup(self) -> None: - self.request = _ChattySocket(self.request) - print(f"Incoming connection from {self.request.getpeername()}") - super().setup() + def __init__(self, request: socket.socket | SlipSocket, *args: Any) -> None: + if isinstance(request, SlipSocket): + request.__class__ = _ChattySocket + else: + request = _ChattySocket(request) + print(f"Incoming connection from {request.getpeername()}") + super().__init__(request, *args) # Dedicated handler to show the encoded bytes. def handle(self) -> None: @@ -81,10 +71,14 @@ class TCPServerIPv6(TCPServer): address_family = socket.AF_INET6 -if __name__ == "__main__": +def main() -> None: if len(sys.argv) > 1 and sys.argv[1].lower() == "ipv6": server = TCPServerIPv6(("localhost", 0), SlipHandler) # type: TCPServer else: server = TCPServer(("localhost", 0), SlipHandler) print("Slip server listening on localhost, port", server.server_address[1]) server.handle_request() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 3efcb65..2c21e69 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ python = "3.12" dependencies = [ "pytest", "pytest-mock", + "semantic-version-check", ] [[tool.hatch.envs.test.matrix]] @@ -61,13 +62,11 @@ cov = [ "cov-report", ] -[[tool.hatch.envs.types.matrix]] -python = ["3.8", "3.9", "3.10", "3.11", "3.12"] - [tool.hatch.envs.types] +python = "3.12" extra-dependencies = [ "mypy>=1.0.0", - "typing_extensions" + "typing_extensions", ] [tool.hatch.envs.types.scripts] @@ -97,13 +96,17 @@ fix = [ source_pkgs = ["sliplib", "tests"] branch = true parallel = true +concurrency = [ + "multiprocessing", + "thread", +] omit = [ "src/sliplib/__about__.py", ] [tool.coverage.paths] -sliplib = ["src/sliplib", "*/SlipLib/src/sliplib"] -tests = ["tests", "*/SlipLib/tests"] +sliplib = ["src/sliplib"] +tests = ["tests"] [tool.coverage.report] show_missing = true @@ -112,6 +115,7 @@ exclude_lines = [ "if __name__ == .__main__.:", "if TYPE_CHECKING:", ] +fail_under = 100 [tool.hatch.build.targets.sdist] include = [ @@ -136,5 +140,5 @@ line-length = 120 profile = "black" line_length = 120 -[tool.ruff] -target-version = "py312" +[tool.mypy] +python_version = "3.8" diff --git a/src/sliplib/__init__.py b/src/sliplib/__init__.py index 293dc8b..09661d0 100644 --- a/src/sliplib/__init__.py +++ b/src/sliplib/__init__.py @@ -67,7 +67,7 @@ .. automodule:: sliplib.slipwrapper .. automodule:: sliplib.slipstream .. automodule:: sliplib.slipsocket -.. automodule:: sliplib.sliprequesthandler +.. automodule:: sliplib.slipserver Exceptions ---------- @@ -77,7 +77,7 @@ """ from sliplib.slip import END, ESC, ESC_END, ESC_ESC, Driver, ProtocolError, decode, encode, is_valid -from sliplib.sliprequesthandler import SlipRequestHandler +from sliplib.slipserver import SlipRequestHandler, SlipServer from sliplib.slipsocket import SlipSocket from sliplib.slipstream import SlipStream from sliplib.slipwrapper import SlipWrapper @@ -90,6 +90,7 @@ "SlipWrapper", "SlipSocket", "SlipRequestHandler", + "SlipServer", "SlipStream", "ProtocolError", "END", diff --git a/src/sliplib/slip.py b/src/sliplib/slip.py index af0b64a..280bb3e 100644 --- a/src/sliplib/slip.py +++ b/src/sliplib/slip.py @@ -226,7 +226,7 @@ def flush(self) -> list[bytes]: try: msg = decode(packet) except ProtocolError: - # Add any already decoded messages to the exception + # Add any already decoded messages to the internal message buffer self._messages = messages raise messages.append(msg) diff --git a/src/sliplib/sliprequesthandler.py b/src/sliplib/sliprequesthandler.py deleted file mode 100644 index 04d065d..0000000 --- a/src/sliplib/sliprequesthandler.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - -""" -SlipRequestHandler ------------------- - -.. autoclass:: SlipRequestHandler - :show-inheritance: - - .. automethod:: setup - .. automethod:: handle - .. automethod:: finish -""" -from __future__ import annotations - -from socketserver import BaseRequestHandler - -from sliplib.slipsocket import SlipSocket - - -class SlipRequestHandler(BaseRequestHandler): - """Base class for request handlers for SLIP-based communication - - This class is derived from :class:`socketserver.BaseRequestHandler` - for the purpose of creating TCP server instances - that can handle incoming SLIP-based connections. - - To do anything useful, a derived class must define - a new :meth:`handle` method that uses - :attr:`self.request` to send and receive SLIP-encoded messages. - - Other methods can of course also be overridden if necessary. - """ - - def setup(self) -> None: - """Initializes the request handler. - - The original socket (available via :code:`self.request`) - is wrapped in a :class:`SlipSocket` object. - Derived classes may override this method, - but should call ``super().setup()`` before - accessing any :class:`SlipSocket` - methods or attributes on :code:`self.request`. - """ - if not isinstance(self.request, SlipSocket): - # noinspection PyTypeChecker - self.request = SlipSocket(self.request) - - def handle(self) -> None: - """Services the request. Must be defined by a derived class. - - Note that in general it does not make sense - to use a :class:`SlipRequestHandler` object - to handle a single transmission, as is e.g. common with HTTP. - The purpose of the SLIP protocol is to allow separation of - messages in a continuous byte stream. - As such, it is expected that the :meth:`handle` method of a derived class - is capable of handling multiple SLIP messages, for example: - - .. code:: - - def handle(self): - while True: - msg = self.request.recv_msg() - if msg == b"": - break - # Do something with the message - """ - - def finish(self) -> None: - """Performs any cleanup actions. - - The default implementation does nothing. - """ diff --git a/src/sliplib/slipserver.py b/src/sliplib/slipserver.py new file mode 100644 index 0000000..1b44a42 --- /dev/null +++ b/src/sliplib/slipserver.py @@ -0,0 +1,131 @@ +# Copyright (c) 2020. Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + +""" +SlipRequestHandler +------------------ + +.. autoclass:: SlipRequestHandler + :show-inheritance: + + .. automethod:: handle + .. automethod:: finish + +.. autoclass:: SlipServer + :show-inheritance: + +""" +from __future__ import annotations + +import socket +from socketserver import BaseRequestHandler, TCPServer +from typing import cast + +from sliplib.slipsocket import SlipSocket, TCPAddress + + +class SlipRequestHandler(BaseRequestHandler): + """Base class for request handlers for SLIP-based communication + + This class is derived from :class:`socketserver.BaseRequestHandler` + for the purpose of creating TCP server instances + that can handle incoming SLIP-based connections. + + To do anything useful, a derived class must define + a new :meth:`handle` method that uses + :attr:`self.request` to send and receive SLIP-encoded messages. + + Other methods can of course also be overridden if necessary. + """ + + def __init__(self, request: socket.socket | SlipSocket, client_address: TCPAddress, server: TCPServer): + """Initializes the request handler. + + The type of the :attr:`request` parameter depends on the type of server + that instantiates the request handler. + If the server is a SlipServer, then :attr:`request` is a SlipSocket. + Otherwise, it is a regular socket, and must be wrapped in a SlipSocket + before it can be used. + + Args: + request: + The socket that is connected to the remote party. + + client_address: + The remote TCP addresss. + + server: + The server instance that instantiated this handler object. + """ + if server.socket_type != socket.SOCK_STREAM: + message = ( + f"{self.__class__.__name__} instance can only be used " + f"with a TCP server (got {server.__class__.__name__})" + ) + raise TypeError(message) + + if not isinstance(request, SlipSocket): + request = SlipSocket(request) + super().__init__(cast(socket.socket, request), client_address, server) + + def handle(self) -> None: + """Services the request. Must be defined by a derived class. + + Note that in general it does not make sense + to use a :class:`SlipRequestHandler` object + to handle a single transmission, as is e.g. common with HTTP. + The purpose of the SLIP protocol is to allow separation of + messages in a continuous byte stream. + As such, it is expected that the :meth:`handle` method of a derived class + is capable of handling multiple SLIP messages, for example: + + .. code:: + + def handle(self): + while True: + msg = self.request.recv_msg() + if msg == b"": + break + # Do something with the message + """ + + def finish(self) -> None: + """Performs any cleanup actions. + + The default implementation does nothing. + """ + + +class SlipServer(TCPServer): + """Base class for SlipSocket based server classes. + + This is a convenience class, that offers a minor enhancement + over the regular :class:`TCPServer` from the standard library. + The class :class:`TCPServer` is hardcoded to use only IPv4 addresses. It must be subclassed + in order to use IPv6 addresses. + The :class:`SlipServer` class uses the address that is provided during instantiation + to determine if it must muse an IPv4 or IPv6 socket. + """ + + def __init__( + self, + server_address: TCPAddress, + handler_class: type[SlipRequestHandler], + bind_and_activate: bool = True, # noqa: FBT001 FBT002 + ): + """ + + Args: + server_address: The address on which the server listens + handler_class: The class that will be instantiated to handle an incoming request + bind_and_activate: Flag to indicate if the server must be bound and activated at creation time. + """ + if self._is_ipv6_address(server_address): + self.address_family = socket.AF_INET6 + super().__init__(server_address[0:2], handler_class, bind_and_activate) + self.socket = cast(socket.socket, SlipSocket(self.socket)) + + @staticmethod + def _is_ipv6_address(server_address: TCPAddress) -> bool: + return ":" in server_address[0] diff --git a/src/sliplib/slipsocket.py b/src/sliplib/slipsocket.py index 6b1ba50..fc2222b 100644 --- a/src/sliplib/slipsocket.py +++ b/src/sliplib/slipsocket.py @@ -2,7 +2,6 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP007 UP006 UP035 """ SlipSocket ---------- @@ -35,9 +34,12 @@ .. automethod:: close .. automethod:: connect .. automethod:: connect_ex + .. automethod:: fileno .. automethod:: getpeername .. automethod:: getsockname + .. automethod:: getsockopt .. automethod:: listen([backlog]) + .. automethod:: setsockopt .. automethod:: shutdown Since the wrapped socket is available as the :attr:`socket` attribute, @@ -66,13 +68,13 @@ import socket import warnings -from typing import Any, Optional, Tuple, Union, cast +from typing import Any, Tuple, Union, cast from sliplib.slipwrapper import SlipWrapper #: TCPAddress stands for either an IPv4 address, i.e. a (host, port) tuple, #: or an IPv6 address, i.e. a (host, port, flowinfo, scope_id) tuple. -TCPAddress = Union[Tuple[Optional[str], int], Tuple[Optional[str], int, int, int]] +TCPAddress = Union[Tuple[str, int], Tuple[str, int, int, int]] class SlipSocket(SlipWrapper[socket.socket]): @@ -119,7 +121,6 @@ class SlipSocket(SlipWrapper[socket.socket]): _chunk_size = 4096 def __init__(self, sock: socket.SocketType): - # pylint: disable=missing-raises-doc """ To instantiate a :class:`SlipSocket`, the user must provide a pre-constructed TCP `socket`. @@ -149,7 +150,7 @@ def recv_bytes(self) -> bytes: """See base class""" return self.socket.recv(self._chunk_size) - def accept(self) -> Tuple[SlipSocket, TCPAddress]: + def accept(self) -> tuple[SlipSocket, TCPAddress]: """Accepts an incoming connection. Returns: @@ -188,6 +189,14 @@ def connect_ex(self, address: TCPAddress) -> None: """ self.socket.connect_ex(address) + def fileno(self) -> int: + """Get the socket's file descriptor. + + Returns: + The wrapped socket's file descriptor, or -1 on failure. + """ + return self.socket.fileno() + def getpeername(self) -> TCPAddress: """Get the IP address of the remote socket to which `SlipSocket` is connected. @@ -204,6 +213,22 @@ def getsockname(self) -> TCPAddress: """ return cast(TCPAddress, self.socket.getsockname()) + def getsockopt(self, *args: Any) -> int | bytes: + """Get the socket option from the embedded socket. + + Returns: + The integer or bytes representing the value of the socket option. + """ + return self.socket.getsockopt(*args) + + def gettimeout(self) -> float | None: + """Get the socket option from the embedded socket. + + Returns: + The integer or bytes representing the value of the socket option. + """ + return self.socket.gettimeout() + def listen(self, backlog: int | None = None) -> None: """Enable a `SlipSocket` server to accept connections. @@ -215,6 +240,14 @@ def listen(self, backlog: int | None = None) -> None: else: self.socket.listen(backlog) + def setsockopt(self, *args: Any) -> None: + """Get the socket option from the embedded socket. + + Returns: + The integer or bytes representing the value of the socket option. + """ + return self.socket.setsockopt(*args) + def shutdown(self, how: int) -> None: """Shutdown the connection. diff --git a/src/sliplib/slipstream.py b/src/sliplib/slipstream.py index 85773f8..80de1ea 100644 --- a/src/sliplib/slipstream.py +++ b/src/sliplib/slipstream.py @@ -14,6 +14,7 @@ A :class:`SlipStream` instance has the following attributes in addition to the attributes offered by its base class :class:`SlipWrapper`: + .. autoattribute:: chunk_size .. autoattribute:: readable .. autoattribute:: writable """ @@ -99,7 +100,7 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): Args: stream: The byte stream that will be wrapped. - chunk_size: the number of bytes to read per read operation. + chunk_size: The number of bytes to read per read operation. The default value for `chunck_size` is `io.DEFAULT_BUFFER_SIZE`. Setting the `chunk_size` is useful when the stream has a low bandwidth @@ -128,7 +129,10 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): if hasattr(stream, "encoding"): error_msg = f"{stream.__class__.__name__} object is not a byte stream" raise TypeError(error_msg) - self._chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + + #: The number of bytes to read during each read operation. + self.chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + super().__init__(stream) def send_bytes(self, packet: bytes) -> None: @@ -139,7 +143,7 @@ def send_bytes(self, packet: bytes) -> None: def recv_bytes(self) -> bytes: """See base class""" - return b"" if self._stream_is_closed else self.stream.read(self._chunk_size) + return b"" if self._stream_is_closed else self.stream.read(self.chunk_size) @property def readable(self) -> bool: diff --git a/tests/examples/__init__.py b/tests/examples/__init__.py new file mode 100644 index 0000000..ef97db1 --- /dev/null +++ b/tests/examples/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/examples/echoserver/__init__.py b/tests/examples/echoserver/__init__.py new file mode 100644 index 0000000..ef97db1 --- /dev/null +++ b/tests/examples/echoserver/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/examples/echoserver/test_echo_server.py b/tests/examples/echoserver/test_echo_server.py new file mode 100644 index 0000000..37db3b6 --- /dev/null +++ b/tests/examples/echoserver/test_echo_server.py @@ -0,0 +1,126 @@ +# Copyright (c) 2020. Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + + +""" +This module tests SlipSocket using a SLIP echo server, similar to the one in the examples directory. +""" +from __future__ import annotations + +import pathlib +import re +import sys +import threading +from queue import Empty, Queue +from subprocess import PIPE, Popen +from typing import Generator + +import pytest + +import sliplib + + +class TestEchoServer: + def output_reader(self, proc: Popen[str], output_queue: Queue[str]) -> None: + for line in iter(proc.stdout.readline, ""): # type: ignore[union-attr] + output_queue.put(line) + + def get_server_output(self) -> str: + try: + output: str = self.server_queue.get(timeout=5) + except Empty: # no cov + pytest.fail("No output from server") + return output.strip() + + def get_client_output(self) -> str: + try: + output: str = self.client_queue.get(timeout=5) + except Empty: # no cov + pytest.fail("No output from client") + return output.strip() + + def write_client_input(self, msg: str) -> None: + self.client.stdin.write(msg + "\n") # type: ignore[union-attr] + self.client.stdin.flush() # type: ignore[union-attr] + + @pytest.fixture(autouse=True) + def setup(self) -> Generator[None, None, None]: + echoserver_directory = pathlib.Path(sliplib.__file__).parents[2] / "examples" / "echoserver" + self.python = sys.executable + self.server_script = str(echoserver_directory / "server.py") + self.client_script = str(echoserver_directory / "client.py") + self.server: Popen[str] | None = None + self.client: Popen[str] | None = None + self.server_queue: Queue[str] = Queue() + self.client_queue: Queue[str] = Queue() + yield + if self.server and self.server.returncode is None: # no cov + self.server.terminate() + if self.client and self.client.returncode is None: # no cov + self.client.terminate() + + @pytest.mark.parametrize("arg", ["", "ipv6"]) + def test_server_and_client(self, arg: str) -> None: + server_command = [self.python, "-u", self.server_script] + if arg: + server_command.append(arg) + self.server = Popen(server_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True, bufsize=1) + server_output_reader = threading.Thread(target=self.output_reader, args=(self.server, self.server_queue)) + server_output_reader.start() + server_output = self.get_server_output() + m = re.match(r"Slip server listening on localhost, port (\d+)", server_output) + assert m is not None + server_port = m.group(1) + + client_command = [self.python, "-u", self.client_script, server_port] + self.client = Popen(client_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True, bufsize=1) + client_output_reader = threading.Thread(target=self.output_reader, args=(self.client, self.client_queue)) + client_output_reader.start() + client_output = self.get_client_output() + assert client_output == f"Connecting to server on port {server_port}" + + server_output = self.get_server_output() + message = f"Incoming connection from ('{'::1' if arg else '127.0.0.1'}'" + assert server_output.startswith(message) + + client_output = self.get_client_output() + message = f"Connected to ('{'::1' if arg else '127.0.0.1'}'" + assert client_output.startswith(message) + + self.write_client_input("hallo") + server_output = self.get_server_output() + assert server_output == r"Raw data received: b'\xc0hallo\xc0'" + server_output = self.get_server_output() + assert server_output == "Decoded data: b'hallo'" + server_output = self.get_server_output() + assert server_output == r"Sending raw data: b'\xc0ollah\xc0'" + client_output = self.get_client_output() + assert client_output == "Message>Response: b'ollah'" + + self.write_client_input("bye") + server_output = self.get_server_output() + assert server_output == r"Raw data received: b'\xc0bye\xc0'" + server_output = self.get_server_output() + assert server_output == "Decoded data: b'bye'" + server_output = self.get_server_output() + assert server_output == r"Sending raw data: b'\xc0eyb\xc0'" + client_output = self.get_client_output() + assert client_output == "Message>Response: b'eyb'" + + self.write_client_input("") + server_output = self.get_server_output() + assert server_output == "Raw data received: b''" + server_output = self.get_server_output() + assert server_output == "Decoded data: b''" + server_output = self.get_server_output() + assert server_output == "Closing down" + client_output = self.get_client_output() + assert client_output == "Message>" + + assert self.server_queue.empty() + assert self.client_queue.empty() + self.server.wait(2) + self.client.wait(2) + assert self.server.returncode == 0 + assert self.client.returncode == 0 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 45bdcb3..b74e5d8 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,3 +1,3 @@ -# Copyright (c) 2020. Ruud de Jong +# Copyright (c) 2024. Ruud de Jong # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/integration/test_data.py b/tests/integration/test_data.py deleted file mode 100644 index 877cb8d..0000000 --- a/tests/integration/test_data.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - -# pylint: disable=attribute-defined-outside-init -# pylint: disable=too-few-public-methods - -"""Common data for file-related tests""" -import pathlib - -import pytest - -data = [ - b"line 1", - b"line with embedded\nnewline", - b"last line", -] - - -class BaseFileTest: - """Base class for filebased SLIP tests.""" - - @pytest.fixture(autouse=True) - def setup(self, tmp_path: pathlib.Path) -> None: - """Prepare the test.""" - testdir = tmp_path / "slip" - testdir.mkdir() - self.filepath = testdir / "read.txt" diff --git a/tests/integration/test_server_client.py b/tests/integration/test_server_client.py deleted file mode 100644 index 0d99c92..0000000 --- a/tests/integration/test_server_client.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - -# ruff: noqa: UP006 UP035 - -""" -This module tests SlipSocket using a SLIP echo server, similar to the one in the examples directory. -""" - -import socket -from multiprocessing import Pipe, Process -from multiprocessing.connection import Connection -from socketserver import TCPServer -from typing import Generator, Mapping, Tuple, Type - -import pytest - -from sliplib import SlipRequestHandler, SlipSocket -from sliplib.slipsocket import TCPAddress - - -class SlipEchoHandler(SlipRequestHandler): - """SLIP request handler that echoes the received message, but with the bytes in reversed order.""" - - def handle(self) -> None: - while True: - message = self.request.recv_msg() - if not message: - break - # Reverse the order of the bytes and send it back. - data_to_send = bytes(reversed(message)) - self.request.send_msg(data_to_send) - - -class SlipEchoServer: - """Execution helper for the echo server. Sends the server address back over the pipe.""" - - server_data: Mapping[int, Tuple[Type[TCPServer], str]] = { - socket.AF_INET: (TCPServer, "127.0.0.1"), - socket.AF_INET6: ( - type("TCPServerIPv6", (TCPServer,), {"address_family": socket.AF_INET6}), - "::1", - ), - } - - def __init__(self, address_family: int, pipe: Connection) -> None: - server_class, localhost = self.server_data[address_family] - self.server = server_class((localhost, 0), SlipEchoHandler) - pipe.send(self.server.server_address) - self.server.handle_request() - - -class SlipEchoClient: - """Client for the SLIP echo server""" - - def __init__(self, address: TCPAddress) -> None: - self.sock = SlipSocket.create_connection(address) - - def echo(self, msg: bytes) -> bytes: - """Send message to the SLIP server and returns the response.""" - self.sock.send_msg(msg) - return self.sock.recv_msg() - - def close(self) -> None: - """Close the SLIP socket""" - self.sock.close() - - -class TestEchoServer: - """Test for the SLIP echo server""" - - @pytest.fixture(autouse=True, params=[socket.AF_INET, socket.AF_INET6]) - def setup(self, request: pytest.FixtureRequest, capfd: pytest.CaptureFixture[str]) -> Generator[None, None, None]: - """Prepare the server and client""" - near, far = Pipe() - address_family = request.param - self.server = Process(target=SlipEchoServer, args=(address_family, far)) - self.server.start() - address_available = near.poll(1.5) # AppVeyor sometimes takes a long time to run the server. - if address_available: - server_address = near.recv() - else: - captured = capfd.readouterr() - pytest.fail(captured.err) - self.client = SlipEchoClient(server_address) - yield - self.client.close() - self.server.join() - - def test_echo_server(self) -> None: - """Test the echo server""" - data = [(b"hallo", b"ollah"), (b"goodbye", b"eybdoog")] - for snd_msg, expected_reply in data: - assert self.client.echo(snd_msg) == expected_reply diff --git a/tests/integration/test_slip_file.py b/tests/integration/test_slip_file.py index 7117d02..4b47962 100644 --- a/tests/integration/test_slip_file.py +++ b/tests/integration/test_slip_file.py @@ -4,28 +4,73 @@ """Test using SlipStream with a buffered file""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator + +if TYPE_CHECKING: + import pathlib + +import pytest + from sliplib import SlipStream, encode -from .test_data import BaseFileTest, data +data = [ + b"line 1", + b"line with embedded\nnewline", + b"last line", +] + + +@pytest.fixture +def scrap_file(tmp_path: pathlib.Path) -> Generator[pathlib.Path, None, None]: + testdir = tmp_path / "slip" + testdir.mkdir() + filepath = testdir / "data.txt" + yield filepath + filepath.unlink() + + +@pytest.fixture +def readfile(scrap_file: pathlib.Path) -> pathlib.Path: + scrap_file.write_bytes(b"".join(encode(msg) for msg in data)) + return scrap_file + + +@pytest.fixture +def writefile(scrap_file: pathlib.Path) -> pathlib.Path: + return scrap_file + + +def test_reading_slip_file(readfile: pathlib.Path) -> None: + """Test reading encoded SLIP messages""" + with readfile.open(mode="rb") as f: + slipstream = SlipStream(f) + for expected, actual in zip(data, slipstream): + assert expected == actual -class TestBufferedFileAccess(BaseFileTest): - """Test buffered SLIP file access""" +def test_writing_slip_file(writefile: pathlib.Path) -> None: + """Test writing encoded SLIP messages""" + with writefile.open(mode="wb") as f: + slipstream = SlipStream(f) + for msg in data: + slipstream.send_msg(msg) + assert writefile.read_bytes() == b"".join(encode(msg) for msg in data) - def test_reading_slip_file(self) -> None: - """Test reading encoded SLIP messages""" - self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) - with self.filepath.open(mode="rb") as f: - slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): # noqa: B905 - assert exp == act +def test_reading_slip_file_unbuffered(readfile: pathlib.Path) -> None: + """Test reading SLIP-encoded message""" + with readfile.open(mode="rb", buffering=0) as f: + slipstream = SlipStream(f) + for expected, actual in zip(data, slipstream): + assert expected == actual - def test_writing_slip_file(self) -> None: - """Test writing encoded SLIP messages""" - with self.filepath.open(mode="wb") as f: - slipstream = SlipStream(f) - for msg in data: - slipstream.send_msg(msg) - assert self.filepath.read_bytes() == b"".join(encode(msg) for msg in data) +def test_writing_slip_file_unbuffered(writefile: pathlib.Path) -> None: + """Test writing SLIP-encoded messages""" + with writefile.open(mode="wb", buffering=0) as f: + slipstream = SlipStream(f) + for msg in data: + slipstream.send_msg(msg) + assert writefile.read_bytes() == b"".join(encode(msg) for msg in data) diff --git a/tests/integration/test_slip_file_unbuffered.py b/tests/integration/test_slip_file_unbuffered.py deleted file mode 100644 index dd0be49..0000000 --- a/tests/integration/test_slip_file_unbuffered.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - -# pylint: disable=relative-beyond-top-level - -"""Test using SlipStream with an unbuffered file""" - -from sliplib import SlipStream, encode - -from .test_data import BaseFileTest, data - - -class TestUnbufferedFileAccess(BaseFileTest): - """Test unbuffered SLIP file access.""" - - def test_reading_slip_file(self) -> None: - """Test reading SLIP-encoded message""" - - self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) - with self.filepath.open(mode="rb", buffering=0) as f: - slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): # noqa: B905 - assert exp == act - - def test_writing_slip_file(self) -> None: - """Test writing SLIP-encoded messages""" - - with self.filepath.open(mode="wb", buffering=0) as f: - slipstream = SlipStream(f) - for msg in data: - slipstream.send_msg(msg) - assert self.filepath.read_bytes() == b"".join(encode(msg) for msg in data) diff --git a/tests/integration/test_slip_in_memory_io.py b/tests/integration/test_slip_in_memory_io.py index ee22ec8..c725975 100644 --- a/tests/integration/test_slip_in_memory_io.py +++ b/tests/integration/test_slip_in_memory_io.py @@ -2,7 +2,6 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP035 """Test using SlipStream with an in-memory bytestream""" @@ -17,41 +16,41 @@ class TestSlipStreamWithBytesIO: """Test SlipStream with BytesIO.""" - @pytest.fixture(autouse=True) - def setup(self) -> Generator[None, None, None]: - """Setup the test""" - self.basestream = io.BytesIO() - self.slipstream = SlipStream(self.basestream) - yield - self.basestream.close() + msg_list = (b"hallo", b"bye") - def test_stream_reading(self) -> None: - """Test reading from the bytestream""" + @pytest.fixture + def empty_stream(self) -> Generator[io.BytesIO, None, None]: + stream = io.BytesIO() + yield stream + stream.close() + + @pytest.fixture + def filled_stream(self, empty_stream: io.BytesIO) -> io.BytesIO: + empty_stream.seek(0) + empty_stream.write(END + self.msg_list[0] + END + END + self.msg_list[1] + END) + empty_stream.seek(0) + return empty_stream - msg_list = [b"hallo", b"bye"] - self.basestream.write(END + msg_list[0] + END + END + msg_list[1] + END) - self.basestream.seek(0) - assert self.slipstream.recv_msg() == msg_list[0] - assert self.slipstream.recv_msg() == msg_list[1] + def test_stream_reading(self, filled_stream: io.BytesIO) -> None: + """Test reading from the bytestream""" + slipstream = SlipStream(filled_stream) + assert slipstream.recv_msg() == self.msg_list[0] + assert slipstream.recv_msg() == self.msg_list[1] # No more messages - assert self.slipstream.recv_msg() == b"" + assert slipstream.recv_msg() == b"" - def test_stream_reading_single_bytes(self) -> None: + def test_stream_reading_single_bytes(self, filled_stream: io.BytesIO) -> None: """Test reading single bytes from the bytestream""" - msg_list = [b"hallo", b"bye"] - self.basestream.write(END + msg_list[0] + END + END + msg_list[1] + END) - self.basestream.seek(0) - self.slipstream = SlipStream(self.basestream, 1) - assert self.slipstream.recv_msg() == msg_list[0] - assert self.slipstream.recv_msg() == msg_list[1] + slipstream = SlipStream(filled_stream, 1) + assert slipstream.recv_msg() == self.msg_list[0] + assert slipstream.recv_msg() == self.msg_list[1] # No more messages - assert self.slipstream.recv_msg() == b"" + assert slipstream.recv_msg() == b"" - def test_stream_writing(self) -> None: + def test_stream_writing(self, empty_stream: io.BytesIO) -> None: """Test writing to the bytestream""" - - msg_list = [b"hallo", b"bye"] - for msg in msg_list: - self.slipstream.send_msg(msg) - assert self.basestream.getvalue() == END + msg_list[0] + END + END + msg_list[1] + END + slipstream = SlipStream(empty_stream) + for msg in self.msg_list: + slipstream.send_msg(msg) + assert empty_stream.getvalue() == END + self.msg_list[0] + END + END + self.msg_list[1] + END diff --git a/tests/unit/test_sliprequesthandler.py b/tests/integration/test_slip_server_and_handler.py similarity index 66% rename from tests/unit/test_sliprequesthandler.py rename to tests/integration/test_slip_server_and_handler.py index c30fbb2..df2faf1 100644 --- a/tests/unit/test_sliprequesthandler.py +++ b/tests/integration/test_slip_server_and_handler.py @@ -2,19 +2,20 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP035 """Tests for SlipRequestHandler""" +from __future__ import annotations -import socketserver import threading -from socket import AF_INET, AF_INET6, socket # pylint: disable=no-name-in-module -from typing import Generator +from socket import AF_INET, AF_INET6, socket +from typing import TYPE_CHECKING, Generator import pytest -from sliplib import END, SlipRequestHandler, SlipSocket -from sliplib.slipsocket import TCPAddress +from sliplib import END, SlipRequestHandler, SlipServer, SlipSocket + +if TYPE_CHECKING: + from sliplib.slipsocket import TCPAddress class DummySlipRequestHandler(SlipRequestHandler): @@ -27,31 +28,37 @@ def handle(self) -> None: self.request.send_msg(bytes(reversed(msg))) +@pytest.mark.parametrize( + ("family", "address"), + ( + (AF_INET, ("127.0.0.1", 0)), + (AF_INET6, ("::1", 0, 0, 0)), + ), +) class TestSlipRequestHandler: """Tests for SlipRequestHandler.""" - @pytest.fixture(autouse=True, params=[(AF_INET, ("127.0.0.1", 0)), (AF_INET6, ("::1", 0, 0, 0))]) - def setup(self, request: pytest.FixtureRequest) -> Generator[None, None, None]: + @pytest.fixture(autouse=True) + def setup(self, family: int, address: TCPAddress) -> Generator[None, None, None]: """Prepare the test.""" - self.family = request.param[0] - self.bind_address = request.param[1] + self.family = family + self.bind_address = address # Cannot use standard TCPServer, because that is hardcoded to IPv4 - self.server_class = type("SlipServer", (socketserver.TCPServer,), {"address_family": self.family}) self.client_socket = socket(family=self.family) self.server_is_running = threading.Event() yield self.client_socket.close() - def server(self, bind_address: TCPAddress) -> None: + def server(self, bind_address: TCPAddress, request_handler_class: type[SlipRequestHandler]) -> None: """Create a server.""" - srv = self.server_class(bind_address, DummySlipRequestHandler) + srv = SlipServer(bind_address, request_handler_class) self.server_address = srv.server_address self.server_is_running.set() srv.handle_request() def test_working_of_sliprequesthandler(self) -> None: """Verify that the server returns the message with the bytes in reversed order.""" - server_thread = threading.Thread(target=self.server, args=(self.bind_address,)) + server_thread = threading.Thread(target=self.server, args=(self.bind_address, DummySlipRequestHandler)) server_thread.start() self.server_is_running.wait(0.5) self.client_socket.connect(self.server_address) diff --git a/tests/unit/test_slip.py b/tests/unit/test_slip.py index 9ae2647..a50c6d8 100644 --- a/tests/unit/test_slip.py +++ b/tests/unit/test_slip.py @@ -2,7 +2,6 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP035 """ This module contains the tests for the low-level SLIP functions and classes. @@ -17,8 +16,6 @@ class TestEncoding: """Test encoding of SLIP messages.""" - # pylint: disable=no-self-use - def test_empty_message_encoding(self) -> None: """Empty message should result in an empty packet.""" msg = b"" @@ -59,12 +56,9 @@ def test_special_character_encoding(self, msg: bytes, packet: bytes) -> None: assert encode(msg) == END + packet + END -# noinspection PyPep8Naming class TestDecoding: """Test decoding of SLIP packets.""" - # pylint: disable=no-self-use - def test_empty_packet_decoding(self) -> None: """An empty packet should result in an empty message.""" packet = END + END @@ -228,7 +222,3 @@ def test_subsequent_packets_with_wrong_escape_sequence(self) -> None: assert self.driver.messages == [msgs[2]] assert exc_info.value.args == (msgs[3],) assert self.driver.flush() == [msgs[4]] - - -if __name__ == "__main__": - pytest.main() diff --git a/tests/unit/test_slipserver.py b/tests/unit/test_slipserver.py new file mode 100644 index 0000000..9da8f29 --- /dev/null +++ b/tests/unit/test_slipserver.py @@ -0,0 +1,160 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + +from __future__ import annotations + +import socket +from socketserver import TCPServer, UDPServer +from typing import TYPE_CHECKING, Generator, cast + +import pytest + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + from sliplib.slipsocket import TCPAddress + + +from sliplib import SlipRequestHandler, SlipServer, SlipSocket + + +@pytest.fixture +def sock(mocker: MockerFixture) -> Generator[socket.socket, None, None]: + address_family = socket.AF_INET + mocked_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=address_family, + type=socket.SOCK_STREAM, + proto=0, + ) + yield mocked_socket + mocked_socket.close() + + +@pytest.fixture +def slipsock(sock: socket.socket) -> SlipSocket: + return SlipSocket(sock) + + +@pytest.fixture +def udpsock(mocker: MockerFixture) -> Generator[socket.socket, None, None]: + address_family = socket.AF_INET + mocked_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=address_family, + type=socket.SOCK_DGRAM, + proto=0, + ) + yield mocked_socket + mocked_socket.close() + + +@pytest.fixture +def tcpserver(mocker: MockerFixture) -> TCPServer: + return mocker.create_autospec(spec=TCPServer, instance=True, socket_type=socket.SOCK_STREAM) + + +@pytest.fixture +def udpserver(mocker: MockerFixture) -> UDPServer: + return mocker.create_autospec(spec=UDPServer, instance=True, socket_type=socket.SOCK_DGRAM) + + +@pytest.fixture +def slipserver(mocker: MockerFixture) -> SlipServer: + return mocker.create_autospec(spec=SlipServer, instance=True, socket_type=socket.SOCK_STREAM) + + +@pytest.fixture +def slip_request_handler(mocker: MockerFixture) -> SlipRequestHandler: + return mocker.create_autospec(SlipRequestHandler, instance=True) + + +@pytest.fixture +def slip_request_handler_class( + slip_request_handler: SlipRequestHandler, mocker: MockerFixture +) -> type[SlipRequestHandler]: + return mocker.patch("sliplib.slipserver.SlipRequestHandler", autospec=True, return_value=slip_request_handler) + + +class TestSlipRequestHandler: + def test_instantiate_with_regular_socket_and_tcpserver(self, sock: socket.socket, tcpserver: TCPServer) -> None: + h = SlipRequestHandler(sock, ("93.184.216.34", 54321), tcpserver) + assert isinstance(h.request, SlipSocket) + assert h.request.socket is sock + + def test_instantiate_with_slip_socket_and_tcpserver(self, slipsock: SlipSocket, tcpserver: TCPServer) -> None: + h = SlipRequestHandler(slipsock, ("93.184.216.34", 54321), tcpserver) + assert h.request is slipsock + + def test_instantiate_with_regular_socket_and_slipserver(self, sock: socket.socket, slipserver: SlipServer) -> None: + h = SlipRequestHandler(sock, ("93.184.216.34", 54321), slipserver) + assert isinstance(h.request, SlipSocket) + assert h.request.socket is sock + + def test_instantiate_with_slip_socket_and_slipserver(self, slipsock: SlipSocket, slipserver: SlipServer) -> None: + h = SlipRequestHandler(slipsock, ("93.184.216.34", 54321), slipserver) + assert h.request is slipsock + + def test_instantiate_with_udp_socket_fails(self, udpsock: socket.socket, tcpserver: TCPServer) -> None: + with pytest.raises(ValueError, match="type SOCK_STREAM"): + SlipRequestHandler(udpsock, ("93.184.216.34", 54321), tcpserver) + + def test_instantiate_with_udp_server_fails(self, sock: socket.socket, udpserver: UDPServer) -> None: + with pytest.raises(TypeError): + SlipRequestHandler(sock, ("93.184.216.34", 54321), udpserver) + + +@pytest.mark.parametrize( + ("address", "family", "remote_address"), + ( + (("127.0.0.1", 12345), socket.AF_INET, ("93.184.216.34", 54321)), + (("::1", 12345, 0, 0), socket.AF_INET6, ("2606:2800:220:1:248:1893:25c8:1946", 54321)), + ), +) +class TestSlipServer: + @pytest.fixture(autouse=True) + def setup( + self, + address: TCPAddress, + family: int, + remote_address: TCPAddress, + slip_request_handler: SlipRequestHandler, + slip_request_handler_class: type[SlipRequestHandler], + mocker: MockerFixture, + ) -> Generator[None, None, None]: + """Spy on the methods of the Request Handler.""" + self.slip_request_handler = slip_request_handler + self.slip_request_handler_class = slip_request_handler_class + self.address = address + self.family = family + self.remote_address = remote_address + + self.tcp_socket = mocker.create_autospec( + spec=socket.socket, instance=True, type=socket.SOCK_STREAM, family=family + ) + socketserver_socket = mocker.patch("socketserver.socket", autospec=True) + socketserver_socket.socket.return_value = self.tcp_socket + yield + self.tcp_socket.close() + + def test_slipserver_instantiation(self) -> None: + server = SlipServer(self.address, self.slip_request_handler_class) + assert server.address_family == self.family + assert isinstance(server.socket, SlipSocket) + assert server.socket.socket is self.tcp_socket + + def test_slipserver_process_request(self, mocker: MockerFixture) -> None: + server = SlipServer(self.address, self.slip_request_handler_class) + handler_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=self.family, + type=socket.SOCK_STREAM, + ) + handler_slip_socket = SlipSocket(handler_socket) + server.process_request(cast(socket.socket, handler_slip_socket), self.remote_address) + self.slip_request_handler_class.assert_called_once_with(handler_slip_socket, self.remote_address, server) # type: ignore[attr-defined] + handler_socket.close() diff --git a/tests/unit/test_slipsocket.py b/tests/unit/test_slipsocket.py index f2a79f7..19c0aee 100644 --- a/tests/unit/test_slipsocket.py +++ b/tests/unit/test_slipsocket.py @@ -1,17 +1,22 @@ -# Copyright (c) 2020 Ruud de Jong +# Copyright (c) 2024 Ruud de Jong # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP035 """Tests for SlipSocket""" +from __future__ import annotations + import socket import warnings -from typing import Generator +from typing import TYPE_CHECKING, Any, Generator import pytest -from pytest_mock import MockerFixture + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + from sliplib.slipsocket import TCPAddress import sliplib from sliplib import END, ESC, ProtocolError, SlipSocket @@ -26,9 +31,13 @@ "close", "connect", "connect_ex", + "fileno", "getpeername", "getsockname", + "getsockopt", + "gettimeout", "listen", + "setsockopt", "shutdown", ) @@ -47,39 +56,31 @@ ) +@pytest.mark.parametrize( + ("address", "family", "remote_address"), + ( + (("127.0.0.1", 12345), socket.AF_INET, ("93.184.216.34", 54321)), + (("::1", 12345, 0, 0), socket.AF_INET6, ("2606:2800:220:1:248:1893:25c8:1946", 54321, 0, 0)), + ), +) class TestSlipSocket: """Tests for SlipSocket""" - @pytest.fixture( - autouse=True, - params=[ - ( - socket.AF_INET, # pylint: disable=no-member - ("93.184.216.34", 54321), # example.com IPv4 address - ("127.0.0.1", 12345), # localhost IPv4 address - ), - ( - socket.AF_INET6, # pylint: disable=no-member - ( - "2606:2800:220:1:248:1893:25c8:1946", - 54321, - 0, - 0, - ), # example.com IPv6 address - ("::1", 12345, 0, 0), # localhost IPv6 address - ), - ], - ) - def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Generator[None, None, None]: + @pytest.fixture(autouse=True) + def setup( + self, address: TCPAddress, family: int, remote_address: TCPAddress, mocker: MockerFixture + ) -> Generator[None, None, None]: """Prepare the test.""" - - self.family, self.far_address, self.near_address = request.param - self.sock_mock = mocker.Mock( - spec=socket.socket(family=self.family), + self.near_address = address + self.family = family + self.far_address = remote_address + self.sock_mock = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, - ) # pylint: disable=no-member + ) self.slipsocket = SlipSocket(self.sock_mock) yield self.sock_mock.close() @@ -88,7 +89,7 @@ def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Genera def test_slipsocket_instantiation(self) -> None: """Test that the slipsocket has been created properly.""" assert self.slipsocket.family == self.family - assert self.slipsocket.type == socket.SOCK_STREAM # pylint: disable=no-member + assert self.slipsocket.type == socket.SOCK_STREAM assert self.slipsocket.proto == 0 assert self.slipsocket.socket is self.sock_mock @@ -116,17 +117,16 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield END + END yield b"" - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" - # noinspection PyProtectedMember chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 - expected_calls = [mocker.call.recv(chunk_size)] * 2 + expected_calls = [mocker.call(chunk_size)] * 2 self.sock_mock.recv.assert_has_calls(expected_calls) assert self.slipsocket.recv_msg() == b"bye" - expected_calls = [mocker.call.recv(chunk_size)] * 4 + expected_calls = [mocker.call(chunk_size)] * 4 self.sock_mock.recv.assert_has_calls(expected_calls) assert self.slipsocket.recv_msg() == b"" - expected_calls = [mocker.call.recv(chunk_size)] * 5 + expected_calls = [mocker.call(chunk_size)] * 5 self.sock_mock.recv.assert_has_calls(expected_calls) def test_end_of_data_handling(self, mocker: MockerFixture) -> None: @@ -137,17 +137,17 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield END + END yield b"bye" yield b"" - yield b"" + yield b"" # no cov. Extra byte to ensure that the previous empty byte is enough to signal end of data. - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" assert self.slipsocket.recv_msg() == b"" chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 - expected_calls = [mocker.call.recv(chunk_size)] * 4 + expected_calls = [mocker.call(chunk_size)] * 4 self.sock_mock.recv.assert_has_calls(expected_calls) - def test_exception_on_protocol_error_in_first_packet(self, mocker: MockerFixture) -> None: + def test_exception_on_protocol_error_in_first_packet(self) -> None: """Test that an invalid packet causes a ProtocolError. Packets after the invalid packet are handled correctly.""" @@ -155,14 +155,14 @@ def test_exception_on_protocol_error_in_first_packet(self, mocker: MockerFixture def socket_data_generator() -> Generator[bytes, None, None]: yield END + ESC + b"error" + END + b"hallo" + END + b"bye" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() assert exc.value.args == (ESC + b"error",) assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" - def test_exception_on_protocol_error_in_subsequent_packet(self, mocker: MockerFixture) -> None: + def test_exception_on_protocol_error_in_subsequent_packet(self) -> None: """Test that an invalid packet causes a ProtocolError Packets before the invalid packet are decoded correctly.""" @@ -170,19 +170,21 @@ def test_exception_on_protocol_error_in_subsequent_packet(self, mocker: MockerFi def socket_data_generator() -> Generator[bytes, None, None]: yield END + b"hallo" + END + ESC + b"error" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() assert exc.value.args == (ESC + b"error",) - def test_exceptions_on_consecutive_invalid_packets(self, mocker: MockerFixture) -> None: + def test_exceptions_on_consecutive_invalid_packets( + self, + ) -> None: """Test that multiple invalid packets result in a ProtocolError for each invalid packet.""" def socket_data_generator() -> Generator[bytes, None, None]: yield END + b"hallo" + END + ESC + b"error" + END + b"another" + ESC + END + b"bye" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() @@ -194,67 +196,82 @@ def socket_data_generator() -> Generator[bytes, None, None]: def test_accept_method(self, mocker: MockerFixture) -> None: """Test that the accept method is delegated to the socket, and that the result is a SlipSocket.""" - new_socket = mocker.Mock( - spec=socket.socket(family=self.family), + new_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, - ) # pylint: disable=no-member - self.sock_mock.accept = mocker.Mock(return_value=(new_socket, self.far_address)) + ) + self.sock_mock.accept.return_value = (new_socket, self.far_address) new_slip_socket, address = self.slipsocket.accept() self.sock_mock.accept.assert_called_once_with() assert isinstance(new_slip_socket, SlipSocket) assert new_slip_socket.socket is new_socket assert address == self.far_address + new_socket.close() - def test_bind_method(self, mocker: MockerFixture) -> None: + def test_bind_method(self) -> None: """Test that the bind method is delegated to the socket.""" - self.sock_mock.bind = mocker.Mock() self.slipsocket.bind(self.near_address) self.sock_mock.bind.assert_called_once_with(self.near_address) - def test_close_method(self, mocker: MockerFixture) -> None: + def test_close_method(self) -> None: """Test that the close method is delegated to the socket.""" - self.sock_mock.close = mocker.Mock() self.slipsocket.close() self.sock_mock.close.assert_called_once_with() - def test_connect_method(self, mocker: MockerFixture) -> None: + def test_connect_method(self) -> None: """Test that the connect method is delegated to the socket.""" - self.sock_mock.connect = mocker.Mock() self.slipsocket.connect(self.far_address) self.sock_mock.connect.assert_called_once_with(self.far_address) - def test_connect_ex_method(self, mocker: MockerFixture) -> None: + def test_connect_ex_method(self) -> None: """Test that the connect_ex method is delegated to the socket.""" - self.sock_mock.connect_ex = mocker.Mock() self.slipsocket.connect_ex(self.far_address) self.sock_mock.connect_ex.assert_called_once_with(self.far_address) - def test_getpeername_method(self, mocker: MockerFixture) -> None: + def test_fileno_method(self) -> None: + """Test that the fileno method is delegated to the socket.""" + self.sock_mock.fileno.return_value = 3 + fileno = self.slipsocket.fileno() + self.sock_mock.fileno.assert_called_once_with() + assert fileno == 3 + + def test_getpeername_method(self) -> None: """Test that the getpeername method is delegated to the socket.""" - self.sock_mock.getpeername = mocker.Mock(return_value=self.far_address) + self.sock_mock.getpeername.return_value = self.far_address peername = self.slipsocket.getpeername() self.sock_mock.getpeername.assert_called_once_with() assert peername == self.far_address - def test_getsockname_method(self, mocker: MockerFixture) -> None: + def test_getsockname_method(self) -> None: """Test that the getsockname method is delegated to the socket.""" - self.sock_mock.getsockname = mocker.Mock(return_value=self.near_address) + self.sock_mock.getsockname.return_value = self.near_address sockname = self.slipsocket.getsockname() self.sock_mock.getsockname.assert_called_once_with() assert sockname == self.near_address + def test_getsockopt_method(self) -> None: + """Test that the getsockopt method is delegated to the socket.""" + self.sock_mock.getsockopt.return_value = 5 + option = self.slipsocket.getsockopt(27, 5) + self.sock_mock.getsockopt.assert_called_once_with(27, 5) + assert option == 5 + def test_listen_method(self, mocker: MockerFixture) -> None: """Test that the listen method (with or without arguments) is delegated to the socket.""" - self.sock_mock.listen = mocker.Mock() self.slipsocket.listen() self.slipsocket.listen(5) assert self.sock_mock.listen.mock_calls == [mocker.call(), mocker.call(5)] - def test_shutdown_method(self, mocker: MockerFixture) -> None: + def test_setsockopt_method(self) -> None: + """Test that the getsockopt method is delegated to the socket.""" + self.slipsocket.setsockopt(27, 5) + self.sock_mock.setsockopt.assert_called_once_with(27, 5) + + def test_shutdown_method(self) -> None: """Test that the shutdown method is delegated to the socket.""" - self.sock_mock.shutdown = mocker.Mock() self.slipsocket.shutdown(0) self.sock_mock.shutdown.assert_called_once_with(0) @@ -267,17 +284,19 @@ def test_exception_for_not_supported_operations(self, method: str) -> None: # Testing delegated methods. # This will be removed due to deprecation of delegating methods to the wrapped socket. @pytest.mark.parametrize("method", DELEGATED_METHODS) - def test_delegated_methods(self, method: str, mocker: MockerFixture) -> None: + def test_delegated_methods(self, method: str) -> None: """Test that other delegated methods are delegated to the socket, but also issue a deprecation warning.""" - mock_method = mocker.Mock() - setattr(self.sock_mock, method, mock_method) with warnings.catch_warnings(record=True) as warning: socket_method = getattr(self.slipsocket, method) assert len(warning) == 1 assert issubclass(warning[0].category, DeprecationWarning) assert "will be removed in version 1.0" in str(warning[0].message) - socket_method() - mock_method.assert_called_once_with() + if method == "set_inheritable": + args: tuple[()] | tuple[Any] = (True,) + else: + args = () + socket_method(*args) + getattr(self.sock_mock, method).assert_called_once_with(*args) @pytest.mark.parametrize("attr", ["family", "type", "proto"]) def test_read_only_attribute(self, attr: str) -> None: @@ -288,19 +307,22 @@ def test_read_only_attribute(self, attr: str) -> None: def test_create_connection(self, mocker: MockerFixture) -> None: """Test that create_connection gives a SlipSocket.""" - new_sock_mock = mocker.Mock( - spec=socket.socket(self.family), + new_sock_mock = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, - ) # pylint: disable=no-member + ) create_connection_mock = mocker.patch("sliplib.slipsocket.socket.create_connection", return_value=new_sock_mock) sock = SlipSocket.create_connection(self.far_address) assert isinstance(sock, SlipSocket) assert sock.socket is new_sock_mock create_connection_mock.assert_called_once_with(self.far_address[0:2], None, None) + sock.close() + new_sock_mock.close() - def test_slip_socket_iteration(self, mocker: MockerFixture) -> None: + def test_slip_socket_iteration(self) -> None: """Test that a SlipSocket can be iterated over.""" def socket_data_generator() -> Generator[bytes, None, None]: @@ -310,11 +332,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield END + END yield b"" - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() expected = (b"hallo", b"bye") - for exp, act in zip(expected, self.slipsocket): # noqa: B905 - assert exp == act - - -if __name__ == "__main__": - pytest.main() + actual = tuple(msg for msg in self.slipsocket) + assert expected == actual diff --git a/tests/unit/test_slipstream.py b/tests/unit/test_slipstream.py index 49bda32..d20b2c0 100644 --- a/tests/unit/test_slipstream.py +++ b/tests/unit/test_slipstream.py @@ -2,19 +2,20 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# ruff: noqa: UP006 UP035 - """Tests for SlipStream.""" +from __future__ import annotations import io import warnings -from typing import Generator, List +from typing import TYPE_CHECKING, Generator import pytest -from pytest_mock import MockerFixture from sliplib import END, ESC, ProtocolError, SlipStream +if TYPE_CHECKING: + from pytest_mock import MockerFixture + def test_slip_stream_fails_if_instantiated_with_non_io_stream_argument() -> None: """SlipStream requires a stream with a read and write method""" @@ -41,7 +42,7 @@ def setup(self, mocker: MockerFixture) -> None: self.slipstream = SlipStream(self.stream_mock) def test_slipstream_creation(self) -> None: - """Verify the creation of the SlipStream instance.""" + """Verify the creation of the self.slipstream instance.""" assert self.slipstream.stream is self.stream_mock @pytest.mark.parametrize(("rbl", "wbl"), [(True, True), (True, False), (False, True), (False, False)]) @@ -68,7 +69,7 @@ def test_slipstream_reading_single_bytes(self, mocker: MockerFixture) -> None: """Verify that receiving messages works when reading the packets byte for byte.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 assert self.slipstream.recv_msg() == msg_list[0] assert self.slipstream.recv_msg() == msg_list[1] # No more messages @@ -109,7 +110,7 @@ def test_iterating_over_slipstream(self) -> None: for i, msg in enumerate(self.slipstream): assert msg_list[i] == msg - def verify_error_recovery(self, msg_list: List[bytes]) -> None: + def verify_error_recovery(self, msg_list: list[bytes]) -> None: """Helper function to verify error recovery.""" assert self.slipstream.recv_msg() == msg_list[0] with pytest.raises(ProtocolError): @@ -129,19 +130,23 @@ def test_recovery_from_protocol_error_with_unbuffered_reads(self) -> None: """Verify error recovery for unbuffered reads.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 self.verify_error_recovery(msg_list) - def verify_error_recovery_during_iteration(self, msg_list: List[bytes]) -> None: + def verify_error_recovery_during_iteration(self, msg_list: list[bytes]) -> None: """Helper function to verify error recovery during iteration.""" - received_message = [] - with pytest.raises(ProtocolError): # noqa: PT012 + received_messages = [] + + def fill_received_messages() -> None: for msg in self.slipstream: - received_message.append(msg) # noqa: PERF402 - assert received_message == msg_list[:1] - for msg in self.slipstream: - received_message.append(msg) # noqa: PERF402 - assert received_message == msg_list + received_messages.append(msg) # noqa: PERF402 + + with pytest.raises(ProtocolError): + fill_received_messages() + + assert received_messages == msg_list[:1] + fill_received_messages() + assert received_messages == msg_list def test_recovery_from_protocol_error_during_iteration(self) -> None: """Verify that error recovery works during iteration with buffered reads.""" @@ -158,7 +163,7 @@ def test_recovery_from_protocol_error_during_iteration_with_unbuffered_reads( """Verify that error recovery works during iteration with unbuffered reads.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 self.verify_error_recovery_during_iteration(msg_list) @@ -166,24 +171,23 @@ def test_recovery_from_protocol_error_during_iteration_with_unbuffered_reads( # This will be removed due to deprecation of delegating methods to the wrapped stream. # Use the io.BytesIO methods for testing -NOT_DELEGATED_METHODS = ( - [attr for attr in dir(io.BytesIO) if attr.startswith("read") and attr != "readable"] - + [attr for attr in dir(io.BytesIO) if attr.startswith("write") and attr != "writable"] - + [ - "detach", - "flushInput", - "flushOutput", - "getbuffer", - "getvalue", - "peek", - "raw", - "reset_input_buffer", - "reset_output_buffer", - "seek", - "seekable", - "tell", - "truncate", - ] +NOT_DELEGATED_METHODS = [ + "detach", + "flushInput", + "flushOutput", + "getbuffer", + "getvalue", + "peek", + "raw", + "reset_input_buffer", + "reset_output_buffer", + "seek", + "seekable", + "tell", + "truncate", +] +NOT_DELEGATED_METHODS.extend( + attr for attr in dir(io.BytesIO) if attr.startswith(("read", "write")) and attr not in ("readable", "writable") ) DELEGATED_METHODS = [ diff --git a/tests/unit/test_slipwrapper.py b/tests/unit/test_slipwrapper.py index 88fbe77..92e9bb6 100644 --- a/tests/unit/test_slipwrapper.py +++ b/tests/unit/test_slipwrapper.py @@ -2,7 +2,6 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. -# pylint: disable=attribute-defined-outside-init """Tests for SlipWrapper.""" diff --git a/tests/unit/test_version.py b/tests/unit/test_version.py new file mode 100644 index 0000000..8637f58 --- /dev/null +++ b/tests/unit/test_version.py @@ -0,0 +1,9 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + +from semantic_version_check import version_check + +from sliplib import version + +assert version_check(version.__version__)