Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timer handle churn in websocket heartbeat #8608

Merged
merged 35 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c48ad39
Fix timer handle churn in websocket heartbeat
bdraco Aug 5, 2024
ab09f18
preen
bdraco Aug 5, 2024
f59f732
preen
bdraco Aug 5, 2024
4ea43f9
fix merge
bdraco Aug 5, 2024
b1ed054
fix merge
bdraco Aug 5, 2024
807e334
fix merge
bdraco Aug 5, 2024
c3ca179
fix merge
bdraco Aug 5, 2024
8d0756b
fix merge
bdraco Aug 5, 2024
3321b93
fix merge
bdraco Aug 5, 2024
9a92850
fix merge
bdraco Aug 5, 2024
abc1d80
fix merge
bdraco Aug 5, 2024
5cb41f3
fix merge
bdraco Aug 5, 2024
179b0d5
add coverage for pong after many messages
bdraco Aug 6, 2024
fb71f93
changelog
bdraco Aug 6, 2024
0c84a8b
Update CHANGES/8608.misc.rst
bdraco Aug 6, 2024
02bd5c6
Update CHANGES/8608.misc.rst
bdraco Aug 6, 2024
53f1b46
Update tests/test_client_ws_functional.py
bdraco Aug 6, 2024
1767cec
more coverage
bdraco Aug 6, 2024
84ae4b3
ensure heartbeat timer is always canceled on close
bdraco Aug 6, 2024
e4747e5
Merge branch 'master' into timer_handle_churn
bdraco Aug 6, 2024
40945e7
cleanup assertions
bdraco Aug 6, 2024
e622cde
remove unreachable
bdraco Aug 6, 2024
c29f5c6
restore
bdraco Aug 6, 2024
d9f6978
add coverage for connection lost
bdraco Aug 6, 2024
77eee1a
combine with
bdraco Aug 6, 2024
be4441a
cleanup tests
bdraco Aug 6, 2024
286a29b
cleanup tests
bdraco Aug 6, 2024
6e7fa77
Update aiohttp/helpers.py
bdraco Aug 6, 2024
7a5f09e
Merge branch 'master' into timer_handle_churn
bdraco Aug 7, 2024
f533e86
lint
bdraco Aug 7, 2024
5eec564
fix type
bdraco Aug 7, 2024
5e7230b
fix type
bdraco Aug 7, 2024
59df3db
test the sending many in handler case as well
bdraco Aug 7, 2024
7807ce9
Merge branch 'master' into timer_handle_churn
bdraco Aug 7, 2024
f9c6027
adjust test to make sure we reschedule once
bdraco Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/8608.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Improved websocket performance when messages are sent or received frequently -- by :user:`bdraco`.

The websocket keep-alive scheduling algorithm has been improved to reduce the ``asyncio`` scheduling overhead by reducing the number of ``asyncio.TimerHandle``s being created and cancelled.
bdraco marked this conversation as resolved.
Show resolved Hide resolved
85 changes: 50 additions & 35 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .client_exceptions import ClientError, ServerTimeoutError
from .client_reqrep import ClientResponse
from .helpers import call_later, set_result
from .helpers import calculate_timeout_when, set_result
from .http import (
WS_CLOSED_MESSAGE,
WS_CLOSING_MESSAGE,
Expand Down Expand Up @@ -72,6 +72,7 @@ def __init__(
self._autoping = autoping
self._heartbeat = heartbeat
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
self._heartbeat_when: float = 0.0
if heartbeat is not None:
self._pong_heartbeat = heartbeat / 2.0
self._pong_response_cb: Optional[asyncio.TimerHandle] = None
Expand All @@ -85,48 +86,62 @@ def __init__(
self._reset_heartbeat()

def _cancel_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

self._cancel_pong_response_cb()
if self._heartbeat_cb is not None:
self._heartbeat_cb.cancel()
self._heartbeat_cb = None

def _cancel_pong_response_cb(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

def _reset_heartbeat(self) -> None:
self._cancel_heartbeat()

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=(
self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5
),
)
if self._heartbeat is None:
return
self._cancel_pong_response_cb()
assert self._loop is not None
loop = self._loop
conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn is not None else 5
)
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when
if self._heartbeat_cb is None:
# We do not cancel the previous heartbeat_cb here because
# it generates a significant amount of TimerHandle churn
# which causes asyncio to rebuild the heap frequently.
# Instead _send_heartbeat() will reschedule the next
# heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)

def _send_heartbeat(self) -> None:
if self._heartbeat is not None and not self._closed:
# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=(
self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5
),
self._heartbeat_cb = None
if self._heartbeat is None or self._closed:
return
bdraco marked this conversation as resolved.
Show resolved Hide resolved
loop = self._loop
now = loop.time()
if now < self._heartbeat_when:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
# Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._heartbeat_when, self._send_heartbeat
)
return

# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn is not None else 5
)
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
self._cancel_pong_response_cb()
self._pong_response_cb = loop.call_at(when, self._pong_not_received)

