diff --git a/CHANGELOG.md b/CHANGELOG.md index de62ceea..193295fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ The project versioning policy is now explicitly governed by SEMVER. See https:// - Support async cancellations, ensuring that the connection pool is left in a clean state when cancellations occur. (#726) - The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699) +- Add some connection pool events for trace extension. (#702) - Graceful handling of HTTP/2 GoAway frames, with requests being transparently retried on a new connection. (#730) - Add exceptions when a synchronous `trace callback` is passed to an asynchronous request or an asynchronous `trace callback` is passed to a synchronous request. (#717) - Drop Python 3.7 support. (#727) diff --git a/docs/extensions.md b/docs/extensions.md index 2bec844e..145a9204 100644 --- a/docs/extensions.md +++ b/docs/extensions.md @@ -119,6 +119,7 @@ The `event_name` and `info` arguments here will be one of the following: * `{event_type}.{event_name}.started`, `` * `{event_type}.{event_name}.complete`, `{"return_value": <...>}` * `{event_type}.{event_name}.failed`, `{"exception": <...>}` +* `{event_type}.{event_name}`, `` Note that when using the async variant of `httpcore` the handler function passed to `"trace"` must be an `async def ...` function. @@ -130,6 +131,16 @@ The following event types are currently exposed... * `"connection.connect_unix_socket"` * `"connection.start_tls"` +**Connection pool events** + +* `"connection_pool.add_request"` +* `"connection_pool.remove_request"` +* `"connection_pool.add_connection"` +* `"connection_pool.reuse_connection"` +* `"connection_pool.assign_request_to_connection"` +* `"connection_pool.unassign_request_from_connection"` +* `"connection_pool.timeout_waiting_for_connection"` + **HTTP/1.1 events** * `"http11.send_request_headers"` diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 0320c6d8..b24bcd7e 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,3 +1,4 @@ +import logging import ssl import sys import time @@ -9,9 +10,12 @@ from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol from .._models import Origin, Request, Response from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation +from .._trace import atrace from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface +logger = logging.getLogger("httpcore.connection_pool") + class RequestStatus: def __init__(self, request: Request): @@ -162,8 +166,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Reuse an existing connection if one is currently available. for idx, connection in enumerate(self._pool): if connection.can_handle_request(origin) and connection.is_available(): + kwargs = {"connection": connection, "request": status.request} + await atrace("reuse_connection", logger, status.request, kwargs) self._pool.pop(idx) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + await atrace( + "assign_request_to_connection", logger, status.request, kwargs + ) status.set_connection(connection) return True @@ -172,6 +182,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle(): await connection.aclose() + await atrace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.pop(idx) break @@ -181,7 +197,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Otherwise create a new connection. connection = self.create_connection(origin) + await atrace( + "add_connection", logger, status.request, kwargs={"connection": connection} + ) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + await atrace("assign_request_to_connection", logger, status.request, kwargs) status.set_connection(connection) return True @@ -193,6 +214,9 @@ async def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.has_expired(): await connection.aclose() + await atrace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) # If the pool size exceeds the maximum number of allowed keep-alive connections, @@ -201,6 +225,9 @@ async def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle() and pool_size > self._max_keepalive_connections: await connection.aclose() + await atrace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) pool_size -= 1 @@ -230,6 +257,7 @@ async def handle_async_request(self, request: Request) -> Response: deadline = float("inf") async with self._pool_lock: + await atrace("add_request", logger, request, {"request": status.request}) self._requests.append(status) await self._close_expired_connections() await self._attempt_to_acquire_connection(status) @@ -241,9 +269,22 @@ async def handle_async_request(self, request: Request) -> Response: # If we timeout here, or if the task is cancelled, then make # sure to remove the request from the queue before bubbling # up the exception. + if isinstance(exc, PoolTimeout): + await atrace( + "timeout_waiting_for_connection", + logger, + status.request, + {"request": status.request}, + ) async with self._pool_lock: # Ensure only remove when task exists. if status in self._requests: + await atrace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) raise exc @@ -260,6 +301,13 @@ async def handle_async_request(self, request: Request) -> Response: async with self._pool_lock: # Maintain our position in the request queue, but reset the # status so that the request becomes queued again. + kwargs = {"request": status.request, "connection": connection} + await atrace( + "unassign_request_from_connection", + logger, + status.request, + kwargs, + ) status.unset_connection() await self._attempt_to_acquire_connection(status) except BaseException as exc: @@ -296,9 +344,21 @@ async def response_closed(self, status: RequestStatus) -> None: async with self._pool_lock: # Update the state of the connection pool. if status in self._requests: + await atrace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) if connection.is_closed() and connection in self._pool: + await atrace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.remove(connection) # Since we've had a response closed, it's possible we'll now be able diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index ccfb8d22..5f33d930 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -1,3 +1,4 @@ +import logging import ssl import sys import time @@ -9,9 +10,12 @@ from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol from .._models import Origin, Request, Response from .._synchronization import Event, Lock, ShieldCancellation +from .._trace import trace from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface +logger = logging.getLogger("httpcore.connection_pool") + class RequestStatus: def __init__(self, request: Request): @@ -162,8 +166,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Reuse an existing connection if one is currently available. for idx, connection in enumerate(self._pool): if connection.can_handle_request(origin) and connection.is_available(): + kwargs = {"connection": connection, "request": status.request} + trace("reuse_connection", logger, status.request, kwargs) self._pool.pop(idx) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + trace( + "assign_request_to_connection", logger, status.request, kwargs + ) status.set_connection(connection) return True @@ -172,6 +182,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle(): connection.close() + trace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.pop(idx) break @@ -181,7 +197,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Otherwise create a new connection. connection = self.create_connection(origin) + trace( + "add_connection", logger, status.request, kwargs={"connection": connection} + ) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + trace("assign_request_to_connection", logger, status.request, kwargs) status.set_connection(connection) return True @@ -193,6 +214,9 @@ def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.has_expired(): connection.close() + trace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) # If the pool size exceeds the maximum number of allowed keep-alive connections, @@ -201,6 +225,9 @@ def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle() and pool_size > self._max_keepalive_connections: connection.close() + trace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) pool_size -= 1 @@ -230,6 +257,7 @@ def handle_request(self, request: Request) -> Response: deadline = float("inf") with self._pool_lock: + trace("add_request", logger, request, {"request": status.request}) self._requests.append(status) self._close_expired_connections() self._attempt_to_acquire_connection(status) @@ -241,9 +269,22 @@ def handle_request(self, request: Request) -> Response: # If we timeout here, or if the task is cancelled, then make # sure to remove the request from the queue before bubbling # up the exception. + if isinstance(exc, PoolTimeout): + trace( + "timeout_waiting_for_connection", + logger, + status.request, + {"request": status.request}, + ) with self._pool_lock: # Ensure only remove when task exists. if status in self._requests: + trace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) raise exc @@ -260,6 +301,13 @@ def handle_request(self, request: Request) -> Response: with self._pool_lock: # Maintain our position in the request queue, but reset the # status so that the request becomes queued again. + kwargs = {"request": status.request, "connection": connection} + trace( + "unassign_request_from_connection", + logger, + status.request, + kwargs, + ) status.unset_connection() self._attempt_to_acquire_connection(status) except BaseException as exc: @@ -296,9 +344,21 @@ def response_closed(self, status: RequestStatus) -> None: with self._pool_lock: # Update the state of the connection pool. if status in self._requests: + trace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) if connection.is_closed() and connection in self._pool: + trace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.remove(connection) # Since we've had a response closed, it's possible we'll now be able diff --git a/httpcore/_trace.py b/httpcore/_trace.py index b122a53e..4ef41121 100644 --- a/httpcore/_trace.py +++ b/httpcore/_trace.py @@ -6,6 +6,56 @@ from ._models import Request +def trace( + name: str, + logger: logging.Logger, + request: Optional[Request] = None, + kwargs: Optional[Dict[str, Any]] = None, +) -> None: + trace_extension = None if request is None else request.extensions.get("trace") + prefix = logger.name.split(".")[-1] + info = kwargs or {} + debug = logger.isEnabledFor(logging.DEBUG) + + if debug or trace_extension: + if trace_extension is not None: + prefix_and_name = f"{prefix}.{name}" + trace_extension(prefix_and_name, info) + + if debug: + if not info: # pragma: no cover + message = name + else: + args = " ".join([f"{key}={value!r}" for key, value in info.items()]) + message = f"{name} {args}" + logger.debug(message) + + +async def atrace( + name: str, + logger: logging.Logger, + request: Optional[Request] = None, + kwargs: Optional[Dict[str, Any]] = None, +) -> None: + trace_extension = None if request is None else request.extensions.get("trace") + prefix = logger.name.split(".")[-1] + info = kwargs or {} + debug = logger.isEnabledFor(logging.DEBUG) + + if debug or trace_extension: + if trace_extension is not None: + prefix_and_name = f"{prefix}.{name}" + await trace_extension(prefix_and_name, info) + + if debug: + if not info: # pragma: no cover + message = name + else: + args = " ".join([f"{key}={value!r}" for key, value in info.items()]) + message = f"{name} {args}" + logger.debug(message) + + class Trace: def __init__( self, diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2392ca17..fcb33a92 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -255,6 +255,9 @@ async def trace(name, kwargs): await pool.request("GET", "https://example.com/", extensions={"trace": trace}) assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -269,6 +272,7 @@ async def trace(name, kwargs): "http11.receive_response_body.complete", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -294,10 +298,27 @@ async def test_debug_request(caplog): await pool.request("GET", "http://example.com/") assert caplog.record_tuples == [ + ( + "httpcore.connection_pool", + logging.DEBUG, + "add_request request=", + ), + ( + "httpcore.connection_pool", + logging.DEBUG, + "add_connection connection=", + ), + ( + "httpcore.connection_pool", + logging.DEBUG, + "assign_request_to_connection request= " + "connection=", + ), ( "httpcore.connection", logging.DEBUG, - "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None", + "connect_tcp.started host='example.com' port=80 local_address=None " + "timeout=None socket_options=None", ), ( "httpcore.connection", @@ -324,8 +345,8 @@ async def test_debug_request(caplog): ( "httpcore.http11", logging.DEBUG, - "receive_response_headers.complete return_value=" - "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", + "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', " + "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", ), ( "httpcore.http11", @@ -335,6 +356,11 @@ async def test_debug_request(caplog): ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"), ("httpcore.http11", logging.DEBUG, "response_closed.started"), ("httpcore.http11", logging.DEBUG, "response_closed.complete"), + ( + "httpcore.connection_pool", + logging.DEBUG, + "remove_request request=", + ), ("httpcore.connection", logging.DEBUG, "close.started"), ("httpcore.connection", logging.DEBUG, "close.complete"), ] @@ -364,6 +390,9 @@ async def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -376,6 +405,7 @@ async def trace(name, kwargs): "http11.receive_response_headers.failed", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -417,8 +447,12 @@ async def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.failed", + "connection_pool.remove_request", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index 287c2bcc..ad2b20de 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -255,6 +255,9 @@ def trace(name, kwargs): pool.request("GET", "https://example.com/", extensions={"trace": trace}) assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -269,6 +272,7 @@ def trace(name, kwargs): "http11.receive_response_body.complete", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -294,10 +298,27 @@ def test_debug_request(caplog): pool.request("GET", "http://example.com/") assert caplog.record_tuples == [ + ( + "httpcore.connection_pool", + logging.DEBUG, + "add_request request=", + ), + ( + "httpcore.connection_pool", + logging.DEBUG, + "add_connection connection=", + ), + ( + "httpcore.connection_pool", + logging.DEBUG, + "assign_request_to_connection request= " + "connection=", + ), ( "httpcore.connection", logging.DEBUG, - "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None", + "connect_tcp.started host='example.com' port=80 local_address=None " + "timeout=None socket_options=None", ), ( "httpcore.connection", @@ -324,8 +345,8 @@ def test_debug_request(caplog): ( "httpcore.http11", logging.DEBUG, - "receive_response_headers.complete return_value=" - "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", + "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', " + "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", ), ( "httpcore.http11", @@ -335,6 +356,11 @@ def test_debug_request(caplog): ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"), ("httpcore.http11", logging.DEBUG, "response_closed.started"), ("httpcore.http11", logging.DEBUG, "response_closed.complete"), + ( + "httpcore.connection_pool", + logging.DEBUG, + "remove_request request=", + ), ("httpcore.connection", logging.DEBUG, "close.started"), ("httpcore.connection", logging.DEBUG, "close.complete"), ] @@ -364,6 +390,9 @@ def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -376,6 +405,7 @@ def trace(name, kwargs): "http11.receive_response_headers.failed", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -417,8 +447,12 @@ def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.failed", + "connection_pool.remove_request", ] diff --git a/unasync.py b/unasync.py index 5a5627d7..0dd0b9d0 100755 --- a/unasync.py +++ b/unasync.py @@ -24,6 +24,7 @@ ('@pytest.mark.anyio', ''), ('@pytest.mark.trio', ''), ('AutoBackend', 'SyncBackend'), + ('atrace', 'trace'), ] COMPILED_SUBS = [ (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)