Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timer handle churn in websocket heartbeat #8608

Merged
merged 35 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c48ad39
Fix timer handle churn in websocket heartbeat
bdraco Aug 5, 2024
ab09f18
preen
bdraco Aug 5, 2024
f59f732
preen
bdraco Aug 5, 2024
4ea43f9
fix merge
bdraco Aug 5, 2024
b1ed054
fix merge
bdraco Aug 5, 2024
807e334
fix merge
bdraco Aug 5, 2024
c3ca179
fix merge
bdraco Aug 5, 2024
8d0756b
fix merge
bdraco Aug 5, 2024
3321b93
fix merge
bdraco Aug 5, 2024
9a92850
fix merge
bdraco Aug 5, 2024
abc1d80
fix merge
bdraco Aug 5, 2024
5cb41f3
fix merge
bdraco Aug 5, 2024
179b0d5
add coverage for pong after many messages
bdraco Aug 6, 2024
fb71f93
changelog
bdraco Aug 6, 2024
0c84a8b
Update CHANGES/8608.misc.rst
bdraco Aug 6, 2024
02bd5c6
Update CHANGES/8608.misc.rst
bdraco Aug 6, 2024
53f1b46
Update tests/test_client_ws_functional.py
bdraco Aug 6, 2024
1767cec
more coverage
bdraco Aug 6, 2024
84ae4b3
ensure heartbeat timer is always canceled on close
bdraco Aug 6, 2024
e4747e5
Merge branch 'master' into timer_handle_churn
bdraco Aug 6, 2024
40945e7
cleanup assertions
bdraco Aug 6, 2024
e622cde
remove unreachable
bdraco Aug 6, 2024
c29f5c6
restore
bdraco Aug 6, 2024
d9f6978
add coverage for connection lost
bdraco Aug 6, 2024
77eee1a
combine with
bdraco Aug 6, 2024
be4441a
cleanup tests
bdraco Aug 6, 2024
286a29b
cleanup tests
bdraco Aug 6, 2024
6e7fa77
Update aiohttp/helpers.py
bdraco Aug 6, 2024
7a5f09e
Merge branch 'master' into timer_handle_churn
bdraco Aug 7, 2024
f533e86
lint
bdraco Aug 7, 2024
5eec564
fix type
bdraco Aug 7, 2024
5e7230b
fix type
bdraco Aug 7, 2024
59df3db
test the sending many in handler case as well
bdraco Aug 7, 2024
7807ce9
Merge branch 'master' into timer_handle_churn
bdraco Aug 7, 2024
f9c6027
adjust test to make sure we reschedule once
bdraco Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 50 additions & 35 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .client_exceptions import ClientError, ServerTimeoutError
from .client_reqrep import ClientResponse
from .helpers import call_later, set_result
from .helpers import calculate_timeout_when, set_result
from .http import (
WS_CLOSED_MESSAGE,
WS_CLOSING_MESSAGE,
Expand Down Expand Up @@ -72,6 +72,7 @@
self._autoping = autoping
self._heartbeat = heartbeat
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
self._heartbeat_when: float = 0.0
if heartbeat is not None:
self._pong_heartbeat = heartbeat / 2.0
self._pong_response_cb: Optional[asyncio.TimerHandle] = None
Expand All @@ -85,48 +86,62 @@
self._reset_heartbeat()

def _cancel_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

self._cancel_pong_response_cb()
if self._heartbeat_cb is not None:
self._heartbeat_cb.cancel()
self._heartbeat_cb = None

def _cancel_pong_response_cb(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

def _reset_heartbeat(self) -> None:
self._cancel_heartbeat()

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=(
self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5
),
)
if self._heartbeat is None:
return
self._cancel_pong_response_cb()
assert self._loop is not None
loop = self._loop
conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn is not None else 5
)
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when
if self._heartbeat_cb is None:
# We do not cancel the previous heartbeat_cb here because
# it generates a significant amount of TimerHandle churn
# which causes asyncio to rebuild the heap frequently.
# Instead _send_heartbeat() will reschedule the next
# heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)

def _send_heartbeat(self) -> None:
if self._heartbeat is not None and not self._closed:
# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=(
self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5
),
self._heartbeat_cb = None
if self._heartbeat is None or self._closed:
return

Check warning on line 123 in aiohttp/client_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/client_ws.py#L123

Added line #L123 was not covered by tests
bdraco marked this conversation as resolved.
Show resolved Hide resolved
loop = self._loop
now = loop.time()
if now < self._heartbeat_when:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
# Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(