def _pong_not_received(self) -> None:
if not self._closed:
Expand Down
23 changes: 17 additions & 6 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,23 @@ def call_later(
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout > timeout_ceil_threshold:
when = ceil(when)
return loop.call_at(when, cb)
return None
if timeout is None or timeout <= 0:
return None
now = loop.time()
when = calculate_timeout_when(now, timeout, timeout_ceil_threshold)
return loop.call_at(when, cb)


def calculate_timeout_when(
loop_time: float,
timeout: float,
timeout_ceiling_threshold: float,
) -> float:
"""Calculate when to execute a timeout."""
when = loop_time + timeout
if timeout > timeout_ceiling_threshold:
when = ceil(when)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
return when


class TimeoutHandle:
Expand Down
89 changes: 52 additions & 37 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from . import hdrs
from .abc import AbstractStreamWriter
from .helpers import call_later, set_exception, set_result
from .helpers import calculate_timeout_when, set_exception, set_result
from .http import (
WS_CLOSED_MESSAGE,
WS_CLOSING_MESSAGE,
Expand Down Expand Up @@ -74,6 +74,7 @@ class WebSocketResponse(StreamResponse):
"_autoclose",
"_autoping",
"_heartbeat",
"_heartbeat_when",
"_heartbeat_cb",
"_pong_heartbeat",
"_pong_response_cb",
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(
self._autoclose = autoclose
self._autoping = autoping
self._heartbeat = heartbeat
self._heartbeat_when = 0.0
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
if heartbeat is not None:
self._pong_heartbeat = heartbeat / 2.0
Expand All @@ -120,50 +122,63 @@ def __init__(
self._max_msg_size = max_msg_size

def _cancel_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

self._cancel_pong_response_cb()
if self._heartbeat_cb is not None:
self._heartbeat_cb.cancel()
self._heartbeat_cb = None

def _reset_heartbeat(self) -> None:
self._cancel_heartbeat()
def _cancel_pong_response_cb(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

if self._heartbeat is not None:
assert self._loop is not None
self._heartbeat_cb = call_later(
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=(
self._req._protocol._timeout_ceil_threshold
if self._req is not None
else 5
),
)
def _reset_heartbeat(self) -> None:
if self._heartbeat is None:
return
self._cancel_pong_response_cb()
req = self._req
timeout_ceil_threshold = (
req._protocol._timeout_ceil_threshold if req is not None else 5
)
assert self._loop is not None
loop = self._loop
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when
if self._heartbeat_cb is None:
# We do not cancel the previous heartbeat_cb here because
# it generates a significant amount of TimerHandle churn
# which causes asyncio to rebuild the heap frequently.
# Instead _send_heartbeat() will reschedule the next
# heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)

def _send_heartbeat(self) -> None:
if self._heartbeat is not None and not self._closed:
assert self._loop is not None and self._writer is not None
# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=(
self._req._protocol._timeout_ceil_threshold
if self._req is not None
else 5
),
self._heartbeat_cb = None
if self._heartbeat is None or self._closed:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
return
assert self._loop is not None and self._writer is not None
loop = self._loop
now = loop.time()
if now < self._heartbeat_when:
# Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(
self._heartbeat_when, self._send_heartbeat
)
return

# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

req = self._req
timeout_ceil_threshold = (
req._protocol._timeout_ceil_threshold if req is not None else 5
)
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
self._cancel_pong_response_cb()
self._pong_response_cb = loop.call_at(when, self._pong_not_received)

def _pong_not_received(self) -> None:
if self._req is not None and self._req.transport is not None:
Expand Down
32 changes: 32 additions & 0 deletions tests/test_client_ws_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ async def handler(request):


async def test_heartbeat_no_pong(aiohttp_client: Any) -> None:
"""Test that the connection is closed if no pong is received without sending messages."""
ping_received = False

async def handler(request):
Expand All @@ -685,6 +686,37 @@ async def handler(request):
assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE


async def test_heartbeat_no_pong_after_many_messages(aiohttp_client: Any) -> None:
"""Test that the connection is closed if no pong is received after many messages."""
ping_received = False

async def handler(request):
nonlocal ping_received
ws = web.WebSocketResponse(autoping=False)
await ws.prepare(request)
for _ in range(10):
await ws.send_str("test")
msg = await ws.receive()
ping_received = msg.type is aiohttp.WSMsgType.PING
await ws.receive()
return ws
bdraco marked this conversation as resolved.
Show resolved Hide resolved

app = web.Application()
app.router.add_route("GET", "/", handler)

client = await aiohttp_client(app)
resp = await client.ws_connect("/", heartbeat=0.1)

for _ in range(10):
test_msg = await resp.receive()
assert test_msg.data == "test"
# Connection should be closed roughly after 1.5x heartbeat.

await asyncio.sleep(0.2)
assert ping_received
assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE


async def test_heartbeat_no_pong_concurrent_receive(aiohttp_client: Any) -> None:
ping_received = False

Expand Down
27 changes: 27 additions & 0 deletions tests/test_web_websocket_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,33 @@ async def handler(request):
await ws.close()


async def test_heartbeat_no_pong_many_messages(loop: Any, aiohttp_client: Any) -> None:
"""Test no pong after sending many messages."""

async def handler(request):
ws = web.WebSocketResponse(heartbeat=0.05)
await ws.prepare(request)
for _ in range(10):
await ws.send_str("test")

await ws.receive()
return ws

app = web.Application()
app.router.add_get("/", handler)

client = await aiohttp_client(app)
ws = await client.ws_connect("/", autoping=False)
for _ in range(10):
msg = await ws.receive()
assert msg.type is aiohttp.WSMsgType.TEXT
assert msg.data == "test"

msg = await ws.receive()
assert msg.type is aiohttp.WSMsgType.PING
await ws.close()


async def test_server_ws_async_for(loop: Any, aiohttp_server: Any) -> None:
closed = loop.create_future()

Expand Down
Loading