diff --git a/CHANGES/8540.bugfix.rst b/CHANGES/8540.bugfix.rst new file mode 100644 index 00000000000..ab7c4767635 --- /dev/null +++ b/CHANGES/8540.bugfix.rst @@ -0,0 +1,7 @@ +Fixed WebSocket server heartbeat timeout logic to terminate `receive` and return :py:class:`~aiohttp.ServerTimeoutError` -- by :user:`arcivanov`. + +When a WebSocket pong message was not received, the + :py:meth:`~aiohttp.ClientWebSocketResponse.receive` operation did not terminate. + This change causes `_pong_not_received` to feed the `reader` an error message, causing + pending `receive` to terminate and return the error message. The error message contains + the exception :py:class:`~aiohttp.ServerTimeoutError`. diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 608c659e543..c1a2c4641ba 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -4,7 +4,7 @@ import sys from typing import Any, Optional, cast -from .client_exceptions import ClientError +from .client_exceptions import ClientError, ServerTimeoutError from .client_reqrep import ClientResponse from .helpers import call_later, set_result from .http import ( @@ -122,8 +122,12 @@ def _pong_not_received(self) -> None: if not self._closed: self._closed = True self._close_code = WSCloseCode.ABNORMAL_CLOSURE - self._exception = asyncio.TimeoutError() + self._exception = ServerTimeoutError() self._response.close() + if self._waiting and not self._closing: + self._reader.feed_data( + WSMessage(WSMsgType.ERROR, self._exception, None) + ) @property def closed(self) -> bool: diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 907a362fc7e..dc474f96c39 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -5,7 +5,7 @@ import pytest import aiohttp -from aiohttp import hdrs, web +from aiohttp import ServerTimeoutError, WSMsgType, hdrs, web from aiohttp.http import WSCloseCode from aiohttp.pytest_plugin import AiohttpClient @@ -624,7 +624,35 @@ async def handler(request): assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE -async def test_send_recv_compress(aiohttp_client) -> None: +async def test_heartbeat_no_pong_concurrent_receive(aiohttp_client: Any) -> None: + ping_received = False + + async def handler(request): + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + await ws.prepare(request) + msg = await ws.receive() + ping_received = msg.type is aiohttp.WSMsgType.PING + ws._reader.feed_eof = lambda: None + await asyncio.sleep(10.0) + + app = web.Application() + app.router.add_route("GET", "/", handler) + + client = await aiohttp_client(app) + resp = await client.ws_connect("/", heartbeat=0.1) + resp._reader.feed_eof = lambda: None + + # Connection should be closed roughly after 1.5x heartbeat. + msg = await resp.receive(5.0) + assert ping_received + assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE + assert msg + assert msg.type is WSMsgType.ERROR + assert isinstance(msg.data, ServerTimeoutError) + + +async def test_send_recv_compress(aiohttp_client: Any) -> None: async def handler(request): ws = web.WebSocketResponse() await ws.prepare(request)