From e6d2e57629e88e2c23355005db554c8ad8a6371c Mon Sep 17 00:00:00 2001 From: Ruud de Jong Date: Fri, 15 Mar 2024 16:46:27 +0100 Subject: [PATCH] Extended test scripts to get 100 % coverage --- examples/echoserver/__init__.py | 2 +- examples/echoserver/client.py | 7 +- examples/echoserver/server.py | 48 ++--- pyproject.toml | 6 +- src/sliplib/__init__.py | 2 +- src/sliplib/slipserver.py | 42 +++- src/sliplib/slipsocket.py | 9 +- src/sliplib/slipstream.py | 10 +- tests/examples/__init__.py | 3 + tests/examples/echoserver/__init__.py | 3 + tests/examples/echoserver/test_echo_server.py | 126 +++++++++++ tests/integration/__init__.py | 2 +- tests/integration/test_data.py | 28 --- tests/integration/test_server_client.py | 98 --------- tests/integration/test_slip_file.py | 79 +++++-- .../integration/test_slip_file_unbuffered.py | 32 --- tests/integration/test_slip_in_memory_io.py | 60 +++--- ...ler.py => test_slip_server_and_handler.py} | 27 +-- tests/unit/test_slip.py | 9 - tests/unit/test_slipserver.py | 196 ++++++++++++------ tests/unit/test_slipsocket.py | 158 +++++++------- tests/unit/test_slipstream.py | 8 +- 22 files changed, 533 insertions(+), 422 deletions(-) create mode 100644 tests/examples/__init__.py create mode 100644 tests/examples/echoserver/__init__.py create mode 100644 tests/examples/echoserver/test_echo_server.py delete mode 100644 tests/integration/test_data.py delete mode 100644 tests/integration/test_server_client.py delete mode 100644 tests/integration/test_slip_file_unbuffered.py rename tests/integration/{test_sliprequesthandler.py => test_slip_server_and_handler.py} (71%) diff --git a/examples/echoserver/__init__.py b/examples/echoserver/__init__.py index c062961..ac28400 100644 --- a/examples/echoserver/__init__.py +++ b/examples/echoserver/__init__.py @@ -56,7 +56,7 @@ .. code:: bash - $ python server_ipv6.py + $ python server.py Slip server listening on localhost, port 59454 Incoming connection from ('127.0.0.1', 59458) Raw data received: b'\\xc0hallo\\xc0' diff --git a/examples/echoserver/client.py b/examples/echoserver/client.py index d5e734b..ad3d9ae 100644 --- a/examples/echoserver/client.py +++ b/examples/echoserver/client.py @@ -20,7 +20,8 @@ import sliplib -if __name__ == "__main__": + +def main() -> None: if len(sys.argv) != 2: # noqa: PLR2004 print("Usage: python client.py ") sys.exit(1) @@ -37,3 +38,7 @@ sock.send_msg(b_message) b_reply = sock.recv_msg() print("Response:", b_reply) + + +if __name__ == "__main__": + main() diff --git a/examples/echoserver/server.py b/examples/echoserver/server.py index b203205..1db66fa 100644 --- a/examples/echoserver/server.py +++ b/examples/echoserver/server.py @@ -13,7 +13,7 @@ the raw data that is received and sent. The request handler prints the decoded message, and then reverses the order of the bytes in the encoded message -(so ``abc`` becomes ``cab``), +(so ``abc`` becomes ``cba``), and sends it back to the client. """ @@ -24,44 +24,34 @@ import socket import sys from socketserver import TCPServer -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: - if sys.version_info >= (3, 12): - from collections.abc import Buffer - else: - from typing_extensions import Buffer - -from _socket import dup - -from sliplib import SlipRequestHandler +from typing import Any +from sliplib import SlipRequestHandler, SlipSocket -class _ChattySocket(socket.socket): - """A socket subclass that prints the raw data that is received and sent.""" - def __init__(self, sock: socket.socket) -> None: - fd = dup(sock.fileno()) - super().__init__(sock.family, sock.type, sock.proto, fileno=fd) - super().settimeout(sock.gettimeout()) +class _ChattySocket(SlipSocket): + """A SlipSocket subclass that prints the raw data that is received and sent.""" - def recv(self, chunksize: int, *args: Any) -> bytes: - data = super().recv(chunksize, *args) + def recv_bytes(self) -> bytes: + data = super().recv_bytes() print("Raw data received:", data) return data - def sendall(self, data: Buffer, *args: Any) -> None: + def send_bytes(self, data: bytes) -> None: print("Sending raw data:", data) - super().sendall(data, *args) + super().send_bytes(data) class SlipHandler(SlipRequestHandler): """A SlipRequestHandler that echoes the received message with the bytes in reversed order.""" - def setup(self) -> None: - self.request = _ChattySocket(self.request) - print(f"Incoming connection from {self.request.getpeername()}") - super().setup() + def __init__(self, request: socket.socket | SlipSocket, *args: Any) -> None: + if isinstance(request, SlipSocket): + request.__class__ = _ChattySocket + else: + request = _ChattySocket(request) + print(f"Incoming connection from {request.getpeername()}") + super().__init__(request, *args) # Dedicated handler to show the encoded bytes. def handle(self) -> None: @@ -81,10 +71,14 @@ class TCPServerIPv6(TCPServer): address_family = socket.AF_INET6 -if __name__ == "__main__": +def main() -> None: if len(sys.argv) > 1 and sys.argv[1].lower() == "ipv6": server = TCPServerIPv6(("localhost", 0), SlipHandler) # type: TCPServer else: server = TCPServer(("localhost", 0), SlipHandler) print("Slip server listening on localhost, port", server.server_address[1]) server.handle_request() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 308dca6..b53e6bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ cov = [ python = "3.12" extra-dependencies = [ "mypy>=1.0.0", - "typing_extensions" + "typing_extensions", ] [tool.hatch.envs.types.scripts] @@ -105,8 +105,8 @@ omit = [ ] [tool.coverage.paths] -sliplib = ["src/sliplib", "*/SlipLib/src/sliplib"] -tests = ["tests", "*/SlipLib/tests"] +sliplib = ["src/sliplib"] +tests = ["tests"] [tool.coverage.report] show_missing = true diff --git a/src/sliplib/__init__.py b/src/sliplib/__init__.py index 5893203..09661d0 100644 --- a/src/sliplib/__init__.py +++ b/src/sliplib/__init__.py @@ -67,7 +67,7 @@ .. automodule:: sliplib.slipwrapper .. automodule:: sliplib.slipstream .. automodule:: sliplib.slipsocket -.. automodule:: sliplib.sliprequesthandler +.. automodule:: sliplib.slipserver Exceptions ---------- diff --git a/src/sliplib/slipserver.py b/src/sliplib/slipserver.py index a9e4cd0..1b44a42 100644 --- a/src/sliplib/slipserver.py +++ b/src/sliplib/slipserver.py @@ -9,9 +9,12 @@ .. autoclass:: SlipRequestHandler :show-inheritance: - .. automethod:: setup .. automethod:: handle .. automethod:: finish + +.. autoclass:: SlipServer + :show-inheritance: + """ from __future__ import annotations @@ -39,9 +42,9 @@ class SlipRequestHandler(BaseRequestHandler): def __init__(self, request: socket.socket | SlipSocket, client_address: TCPAddress, server: TCPServer): """Initializes the request handler. - The type of the :arg:`request` parameter depends on the type of server + The type of the :attr:`request` parameter depends on the type of server that instantiates the request handler. - If the server is a SlipServer, then :arg:`request` is a SlipSocket. + If the server is a SlipServer, then :attr:`request` is a SlipSocket. Otherwise, it is a regular socket, and must be wrapped in a SlipSocket before it can be used. @@ -53,8 +56,15 @@ def __init__(self, request: socket.socket | SlipSocket, client_address: TCPAddre The remote TCP addresss. server: - The TCPServer or SlipServer instance that instantiated this handler object. + The server instance that instantiated this handler object. """ + if server.socket_type != socket.SOCK_STREAM: + message = ( + f"{self.__class__.__name__} instance can only be used " + f"with a TCP server (got {server.__class__.__name__})" + ) + raise TypeError(message) + if not isinstance(request, SlipSocket): request = SlipSocket(request) super().__init__(cast(socket.socket, request), client_address, server) @@ -88,7 +98,15 @@ def finish(self) -> None: class SlipServer(TCPServer): - """Base class for SlipSocket based server classes.""" + """Base class for SlipSocket based server classes. + + This is a convenience class, that offers a minor enhancement + over the regular :class:`TCPServer` from the standard library. + The class :class:`TCPServer` is hardcoded to use only IPv4 addresses. It must be subclassed + in order to use IPv6 addresses. + The :class:`SlipServer` class uses the address that is provided during instantiation + to determine if it must muse an IPv4 or IPv6 socket. + """ def __init__( self, @@ -96,15 +114,17 @@ def __init__( handler_class: type[SlipRequestHandler], bind_and_activate: bool = True, # noqa: FBT001 FBT002 ): + """ + + Args: + server_address: The address on which the server listens + handler_class: The class that will be instantiated to handle an incoming request + bind_and_activate: Flag to indicate if the server must be bound and activated at creation time. + """ if self._is_ipv6_address(server_address): self.address_family = socket.AF_INET6 super().__init__(server_address[0:2], handler_class, bind_and_activate) - - def server_bind(self) -> None: - """Make the server socket into a SLIP socket and bind it to the server address.""" - if not isinstance(self.socket, SlipSocket): - self.socket = cast(socket.socket, SlipSocket(self.socket)) - super().server_bind() + self.socket = cast(socket.socket, SlipSocket(self.socket)) @staticmethod def _is_ipv6_address(server_address: TCPAddress) -> bool: diff --git a/src/sliplib/slipsocket.py b/src/sliplib/slipsocket.py index e1185c7..fc2222b 100644 --- a/src/sliplib/slipsocket.py +++ b/src/sliplib/slipsocket.py @@ -121,7 +121,6 @@ class SlipSocket(SlipWrapper[socket.socket]): _chunk_size = 4096 def __init__(self, sock: socket.SocketType): - # pylint: disable=missing-raises-doc """ To instantiate a :class:`SlipSocket`, the user must provide a pre-constructed TCP `socket`. @@ -222,6 +221,14 @@ def getsockopt(self, *args: Any) -> int | bytes: """ return self.socket.getsockopt(*args) + def gettimeout(self) -> float | None: + """Get the socket option from the embedded socket. + + Returns: + The integer or bytes representing the value of the socket option. + """ + return self.socket.gettimeout() + def listen(self, backlog: int | None = None) -> None: """Enable a `SlipSocket` server to accept connections. diff --git a/src/sliplib/slipstream.py b/src/sliplib/slipstream.py index 85773f8..80de1ea 100644 --- a/src/sliplib/slipstream.py +++ b/src/sliplib/slipstream.py @@ -14,6 +14,7 @@ A :class:`SlipStream` instance has the following attributes in addition to the attributes offered by its base class :class:`SlipWrapper`: + .. autoattribute:: chunk_size .. autoattribute:: readable .. autoattribute:: writable """ @@ -99,7 +100,7 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): Args: stream: The byte stream that will be wrapped. - chunk_size: the number of bytes to read per read operation. + chunk_size: The number of bytes to read per read operation. The default value for `chunck_size` is `io.DEFAULT_BUFFER_SIZE`. Setting the `chunk_size` is useful when the stream has a low bandwidth @@ -128,7 +129,10 @@ def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE): if hasattr(stream, "encoding"): error_msg = f"{stream.__class__.__name__} object is not a byte stream" raise TypeError(error_msg) - self._chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + + #: The number of bytes to read during each read operation. + self.chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE + super().__init__(stream) def send_bytes(self, packet: bytes) -> None: @@ -139,7 +143,7 @@ def send_bytes(self, packet: bytes) -> None: def recv_bytes(self) -> bytes: """See base class""" - return b"" if self._stream_is_closed else self.stream.read(self._chunk_size) + return b"" if self._stream_is_closed else self.stream.read(self.chunk_size) @property def readable(self) -> bool: diff --git a/tests/examples/__init__.py b/tests/examples/__init__.py new file mode 100644 index 0000000..ef97db1 --- /dev/null +++ b/tests/examples/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/examples/echoserver/__init__.py b/tests/examples/echoserver/__init__.py new file mode 100644 index 0000000..ef97db1 --- /dev/null +++ b/tests/examples/echoserver/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2024 Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/examples/echoserver/test_echo_server.py b/tests/examples/echoserver/test_echo_server.py new file mode 100644 index 0000000..37db3b6 --- /dev/null +++ b/tests/examples/echoserver/test_echo_server.py @@ -0,0 +1,126 @@ +# Copyright (c) 2020. Ruud de Jong +# This file is part of the SlipLib project which is released under the MIT license. +# See https://github.com/rhjdjong/SlipLib for details. + + +""" +This module tests SlipSocket using a SLIP echo server, similar to the one in the examples directory. +""" +from __future__ import annotations + +import pathlib +import re +import sys +import threading +from queue import Empty, Queue +from subprocess import PIPE, Popen +from typing import Generator + +import pytest + +import sliplib + + +class TestEchoServer: + def output_reader(self, proc: Popen[str], output_queue: Queue[str]) -> None: + for line in iter(proc.stdout.readline, ""): # type: ignore[union-attr] + output_queue.put(line) + + def get_server_output(self) -> str: + try: + output: str = self.server_queue.get(timeout=5) + except Empty: # no cov + pytest.fail("No output from server") + return output.strip() + + def get_client_output(self) -> str: + try: + output: str = self.client_queue.get(timeout=5) + except Empty: # no cov + pytest.fail("No output from client") + return output.strip() + + def write_client_input(self, msg: str) -> None: + self.client.stdin.write(msg + "\n") # type: ignore[union-attr] + self.client.stdin.flush() # type: ignore[union-attr] + + @pytest.fixture(autouse=True) + def setup(self) -> Generator[None, None, None]: + echoserver_directory = pathlib.Path(sliplib.__file__).parents[2] / "examples" / "echoserver" + self.python = sys.executable + self.server_script = str(echoserver_directory / "server.py") + self.client_script = str(echoserver_directory / "client.py") + self.server: Popen[str] | None = None + self.client: Popen[str] | None = None + self.server_queue: Queue[str] = Queue() + self.client_queue: Queue[str] = Queue() + yield + if self.server and self.server.returncode is None: # no cov + self.server.terminate() + if self.client and self.client.returncode is None: # no cov + self.client.terminate() + + @pytest.mark.parametrize("arg", ["", "ipv6"]) + def test_server_and_client(self, arg: str) -> None: + server_command = [self.python, "-u", self.server_script] + if arg: + server_command.append(arg) + self.server = Popen(server_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True, bufsize=1) + server_output_reader = threading.Thread(target=self.output_reader, args=(self.server, self.server_queue)) + server_output_reader.start() + server_output = self.get_server_output() + m = re.match(r"Slip server listening on localhost, port (\d+)", server_output) + assert m is not None + server_port = m.group(1) + + client_command = [self.python, "-u", self.client_script, server_port] + self.client = Popen(client_command, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True, bufsize=1) + client_output_reader = threading.Thread(target=self.output_reader, args=(self.client, self.client_queue)) + client_output_reader.start() + client_output = self.get_client_output() + assert client_output == f"Connecting to server on port {server_port}" + + server_output = self.get_server_output() + message = f"Incoming connection from ('{'::1' if arg else '127.0.0.1'}'" + assert server_output.startswith(message) + + client_output = self.get_client_output() + message = f"Connected to ('{'::1' if arg else '127.0.0.1'}'" + assert client_output.startswith(message) + + self.write_client_input("hallo") + server_output = self.get_server_output() + assert server_output == r"Raw data received: b'\xc0hallo\xc0'" + server_output = self.get_server_output() + assert server_output == "Decoded data: b'hallo'" + server_output = self.get_server_output() + assert server_output == r"Sending raw data: b'\xc0ollah\xc0'" + client_output = self.get_client_output() + assert client_output == "Message>Response: b'ollah'" + + self.write_client_input("bye") + server_output = self.get_server_output() + assert server_output == r"Raw data received: b'\xc0bye\xc0'" + server_output = self.get_server_output() + assert server_output == "Decoded data: b'bye'" + server_output = self.get_server_output() + assert server_output == r"Sending raw data: b'\xc0eyb\xc0'" + client_output = self.get_client_output() + assert client_output == "Message>Response: b'eyb'" + + self.write_client_input("") + server_output = self.get_server_output() + assert server_output == "Raw data received: b''" + server_output = self.get_server_output() + assert server_output == "Decoded data: b''" + server_output = self.get_server_output() + assert server_output == "Closing down" + client_output = self.get_client_output() + assert client_output == "Message>" + + assert self.server_queue.empty() + assert self.client_queue.empty() + self.server.wait(2) + self.client.wait(2) + assert self.server.returncode == 0 + assert self.client.returncode == 0 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 45bdcb3..b74e5d8 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,3 +1,3 @@ -# Copyright (c) 2020. Ruud de Jong +# Copyright (c) 2024. Ruud de Jong # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. diff --git a/tests/integration/test_data.py b/tests/integration/test_data.py deleted file mode 100644 index 877cb8d..0000000 --- a/tests/integration/test_data.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - -# pylint: disable=attribute-defined-outside-init -# pylint: disable=too-few-public-methods - -"""Common data for file-related tests""" -import pathlib - -import pytest - -data = [ - b"line 1", - b"line with embedded\nnewline", - b"last line", -] - - -class BaseFileTest: - """Base class for filebased SLIP tests.""" - - @pytest.fixture(autouse=True) - def setup(self, tmp_path: pathlib.Path) -> None: - """Prepare the test.""" - testdir = tmp_path / "slip" - testdir.mkdir() - self.filepath = testdir / "read.txt" diff --git a/tests/integration/test_server_client.py b/tests/integration/test_server_client.py deleted file mode 100644 index 089a940..0000000 --- a/tests/integration/test_server_client.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - - -""" -This module tests SlipSocket using a SLIP echo server, similar to the one in the examples directory. -""" -from __future__ import annotations - -import socket -from multiprocessing import Pipe, Process -from socketserver import TCPServer -from typing import TYPE_CHECKING, Generator, Mapping - -import pytest - -from sliplib import SlipRequestHandler, SlipSocket - -if TYPE_CHECKING: - from multiprocessing.connection import Connection - - from sliplib.slipsocket import TCPAddress - - -class SlipEchoHandler(SlipRequestHandler): - """SLIP request handler that echoes the received message, but with the bytes in reversed order.""" - - def handle(self) -> None: - while True: - message = self.request.recv_msg() - if not message: - break - # Reverse the order of the bytes and send it back. - data_to_send = bytes(reversed(message)) - self.request.send_msg(data_to_send) - - -class SlipEchoServer: - """Execution helper for the echo server. Sends the server address back over the pipe.""" - - server_data: Mapping[int, tuple[type[TCPServer], str]] = { - socket.AF_INET: (TCPServer, "127.0.0.1"), - socket.AF_INET6: ( - type("TCPServerIPv6", (TCPServer,), {"address_family": socket.AF_INET6}), - "::1", - ), - } - - def __init__(self, address_family: int, pipe: Connection) -> None: - server_class, localhost = self.server_data[address_family] - self.server = server_class((localhost, 0), SlipEchoHandler) - pipe.send(self.server.server_address) - self.server.handle_request() - - -class SlipEchoClient: - """Client for the SLIP echo server""" - - def __init__(self, address: TCPAddress) -> None: - self.sock = SlipSocket.create_connection(address) - - def echo(self, msg: bytes) -> bytes: - """Send message to the SLIP server and returns the response.""" - self.sock.send_msg(msg) - return self.sock.recv_msg() - - def close(self) -> None: - """Close the SLIP socket""" - self.sock.close() - - -class TestEchoServer: - """Test for the SLIP echo server""" - - @pytest.fixture(autouse=True, params=[socket.AF_INET, socket.AF_INET6]) - def setup(self, request: pytest.FixtureRequest, capfd: pytest.CaptureFixture[str]) -> Generator[None, None, None]: - """Prepare the server and client""" - near, far = Pipe() - address_family = request.param - self.server = Process(target=SlipEchoServer, args=(address_family, far)) - self.server.start() - address_available = near.poll(1.5) # AppVeyor sometimes takes a long time to run the server. - if address_available: - server_address = near.recv() - else: # no cov - captured = capfd.readouterr() - pytest.fail(captured.err) - self.client = SlipEchoClient(server_address) - yield - self.client.close() - self.server.join() - - def test_echo_server(self) -> None: - """Test the echo server""" - data = [(b"hallo", b"ollah"), (b"goodbye", b"eybdoog")] - for snd_msg, expected_reply in data: - assert self.client.echo(snd_msg) == expected_reply diff --git a/tests/integration/test_slip_file.py b/tests/integration/test_slip_file.py index d3d07fa..4b47962 100644 --- a/tests/integration/test_slip_file.py +++ b/tests/integration/test_slip_file.py @@ -4,28 +4,73 @@ """Test using SlipStream with a buffered file""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator + +if TYPE_CHECKING: + import pathlib + +import pytest + from sliplib import SlipStream, encode -from .test_data import BaseFileTest, data +data = [ + b"line 1", + b"line with embedded\nnewline", + b"last line", +] + + +@pytest.fixture +def scrap_file(tmp_path: pathlib.Path) -> Generator[pathlib.Path, None, None]: + testdir = tmp_path / "slip" + testdir.mkdir() + filepath = testdir / "data.txt" + yield filepath + filepath.unlink() + + +@pytest.fixture +def readfile(scrap_file: pathlib.Path) -> pathlib.Path: + scrap_file.write_bytes(b"".join(encode(msg) for msg in data)) + return scrap_file + + +@pytest.fixture +def writefile(scrap_file: pathlib.Path) -> pathlib.Path: + return scrap_file + + +def test_reading_slip_file(readfile: pathlib.Path) -> None: + """Test reading encoded SLIP messages""" + with readfile.open(mode="rb") as f: + slipstream = SlipStream(f) + for expected, actual in zip(data, slipstream): + assert expected == actual -class TestBufferedFileAccess(BaseFileTest): - """Test buffered SLIP file access""" +def test_writing_slip_file(writefile: pathlib.Path) -> None: + """Test writing encoded SLIP messages""" + with writefile.open(mode="wb") as f: + slipstream = SlipStream(f) + for msg in data: + slipstream.send_msg(msg) + assert writefile.read_bytes() == b"".join(encode(msg) for msg in data) - def test_reading_slip_file(self) -> None: - """Test reading encoded SLIP messages""" - self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) - with self.filepath.open(mode="rb") as f: - slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): - assert exp == act +def test_reading_slip_file_unbuffered(readfile: pathlib.Path) -> None: + """Test reading SLIP-encoded message""" + with readfile.open(mode="rb", buffering=0) as f: + slipstream = SlipStream(f) + for expected, actual in zip(data, slipstream): + assert expected == actual - def test_writing_slip_file(self) -> None: - """Test writing encoded SLIP messages""" - with self.filepath.open(mode="wb") as f: - slipstream = SlipStream(f) - for msg in data: - slipstream.send_msg(msg) - assert self.filepath.read_bytes() == b"".join(encode(msg) for msg in data) +def test_writing_slip_file_unbuffered(writefile: pathlib.Path) -> None: + """Test writing SLIP-encoded messages""" + with writefile.open(mode="wb", buffering=0) as f: + slipstream = SlipStream(f) + for msg in data: + slipstream.send_msg(msg) + assert writefile.read_bytes() == b"".join(encode(msg) for msg in data) diff --git a/tests/integration/test_slip_file_unbuffered.py b/tests/integration/test_slip_file_unbuffered.py deleted file mode 100644 index 234f0f7..0000000 --- a/tests/integration/test_slip_file_unbuffered.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (c) 2020. Ruud de Jong -# This file is part of the SlipLib project which is released under the MIT license. -# See https://github.com/rhjdjong/SlipLib for details. - - -"""Test using SlipStream with an unbuffered file""" - -from sliplib import SlipStream, encode - -from .test_data import BaseFileTest, data - - -class TestUnbufferedFileAccess(BaseFileTest): - """Test unbuffered SLIP file access.""" - - def test_reading_slip_file(self) -> None: - """Test reading SLIP-encoded message""" - - self.filepath.write_bytes(b"".join(encode(msg) for msg in data)) - with self.filepath.open(mode="rb", buffering=0) as f: - slipstream = SlipStream(f) - for exp, act in zip(data, slipstream): - assert exp == act - - def test_writing_slip_file(self) -> None: - """Test writing SLIP-encoded messages""" - - with self.filepath.open(mode="wb", buffering=0) as f: - slipstream = SlipStream(f) - for msg in data: - slipstream.send_msg(msg) - assert self.filepath.read_bytes() == b"".join(encode(msg) for msg in data) diff --git a/tests/integration/test_slip_in_memory_io.py b/tests/integration/test_slip_in_memory_io.py index fe00697..c725975 100644 --- a/tests/integration/test_slip_in_memory_io.py +++ b/tests/integration/test_slip_in_memory_io.py @@ -16,41 +16,41 @@ class TestSlipStreamWithBytesIO: """Test SlipStream with BytesIO.""" - @pytest.fixture(autouse=True) - def setup(self) -> Generator[None, None, None]: - """Setup the test""" - self.basestream = io.BytesIO() - self.slipstream = SlipStream(self.basestream) - yield - self.basestream.close() - - def test_stream_reading(self) -> None: + msg_list = (b"hallo", b"bye") + + @pytest.fixture + def empty_stream(self) -> Generator[io.BytesIO, None, None]: + stream = io.BytesIO() + yield stream + stream.close() + + @pytest.fixture + def filled_stream(self, empty_stream: io.BytesIO) -> io.BytesIO: + empty_stream.seek(0) + empty_stream.write(END + self.msg_list[0] + END + END + self.msg_list[1] + END) + empty_stream.seek(0) + return empty_stream + + def test_stream_reading(self, filled_stream: io.BytesIO) -> None: """Test reading from the bytestream""" - - msg_list = [b"hallo", b"bye"] - self.basestream.write(END + msg_list[0] + END + END + msg_list[1] + END) - self.basestream.seek(0) - assert self.slipstream.recv_msg() == msg_list[0] - assert self.slipstream.recv_msg() == msg_list[1] + slipstream = SlipStream(filled_stream) + assert slipstream.recv_msg() == self.msg_list[0] + assert slipstream.recv_msg() == self.msg_list[1] # No more messages - assert self.slipstream.recv_msg() == b"" + assert slipstream.recv_msg() == b"" - def test_stream_reading_single_bytes(self) -> None: + def test_stream_reading_single_bytes(self, filled_stream: io.BytesIO) -> None: """Test reading single bytes from the bytestream""" - msg_list = [b"hallo", b"bye"] - self.basestream.write(END + msg_list[0] + END + END + msg_list[1] + END) - self.basestream.seek(0) - self.slipstream = SlipStream(self.basestream, 1) - assert self.slipstream.recv_msg() == msg_list[0] - assert self.slipstream.recv_msg() == msg_list[1] + slipstream = SlipStream(filled_stream, 1) + assert slipstream.recv_msg() == self.msg_list[0] + assert slipstream.recv_msg() == self.msg_list[1] # No more messages - assert self.slipstream.recv_msg() == b"" + assert slipstream.recv_msg() == b"" - def test_stream_writing(self) -> None: + def test_stream_writing(self, empty_stream: io.BytesIO) -> None: """Test writing to the bytestream""" - - msg_list = [b"hallo", b"bye"] - for msg in msg_list: - self.slipstream.send_msg(msg) - assert self.basestream.getvalue() == END + msg_list[0] + END + END + msg_list[1] + END + slipstream = SlipStream(empty_stream) + for msg in self.msg_list: + slipstream.send_msg(msg) + assert empty_stream.getvalue() == END + self.msg_list[0] + END + END + self.msg_list[1] + END diff --git a/tests/integration/test_sliprequesthandler.py b/tests/integration/test_slip_server_and_handler.py similarity index 71% rename from tests/integration/test_sliprequesthandler.py rename to tests/integration/test_slip_server_and_handler.py index 5e6c18e..df2faf1 100644 --- a/tests/integration/test_sliprequesthandler.py +++ b/tests/integration/test_slip_server_and_handler.py @@ -6,14 +6,13 @@ """Tests for SlipRequestHandler""" from __future__ import annotations -import socketserver import threading from socket import AF_INET, AF_INET6, socket from typing import TYPE_CHECKING, Generator import pytest -from sliplib import END, SlipRequestHandler, SlipSocket +from sliplib import END, SlipRequestHandler, SlipServer, SlipSocket if TYPE_CHECKING: from sliplib.slipsocket import TCPAddress @@ -29,33 +28,37 @@ def handle(self) -> None: self.request.send_msg(bytes(reversed(msg))) +@pytest.mark.parametrize( + ("family", "address"), + ( + (AF_INET, ("127.0.0.1", 0)), + (AF_INET6, ("::1", 0, 0, 0)), + ), +) class TestSlipRequestHandler: """Tests for SlipRequestHandler.""" - @pytest.fixture(autouse=True, params=[(AF_INET, ("127.0.0.1", 0)), (AF_INET6, ("::1", 0, 0, 0))]) - def setup(self, request: pytest.FixtureRequest) -> Generator[None, None, None]: + @pytest.fixture(autouse=True) + def setup(self, family: int, address: TCPAddress) -> Generator[None, None, None]: """Prepare the test.""" - self.family = request.param[0] - self.bind_address = request.param[1] + self.family = family + self.bind_address = address # Cannot use standard TCPServer, because that is hardcoded to IPv4 - self.server_class = type("SlipServer", (socketserver.TCPServer,), {"address_family": self.family}) self.client_socket = socket(family=self.family) self.server_is_running = threading.Event() yield self.client_socket.close() - def server( - self, bind_address: TCPAddress, request_handler_class: type[SlipRequestHandler] = DummySlipRequestHandler - ) -> None: + def server(self, bind_address: TCPAddress, request_handler_class: type[SlipRequestHandler]) -> None: """Create a server.""" - srv = self.server_class(bind_address, request_handler_class) + srv = SlipServer(bind_address, request_handler_class) self.server_address = srv.server_address self.server_is_running.set() srv.handle_request() def test_working_of_sliprequesthandler(self) -> None: """Verify that the server returns the message with the bytes in reversed order.""" - server_thread = threading.Thread(target=self.server, args=(self.bind_address,)) + server_thread = threading.Thread(target=self.server, args=(self.bind_address, DummySlipRequestHandler)) server_thread.start() self.server_is_running.wait(0.5) self.client_socket.connect(self.server_address) diff --git a/tests/unit/test_slip.py b/tests/unit/test_slip.py index 931a915..a50c6d8 100644 --- a/tests/unit/test_slip.py +++ b/tests/unit/test_slip.py @@ -16,8 +16,6 @@ class TestEncoding: """Test encoding of SLIP messages.""" - # pylint: disable=no-self-use - def test_empty_message_encoding(self) -> None: """Empty message should result in an empty packet.""" msg = b"" @@ -58,12 +56,9 @@ def test_special_character_encoding(self, msg: bytes, packet: bytes) -> None: assert encode(msg) == END + packet + END -# noinspection PyPep8Naming class TestDecoding: """Test decoding of SLIP packets.""" - # pylint: disable=no-self-use - def test_empty_packet_decoding(self) -> None: """An empty packet should result in an empty message.""" packet = END + END @@ -227,7 +222,3 @@ def test_subsequent_packets_with_wrong_escape_sequence(self) -> None: assert self.driver.messages == [msgs[2]] assert exc_info.value.args == (msgs[3],) assert self.driver.flush() == [msgs[4]] - - -if __name__ == "__main__": - pytest.main() diff --git a/tests/unit/test_slipserver.py b/tests/unit/test_slipserver.py index a33ac5e..9da8f29 100644 --- a/tests/unit/test_slipserver.py +++ b/tests/unit/test_slipserver.py @@ -2,87 +2,159 @@ # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. +from __future__ import annotations import socket -from socketserver import TCPServer -from typing import Generator, cast +from socketserver import TCPServer, UDPServer +from typing import TYPE_CHECKING, Generator, cast import pytest -from pytest_mock import MockerFixture + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + from sliplib.slipsocket import TCPAddress + from sliplib import SlipRequestHandler, SlipServer, SlipSocket +@pytest.fixture +def sock(mocker: MockerFixture) -> Generator[socket.socket, None, None]: + address_family = socket.AF_INET + mocked_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=address_family, + type=socket.SOCK_STREAM, + proto=0, + ) + yield mocked_socket + mocked_socket.close() + + +@pytest.fixture +def slipsock(sock: socket.socket) -> SlipSocket: + return SlipSocket(sock) + + +@pytest.fixture +def udpsock(mocker: MockerFixture) -> Generator[socket.socket, None, None]: + address_family = socket.AF_INET + mocked_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=address_family, + type=socket.SOCK_DGRAM, + proto=0, + ) + yield mocked_socket + mocked_socket.close() + + +@pytest.fixture +def tcpserver(mocker: MockerFixture) -> TCPServer: + return mocker.create_autospec(spec=TCPServer, instance=True, socket_type=socket.SOCK_STREAM) + + +@pytest.fixture +def udpserver(mocker: MockerFixture) -> UDPServer: + return mocker.create_autospec(spec=UDPServer, instance=True, socket_type=socket.SOCK_DGRAM) + + +@pytest.fixture +def slipserver(mocker: MockerFixture) -> SlipServer: + return mocker.create_autospec(spec=SlipServer, instance=True, socket_type=socket.SOCK_STREAM) + + +@pytest.fixture +def slip_request_handler(mocker: MockerFixture) -> SlipRequestHandler: + return mocker.create_autospec(SlipRequestHandler, instance=True) + + +@pytest.fixture +def slip_request_handler_class( + slip_request_handler: SlipRequestHandler, mocker: MockerFixture +) -> type[SlipRequestHandler]: + return mocker.patch("sliplib.slipserver.SlipRequestHandler", autospec=True, return_value=slip_request_handler) + + class TestSlipRequestHandler: - @pytest.fixture(autouse=True) - def setup(self, mocker: MockerFixture) -> Generator[None, None, None]: - self.sock_mock = mocker.Mock( - spec=socket.socket(socket.AF_INET), - family=socket.AF_INET, - type=socket.SOCK_STREAM, - proto=0, - ) - self.slipsocket = SlipSocket(self.sock_mock) - self.server = mocker.Mock(spec=TCPServer) - yield - self.sock_mock.close() - del self.slipsocket + def test_instantiate_with_regular_socket_and_tcpserver(self, sock: socket.socket, tcpserver: TCPServer) -> None: + h = SlipRequestHandler(sock, ("93.184.216.34", 54321), tcpserver) + assert isinstance(h.request, SlipSocket) + assert h.request.socket is sock + + def test_instantiate_with_slip_socket_and_tcpserver(self, slipsock: SlipSocket, tcpserver: TCPServer) -> None: + h = SlipRequestHandler(slipsock, ("93.184.216.34", 54321), tcpserver) + assert h.request is slipsock - def test_instantiation_with_regular_socket(self) -> None: - h = SlipRequestHandler(self.sock_mock, ("93.184.216.34", 54321), self.server) + def test_instantiate_with_regular_socket_and_slipserver(self, sock: socket.socket, slipserver: SlipServer) -> None: + h = SlipRequestHandler(sock, ("93.184.216.34", 54321), slipserver) assert isinstance(h.request, SlipSocket) - assert h.request.socket is self.sock_mock + assert h.request.socket is sock + + def test_instantiate_with_slip_socket_and_slipserver(self, slipsock: SlipSocket, slipserver: SlipServer) -> None: + h = SlipRequestHandler(slipsock, ("93.184.216.34", 54321), slipserver) + assert h.request is slipsock + + def test_instantiate_with_udp_socket_fails(self, udpsock: socket.socket, tcpserver: TCPServer) -> None: + with pytest.raises(ValueError, match="type SOCK_STREAM"): + SlipRequestHandler(udpsock, ("93.184.216.34", 54321), tcpserver) - def test_instantiation_with_slip_socket(self) -> None: - h = SlipRequestHandler(self.slipsocket, ("93.184.216.34", 54321), self.server) - assert h.request is self.slipsocket + def test_instantiate_with_udp_server_fails(self, sock: socket.socket, udpserver: UDPServer) -> None: + with pytest.raises(TypeError): + SlipRequestHandler(sock, ("93.184.216.34", 54321), udpserver) +@pytest.mark.parametrize( + ("address", "family", "remote_address"), + ( + (("127.0.0.1", 12345), socket.AF_INET, ("93.184.216.34", 54321)), + (("::1", 12345, 0, 0), socket.AF_INET6, ("2606:2800:220:1:248:1893:25c8:1946", 54321)), + ), +) class TestSlipServer: - @pytest.fixture( - autouse=True, - params=[ - ( - socket.AF_INET, - ("93.184.216.34", 54321), # example.com IPv4 address - ("127.0.0.1", 12345), # localhost IPv4 address - ), - ( - socket.AF_INET6, - ( - "2606:2800:220:1:248:1893:25c8:1946", - 54321, - 0, - 0, - ), # example.com IPv6 address - ("::1", 12345, 0, 0), # localhost IPv6 address - ), - ], - ) - def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Generator[None, None, None]: - """Prepare the test.""" + @pytest.fixture(autouse=True) + def setup( + self, + address: TCPAddress, + family: int, + remote_address: TCPAddress, + slip_request_handler: SlipRequestHandler, + slip_request_handler_class: type[SlipRequestHandler], + mocker: MockerFixture, + ) -> Generator[None, None, None]: + """Spy on the methods of the Request Handler.""" + self.slip_request_handler = slip_request_handler + self.slip_request_handler_class = slip_request_handler_class + self.address = address + self.family = family + self.remote_address = remote_address - self.family, self.far_address, self.near_address = request.param - self.sock_mock = mocker.Mock( - spec=socket.socket(family=self.family), - family=self.family, - type=socket.SOCK_STREAM, - proto=0, + self.tcp_socket = mocker.create_autospec( + spec=socket.socket, instance=True, type=socket.SOCK_STREAM, family=family ) - self.slipsocket = SlipSocket(self.sock_mock) + socketserver_socket = mocker.patch("socketserver.socket", autospec=True) + socketserver_socket.socket.return_value = self.tcp_socket yield - self.sock_mock.close() - del self.slipsocket + self.tcp_socket.close() def test_slipserver_instantiation(self) -> None: - server = SlipServer(self.near_address, SlipRequestHandler) - assert isinstance(server.socket, SlipSocket) + server = SlipServer(self.address, self.slip_request_handler_class) assert server.address_family == self.family + assert isinstance(server.socket, SlipSocket) + assert server.socket.socket is self.tcp_socket - def test_no_automatic_bind(self) -> None: - server = SlipServer(self.near_address, SlipRequestHandler, bind_and_activate=False) - assert not isinstance(server.socket, SlipSocket) - slip_socket = SlipSocket(server.socket) - server.socket = cast(socket.socket, slip_socket) - server.server_bind() - assert server.socket is cast(socket.socket, slip_socket) + def test_slipserver_process_request(self, mocker: MockerFixture) -> None: + server = SlipServer(self.address, self.slip_request_handler_class) + handler_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, + family=self.family, + type=socket.SOCK_STREAM, + ) + handler_slip_socket = SlipSocket(handler_socket) + server.process_request(cast(socket.socket, handler_slip_socket), self.remote_address) + self.slip_request_handler_class.assert_called_once_with(handler_slip_socket, self.remote_address, server) # type: ignore[attr-defined] + handler_socket.close() diff --git a/tests/unit/test_slipsocket.py b/tests/unit/test_slipsocket.py index 006fffd..19c0aee 100644 --- a/tests/unit/test_slipsocket.py +++ b/tests/unit/test_slipsocket.py @@ -1,16 +1,22 @@ -# Copyright (c) 2020 Ruud de Jong +# Copyright (c) 2024 Ruud de Jong # This file is part of the SlipLib project which is released under the MIT license. # See https://github.com/rhjdjong/SlipLib for details. """Tests for SlipSocket""" +from __future__ import annotations + import socket import warnings -from typing import Generator +from typing import TYPE_CHECKING, Any, Generator import pytest -from pytest_mock import MockerFixture + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + from sliplib.slipsocket import TCPAddress import sliplib from sliplib import END, ESC, ProtocolError, SlipSocket @@ -29,6 +35,7 @@ "getpeername", "getsockname", "getsockopt", + "gettimeout", "listen", "setsockopt", "shutdown", @@ -49,35 +56,27 @@ ) +@pytest.mark.parametrize( + ("address", "family", "remote_address"), + ( + (("127.0.0.1", 12345), socket.AF_INET, ("93.184.216.34", 54321)), + (("::1", 12345, 0, 0), socket.AF_INET6, ("2606:2800:220:1:248:1893:25c8:1946", 54321, 0, 0)), + ), +) class TestSlipSocket: """Tests for SlipSocket""" - @pytest.fixture( - autouse=True, - params=[ - ( - socket.AF_INET, - ("93.184.216.34", 54321), # example.com IPv4 address - ("127.0.0.1", 12345), # localhost IPv4 address - ), - ( - socket.AF_INET6, - ( - "2606:2800:220:1:248:1893:25c8:1946", - 54321, - 0, - 0, - ), # example.com IPv6 address - ("::1", 12345, 0, 0), # localhost IPv6 address - ), - ], - ) - def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Generator[None, None, None]: + @pytest.fixture(autouse=True) + def setup( + self, address: TCPAddress, family: int, remote_address: TCPAddress, mocker: MockerFixture + ) -> Generator[None, None, None]: """Prepare the test.""" - - self.family, self.far_address, self.near_address = request.param - self.sock_mock = mocker.Mock( - spec=socket.socket(family=self.family), + self.near_address = address + self.family = family + self.far_address = remote_address + self.sock_mock = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, @@ -90,7 +89,7 @@ def setup(self, request: pytest.FixtureRequest, mocker: MockerFixture) -> Genera def test_slipsocket_instantiation(self) -> None: """Test that the slipsocket has been created properly.""" assert self.slipsocket.family == self.family - assert self.slipsocket.type == socket.SOCK_STREAM # pylint: disable=no-member + assert self.slipsocket.type == socket.SOCK_STREAM assert self.slipsocket.proto == 0 assert self.slipsocket.socket is self.sock_mock @@ -118,17 +117,16 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield END + END yield b"" - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" - # noinspection PyProtectedMember chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 - expected_calls = [mocker.call.recv(chunk_size)] * 2 + expected_calls = [mocker.call(chunk_size)] * 2 self.sock_mock.recv.assert_has_calls(expected_calls) assert self.slipsocket.recv_msg() == b"bye" - expected_calls = [mocker.call.recv(chunk_size)] * 4 + expected_calls = [mocker.call(chunk_size)] * 4 self.sock_mock.recv.assert_has_calls(expected_calls) assert self.slipsocket.recv_msg() == b"" - expected_calls = [mocker.call.recv(chunk_size)] * 5 + expected_calls = [mocker.call(chunk_size)] * 5 self.sock_mock.recv.assert_has_calls(expected_calls) def test_end_of_data_handling(self, mocker: MockerFixture) -> None: @@ -141,15 +139,15 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield b"" yield b"" # no cov. Extra byte to ensure that the previous empty byte is enough to signal end of data. - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" assert self.slipsocket.recv_msg() == b"" chunk_size = sliplib.SlipSocket._chunk_size # noqa: SLF001 - expected_calls = [mocker.call.recv(chunk_size)] * 4 + expected_calls = [mocker.call(chunk_size)] * 4 self.sock_mock.recv.assert_has_calls(expected_calls) - def test_exception_on_protocol_error_in_first_packet(self, mocker: MockerFixture) -> None: + def test_exception_on_protocol_error_in_first_packet(self) -> None: """Test that an invalid packet causes a ProtocolError. Packets after the invalid packet are handled correctly.""" @@ -157,14 +155,14 @@ def test_exception_on_protocol_error_in_first_packet(self, mocker: MockerFixture def socket_data_generator() -> Generator[bytes, None, None]: yield END + ESC + b"error" + END + b"hallo" + END + b"bye" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() assert exc.value.args == (ESC + b"error",) assert self.slipsocket.recv_msg() == b"hallo" assert self.slipsocket.recv_msg() == b"bye" - def test_exception_on_protocol_error_in_subsequent_packet(self, mocker: MockerFixture) -> None: + def test_exception_on_protocol_error_in_subsequent_packet(self) -> None: """Test that an invalid packet causes a ProtocolError Packets before the invalid packet are decoded correctly.""" @@ -172,19 +170,21 @@ def test_exception_on_protocol_error_in_subsequent_packet(self, mocker: MockerFi def socket_data_generator() -> Generator[bytes, None, None]: yield END + b"hallo" + END + ESC + b"error" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() assert exc.value.args == (ESC + b"error",) - def test_exceptions_on_consecutive_invalid_packets(self, mocker: MockerFixture) -> None: + def test_exceptions_on_consecutive_invalid_packets( + self, + ) -> None: """Test that multiple invalid packets result in a ProtocolError for each invalid packet.""" def socket_data_generator() -> Generator[bytes, None, None]: yield END + b"hallo" + END + ESC + b"error" + END + b"another" + ESC + END + b"bye" + END - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() assert self.slipsocket.recv_msg() == b"hallo" with pytest.raises(ProtocolError) as exc: self.slipsocket.recv_msg() @@ -196,87 +196,82 @@ def socket_data_generator() -> Generator[bytes, None, None]: def test_accept_method(self, mocker: MockerFixture) -> None: """Test that the accept method is delegated to the socket, and that the result is a SlipSocket.""" - new_socket = mocker.Mock( - spec=socket.socket(family=self.family), + new_socket = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, - ) # pylint: disable=no-member - self.sock_mock.accept = mocker.Mock(return_value=(new_socket, self.far_address)) + ) + self.sock_mock.accept.return_value = (new_socket, self.far_address) new_slip_socket, address = self.slipsocket.accept() self.sock_mock.accept.assert_called_once_with() assert isinstance(new_slip_socket, SlipSocket) assert new_slip_socket.socket is new_socket assert address == self.far_address + new_socket.close() - def test_bind_method(self, mocker: MockerFixture) -> None: + def test_bind_method(self) -> None: """Test that the bind method is delegated to the socket.""" - self.sock_mock.bind = mocker.Mock() self.slipsocket.bind(self.near_address) self.sock_mock.bind.assert_called_once_with(self.near_address) - def test_close_method(self, mocker: MockerFixture) -> None: + def test_close_method(self) -> None: """Test that the close method is delegated to the socket.""" - self.sock_mock.close = mocker.Mock() self.slipsocket.close() self.sock_mock.close.assert_called_once_with() - def test_connect_method(self, mocker: MockerFixture) -> None: + def test_connect_method(self) -> None: """Test that the connect method is delegated to the socket.""" - self.sock_mock.connect = mocker.Mock() self.slipsocket.connect(self.far_address) self.sock_mock.connect.assert_called_once_with(self.far_address) - def test_connect_ex_method(self, mocker: MockerFixture) -> None: + def test_connect_ex_method(self) -> None: """Test that the connect_ex method is delegated to the socket.""" - self.sock_mock.connect_ex = mocker.Mock() self.slipsocket.connect_ex(self.far_address) self.sock_mock.connect_ex.assert_called_once_with(self.far_address) - def test_fileno_method(self, mocker: MockerFixture) -> None: + def test_fileno_method(self) -> None: """Test that the fileno method is delegated to the socket.""" - self.sock_mock.fileno = mocker.Mock(return_value=3) + self.sock_mock.fileno.return_value = 3 fileno = self.slipsocket.fileno() self.sock_mock.fileno.assert_called_once_with() assert fileno == 3 - def test_getpeername_method(self, mocker: MockerFixture) -> None: + def test_getpeername_method(self) -> None: """Test that the getpeername method is delegated to the socket.""" - self.sock_mock.getpeername = mocker.Mock(return_value=self.far_address) + self.sock_mock.getpeername.return_value = self.far_address peername = self.slipsocket.getpeername() self.sock_mock.getpeername.assert_called_once_with() assert peername == self.far_address - def test_getsockname_method(self, mocker: MockerFixture) -> None: + def test_getsockname_method(self) -> None: """Test that the getsockname method is delegated to the socket.""" - self.sock_mock.getsockname = mocker.Mock(return_value=self.near_address) + self.sock_mock.getsockname.return_value = self.near_address sockname = self.slipsocket.getsockname() self.sock_mock.getsockname.assert_called_once_with() assert sockname == self.near_address - def test_getsockopt_method(self, mocker: MockerFixture) -> None: + def test_getsockopt_method(self) -> None: """Test that the getsockopt method is delegated to the socket.""" - self.sock_mock.getsockopt = mocker.Mock(return_value=5) + self.sock_mock.getsockopt.return_value = 5 option = self.slipsocket.getsockopt(27, 5) self.sock_mock.getsockopt.assert_called_once_with(27, 5) assert option == 5 def test_listen_method(self, mocker: MockerFixture) -> None: """Test that the listen method (with or without arguments) is delegated to the socket.""" - self.sock_mock.listen = mocker.Mock() self.slipsocket.listen() self.slipsocket.listen(5) assert self.sock_mock.listen.mock_calls == [mocker.call(), mocker.call(5)] - def test_setsockopt_method(self, mocker: MockerFixture) -> None: + def test_setsockopt_method(self) -> None: """Test that the getsockopt method is delegated to the socket.""" - self.sock_mock.setsockopt = mocker.Mock() self.slipsocket.setsockopt(27, 5) self.sock_mock.setsockopt.assert_called_once_with(27, 5) - def test_shutdown_method(self, mocker: MockerFixture) -> None: + def test_shutdown_method(self) -> None: """Test that the shutdown method is delegated to the socket.""" - self.sock_mock.shutdown = mocker.Mock() self.slipsocket.shutdown(0) self.sock_mock.shutdown.assert_called_once_with(0) @@ -289,17 +284,19 @@ def test_exception_for_not_supported_operations(self, method: str) -> None: # Testing delegated methods. # This will be removed due to deprecation of delegating methods to the wrapped socket. @pytest.mark.parametrize("method", DELEGATED_METHODS) - def test_delegated_methods(self, method: str, mocker: MockerFixture) -> None: + def test_delegated_methods(self, method: str) -> None: """Test that other delegated methods are delegated to the socket, but also issue a deprecation warning.""" - mock_method = mocker.Mock() - setattr(self.sock_mock, method, mock_method) with warnings.catch_warnings(record=True) as warning: socket_method = getattr(self.slipsocket, method) assert len(warning) == 1 assert issubclass(warning[0].category, DeprecationWarning) assert "will be removed in version 1.0" in str(warning[0].message) - socket_method() - mock_method.assert_called_once_with() + if method == "set_inheritable": + args: tuple[()] | tuple[Any] = (True,) + else: + args = () + socket_method(*args) + getattr(self.sock_mock, method).assert_called_once_with(*args) @pytest.mark.parametrize("attr", ["family", "type", "proto"]) def test_read_only_attribute(self, attr: str) -> None: @@ -310,19 +307,22 @@ def test_read_only_attribute(self, attr: str) -> None: def test_create_connection(self, mocker: MockerFixture) -> None: """Test that create_connection gives a SlipSocket.""" - new_sock_mock = mocker.Mock( - spec=socket.socket(self.family), + new_sock_mock = mocker.create_autospec( + spec=socket.socket, + instance=True, family=self.family, type=socket.SOCK_STREAM, proto=0, - ) # pylint: disable=no-member + ) create_connection_mock = mocker.patch("sliplib.slipsocket.socket.create_connection", return_value=new_sock_mock) sock = SlipSocket.create_connection(self.far_address) assert isinstance(sock, SlipSocket) assert sock.socket is new_sock_mock create_connection_mock.assert_called_once_with(self.far_address[0:2], None, None) + sock.close() + new_sock_mock.close() - def test_slip_socket_iteration(self, mocker: MockerFixture) -> None: + def test_slip_socket_iteration(self) -> None: """Test that a SlipSocket can be iterated over.""" def socket_data_generator() -> Generator[bytes, None, None]: @@ -332,11 +332,7 @@ def socket_data_generator() -> Generator[bytes, None, None]: yield END + END yield b"" - self.sock_mock.recv = mocker.Mock(side_effect=socket_data_generator()) + self.sock_mock.recv.side_effect = socket_data_generator() expected = (b"hallo", b"bye") actual = tuple(msg for msg in self.slipsocket) assert expected == actual - - -if __name__ == "__main__": - pytest.main() diff --git a/tests/unit/test_slipstream.py b/tests/unit/test_slipstream.py index 605b104..d20b2c0 100644 --- a/tests/unit/test_slipstream.py +++ b/tests/unit/test_slipstream.py @@ -42,7 +42,7 @@ def setup(self, mocker: MockerFixture) -> None: self.slipstream = SlipStream(self.stream_mock) def test_slipstream_creation(self) -> None: - """Verify the creation of the SlipStream instance.""" + """Verify the creation of the self.slipstream instance.""" assert self.slipstream.stream is self.stream_mock @pytest.mark.parametrize(("rbl", "wbl"), [(True, True), (True, False), (False, True), (False, False)]) @@ -69,7 +69,7 @@ def test_slipstream_reading_single_bytes(self, mocker: MockerFixture) -> None: """Verify that receiving messages works when reading the packets byte for byte.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 assert self.slipstream.recv_msg() == msg_list[0] assert self.slipstream.recv_msg() == msg_list[1] # No more messages @@ -130,7 +130,7 @@ def test_recovery_from_protocol_error_with_unbuffered_reads(self) -> None: """Verify error recovery for unbuffered reads.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 self.verify_error_recovery(msg_list) def verify_error_recovery_during_iteration(self, msg_list: list[bytes]) -> None: @@ -163,7 +163,7 @@ def test_recovery_from_protocol_error_during_iteration_with_unbuffered_reads( """Verify that error recovery works during iteration with unbuffered reads.""" msg_list = [b"hallo", b"bye"] self.stream_mock.read.side_effect = [END, *msg_list[0], END, ESC, END, *msg_list[1], END, b""] - self.slipstream = SlipStream(self.stream_mock, 1) + self.slipstream.chunk_size = 1 self.verify_error_recovery_during_iteration(msg_list)