From c48ad3931914b75271c4f30f5201982cd78bdeb1 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:25:42 -0500 Subject: [PATCH 01/32] Fix timer handle churn in websocket heartbeat Each message would cancel and reset the timer handler which meant asyncio would have to rebuild the whole schedule heap frequently because so many timer handlers were being created and than cancelled. When asyncio timer handle cancels reach the threadhold the whole heap has to be rebuilt https://github.com/python/cpython/blob/1422500d020bd199b26357fc387f8b79b82226cd/Lib/asyncio/base_events.py#L1968 which is extremely inefficent. To solve this, when a timer handle is already running, instead of cancelling it, we let it fire and reschedule if heartbeat would be sent to early. --- aiohttp/client_ws.py | 89 +++++++++++++++++++++++++----------------- aiohttp/helpers.py | 18 +++++++-- aiohttp/web_ws.py | 92 ++++++++++++++++++++++++++------------------ 3 files changed, 124 insertions(+), 75 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index e6c720b4264..f0ce95d26e1 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -7,7 +7,7 @@ from .client_exceptions import ClientError, ServerTimeoutError from .client_reqrep import ClientResponse -from .helpers import call_later, set_result +from .helpers import calculate_timeout_when, set_result from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -72,6 +72,7 @@ def __init__( self._autoping = autoping self._heartbeat = heartbeat self._heartbeat_cb: Optional[asyncio.TimerHandle] = None + self._heartbeat_when: float = 0.0 if heartbeat is not None: self._pong_heartbeat = heartbeat / 2.0 self._pong_response_cb: Optional[asyncio.TimerHandle] = None @@ -85,48 +86,66 @@ def __init__( self._reset_heartbeat() def _cancel_heartbeat(self) -> None: - if self._pong_response_cb is not None: - self._pong_response_cb.cancel() - self._pong_response_cb = None - + self._cancel_pong_response_cb() if self._heartbeat_cb is not None: self._heartbeat_cb.cancel() self._heartbeat_cb = None + def _cancel_pong_response_cb(self) -> None: + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = None + def _reset_heartbeat(self) -> None: - self._cancel_heartbeat() - - if self._heartbeat is not None: - self._heartbeat_cb = call_later( - self._send_heartbeat, - self._heartbeat, - self._loop, - timeout_ceil_threshold=( - self._conn._connector._timeout_ceil_threshold - if self._conn is not None - else 5 - ), - ) + if self._heartbeat is None: + return + self._cancel_pong_response_cb() + req = self._req + timeout_ceil_threshold = ( + req._protocol._timeout_ceil_threshold if req is not None else 5 + ) + assert self._loop is not None + loop = self._loop + when = calculate_timeout_when( + loop.time(), self._heartbeat, timeout_ceil_threshold + ) + self._heartbeat_when = when + if self._heartbeat_cb is None: + # We do not cancel the previous heartbeat_cb here because + # it generates a significant amount of TimerHandle churn + # which causes asyncio to rebuild the heap frequently. + # Instead _send_heartbeat() will reschedule the next + # heartbeat if it fires too early. + self._heartbeat_cb = loop.call_at(when, self._send_heartbeat) def _send_heartbeat(self) -> None: - if self._heartbeat is not None and not self._closed: - # fire-and-forget a task is not perfect but maybe ok for - # sending ping. Otherwise we need a long-living heartbeat - # task in the class. - self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] - - if self._pong_response_cb is not None: - self._pong_response_cb.cancel() - self._pong_response_cb = call_later( - self._pong_not_received, - self._pong_heartbeat, - self._loop, - timeout_ceil_threshold=( - self._conn._connector._timeout_ceil_threshold - if self._conn is not None - else 5 - ), + self._heartbeat_cb = None + if self._heartbeat is None or self._closed: + return + assert self._loop is not None and self._writer is not None + loop = self._loop + now = loop.time() + if now < self._heartbeat_when: + # Heartbeat fired too early, reschedule + self._heartbeat_cb = loop.call_at( + self._heartbeat_when, self._send_heartbeat ) + return + + # fire-and-forget a task is not perfect but maybe ok for + # sending ping. Otherwise we need a long-living heartbeat + # task in the class. + self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] + + req = self._req + timeout_ceil_threshold = ( + req._protocol._timeout_ceil_threshold if req is not None else 5 + ) + when = calculate_timeout_when( + loop.time(), self._pong_heartbeat, timeout_ceil_threshold + ) + self._cancel_pong_response_cb() + self._pong_response_cb = loop.call_at(when, self._pong_not_received) def _pong_not_received(self) -> None: if not self._closed: diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 9811ad05054..d9dadb8d012 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -598,11 +598,23 @@ def call_later( loop: asyncio.AbstractEventLoop, timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: + when = calculate_timeout_when(timeout, timeout_ceil_threshold) + if when is not None: + return loop.call_at(when, cb) + return None + + +def calculate_timeout_when( + loop_time: float, + timeout: Optional[float], + timeout_ceiling_threshold: float = 5, +) -> Optional[float]: + """Calculate when to execute a timeout.""" if timeout is not None and timeout > 0: - when = loop.time() + timeout - if timeout > timeout_ceil_threshold: + when = loop_time + timeout + if timeout > timeout_ceiling_threshold: when = ceil(when) - return loop.call_at(when, cb) + return when return None diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 522d1e0f0a9..47747d77969 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -11,7 +11,7 @@ from . import hdrs from .abc import AbstractStreamWriter -from .helpers import call_later, set_exception, set_result +from .helpers import calculate_timeout_when, set_exception, set_result from .http import ( WS_CLOSED_MESSAGE, WS_CLOSING_MESSAGE, @@ -74,6 +74,7 @@ class WebSocketResponse(StreamResponse): "_autoclose", "_autoping", "_heartbeat", + "_heartbeat_when", "_heartbeat_cb", "_pong_heartbeat", "_pong_response_cb", @@ -112,6 +113,7 @@ def __init__( self._autoclose = autoclose self._autoping = autoping self._heartbeat = heartbeat + self._heartbeat_when = 0.0 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None if heartbeat is not None: self._pong_heartbeat = heartbeat / 2.0 @@ -120,50 +122,66 @@ def __init__( self._max_msg_size = max_msg_size def _cancel_heartbeat(self) -> None: - if self._pong_response_cb is not None: - self._pong_response_cb.cancel() - self._pong_response_cb = None - + self._cancel_pong_response_cb() if self._heartbeat_cb is not None: self._heartbeat_cb.cancel() self._heartbeat_cb = None - def _reset_heartbeat(self) -> None: - self._cancel_heartbeat() + def _cancel_pong_response_cb(self) -> None: + if self._pong_response_cb is not None: + self._pong_response_cb.cancel() + self._pong_response_cb = None - if self._heartbeat is not None: - assert self._loop is not None - self._heartbeat_cb = call_later( - self._send_heartbeat, - self._heartbeat, - self._loop, - timeout_ceil_threshold=( - self._req._protocol._timeout_ceil_threshold - if self._req is not None - else 5 - ), - ) + def _reset_heartbeat(self) -> None: + if self._heartbeat is None: + return + self._cancel_pong_response_cb() + req = self._req + timeout_ceil_threshold = ( + req._protocol._timeout_ceil_threshold if req is not None else 5 + ) + assert self._loop is not None + loop = self._loop + when = calculate_timeout_when( + loop.time(), self._heartbeat, timeout_ceil_threshold + ) + self._heartbeat_when = when + if self._heartbeat_cb is None: + # We do not cancel the previous heartbeat_cb here because + # it generates a significant amount of TimerHandle churn + # which causes asyncio to rebuild the heap frequently. + # Instead _send_heartbeat() will reschedule the next + # heartbeat if it fires too early. + self._heartbeat_cb = loop.call_at(when, self._send_heartbeat) def _send_heartbeat(self) -> None: - if self._heartbeat is not None and not self._closed: - assert self._loop is not None and self._writer is not None - # fire-and-forget a task is not perfect but maybe ok for - # sending ping. Otherwise we need a long-living heartbeat - # task in the class. - self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] - - if self._pong_response_cb is not None: - self._pong_response_cb.cancel() - self._pong_response_cb = call_later( - self._pong_not_received, - self._pong_heartbeat, - self._loop, - timeout_ceil_threshold=( - self._req._protocol._timeout_ceil_threshold - if self._req is not None - else 5 - ), + self._heartbeat_cb = None + if self._heartbeat is None or self._closed: + return + assert self._loop is not None and self._writer is not None + loop = self._loop + now = loop.time() + if now < self._heartbeat_when: + # Heartbeat fired too early, reschedule + self._heartbeat_cb = loop.call_at( + self._heartbeat_when, self._send_heartbeat ) + return + + # fire-and-forget a task is not perfect but maybe ok for + # sending ping. Otherwise we need a long-living heartbeat + # task in the class. + self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] + + req = self._req + timeout_ceil_threshold = ( + req._protocol._timeout_ceil_threshold if req is not None else 5 + ) + when = calculate_timeout_when( + loop.time(), self._pong_heartbeat, timeout_ceil_threshold + ) + self._cancel_pong_response_cb() + self._pong_response_cb = loop.call_at(when, self._pong_not_received) def _pong_not_received(self) -> None: if self._req is not None and self._req.transport is not None: From ab09f184fd8c1d096de23a83960905713bd9bc91 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:33:09 -0500 Subject: [PATCH 02/32] preen --- aiohttp/client_ws.py | 4 +--- aiohttp/web_ws.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index f0ce95d26e1..0434aae7ca9 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -141,9 +141,7 @@ def _send_heartbeat(self) -> None: timeout_ceil_threshold = ( req._protocol._timeout_ceil_threshold if req is not None else 5 ) - when = calculate_timeout_when( - loop.time(), self._pong_heartbeat, timeout_ceil_threshold - ) + when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold) self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 47747d77969..d5d0e42314f 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -177,9 +177,7 @@ def _send_heartbeat(self) -> None: timeout_ceil_threshold = ( req._protocol._timeout_ceil_threshold if req is not None else 5 ) - when = calculate_timeout_when( - loop.time(), self._pong_heartbeat, timeout_ceil_threshold - ) + when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold) self._cancel_pong_response_cb() self._pong_response_cb = loop.call_at(when, self._pong_not_received) From f59f732f4e57fdc1638d613f7724a6f3cec78ba3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:36:30 -0500 Subject: [PATCH 03/32] preen --- aiohttp/client_ws.py | 3 +-- aiohttp/web_ws.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 0434aae7ca9..44214361a63 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -122,7 +122,6 @@ def _send_heartbeat(self) -> None: self._heartbeat_cb = None if self._heartbeat is None or self._closed: return - assert self._loop is not None and self._writer is not None loop = self._loop now = loop.time() if now < self._heartbeat_when: @@ -135,7 +134,7 @@ def _send_heartbeat(self) -> None: # fire-and-forget a task is not perfect but maybe ok for # sending ping. Otherwise we need a long-living heartbeat # task in the class. - self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] + loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] req = self._req timeout_ceil_threshold = ( diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index d5d0e42314f..8635afe5f63 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -171,7 +171,7 @@ def _send_heartbeat(self) -> None: # fire-and-forget a task is not perfect but maybe ok for # sending ping. Otherwise we need a long-living heartbeat # task in the class. - self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] + loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] req = self._req timeout_ceil_threshold = ( From 4ea43f936c9b2c47d1e34877540f17b0069a67b6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:39:58 -0500 Subject: [PATCH 04/32] fix merge --- aiohttp/client_ws.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 44214361a63..cbd2a13fd2f 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -100,9 +100,9 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is None: return self._cancel_pong_response_cb() - req = self._req + conn = self._conn timeout_ceil_threshold = ( - req._protocol._timeout_ceil_threshold if req is not None else 5 + conn._connector._timeout_ceil_threshold if conn is not None else 5 ) assert self._loop is not None loop = self._loop @@ -136,9 +136,9 @@ def _send_heartbeat(self) -> None: # task in the class. loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable] - req = self._req + conn = self._conn timeout_ceil_threshold = ( - req._protocol._timeout_ceil_threshold if req is not None else 5 + conn._connector._timeout_ceil_threshold if conn is not None else 5 ) when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold) self._cancel_pong_response_cb() From b1ed054a0e9de9825c47282f43c82329b2f874c5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:40:41 -0500 Subject: [PATCH 05/32] fix merge --- aiohttp/client_ws.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index cbd2a13fd2f..2fca80a9bcc 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -100,12 +100,12 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is None: return self._cancel_pong_response_cb() + assert self._loop is not None + loop = self._loop conn = self._conn timeout_ceil_threshold = ( conn._connector._timeout_ceil_threshold if conn is not None else 5 ) - assert self._loop is not None - loop = self._loop when = calculate_timeout_when( loop.time(), self._heartbeat, timeout_ceil_threshold ) From 807e3345f20d1d9d1ecd5d18d4d24f0c69a67754 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:41:10 -0500 Subject: [PATCH 06/32] fix merge --- aiohttp/client_ws.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 2fca80a9bcc..8a8f3d131fd 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -106,9 +106,8 @@ def _reset_heartbeat(self) -> None: timeout_ceil_threshold = ( conn._connector._timeout_ceil_threshold if conn is not None else 5 ) - when = calculate_timeout_when( - loop.time(), self._heartbeat, timeout_ceil_threshold - ) + now = loop.time() + when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold) self._heartbeat_when = when if self._heartbeat_cb is None: # We do not cancel the previous heartbeat_cb here because From c3ca179e99ae1e0d5f9ae106797e54e64da78904 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:41:34 -0500 Subject: [PATCH 07/32] fix merge --- aiohttp/web_ws.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 8635afe5f63..5fb69e32ad0 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -142,9 +142,8 @@ def _reset_heartbeat(self) -> None: ) assert self._loop is not None loop = self._loop - when = calculate_timeout_when( - loop.time(), self._heartbeat, timeout_ceil_threshold - ) + now = loop.time() + when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold) self._heartbeat_when = when if self._heartbeat_cb is None: # We do not cancel the previous heartbeat_cb here because From 8d0756b0e8517e600515bca688a7e0922f32fa54 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:43:52 -0500 Subject: [PATCH 08/32] fix merge --- aiohttp/helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index d9dadb8d012..9a5a056928e 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -598,7 +598,8 @@ def call_later( loop: asyncio.AbstractEventLoop, timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: - when = calculate_timeout_when(timeout, timeout_ceil_threshold) + now = loop.time() + when = calculate_timeout_when(now, timeout, timeout_ceil_threshold) if when is not None: return loop.call_at(when, cb) return None From 3321b9360552cd53258dce74f3e4f52a91423c92 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:45:11 -0500 Subject: [PATCH 09/32] fix merge --- aiohttp/helpers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 9a5a056928e..420b1e82a6e 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -598,18 +598,18 @@ def call_later( loop: asyncio.AbstractEventLoop, timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: + if timeout is None: + return None now = loop.time() when = calculate_timeout_when(now, timeout, timeout_ceil_threshold) - if when is not None: - return loop.call_at(when, cb) - return None + return loop.call_at(when, cb) def calculate_timeout_when( loop_time: float, - timeout: Optional[float], - timeout_ceiling_threshold: float = 5, -) -> Optional[float]: + timeout: float, + timeout_ceiling_threshold: float, +) -> float: """Calculate when to execute a timeout.""" if timeout is not None and timeout > 0: when = loop_time + timeout From 9a92850f996674647580039b136264f4cd648f5f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:46:57 -0500 Subject: [PATCH 10/32] fix merge --- aiohttp/helpers.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 420b1e82a6e..80b956ca714 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -601,17 +601,18 @@ def call_later( if timeout is None: return None now = loop.time() - when = calculate_timeout_when(now, timeout, timeout_ceil_threshold) - return loop.call_at(when, cb) + if when := calculate_timeout_when(now, timeout, timeout_ceil_threshold): + return loop.call_at(when, cb) + return None def calculate_timeout_when( loop_time: float, timeout: float, timeout_ceiling_threshold: float, -) -> float: +) -> float | None: """Calculate when to execute a timeout.""" - if timeout is not None and timeout > 0: + if timeout > 0: when = loop_time + timeout if timeout > timeout_ceiling_threshold: when = ceil(when) From abc1d80e1f7dd5466b3fdb1226820639e00eda26 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:48:00 -0500 Subject: [PATCH 11/32] fix merge --- aiohttp/helpers.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 80b956ca714..c3f1f3d5fe7 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -598,12 +598,11 @@ def call_later( loop: asyncio.AbstractEventLoop, timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: - if timeout is None: + if timeout is None or timeout <= 0: return None now = loop.time() - if when := calculate_timeout_when(now, timeout, timeout_ceil_threshold): - return loop.call_at(when, cb) - return None + when = calculate_timeout_when(now, timeout, timeout_ceil_threshold) + return loop.call_at(when, cb) def calculate_timeout_when( @@ -612,12 +611,10 @@ def calculate_timeout_when( timeout_ceiling_threshold: float, ) -> float | None: """Calculate when to execute a timeout.""" - if timeout > 0: - when = loop_time + timeout - if timeout > timeout_ceiling_threshold: - when = ceil(when) - return when - return None + when = loop_time + timeout + if timeout > timeout_ceiling_threshold: + when = ceil(when) + return when class TimeoutHandle: From 5cb41f3745a1255b589962b2b06f2c6e0ec55487 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 07:50:14 -0500 Subject: [PATCH 12/32] fix merge --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index c3f1f3d5fe7..4ffb203d13b 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -609,7 +609,7 @@ def calculate_timeout_when( loop_time: float, timeout: float, timeout_ceiling_threshold: float, -) -> float | None: +) -> float: """Calculate when to execute a timeout.""" when = loop_time + timeout if timeout > timeout_ceiling_threshold: From 179b0d5b30cac5ee1713324ff17129673679a676 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 19:19:37 -0500 Subject: [PATCH 13/32] add coverage for pong after many messages --- tests/test_client_ws_functional.py | 32 ++++++++++++++++++++++++++ tests/test_web_websocket_functional.py | 27 ++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 6830f9131b4..bab006fda5b 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -661,6 +661,7 @@ async def handler(request): async def test_heartbeat_no_pong(aiohttp_client: Any) -> None: + """Test that the connection is closed if no pong is received without sending messages.""" ping_received = False async def handler(request): @@ -685,6 +686,37 @@ async def handler(request): assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE +async def test_heartbeat_no_pong_after_many_messages(aiohttp_client: Any) -> None: + """Test that the connection is closed if no pong is received after many messages.""" + ping_received = False + + async def handler(request): + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + await ws.prepare(request) + for _ in range(10): + await ws.send_str("test") + msg = await ws.receive() + ping_received = msg.type is aiohttp.WSMsgType.PING + await ws.receive() + return ws + + app = web.Application() + app.router.add_route("GET", "/", handler) + + client = await aiohttp_client(app) + resp = await client.ws_connect("/", heartbeat=0.1) + + for _ in range(10): + test_msg = await resp.receive() + assert test_msg.data == "test" + # Connection should be closed roughly after 1.5x heartbeat. + + await asyncio.sleep(0.2) + assert ping_received + assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE + + async def test_heartbeat_no_pong_concurrent_receive(aiohttp_client: Any) -> None: ping_received = False diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index 7d990294840..b4ec0591891 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -715,6 +715,33 @@ async def handler(request): await ws.close() +async def test_heartbeat_no_pong_many_messages(loop: Any, aiohttp_client: Any) -> None: + """Test no pong after sending many messages.""" + + async def handler(request): + ws = web.WebSocketResponse(heartbeat=0.05) + await ws.prepare(request) + for _ in range(10): + await ws.send_str("test") + + await ws.receive() + return ws + + app = web.Application() + app.router.add_get("/", handler) + + client = await aiohttp_client(app) + ws = await client.ws_connect("/", autoping=False) + for _ in range(10): + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.TEXT + assert msg.data == "test" + + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.PING + await ws.close() + + async def test_server_ws_async_for(loop: Any, aiohttp_server: Any) -> None: closed = loop.create_future() From fb71f9373d007c07b920170450a30ead548e9136 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 19:23:55 -0500 Subject: [PATCH 14/32] changelog --- CHANGES/8608.misc.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 CHANGES/8608.misc.rst diff --git a/CHANGES/8608.misc.rst b/CHANGES/8608.misc.rst new file mode 100644 index 00000000000..ebbe1f9612e --- /dev/null +++ b/CHANGES/8608.misc.rst @@ -0,0 +1,3 @@ +Improved websocket performance when messages are sent or received frequently -- by :user:`bdraco`. + +The websocket keep-alive scheduling algorithm has been improved to reduce the ``asyncio`` scheduling overhead by reducing the number of ``asyncio.TimerHandle``s being created and cancelled. From 0c84a8bccf7dcc7cbe2da6309ea802ffe82569da Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 19:25:04 -0500 Subject: [PATCH 15/32] Update CHANGES/8608.misc.rst --- CHANGES/8608.misc.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES/8608.misc.rst b/CHANGES/8608.misc.rst index ebbe1f9612e..4defba27bdb 100644 --- a/CHANGES/8608.misc.rst +++ b/CHANGES/8608.misc.rst @@ -1,3 +1,3 @@ Improved websocket performance when messages are sent or received frequently -- by :user:`bdraco`. -The websocket keep-alive scheduling algorithm has been improved to reduce the ``asyncio`` scheduling overhead by reducing the number of ``asyncio.TimerHandle``s being created and cancelled. +The WebSocket heartbeat scheduling algorithm has been improved to reduce the ``asyncio`` scheduling overhead by reducing the number of ``asyncio.TimerHandle``s being created and cancelled. From 02bd5c65a749659620b6c35610976d2a3aa37244 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 19:27:09 -0500 Subject: [PATCH 16/32] Update CHANGES/8608.misc.rst --- CHANGES/8608.misc.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES/8608.misc.rst b/CHANGES/8608.misc.rst index 4defba27bdb..76e845bf997 100644 --- a/CHANGES/8608.misc.rst +++ b/CHANGES/8608.misc.rst @@ -1,3 +1,3 @@ Improved websocket performance when messages are sent or received frequently -- by :user:`bdraco`. -The WebSocket heartbeat scheduling algorithm has been improved to reduce the ``asyncio`` scheduling overhead by reducing the number of ``asyncio.TimerHandle``s being created and cancelled. +The WebSocket heartbeat scheduling algorithm was improved to reduce the ``asyncio`` scheduling overhead by decreasing the number of ``asyncio.TimerHandle`` creations and cancellations. From 53f1b469e4a8ddaf1b925c08379ef576b01d31ff Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 20:40:21 -0500 Subject: [PATCH 17/32] Update tests/test_client_ws_functional.py --- tests/test_client_ws_functional.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index bab006fda5b..9cd928391c8 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -699,7 +699,6 @@ async def handler(request): msg = await ws.receive() ping_received = msg.type is aiohttp.WSMsgType.PING await ws.receive() - return ws app = web.Application() app.router.add_route("GET", "/", handler) From 1767cecd44a6d8ff5330a37791a6742c48097bb5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 5 Aug 2024 20:54:35 -0500 Subject: [PATCH 18/32] more coverage --- tests/test_web_websocket_functional.py | 32 +++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index b4ec0591891..af7addf29b9 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -715,7 +715,9 @@ async def handler(request): await ws.close() -async def test_heartbeat_no_pong_many_messages(loop: Any, aiohttp_client: Any) -> None: +async def test_heartbeat_no_pong_send_many_messages( + loop: Any, aiohttp_client: Any +) -> None: """Test no pong after sending many messages.""" async def handler(request): @@ -742,6 +744,34 @@ async def handler(request): await ws.close() +async def test_heartbeat_no_pong_receive_many_messages( + loop: Any, aiohttp_client: Any +) -> None: + """Test no pong after receiving many messages.""" + + async def handler(request): + ws = web.WebSocketResponse(heartbeat=0.05) + await ws.prepare(request) + for _ in range(10): + server_msg = await ws.receive() + assert server_msg.type is aiohttp.WSMsgType.TEXT + + await ws.receive() + return ws + + app = web.Application() + app.router.add_get("/", handler) + + client = await aiohttp_client(app) + ws = await client.ws_connect("/", autoping=False) + for _ in range(10): + await ws.send_str("test") + + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.PING + await ws.close() + + async def test_server_ws_async_for(loop: Any, aiohttp_server: Any) -> None: closed = loop.create_future() From 84ae4b34d0608282f1946dc21102fa7d538233f4 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 08:58:17 -0500 Subject: [PATCH 19/32] ensure heartbeat timer is always canceled on close --- aiohttp/client_ws.py | 17 +++++++++++------ aiohttp/web_ws.py | 14 ++++++++++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 8a8f3d131fd..bbe854ca5bc 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -119,8 +119,6 @@ def _reset_heartbeat(self) -> None: def _send_heartbeat(self) -> None: self._heartbeat_cb = None - if self._heartbeat is None or self._closed: - return loop = self._loop now = loop.time() if now < self._heartbeat_when: @@ -145,7 +143,7 @@ def _send_heartbeat(self) -> None: def _pong_not_received(self) -> None: if not self._closed: - self._closed = True + self._set_closed() self._close_code = WSCloseCode.ABNORMAL_CLOSURE self._exception = ServerTimeoutError() self._response.close() @@ -154,6 +152,14 @@ def _pong_not_received(self) -> None: WSMessage(WSMsgType.ERROR, self._exception, None) ) + def _set_closed(self) -> None: + """Set the connection to closed. + + Cancel any heartbeat timers and set the closed flag. + """ + self._closed = True + self._cancel_heartbeat() + @property def closed(self) -> bool: return self._closed @@ -223,8 +229,7 @@ async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bo await self._close_wait if not self._closed: - self._cancel_heartbeat() - self._closed = True + self._set_closed() try: await self._writer.close(code, message) except asyncio.CancelledError: @@ -293,7 +298,7 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage: await self.close() return WSMessage(WSMsgType.CLOSED, None, None) except ClientError: - self._closed = True + self._set_closed() self._close_code = WSCloseCode.ABNORMAL_CLOSURE return WS_CLOSED_MESSAGE except WebSocketError as exc: diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 5fb69e32ad0..d9c5fff2587 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -155,8 +155,6 @@ def _reset_heartbeat(self) -> None: def _send_heartbeat(self) -> None: self._heartbeat_cb = None - if self._heartbeat is None or self._closed: - return assert self._loop is not None and self._writer is not None loop = self._loop now = loop.time() @@ -182,10 +180,18 @@ def _send_heartbeat(self) -> None: def _pong_not_received(self) -> None: if self._req is not None and self._req.transport is not None: - self._closed = True + self._set_closed() self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) self._exception = asyncio.TimeoutError() + def _set_closed(self) -> None: + """Set the connection to closed. + + Cancel any heartbeat timers and set the closed flag. + """ + self._closed = True + self._cancel_heartbeat() + async def prepare(self, request: BaseRequest) -> AbstractStreamWriter: # make pre-check to don't hide it by do_handshake() exceptions if self._payload_writer is not None: @@ -425,7 +431,7 @@ async def close( if self._closed: return False - self._closed = True + self._set_closed() try: await self._writer.close(code, message) writer = self._payload_writer From 40945e7c9ff2b085a74c4bb50d68563b2cdded95 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:03:24 -0500 Subject: [PATCH 20/32] cleanup assertions --- aiohttp/client_ws.py | 2 +- aiohttp/web_ws.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index bbe854ca5bc..1e8286b22e8 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -100,8 +100,8 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is None: return self._cancel_pong_response_cb() - assert self._loop is not None loop = self._loop + assert loop is not None conn = self._conn timeout_ceil_threshold = ( conn._connector._timeout_ceil_threshold if conn is not None else 5 diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index d9c5fff2587..44b062275a4 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -140,8 +140,8 @@ def _reset_heartbeat(self) -> None: timeout_ceil_threshold = ( req._protocol._timeout_ceil_threshold if req is not None else 5 ) - assert self._loop is not None loop = self._loop + assert loop is not None now = loop.time() when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold) self._heartbeat_when = when @@ -155,8 +155,8 @@ def _reset_heartbeat(self) -> None: def _send_heartbeat(self) -> None: self._heartbeat_cb = None - assert self._loop is not None and self._writer is not None loop = self._loop + assert loop is not None and self._writer is not None now = loop.time() if now < self._heartbeat_when: # Heartbeat fired too early, reschedule From e622cde79265bb51e5c0272a059f490fddcb1b43 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:45:49 -0500 Subject: [PATCH 21/32] remove unreachable --- aiohttp/client_ws.py | 22 +++++++++++++--------- aiohttp/web_ws.py | 2 ++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 1e8286b22e8..a150d8a77a7 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -5,7 +5,7 @@ import sys from typing import Any, Final, Optional, cast -from .client_exceptions import ClientError, ServerTimeoutError +from .client_exceptions import ServerTimeoutError from .client_reqrep import ClientResponse from .helpers import calculate_timeout_when, set_result from .http import ( @@ -160,6 +160,14 @@ def _set_closed(self) -> None: self._closed = True self._cancel_heartbeat() + def _set_closing(self) -> None: + """Set the connection to closing. + + Cancel any heartbeat timers and set the closing flag. + """ + self._closing = True + self._cancel_heartbeat() + @property def closed(self) -> bool: return self._closed @@ -224,7 +232,7 @@ async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bo if self._waiting and not self._closing: assert self._loop is not None self._close_wait = self._loop.create_future() - self._closing = True + self._set_closing() self._reader.feed_data(WS_CLOSING_MESSAGE) await self._close_wait @@ -297,29 +305,25 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage: self._close_code = WSCloseCode.OK await self.close() return WSMessage(WSMsgType.CLOSED, None, None) - except ClientError: - self._set_closed() - self._close_code = WSCloseCode.ABNORMAL_CLOSURE - return WS_CLOSED_MESSAGE except WebSocketError as exc: self._close_code = exc.code await self.close(code=exc.code) return WSMessage(WSMsgType.ERROR, exc, None) except Exception as exc: self._exception = exc - self._closing = True + self._set_closing() self._close_code = WSCloseCode.ABNORMAL_CLOSURE await self.close() return WSMessage(WSMsgType.ERROR, exc, None) if msg.type is WSMsgType.CLOSE: - self._closing = True + self._set_closing() self._close_code = msg.data # Could be closed elsewhere while awaiting reader if not self._closed and self._autoclose: # type: ignore[redundant-expr] await self.close() elif msg.type is WSMsgType.CLOSING: - self._closing = True + self._set_closing() elif msg.type is WSMsgType.PING and self._autoping: await self.pong(msg.data) continue diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 44b062275a4..7cc206b4ac1 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -475,6 +475,7 @@ def _set_closing(self, code: WSCloseCode) -> None: """Set the close code and mark the connection as closing.""" self._closing = True self._close_code = code + self._cancel_heartbeat() def _set_code_close_transport(self, code: WSCloseCode) -> None: """Set the close code and close the transport.""" @@ -587,5 +588,6 @@ def _cancel(self, exc: BaseException) -> None: # web_protocol calls this from connection_lost # or when the server is shutting down. self._closing = True + self._cancel_heartbeat() if self._reader is not None: set_exception(self._reader, exc) From c29f5c600253d554715fe69e6f909f36f13c2ae3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:48:50 -0500 Subject: [PATCH 22/32] restore --- aiohttp/client_ws.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index a150d8a77a7..5008c0bc336 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -5,7 +5,7 @@ import sys from typing import Any, Final, Optional, cast -from .client_exceptions import ServerTimeoutError +from .client_exceptions import ClientError, ServerTimeoutError from .client_reqrep import ClientResponse from .helpers import calculate_timeout_when, set_result from .http import ( @@ -305,6 +305,11 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage: self._close_code = WSCloseCode.OK await self.close() return WSMessage(WSMsgType.CLOSED, None, None) + except ClientError: + # Likely ServerDisconnectedError when connection is lost + self._set_closed() + self._close_code = WSCloseCode.ABNORMAL_CLOSURE + return WS_CLOSED_MESSAGE except WebSocketError as exc: self._close_code = exc.code await self.close(code=exc.code) From d9f69788e7f1cb985765eda0531fc8217bc391bb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:52:20 -0500 Subject: [PATCH 23/32] add coverage for connection lost --- tests/test_client_ws.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_client_ws.py b/tests/test_client_ws.py index 81ad41c2347..e18148548c2 100644 --- a/tests/test_client_ws.py +++ b/tests/test_client_ws.py @@ -10,6 +10,7 @@ import aiohttp from aiohttp import client, hdrs +from aiohttp.client_exceptions import ServerDisconnectedError from aiohttp.client_ws import ClientWSTimeout from aiohttp.http import WS_KEY from aiohttp.streams import EofStream @@ -408,6 +409,37 @@ async def test_close_eofstream(loop: Any, ws_key: Any, key_data: Any) -> None: await session.close() +async def test_close_connection_lost(loop: Any, ws_key: Any, key_data: Any) -> None: + resp = mock.Mock() + resp.status = 101 + resp.headers = { + hdrs.UPGRADE: "websocket", + hdrs.CONNECTION: "upgrade", + hdrs.SEC_WEBSOCKET_ACCEPT: ws_key, + } + resp.connection.protocol.read_timeout = None + with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter: + with mock.patch("aiohttp.client.os") as m_os: + with mock.patch("aiohttp.client.ClientSession.request") as m_req: + m_os.urandom.return_value = key_data + m_req.return_value = loop.create_future() + m_req.return_value.set_result(resp) + WebSocketWriter.return_value = mock.Mock() + + session = aiohttp.ClientSession() + resp = await session.ws_connect("http://test.org") + assert not resp.closed + + exc = ServerDisconnectedError() + resp._reader.set_exception(exc) + + msg = await resp.receive() + assert msg.type is aiohttp.WSMsgType.CLOSED + assert resp.closed + + await session.close() + + async def test_close_exc(loop: Any, ws_key: Any, key_data: Any) -> None: resp = mock.Mock() resp.status = 101 From 77eee1a6abd25e2d6cf18e4d6f551c2c87710089 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:53:34 -0500 Subject: [PATCH 24/32] combine with --- tests/test_client_ws.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/test_client_ws.py b/tests/test_client_ws.py index e18148548c2..7699e878601 100644 --- a/tests/test_client_ws.py +++ b/tests/test_client_ws.py @@ -418,26 +418,26 @@ async def test_close_connection_lost(loop: Any, ws_key: Any, key_data: Any) -> N hdrs.SEC_WEBSOCKET_ACCEPT: ws_key, } resp.connection.protocol.read_timeout = None - with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter: - with mock.patch("aiohttp.client.os") as m_os: - with mock.patch("aiohttp.client.ClientSession.request") as m_req: - m_os.urandom.return_value = key_data - m_req.return_value = loop.create_future() - m_req.return_value.set_result(resp) - WebSocketWriter.return_value = mock.Mock() + with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter, mock.patch( + "aiohttp.client.os" + ) as m_os, mock.patch("aiohttp.client.ClientSession.request") as m_req: + m_os.urandom.return_value = key_data + m_req.return_value = loop.create_future() + m_req.return_value.set_result(resp) + WebSocketWriter.return_value = mock.Mock() - session = aiohttp.ClientSession() - resp = await session.ws_connect("http://test.org") - assert not resp.closed + session = aiohttp.ClientSession() + resp = await session.ws_connect("http://test.org") + assert not resp.closed - exc = ServerDisconnectedError() - resp._reader.set_exception(exc) + exc = ServerDisconnectedError() + resp._reader.set_exception(exc) - msg = await resp.receive() - assert msg.type is aiohttp.WSMsgType.CLOSED - assert resp.closed + msg = await resp.receive() + assert msg.type is aiohttp.WSMsgType.CLOSED + assert resp.closed - await session.close() + await session.close() async def test_close_exc(loop: Any, ws_key: Any, key_data: Any) -> None: From be4441ab54d087949669becd1905e3696ff4990e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:54:43 -0500 Subject: [PATCH 25/32] cleanup tests --- tests/test_client_ws.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_client_ws.py b/tests/test_client_ws.py index 7699e878601..565a262cb9d 100644 --- a/tests/test_client_ws.py +++ b/tests/test_client_ws.py @@ -410,7 +410,7 @@ async def test_close_eofstream(loop: Any, ws_key: Any, key_data: Any) -> None: async def test_close_connection_lost(loop: Any, ws_key: Any, key_data: Any) -> None: - resp = mock.Mock() + resp = mock.Mock(spec_set=client.ClientResponse) resp.status = 101 resp.headers = { hdrs.UPGRADE: "websocket", @@ -418,13 +418,12 @@ async def test_close_connection_lost(loop: Any, ws_key: Any, key_data: Any) -> N hdrs.SEC_WEBSOCKET_ACCEPT: ws_key, } resp.connection.protocol.read_timeout = None - with mock.patch("aiohttp.client.WebSocketWriter") as WebSocketWriter, mock.patch( + with mock.patch("aiohttp.client.WebSocketWriter"), mock.patch( "aiohttp.client.os" ) as m_os, mock.patch("aiohttp.client.ClientSession.request") as m_req: m_os.urandom.return_value = key_data m_req.return_value = loop.create_future() m_req.return_value.set_result(resp) - WebSocketWriter.return_value = mock.Mock() session = aiohttp.ClientSession() resp = await session.ws_connect("http://test.org") From 286a29b81ccc40354c3147ae9db3ab63097f484e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 09:55:51 -0500 Subject: [PATCH 26/32] cleanup tests --- tests/test_client_ws.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_client_ws.py b/tests/test_client_ws.py index 565a262cb9d..1160a5524f2 100644 --- a/tests/test_client_ws.py +++ b/tests/test_client_ws.py @@ -409,7 +409,10 @@ async def test_close_eofstream(loop: Any, ws_key: Any, key_data: Any) -> None: await session.close() -async def test_close_connection_lost(loop: Any, ws_key: Any, key_data: Any) -> None: +async def test_close_connection_lost( + loop: asyncio.AbstractEventLoop, ws_key: Any, key_data: Any +) -> None: + """Test the websocket client handles the connection being closed out from under it.""" resp = mock.Mock(spec_set=client.ClientResponse) resp.status = 101 resp.headers = { From 6e7fa77e783a41d89d98507d315607eba9fb7290 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 6 Aug 2024 11:37:30 -0500 Subject: [PATCH 27/32] Update aiohttp/helpers.py Co-authored-by: Sam Bull --- aiohttp/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 4ffb203d13b..496f3ad1d05 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -613,7 +613,7 @@ def calculate_timeout_when( """Calculate when to execute a timeout.""" when = loop_time + timeout if timeout > timeout_ceiling_threshold: - when = ceil(when) + return ceil(when) return when From f533e86112e860f3bc474c9b17d904aca56d3df8 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 11:02:27 -0500 Subject: [PATCH 28/32] lint --- tests/test_client_ws_functional.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 28bdf7154e1..c466476ca5d 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -693,7 +693,7 @@ async def test_heartbeat_no_pong_after_many_messages( """Test that the connection is closed if no pong is received after many messages.""" ping_received = False - async def handler(request): + async def handler(request: web.Request) -> NoReturn: nonlocal ping_received ws = web.WebSocketResponse(autoping=False) await ws.prepare(request) From 5eec564f1fd2efca5aa25661a42bf1d4952999d6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 11:09:13 -0500 Subject: [PATCH 29/32] fix type --- tests/test_client_ws_functional.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index c466476ca5d..667bcf3a4cf 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -702,6 +702,7 @@ async def handler(request: web.Request) -> NoReturn: msg = await ws.receive() ping_received = msg.type is aiohttp.WSMsgType.PING await ws.receive() + assert False app = web.Application() app.router.add_route("GET", "/", handler) From 5e7230bb713f47cf3ea86bd1a37bc59faefd0d50 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 11:09:41 -0500 Subject: [PATCH 30/32] fix type --- tests/test_client_ws.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_client_ws.py b/tests/test_client_ws.py index da143744afc..7521bd24ac5 100644 --- a/tests/test_client_ws.py +++ b/tests/test_client_ws.py @@ -436,20 +436,20 @@ async def test_close_connection_lost( loop: asyncio.AbstractEventLoop, ws_key: bytes, key_data: bytes ) -> None: """Test the websocket client handles the connection being closed out from under it.""" - resp = mock.Mock(spec_set=client.ClientResponse) - resp.status = 101 - resp.headers = { + mresp = mock.Mock(spec_set=client.ClientResponse) + mresp.status = 101 + mresp.headers = { hdrs.UPGRADE: "websocket", hdrs.CONNECTION: "upgrade", hdrs.SEC_WEBSOCKET_ACCEPT: ws_key, } - resp.connection.protocol.read_timeout = None + mresp.connection.protocol.read_timeout = None with mock.patch("aiohttp.client.WebSocketWriter"), mock.patch( "aiohttp.client.os" ) as m_os, mock.patch("aiohttp.client.ClientSession.request") as m_req: m_os.urandom.return_value = key_data m_req.return_value = loop.create_future() - m_req.return_value.set_result(resp) + m_req.return_value.set_result(mresp) session = aiohttp.ClientSession() resp = await session.ws_connect("http://test.org") From 59df3dbcab3ddd4afe617c1d3f5491c0f6f04f7d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 13:04:01 -0500 Subject: [PATCH 31/32] test the sending many in handler case as well --- tests/test_client_ws_functional.py | 38 ++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 667bcf3a4cf..37537fbcfd0 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -687,10 +687,10 @@ async def handler(request: web.Request) -> NoReturn: assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE -async def test_heartbeat_no_pong_after_many_messages( +async def test_heartbeat_no_pong_after_receive_many_messages( aiohttp_client: AiohttpClient, ) -> None: - """Test that the connection is closed if no pong is received after many messages.""" + """Test that the connection is closed if no pong is received after receiving many messages.""" ping_received = False async def handler(request: web.Request) -> NoReturn: @@ -720,6 +720,40 @@ async def handler(request: web.Request) -> NoReturn: assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE +async def test_heartbeat_no_pong_after_send_many_messages( + aiohttp_client: AiohttpClient, +) -> None: + """Test that the connection is closed if no pong is received after sending many messages.""" + ping_received = False + + async def handler(request: web.Request) -> NoReturn: + nonlocal ping_received + ws = web.WebSocketResponse(autoping=False) + await ws.prepare(request) + for _ in range(10): + msg = await ws.receive() + assert msg.data == "test" + assert msg.type is aiohttp.WSMsgType.TEXT + msg = await ws.receive() + ping_received = msg.type is aiohttp.WSMsgType.PING + await ws.receive() + assert False + + app = web.Application() + app.router.add_route("GET", "/", handler) + + client = await aiohttp_client(app) + resp = await client.ws_connect("/", heartbeat=0.1) + + for _ in range(10): + await resp.send_str("test") + + # Connection should be closed roughly after 1.5x heartbeat. + await asyncio.sleep(0.2) + assert ping_received + assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE + + async def test_heartbeat_no_pong_concurrent_receive( aiohttp_client: AiohttpClient, ) -> None: From f9c602766feb6be1093655560b0a72f095c2d806 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 7 Aug 2024 16:42:40 -0500 Subject: [PATCH 32/32] adjust test to make sure we reschedule once --- tests/test_client_ws_functional.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 37537fbcfd0..81bc6e6c7c8 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -697,7 +697,10 @@ async def handler(request: web.Request) -> NoReturn: nonlocal ping_received ws = web.WebSocketResponse(autoping=False) await ws.prepare(request) - for _ in range(10): + for _ in range(5): + await ws.send_str("test") + await asyncio.sleep(0.05) + for _ in range(5): await ws.send_str("test") msg = await ws.receive() ping_received = msg.type is aiohttp.WSMsgType.PING @@ -745,9 +748,11 @@ async def handler(request: web.Request) -> NoReturn: client = await aiohttp_client(app) resp = await client.ws_connect("/", heartbeat=0.1) - for _ in range(10): + for _ in range(5): + await resp.send_str("test") + await asyncio.sleep(0.05) + for _ in range(5): await resp.send_str("test") - # Connection should be closed roughly after 1.5x heartbeat. await asyncio.sleep(0.2) assert ping_received