Check warning on line 128 in aiohttp/client_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/client_ws.py#L128

Added line #L128 was not covered by tests
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._heartbeat_when, self._send_heartbeat
)
return

Check warning on line 131 in aiohttp/client_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/client_ws.py#L131

Added line #L131 was not covered by tests

# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn is not None else 5
)
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
self._cancel_pong_response_cb()
self._pong_response_cb = loop.call_at(when, self._pong_not_received)

def _pong_not_received(self) -> None:
if not self._closed:
Expand Down
23 changes: 17 additions & 6 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,23 @@ def call_later(
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout > timeout_ceil_threshold:
when = ceil(when)
return loop.call_at(when, cb)
return None
if timeout is None or timeout <= 0:
return None
now = loop.time()
when = calculate_timeout_when(now, timeout, timeout_ceil_threshold)
return loop.call_at(when, cb)


def calculate_timeout_when(
loop_time: float,
timeout: float,
timeout_ceiling_threshold: float,
) -> float:
"""Calculate when to execute a timeout."""
when = loop_time + timeout
if timeout > timeout_ceiling_threshold:
when = ceil(when)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
return when


class TimeoutHandle:
Expand Down
89 changes: 52 additions & 37 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from . import hdrs
from .abc import AbstractStreamWriter
from .helpers import call_later, set_exception, set_result
from .helpers import calculate_timeout_when, set_exception, set_result
from .http import (
WS_CLOSED_MESSAGE,
WS_CLOSING_MESSAGE,
Expand Down Expand Up @@ -74,6 +74,7 @@
"_autoclose",
"_autoping",
"_heartbeat",
"_heartbeat_when",
"_heartbeat_cb",
"_pong_heartbeat",
"_pong_response_cb",
Expand Down Expand Up @@ -112,6 +113,7 @@
self._autoclose = autoclose
self._autoping = autoping
self._heartbeat = heartbeat
self._heartbeat_when = 0.0
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
if heartbeat is not None:
self._pong_heartbeat = heartbeat / 2.0
Expand All @@ -120,50 +122,63 @@
self._max_msg_size = max_msg_size

def _cancel_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

self._cancel_pong_response_cb()
if self._heartbeat_cb is not None:
self._heartbeat_cb.cancel()
self._heartbeat_cb = None

def _reset_heartbeat(self) -> None:
self._cancel_heartbeat()
def _cancel_pong_response_cb(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = None

if self._heartbeat is not None:
assert self._loop is not None
self._heartbeat_cb = call_later(
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=(
self._req._protocol._timeout_ceil_threshold
if self._req is not None
else 5
),
)
def _reset_heartbeat(self) -> None:
if self._heartbeat is None:
return
self._cancel_pong_response_cb()
req = self._req
timeout_ceil_threshold = (
req._protocol._timeout_ceil_threshold if req is not None else 5
)
assert self._loop is not None
loop = self._loop
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when
if self._heartbeat_cb is None:
# We do not cancel the previous heartbeat_cb here because
# it generates a significant amount of TimerHandle churn
# which causes asyncio to rebuild the heap frequently.
# Instead _send_heartbeat() will reschedule the next
# heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)

def _send_heartbeat(self) -> None:
if self._heartbeat is not None and not self._closed:
assert self._loop is not None and self._writer is not None
# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=(
self._req._protocol._timeout_ceil_threshold
if self._req is not None
else 5
),
self._heartbeat_cb = None
if self._heartbeat is None or self._closed:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
return

Check warning on line 159 in aiohttp/web_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/web_ws.py#L159

Added line #L159 was not covered by tests
assert self._loop is not None and self._writer is not None
loop = self._loop
now = loop.time()
if now < self._heartbeat_when:
# Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(

Check warning on line 165 in aiohttp/web_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/web_ws.py#L165

Added line #L165 was not covered by tests
self._heartbeat_when, self._send_heartbeat
)
return

Check warning on line 168 in aiohttp/web_ws.py

View check run for this annotation

Codecov / codecov/patch

aiohttp/web_ws.py#L168

Added line #L168 was not covered by tests

# fire-and-forget a task is not perfect but maybe ok for
# sending ping. Otherwise we need a long-living heartbeat
# task in the class.
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]

req = self._req
timeout_ceil_threshold = (
req._protocol._timeout_ceil_threshold if req is not None else 5
)
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
self._cancel_pong_response_cb()
self._pong_response_cb = loop.call_at(when, self._pong_not_received)

def _pong_not_received(self) -> None:
if self._req is not None and self._req.transport is not None:
Expand Down
Loading