From 8069f8280cc080b81e0cdf5a360d16225fc33280 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 6 Feb 2024 12:33:58 +0100 Subject: [PATCH 01/12] Connection pool work --- httpcore/_async/connection_pool.py | 304 +++++++++++++---------------- httpcore/_sync/connection_pool.py | 304 +++++++++++++---------------- 2 files changed, 282 insertions(+), 326 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 0320c6d8..2efd7960 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,14 +1,13 @@ import ssl import sys -import time from types import TracebackType from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend -from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol +from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation +from .._synchronization import AsyncEvent, AsyncLock from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -38,6 +37,31 @@ async def wait_for_connection( return self.connection +class AsyncPoolRequest(Request): + def __init__(self, request: Request) -> None: + self.request = request + self.connection: Optional[AsyncConnectionInterface] = None + self._connection_acquired = AsyncEvent() + + def assign_to_connection( + self, connection: Optional[AsyncConnectionInterface] + ) -> None: + self.connection = connection + self._connection_acquired.set() + + def clear_connection(self) -> None: + self.connection = None + self._connection_acquired = AsyncEvent() + + async def wait_for_connection( + self, timeout: Optional[float] = None + ) -> AsyncConnectionInterface: + if self.connection is None: + await self._connection_acquired.wait(timeout=timeout) + assert self.connection is not None + return self.connection + + class AsyncConnectionPool(AsyncRequestInterface): """ A connection pool for making HTTP requests. @@ -107,8 +131,8 @@ def __init__( self._local_address = local_address self._uds = uds - self._pool: List[AsyncConnectionInterface] = [] - self._requests: List[RequestStatus] = [] + self._connections: List[AsyncConnectionInterface] = [] + self._requests: List[AsyncPoolRequest] = [] self._pool_lock = AsyncLock() self._network_backend = ( AutoBackend() if network_backend is None else network_backend @@ -145,64 +169,7 @@ def connections(self) -> List[AsyncConnectionInterface]: ] ``` """ - return list(self._pool) - - async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: - """ - Attempt to provide a connection that can handle the given origin. - """ - origin = status.request.url.origin - - # If there are queued requests in front of us, then don't acquire a - # connection. We handle requests strictly in order. - waiting = [s for s in self._requests if s.connection is None] - if waiting and waiting[0] is not status: - return False - - # Reuse an existing connection if one is currently available. - for idx, connection in enumerate(self._pool): - if connection.can_handle_request(origin) and connection.is_available(): - self._pool.pop(idx) - self._pool.insert(0, connection) - status.set_connection(connection) - return True - - # If the pool is currently full, attempt to close one idle connection. - if len(self._pool) >= self._max_connections: - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.is_idle(): - await connection.aclose() - self._pool.pop(idx) - break - - # If the pool is still full, then we cannot acquire a connection. - if len(self._pool) >= self._max_connections: - return False - - # Otherwise create a new connection. - connection = self.create_connection(origin) - self._pool.insert(0, connection) - status.set_connection(connection) - return True - - async def _close_expired_connections(self) -> None: - """ - Clean up the connection pool by closing off any connections that have expired. - """ - # Close any connections that have expired their keep-alive time. - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.has_expired(): - await connection.aclose() - self._pool.pop(idx) - - # If the pool size exceeds the maximum number of allowed keep-alive connections, - # then close off idle connections as required. - pool_size = len(self._pool) - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.is_idle() and pool_size > self._max_keepalive_connections: - await connection.aclose() - self._pool.pop(idx) - pool_size -= 1 + return list(self._connections) async def handle_async_request(self, request: Request) -> Response: """ @@ -220,110 +187,122 @@ async def handle_async_request(self, request: Request) -> Response: f"Request URL has an unsupported protocol '{scheme}://'." ) - status = RequestStatus(request) timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("pool", None) - if timeout is not None: - deadline = time.monotonic() + timeout - else: - deadline = float("inf") - - async with self._pool_lock: - self._requests.append(status) - await self._close_expired_connections() - await self._attempt_to_acquire_connection(status) - - while True: - try: - connection = await status.wait_for_connection(timeout=timeout) - except BaseException as exc: - # If we timeout here, or if the task is cancelled, then make - # sure to remove the request from the queue before bubbling - # up the exception. - async with self._pool_lock: - # Ensure only remove when task exists. - if status in self._requests: - self._requests.remove(status) - raise exc - - try: - response = await connection.handle_async_request(request) - except ConnectionNotAvailable: - # The ConnectionNotAvailable exception is a special case, that - # indicates we need to retry the request on a new connection. - # - # The most common case where this can occur is when multiple - # requests are queued waiting for a single connection, which - # might end up as an HTTP/2 connection, but which actually ends - # up as HTTP/1.1. + pool_request = AsyncPoolRequest(request) + try: + while True: async with self._pool_lock: - # Maintain our position in the request queue, but reset the - # status so that the request becomes queued again. - status.unset_connection() - await self._attempt_to_acquire_connection(status) - except BaseException as exc: - with AsyncShieldCancellation(): - await self.response_closed(status) - raise exc - else: - break + self._requests.append(pool_request) + closing = self._assign_requests_to_connections() + await self._close_connections(closing) + connection = await pool_request.wait_for_connection(timeout=timeout) + + try: + response = await connection.handle_async_request( + pool_request.request + ) + except ConnectionNotAvailable: + pool_request.clear_connection() + else: + break - timeout = deadline - time.monotonic() - if timeout < 0: - raise PoolTimeout # pragma: nocover + except BaseException: + async with self._pool_lock: + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() + await self._close_connections(closing) - # When we return the response, we wrap the stream in a special class - # that handles notifying the connection pool once the response - # has been released. assert isinstance(response.stream, AsyncIterable) return Response( status=response.status, headers=response.headers, - content=ConnectionPoolByteStream(response.stream, self, status), + content=PoolByteStream( + stream=response.stream, pool_request=pool_request, pool=self + ), extensions=response.extensions, ) - async def response_closed(self, status: RequestStatus) -> None: + async def _request_closed(self, request: AsyncPoolRequest) -> None: """ - This method acts as a callback once the request/response cycle is complete. + Once a request completes we remove it from the pool, + and determine if we can now assign any queued requests + to a connection. + """ + async with self._pool_lock: + self._requests.remove(request) + closing = self._assign_requests_to_connections() + await self._close_connections(closing) - It is called into from the `ConnectionPoolByteStream.aclose()` method. + def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: """ - assert status.connection is not None - connection = status.connection + Manage the state of the connection pool, assigning incoming + requests to connections as available. - async with self._pool_lock: - # Update the state of the connection pool. - if status in self._requests: - self._requests.remove(status) - - if connection.is_closed() and connection in self._pool: - self._pool.remove(connection) - - # Since we've had a response closed, it's possible we'll now be able - # to service one or more requests that are currently pending. - for status in self._requests: - if status.connection is None: - acquired = await self._attempt_to_acquire_connection(status) - # If we could not acquire a connection for a queued request - # then we don't need to check anymore requests that are - # queued later behind it. - if not acquired: - break - - # Housekeeping. - await self._close_expired_connections() + Called whenever a new request is added or removed from the pool. - async def aclose(self) -> None: + Any closing connections are returned, allowing the I/O for closing + those connections to be handled seperately. """ - Close any connections in the pool. + closing_connections = [] + + # Close any expired connections. + for connection in list(self._connections): + if connection.has_expired(): + # log: "closing expired connection" + self._connections.remove(connection) + closing_connections.append(connection) + + # Assign queued requests to connections. + queued_requests = [ + request for request in self._requests if request.connection is None + ] + for request in queued_requests: + origin = request.url.origin + avilable_connections = [ + connection + for connection in self._connections + if connection.can_handle_request(origin) and connection.is_available() + ] + if avilable_connections: + # log: "reusing existing connection" + connection = avilable_connections[0] + request.assign_to_connection(connection) + elif len(self._connections) < self._max_connections: + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + request.assign_to_connection(connection) + else: + idle_connections = [ + connection + for connection in self._connections + if connection.is_idle() + ] + # log: "closing idle connection" + connection = idle_connections[0] + self._connections.remove(connection) + closing_connections.append(connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + request.assign_to_connection(connection) + + return closing_connections + + async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> None: """ - async with self._pool_lock: - for connection in self._pool: - await connection.aclose() - self._pool = [] - self._requests = [] + Close connections which have been removed from the pool. + """ + for connection in closing: + await connection.aclose() + + async def aclose(self) -> None: + closing = list(self._connections) + self._requests = [] + self._connections = [] + await self._close_connections(closing) async def __aenter__(self) -> "AsyncConnectionPool": # Acquiring the pool lock here ensures that we have the @@ -341,30 +320,29 @@ async def __aexit__( await self.aclose() -class ConnectionPoolByteStream: - """ - A wrapper around the response byte stream, that additionally handles - notifying the connection pool when the response has been closed. - """ - +class PoolByteStream: def __init__( self, stream: AsyncIterable[bytes], + pool_request: AsyncPoolRequest, pool: AsyncConnectionPool, - status: RequestStatus, ) -> None: self._stream = stream + self._pool_request = pool_request self._pool = pool - self._status = status async def __aiter__(self) -> AsyncIterator[bytes]: - async for part in self._stream: - yield part + try: + async for part in self._stream: + yield part + except BaseException: + async with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + await self._pool._close_connections(closing) async def aclose(self) -> None: - try: - if hasattr(self._stream, "aclose"): - await self._stream.aclose() - finally: - with AsyncShieldCancellation(): - await self._pool.response_closed(self._status) + async with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + await self._pool._close_connections(closing) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index ccfb8d22..acd0b2a6 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -1,14 +1,13 @@ import ssl import sys -import time from types import TracebackType from typing import Iterable, Iterator, Iterable, List, Optional, Type from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend -from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol +from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import Event, Lock, ShieldCancellation +from .._synchronization import Event, Lock from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -38,6 +37,31 @@ def wait_for_connection( return self.connection +class PoolRequest(Request): + def __init__(self, request: Request) -> None: + self.request = request + self.connection: Optional[ConnectionInterface] = None + self._connection_acquired = Event() + + def assign_to_connection( + self, connection: Optional[ConnectionInterface] + ) -> None: + self.connection = connection + self._connection_acquired.set() + + def clear_connection(self) -> None: + self.connection = None + self._connection_acquired = Event() + + def wait_for_connection( + self, timeout: Optional[float] = None + ) -> ConnectionInterface: + if self.connection is None: + self._connection_acquired.wait(timeout=timeout) + assert self.connection is not None + return self.connection + + class ConnectionPool(RequestInterface): """ A connection pool for making HTTP requests. @@ -107,8 +131,8 @@ def __init__( self._local_address = local_address self._uds = uds - self._pool: List[ConnectionInterface] = [] - self._requests: List[RequestStatus] = [] + self._connections: List[ConnectionInterface] = [] + self._requests: List[PoolRequest] = [] self._pool_lock = Lock() self._network_backend = ( SyncBackend() if network_backend is None else network_backend @@ -145,64 +169,7 @@ def connections(self) -> List[ConnectionInterface]: ] ``` """ - return list(self._pool) - - def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: - """ - Attempt to provide a connection that can handle the given origin. - """ - origin = status.request.url.origin - - # If there are queued requests in front of us, then don't acquire a - # connection. We handle requests strictly in order. - waiting = [s for s in self._requests if s.connection is None] - if waiting and waiting[0] is not status: - return False - - # Reuse an existing connection if one is currently available. - for idx, connection in enumerate(self._pool): - if connection.can_handle_request(origin) and connection.is_available(): - self._pool.pop(idx) - self._pool.insert(0, connection) - status.set_connection(connection) - return True - - # If the pool is currently full, attempt to close one idle connection. - if len(self._pool) >= self._max_connections: - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.is_idle(): - connection.close() - self._pool.pop(idx) - break - - # If the pool is still full, then we cannot acquire a connection. - if len(self._pool) >= self._max_connections: - return False - - # Otherwise create a new connection. - connection = self.create_connection(origin) - self._pool.insert(0, connection) - status.set_connection(connection) - return True - - def _close_expired_connections(self) -> None: - """ - Clean up the connection pool by closing off any connections that have expired. - """ - # Close any connections that have expired their keep-alive time. - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.has_expired(): - connection.close() - self._pool.pop(idx) - - # If the pool size exceeds the maximum number of allowed keep-alive connections, - # then close off idle connections as required. - pool_size = len(self._pool) - for idx, connection in reversed(list(enumerate(self._pool))): - if connection.is_idle() and pool_size > self._max_keepalive_connections: - connection.close() - self._pool.pop(idx) - pool_size -= 1 + return list(self._connections) def handle_request(self, request: Request) -> Response: """ @@ -220,110 +187,122 @@ def handle_request(self, request: Request) -> Response: f"Request URL has an unsupported protocol '{scheme}://'." ) - status = RequestStatus(request) timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("pool", None) - if timeout is not None: - deadline = time.monotonic() + timeout - else: - deadline = float("inf") - - with self._pool_lock: - self._requests.append(status) - self._close_expired_connections() - self._attempt_to_acquire_connection(status) - - while True: - try: - connection = status.wait_for_connection(timeout=timeout) - except BaseException as exc: - # If we timeout here, or if the task is cancelled, then make - # sure to remove the request from the queue before bubbling - # up the exception. - with self._pool_lock: - # Ensure only remove when task exists. - if status in self._requests: - self._requests.remove(status) - raise exc - - try: - response = connection.handle_request(request) - except ConnectionNotAvailable: - # The ConnectionNotAvailable exception is a special case, that - # indicates we need to retry the request on a new connection. - # - # The most common case where this can occur is when multiple - # requests are queued waiting for a single connection, which - # might end up as an HTTP/2 connection, but which actually ends - # up as HTTP/1.1. + pool_request = PoolRequest(request) + try: + while True: with self._pool_lock: - # Maintain our position in the request queue, but reset the - # status so that the request becomes queued again. - status.unset_connection() - self._attempt_to_acquire_connection(status) - except BaseException as exc: - with ShieldCancellation(): - self.response_closed(status) - raise exc - else: - break + self._requests.append(pool_request) + closing = self._assign_requests_to_connections() + self._close_connections(closing) + connection = pool_request.wait_for_connection(timeout=timeout) + + try: + response = connection.handle_request( + pool_request.request + ) + except ConnectionNotAvailable: + pool_request.clear_connection() + else: + break - timeout = deadline - time.monotonic() - if timeout < 0: - raise PoolTimeout # pragma: nocover + except BaseException: + with self._pool_lock: + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() + self._close_connections(closing) - # When we return the response, we wrap the stream in a special class - # that handles notifying the connection pool once the response - # has been released. assert isinstance(response.stream, Iterable) return Response( status=response.status, headers=response.headers, - content=ConnectionPoolByteStream(response.stream, self, status), + content=PoolByteStream( + stream=response.stream, pool_request=pool_request, pool=self + ), extensions=response.extensions, ) - def response_closed(self, status: RequestStatus) -> None: + def _request_closed(self, request: PoolRequest) -> None: """ - This method acts as a callback once the request/response cycle is complete. + Once a request completes we remove it from the pool, + and determine if we can now assign any queued requests + to a connection. + """ + with self._pool_lock: + self._requests.remove(request) + closing = self._assign_requests_to_connections() + self._close_connections(closing) - It is called into from the `ConnectionPoolByteStream.close()` method. + def _assign_requests_to_connections(self) -> List[ConnectionInterface]: """ - assert status.connection is not None - connection = status.connection + Manage the state of the connection pool, assigning incoming + requests to connections as available. - with self._pool_lock: - # Update the state of the connection pool. - if status in self._requests: - self._requests.remove(status) - - if connection.is_closed() and connection in self._pool: - self._pool.remove(connection) - - # Since we've had a response closed, it's possible we'll now be able - # to service one or more requests that are currently pending. - for status in self._requests: - if status.connection is None: - acquired = self._attempt_to_acquire_connection(status) - # If we could not acquire a connection for a queued request - # then we don't need to check anymore requests that are - # queued later behind it. - if not acquired: - break - - # Housekeeping. - self._close_expired_connections() + Called whenever a new request is added or removed from the pool. - def close(self) -> None: + Any closing connections are returned, allowing the I/O for closing + those connections to be handled seperately. """ - Close any connections in the pool. + closing_connections = [] + + # Close any expired connections. + for connection in list(self._connections): + if connection.has_expired(): + # log: "closing expired connection" + self._connections.remove(connection) + closing_connections.append(connection) + + # Assign queued requests to connections. + queued_requests = [ + request for request in self._requests if request.connection is None + ] + for request in queued_requests: + origin = request.url.origin + avilable_connections = [ + connection + for connection in self._connections + if connection.can_handle_request(origin) and connection.is_available() + ] + if avilable_connections: + # log: "reusing existing connection" + connection = avilable_connections[0] + request.assign_to_connection(connection) + elif len(self._connections) < self._max_connections: + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + request.assign_to_connection(connection) + else: + idle_connections = [ + connection + for connection in self._connections + if connection.is_idle() + ] + # log: "closing idle connection" + connection = idle_connections[0] + self._connections.remove(connection) + closing_connections.append(connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + request.assign_to_connection(connection) + + return closing_connections + + def _close_connections(self, closing: List[ConnectionInterface]) -> None: """ - with self._pool_lock: - for connection in self._pool: - connection.close() - self._pool = [] - self._requests = [] + Close connections which have been removed from the pool. + """ + for connection in closing: + connection.close() + + def close(self) -> None: + closing = list(self._connections) + self._requests = [] + self._connections = [] + self._close_connections(closing) def __enter__(self) -> "ConnectionPool": # Acquiring the pool lock here ensures that we have the @@ -341,30 +320,29 @@ def __exit__( self.close() -class ConnectionPoolByteStream: - """ - A wrapper around the response byte stream, that additionally handles - notifying the connection pool when the response has been closed. - """ - +class PoolByteStream: def __init__( self, stream: Iterable[bytes], + pool_request: PoolRequest, pool: ConnectionPool, - status: RequestStatus, ) -> None: self._stream = stream + self._pool_request = pool_request self._pool = pool - self._status = status def __iter__(self) -> Iterator[bytes]: - for part in self._stream: - yield part + try: + for part in self._stream: + yield part + except BaseException: + with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + self._pool._close_connections(closing) def close(self) -> None: - try: - if hasattr(self._stream, "close"): - self._stream.close() - finally: - with ShieldCancellation(): - self._pool.response_closed(self._status) + with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + self._pool._close_connections(closing) From 6e3453ea9ebb8e618c0e31cad4bdec2532b14432 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 6 Feb 2024 13:10:37 +0100 Subject: [PATCH 02/12] Connection pool work --- httpcore/_async/connection_pool.py | 13 +++++++------ httpcore/_sync/connection_pool.py | 13 +++++++------ requirements.txt | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 2efd7960..3f67dffb 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -208,11 +208,12 @@ async def handle_async_request(self, request: Request) -> Response: else: break - except BaseException: + except BaseException as exc: async with self._pool_lock: self._requests.remove(pool_request) closing = self._assign_requests_to_connections() await self._close_connections(closing) + raise exc from None assert isinstance(response.stream, AsyncIterable) return Response( @@ -258,8 +259,8 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: queued_requests = [ request for request in self._requests if request.connection is None ] - for request in queued_requests: - origin = request.url.origin + for pool_request in queued_requests: + origin = pool_request.request.url.origin avilable_connections = [ connection for connection in self._connections @@ -268,12 +269,12 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) elif len(self._connections) < self._max_connections: # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) else: idle_connections = [ connection @@ -287,7 +288,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) return closing_connections diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index acd0b2a6..050ffdba 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -208,11 +208,12 @@ def handle_request(self, request: Request) -> Response: else: break - except BaseException: + except BaseException as exc: with self._pool_lock: self._requests.remove(pool_request) closing = self._assign_requests_to_connections() self._close_connections(closing) + raise exc from None assert isinstance(response.stream, Iterable) return Response( @@ -258,8 +259,8 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: queued_requests = [ request for request in self._requests if request.connection is None ] - for request in queued_requests: - origin = request.url.origin + for pool_request in queued_requests: + origin = pool_request.request.url.origin avilable_connections = [ connection for connection in self._connections @@ -268,12 +269,12 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) elif len(self._connections) < self._max_connections: # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) else: idle_connections = [ connection @@ -287,7 +288,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: # log: "creating new connection" connection = self.create_connection(origin) self._connections.append(connection) - request.assign_to_connection(connection) + pool_request.assign_to_connection(connection) return closing_connections diff --git a/requirements.txt b/requirements.txt index c040ed78..be4626af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,5 +20,5 @@ trio-typing==0.10.0 types-certifi==2021.10.8.3 pytest==8.0.0 pytest-httpbin==2.0.0 -pytest-trio==0.7.0 +pytest-trio==0.8.0 werkzeug<2.1 # See: https://github.com/postmanlabs/httpbin/issues/673 From 3791d5e169a3385c41bbe85f45197d36721810e3 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 6 Feb 2024 16:59:44 +0100 Subject: [PATCH 03/12] Connection pool work --- httpcore/_async/connection_pool.py | 74 ++++++++++++++-------------- httpcore/_sync/connection_pool.py | 74 ++++++++++++++-------------- tests/_async/test_connection_pool.py | 13 +++-- tests/_sync/test_connection_pool.py | 13 +++-- 4 files changed, 92 insertions(+), 82 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 3f67dffb..81aa7be1 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import AsyncEvent, AsyncLock +from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -193,9 +193,10 @@ async def handle_async_request(self, request: Request) -> Response: pool_request = AsyncPoolRequest(request) try: while True: - async with self._pool_lock: - self._requests.append(pool_request) - closing = self._assign_requests_to_connections() + with AsyncShieldCancellation(): + async with self._pool_lock: + self._requests.append(pool_request) + closing = self._assign_requests_to_connections() await self._close_connections(closing) connection = await pool_request.wait_for_connection(timeout=timeout) @@ -209,9 +210,10 @@ async def handle_async_request(self, request: Request) -> Response: break except BaseException as exc: - async with self._pool_lock: - self._requests.remove(pool_request) - closing = self._assign_requests_to_connections() + with AsyncShieldCancellation(): + async with self._pool_lock: + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() await self._close_connections(closing) raise exc from None @@ -225,17 +227,6 @@ async def handle_async_request(self, request: Request) -> Response: extensions=response.extensions, ) - async def _request_closed(self, request: AsyncPoolRequest) -> None: - """ - Once a request completes we remove it from the pool, - and determine if we can now assign any queued requests - to a connection. - """ - async with self._pool_lock: - self._requests.remove(request) - closing = self._assign_requests_to_connections() - await self._close_connections(closing) - def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: """ Manage the state of the connection pool, assigning incoming @@ -248,12 +239,20 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: """ closing_connections = [] - # Close any expired connections. for connection in list(self._connections): - if connection.has_expired(): + if connection.is_closed(): + # log: "removing closed connection" + self._connections.remove(connection) + elif connection.has_expired(): # log: "closing expired connection" self._connections.remove(connection) closing_connections.append(connection) + elif ( + connection.is_idle() and len(self._connections) > self._max_connections + ): + # log: "closing idle connection" + self._connections.remove(connection) + closing_connections.append(connection) # Assign queued requests to connections. queued_requests = [ @@ -266,6 +265,9 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: for connection in self._connections if connection.can_handle_request(origin) and connection.is_available() ] + idle_connections = [ + connection for connection in self._connections if connection.is_idle() + ] if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] @@ -275,12 +277,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) - else: - idle_connections = [ - connection - for connection in self._connections - if connection.is_idle() - ] + elif idle_connections: # log: "closing idle connection" connection = idle_connections[0] self._connections.remove(connection) @@ -300,10 +297,9 @@ async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> N await connection.aclose() async def aclose(self) -> None: - closing = list(self._connections) - self._requests = [] + closing_connections = list(self._connections) self._connections = [] - await self._close_connections(closing) + await self._close_connections(closing_connections) async def __aenter__(self) -> "AsyncConnectionPool": # Acquiring the pool lock here ensures that we have the @@ -331,19 +327,23 @@ def __init__( self._stream = stream self._pool_request = pool_request self._pool = pool + self._closed = False + assert self._pool_request in self._pool._requests async def __aiter__(self) -> AsyncIterator[bytes]: try: async for part in self._stream: yield part except BaseException: - async with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() - await self._pool._close_connections(closing) + await self.aclose() async def aclose(self) -> None: - async with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() - await self._pool._close_connections(closing) + if not self._closed: + self._closed = True + with AsyncShieldCancellation(): + if hasattr(self._stream, "aclose"): + await self._stream.aclose() + async with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + await self._pool._close_connections(closing) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 050ffdba..ce2fca42 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, NetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import Event, Lock +from .._synchronization import Event, Lock, ShieldCancellation from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -193,9 +193,10 @@ def handle_request(self, request: Request) -> Response: pool_request = PoolRequest(request) try: while True: - with self._pool_lock: - self._requests.append(pool_request) - closing = self._assign_requests_to_connections() + with ShieldCancellation(): + with self._pool_lock: + self._requests.append(pool_request) + closing = self._assign_requests_to_connections() self._close_connections(closing) connection = pool_request.wait_for_connection(timeout=timeout) @@ -209,9 +210,10 @@ def handle_request(self, request: Request) -> Response: break except BaseException as exc: - with self._pool_lock: - self._requests.remove(pool_request) - closing = self._assign_requests_to_connections() + with ShieldCancellation(): + with self._pool_lock: + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() self._close_connections(closing) raise exc from None @@ -225,17 +227,6 @@ def handle_request(self, request: Request) -> Response: extensions=response.extensions, ) - def _request_closed(self, request: PoolRequest) -> None: - """ - Once a request completes we remove it from the pool, - and determine if we can now assign any queued requests - to a connection. - """ - with self._pool_lock: - self._requests.remove(request) - closing = self._assign_requests_to_connections() - self._close_connections(closing) - def _assign_requests_to_connections(self) -> List[ConnectionInterface]: """ Manage the state of the connection pool, assigning incoming @@ -248,12 +239,20 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: """ closing_connections = [] - # Close any expired connections. for connection in list(self._connections): - if connection.has_expired(): + if connection.is_closed(): + # log: "removing closed connection" + self._connections.remove(connection) + elif connection.has_expired(): # log: "closing expired connection" self._connections.remove(connection) closing_connections.append(connection) + elif ( + connection.is_idle() and len(self._connections) > self._max_connections + ): + # log: "closing idle connection" + self._connections.remove(connection) + closing_connections.append(connection) # Assign queued requests to connections. queued_requests = [ @@ -266,6 +265,9 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: for connection in self._connections if connection.can_handle_request(origin) and connection.is_available() ] + idle_connections = [ + connection for connection in self._connections if connection.is_idle() + ] if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] @@ -275,12 +277,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) - else: - idle_connections = [ - connection - for connection in self._connections - if connection.is_idle() - ] + elif idle_connections: # log: "closing idle connection" connection = idle_connections[0] self._connections.remove(connection) @@ -300,10 +297,9 @@ def _close_connections(self, closing: List[ConnectionInterface]) -> None: connection.close() def close(self) -> None: - closing = list(self._connections) - self._requests = [] + closing_connections = list(self._connections) self._connections = [] - self._close_connections(closing) + self._close_connections(closing_connections) def __enter__(self) -> "ConnectionPool": # Acquiring the pool lock here ensures that we have the @@ -331,19 +327,23 @@ def __init__( self._stream = stream self._pool_request = pool_request self._pool = pool + self._closed = False + assert self._pool_request in self._pool._requests def __iter__(self) -> Iterator[bytes]: try: for part in self._stream: yield part except BaseException: - with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() - self._pool._close_connections(closing) + self.close() def close(self) -> None: - with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() - self._pool._close_connections(closing) + if not self._closed: + self._closed = True + with ShieldCancellation(): + if hasattr(self._stream, "close"): + self._stream.close() + with self._pool._pool_lock: + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + self._pool._close_connections(closing) diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 61ee1e54..4ee6ca33 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -66,8 +66,8 @@ async def test_connection_pool_with_keepalive(): async with pool.stream("GET", "http://example.com/") as response: info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] await response.aread() @@ -75,8 +75,8 @@ async def test_connection_pool_with_keepalive(): assert response.content == b"Hello, world!" info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] @@ -205,11 +205,16 @@ async def test_connection_pool_with_http2_goaway(): http2=True, ) + def debug(*args, **kwargs): + print(*args, **kwargs) + async with httpcore.AsyncConnectionPool( network_backend=network_backend, ) as pool: # Sending an intial request, which once complete will return to the pool, IDLE. - response = await pool.request("GET", "https://example.com/") + response = await pool.request( + "GET", "https://example.com/", exensions={"trace": debug} + ) assert response.status == 200 assert response.content == b"Hello, world!" @@ -225,8 +230,8 @@ async def test_connection_pool_with_http2_goaway(): info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index c9621c7b..9ee63456 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -66,8 +66,8 @@ def test_connection_pool_with_keepalive(): with pool.stream("GET", "http://example.com/") as response: info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] response.read() @@ -75,8 +75,8 @@ def test_connection_pool_with_keepalive(): assert response.content == b"Hello, world!" info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] @@ -205,11 +205,16 @@ def test_connection_pool_with_http2_goaway(): http2=True, ) + def debug(*args, **kwargs): + print(*args, **kwargs) + with httpcore.ConnectionPool( network_backend=network_backend, ) as pool: # Sending an intial request, which once complete will return to the pool, IDLE. - response = pool.request("GET", "https://example.com/") + response = pool.request( + "GET", "https://example.com/", exensions={"trace": debug} + ) assert response.status == 200 assert response.content == b"Hello, world!" @@ -225,8 +230,8 @@ def test_connection_pool_with_http2_goaway(): info = [repr(c) for c in pool.connections] assert info == [ - "", "", + "", ] From 8d9371bfb43ad00b281814a6c219ea8842369871 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 6 Feb 2024 17:29:17 +0100 Subject: [PATCH 04/12] Connection pool work --- httpcore/_async/connection_pool.py | 6 ++++-- httpcore/_sync/connection_pool.py | 6 ++++-- tests/_async/test_connection_pool.py | 9 ++------- tests/_sync/test_connection_pool.py | 9 ++------- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 81aa7be1..4544c3a6 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -191,11 +191,11 @@ async def handle_async_request(self, request: Request) -> Response: timeout = timeouts.get("pool", None) pool_request = AsyncPoolRequest(request) + self._requests.append(pool_request) try: while True: with AsyncShieldCancellation(): async with self._pool_lock: - self._requests.append(pool_request) closing = self._assign_requests_to_connections() await self._close_connections(closing) connection = await pool_request.wait_for_connection(timeout=timeout) @@ -248,7 +248,9 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: self._connections.remove(connection) closing_connections.append(connection) elif ( - connection.is_idle() and len(self._connections) > self._max_connections + connection.is_idle() + and len([connection.is_idle() for connection in self._connections]) + > self._max_keepalive_connections ): # log: "closing idle connection" self._connections.remove(connection) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index ce2fca42..d4b051b8 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -191,11 +191,11 @@ def handle_request(self, request: Request) -> Response: timeout = timeouts.get("pool", None) pool_request = PoolRequest(request) + self._requests.append(pool_request) try: while True: with ShieldCancellation(): with self._pool_lock: - self._requests.append(pool_request) closing = self._assign_requests_to_connections() self._close_connections(closing) connection = pool_request.wait_for_connection(timeout=timeout) @@ -248,7 +248,9 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: self._connections.remove(connection) closing_connections.append(connection) elif ( - connection.is_idle() and len(self._connections) > self._max_connections + connection.is_idle() + and len([connection.is_idle() for connection in self._connections]) + > self._max_keepalive_connections ): # log: "closing idle connection" self._connections.remove(connection) diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 4ee6ca33..442b441d 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -205,16 +205,11 @@ async def test_connection_pool_with_http2_goaway(): http2=True, ) - def debug(*args, **kwargs): - print(*args, **kwargs) - async with httpcore.AsyncConnectionPool( network_backend=network_backend, ) as pool: # Sending an intial request, which once complete will return to the pool, IDLE. - response = await pool.request( - "GET", "https://example.com/", exensions={"trace": debug} - ) + response = await pool.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"Hello, world!" @@ -224,13 +219,13 @@ def debug(*args, **kwargs): ] # Sending a second request to the same origin will require a new connection. + # The original connection has now been closed. response = await pool.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"Hello, world!" info = [repr(c) for c in pool.connections] assert info == [ - "", "", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index 9ee63456..0897f1ff 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -205,16 +205,11 @@ def test_connection_pool_with_http2_goaway(): http2=True, ) - def debug(*args, **kwargs): - print(*args, **kwargs) - with httpcore.ConnectionPool( network_backend=network_backend, ) as pool: # Sending an intial request, which once complete will return to the pool, IDLE. - response = pool.request( - "GET", "https://example.com/", exensions={"trace": debug} - ) + response = pool.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"Hello, world!" @@ -224,13 +219,13 @@ def debug(*args, **kwargs): ] # Sending a second request to the same origin will require a new connection. + # The original connection has now been closed. response = pool.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"Hello, world!" info = [repr(c) for c in pool.connections] assert info == [ - "", "", ] From fdd5e786b22782f4fe6e808e52c8e68b7985e604 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 09:17:01 +0100 Subject: [PATCH 05/12] Comments --- httpcore/_async/connection_pool.py | 9 +++++++++ httpcore/_sync/connection_pool.py | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 4544c3a6..2fd837f0 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -239,6 +239,8 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: """ closing_connections = [] + # First we handle cleaning up any connections that are closed, + # have expired their keep-alive, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" @@ -270,6 +272,13 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: idle_connections = [ connection for connection in self._connections if connection.is_idle() ] + + # There are three cases for how we may be able to handle the request: + # + # 1. There is an existing connection that can handle the request. + # 2. We can create a new connection to handle the request. + # 3. We can close an idle connection and then create a new connection + # to handle the request. if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index d4b051b8..b740958a 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -239,6 +239,8 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: """ closing_connections = [] + # First we handle cleaning up any connections that are closed, + # have expired their keep-alive, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" @@ -270,6 +272,13 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: idle_connections = [ connection for connection in self._connections if connection.is_idle() ] + + # There are three cases for how we may be able to handle the request: + # + # 1. There is an existing connection that can handle the request. + # 2. We can create a new connection to handle the request. + # 3. We can close an idle connection and then create a new connection + # to handle the request. if avilable_connections: # log: "reusing existing connection" connection = avilable_connections[0] From fcb0ca2c70687787cdb54daba6e28d6d287805be Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 09:18:37 +0100 Subject: [PATCH 06/12] Comments --- httpcore/_async/connection_pool.py | 2 +- httpcore/_sync/connection_pool.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 2fd837f0..b8bea97a 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: closing_connections = [] # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. + # have expired their keep-alive, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index b740958a..1bb2d028 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: closing_connections = [] # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. + # have expired their keep-alive, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" From cda040d4f6f01b73c97fbf8cd767cff21e820b83 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 10:37:05 +0100 Subject: [PATCH 07/12] Connection pool work --- httpcore/_async/connection_pool.py | 19 +++++++++---------- httpcore/_sync/connection_pool.py | 19 +++++++++---------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index b8bea97a..0c382779 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -195,8 +195,7 @@ async def handle_async_request(self, request: Request) -> Response: try: while True: with AsyncShieldCancellation(): - async with self._pool_lock: - closing = self._assign_requests_to_connections() + closing = self._assign_requests_to_connections() await self._close_connections(closing) connection = await pool_request.wait_for_connection(timeout=timeout) @@ -211,9 +210,8 @@ async def handle_async_request(self, request: Request) -> Response: except BaseException as exc: with AsyncShieldCancellation(): - async with self._pool_lock: - self._requests.remove(pool_request) - closing = self._assign_requests_to_connections() + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() await self._close_connections(closing) raise exc from None @@ -351,10 +349,11 @@ async def __aiter__(self) -> AsyncIterator[bytes]: async def aclose(self) -> None: if not self._closed: self._closed = True + if hasattr(self._stream, "aclose"): + await self._stream.aclose() + with AsyncShieldCancellation(): - if hasattr(self._stream, "aclose"): - await self._stream.aclose() - async with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + await self._pool._close_connections(closing) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 1bb2d028..a958678a 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -195,8 +195,7 @@ def handle_request(self, request: Request) -> Response: try: while True: with ShieldCancellation(): - with self._pool_lock: - closing = self._assign_requests_to_connections() + closing = self._assign_requests_to_connections() self._close_connections(closing) connection = pool_request.wait_for_connection(timeout=timeout) @@ -211,9 +210,8 @@ def handle_request(self, request: Request) -> Response: except BaseException as exc: with ShieldCancellation(): - with self._pool_lock: - self._requests.remove(pool_request) - closing = self._assign_requests_to_connections() + self._requests.remove(pool_request) + closing = self._assign_requests_to_connections() self._close_connections(closing) raise exc from None @@ -351,10 +349,11 @@ def __iter__(self) -> Iterator[bytes]: def close(self) -> None: if not self._closed: self._closed = True + if hasattr(self._stream, "close"): + self._stream.close() + with ShieldCancellation(): - if hasattr(self._stream, "close"): - self._stream.close() - with self._pool._pool_lock: - self._pool._requests.remove(self._pool_request) - closing = self._pool._assign_requests_to_connections() + self._pool._requests.remove(self._pool_request) + closing = self._pool._assign_requests_to_connections() + self._pool._close_connections(closing) From d246c91e111b36776408f4cc5df956d39b3d79c0 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 12:52:41 +0100 Subject: [PATCH 08/12] Reraise --- httpcore/_async/connection_pool.py | 35 +++++------------------------- httpcore/_sync/connection_pool.py | 35 +++++------------------------- 2 files changed, 10 insertions(+), 60 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 0c382779..4cd63369 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -12,31 +12,6 @@ from .interfaces import AsyncConnectionInterface, AsyncRequestInterface -class RequestStatus: - def __init__(self, request: Request): - self.request = request - self.connection: Optional[AsyncConnectionInterface] = None - self._connection_acquired = AsyncEvent() - - def set_connection(self, connection: AsyncConnectionInterface) -> None: - assert self.connection is None - self.connection = connection - self._connection_acquired.set() - - def unset_connection(self) -> None: - assert self.connection is not None - self.connection = None - self._connection_acquired = AsyncEvent() - - async def wait_for_connection( - self, timeout: Optional[float] = None - ) -> AsyncConnectionInterface: - if self.connection is None: - await self._connection_acquired.wait(timeout=timeout) - assert self.connection is not None - return self.connection - - class AsyncPoolRequest(Request): def __init__(self, request: Request) -> None: self.request = request @@ -337,22 +312,22 @@ def __init__( self._pool_request = pool_request self._pool = pool self._closed = False - assert self._pool_request in self._pool._requests async def __aiter__(self) -> AsyncIterator[bytes]: try: async for part in self._stream: yield part - except BaseException: + except BaseException as exc: await self.aclose() + raise exc from None async def aclose(self) -> None: if not self._closed: self._closed = True - if hasattr(self._stream, "aclose"): - await self._stream.aclose() - with AsyncShieldCancellation(): + if hasattr(self._stream, "aclose"): + await self._stream.aclose() + self._pool._requests.remove(self._pool_request) closing = self._pool._assign_requests_to_connections() diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index a958678a..0b6adf32 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -12,31 +12,6 @@ from .interfaces import ConnectionInterface, RequestInterface -class RequestStatus: - def __init__(self, request: Request): - self.request = request - self.connection: Optional[ConnectionInterface] = None - self._connection_acquired = Event() - - def set_connection(self, connection: ConnectionInterface) -> None: - assert self.connection is None - self.connection = connection - self._connection_acquired.set() - - def unset_connection(self) -> None: - assert self.connection is not None - self.connection = None - self._connection_acquired = Event() - - def wait_for_connection( - self, timeout: Optional[float] = None - ) -> ConnectionInterface: - if self.connection is None: - self._connection_acquired.wait(timeout=timeout) - assert self.connection is not None - return self.connection - - class PoolRequest(Request): def __init__(self, request: Request) -> None: self.request = request @@ -337,22 +312,22 @@ def __init__( self._pool_request = pool_request self._pool = pool self._closed = False - assert self._pool_request in self._pool._requests def __iter__(self) -> Iterator[bytes]: try: for part in self._stream: yield part - except BaseException: + except BaseException as exc: self.close() + raise exc from None def close(self) -> None: if not self._closed: self._closed = True - if hasattr(self._stream, "close"): - self._stream.close() - with ShieldCancellation(): + if hasattr(self._stream, "close"): + self._stream.close() + self._pool._requests.remove(self._pool_request) closing = self._pool._assign_requests_to_connections() From 20f088d67ae93b4cc24132319ed94bcfa56d289c Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 15:38:51 +0100 Subject: [PATCH 09/12] Lookin sharp --- httpcore/_async/connection_pool.py | 61 ++++++++++++++++++++---------- httpcore/_sync/connection_pool.py | 61 ++++++++++++++++++++---------- httpcore/_synchronization.py | 58 ++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 38 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 4cd63369..9dea3e06 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation +from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -106,14 +106,21 @@ def __init__( self._local_address = local_address self._uds = uds - self._connections: List[AsyncConnectionInterface] = [] - self._requests: List[AsyncPoolRequest] = [] - self._pool_lock = AsyncLock() self._network_backend = ( AutoBackend() if network_backend is None else network_backend ) self._socket_options = socket_options + # The mutable state on a connection pool is the queue of incoming requests, + # and the set of connections that are servicing those requests. + self._connections: List[AsyncConnectionInterface] = [] + self._requests: List[AsyncPoolRequest] = [] + + # We only mutate the state of the connection pool within an 'optional_thread_lock' + # context. This holds a threading lock unless we're running in async mode, + # in which case it is a no-op. + self._optional_thread_lock = AsyncThreadLock() + def create_connection(self, origin: Origin) -> AsyncConnectionInterface: return AsyncHTTPConnection( origin=origin, @@ -165,31 +172,48 @@ async def handle_async_request(self, request: Request) -> Response: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("pool", None) - pool_request = AsyncPoolRequest(request) - self._requests.append(pool_request) + with self._optional_thread_lock: + # Add the incoming request to our request queue. + pool_request = AsyncPoolRequest(request) + self._requests.append(pool_request) + try: while True: - with AsyncShieldCancellation(): + with self._optional_thread_lock: + # Assign incoming requests to available connections, + # closing or creating new connections as required. closing = self._assign_requests_to_connections() await self._close_connections(closing) + + # Wait until this request has an assigned connection. connection = await pool_request.wait_for_connection(timeout=timeout) try: + # Send the request on the assigned connection. response = await connection.handle_async_request( pool_request.request ) except ConnectionNotAvailable: + # In some cases a connection may initially be available to + # handle a request, but then become unavailable. + # + # In this case we clear the connection and try again. pool_request.clear_connection() else: break except BaseException as exc: - with AsyncShieldCancellation(): + with self._optional_thread_lock: + # For any exception or cancellation we remove the request from + # the queue, and then re-assign requests to connections. self._requests.remove(pool_request) closing = self._assign_requests_to_connections() + await self._close_connections(closing) raise exc from None + # Return the response. Note that in this case we still have to manage + # the point at which the response is closed. assert isinstance(response.stream, AsyncIterable) return Response( status=response.status, @@ -274,22 +298,20 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: return closing_connections async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> None: - """ - Close connections which have been removed from the pool. - """ - for connection in closing: - await connection.aclose() + # Close connections which have been removed from the pool. + with AsyncShieldCancellation(): + for connection in closing: + await connection.aclose() async def aclose(self) -> None: - closing_connections = list(self._connections) - self._connections = [] + # Explicitly close the connection pool. + # Clears all existing requests and connections. + with self._optional_thread_lock: + closing_connections = list(self._connections) + self._connections = [] await self._close_connections(closing_connections) async def __aenter__(self) -> "AsyncConnectionPool": - # Acquiring the pool lock here ensures that we have the - # correct dependencies installed as early as possible. - async with self._pool_lock: - pass return self async def __aexit__( @@ -328,6 +350,7 @@ async def aclose(self) -> None: if hasattr(self._stream, "aclose"): await self._stream.aclose() + with self._pool._optional_thread_lock: self._pool._requests.remove(self._pool_request) closing = self._pool._assign_requests_to_connections() diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 0b6adf32..1727374f 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -7,7 +7,7 @@ from .._backends.base import SOCKET_OPTION, NetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import Event, Lock, ShieldCancellation +from .._synchronization import Event, ShieldCancellation, ThreadLock from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -106,14 +106,21 @@ def __init__( self._local_address = local_address self._uds = uds - self._connections: List[ConnectionInterface] = [] - self._requests: List[PoolRequest] = [] - self._pool_lock = Lock() self._network_backend = ( SyncBackend() if network_backend is None else network_backend ) self._socket_options = socket_options + # The mutable state on a connection pool is the queue of incoming requests, + # and the set of connections that are servicing those requests. + self._connections: List[ConnectionInterface] = [] + self._requests: List[PoolRequest] = [] + + # We only mutate the state of the connection pool within an 'optional_thread_lock' + # context. This holds a threading lock unless we're running in async mode, + # in which case it is a no-op. + self._optional_thread_lock = ThreadLock() + def create_connection(self, origin: Origin) -> ConnectionInterface: return HTTPConnection( origin=origin, @@ -165,31 +172,48 @@ def handle_request(self, request: Request) -> Response: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("pool", None) - pool_request = PoolRequest(request) - self._requests.append(pool_request) + with self._optional_thread_lock: + # Add the incoming request to our request queue. + pool_request = PoolRequest(request) + self._requests.append(pool_request) + try: while True: - with ShieldCancellation(): + with self._optional_thread_lock: + # Assign incoming requests to available connections, + # closing or creating new connections as required. closing = self._assign_requests_to_connections() self._close_connections(closing) + + # Wait until this request has an assigned connection. connection = pool_request.wait_for_connection(timeout=timeout) try: + # Send the request on the assigned connection. response = connection.handle_request( pool_request.request ) except ConnectionNotAvailable: + # In some cases a connection may initially be available to + # handle a request, but then become unavailable. + # + # In this case we clear the connection and try again. pool_request.clear_connection() else: break except BaseException as exc: - with ShieldCancellation(): + with self._optional_thread_lock: + # For any exception or cancellation we remove the request from + # the queue, and then re-assign requests to connections. self._requests.remove(pool_request) closing = self._assign_requests_to_connections() + self._close_connections(closing) raise exc from None + # Return the response. Note that in this case we still have to manage + # the point at which the response is closed. assert isinstance(response.stream, Iterable) return Response( status=response.status, @@ -274,22 +298,20 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: return closing_connections def _close_connections(self, closing: List[ConnectionInterface]) -> None: - """ - Close connections which have been removed from the pool. - """ - for connection in closing: - connection.close() + # Close connections which have been removed from the pool. + with ShieldCancellation(): + for connection in closing: + connection.close() def close(self) -> None: - closing_connections = list(self._connections) - self._connections = [] + # Explicitly close the connection pool. + # Clears all existing requests and connections. + with self._optional_thread_lock: + closing_connections = list(self._connections) + self._connections = [] self._close_connections(closing_connections) def __enter__(self) -> "ConnectionPool": - # Acquiring the pool lock here ensures that we have the - # correct dependencies installed as early as possible. - with self._pool_lock: - pass return self def __exit__( @@ -328,6 +350,7 @@ def close(self) -> None: if hasattr(self._stream, "close"): self._stream.close() + with self._pool._optional_thread_lock: self._pool._requests.remove(self._pool_request) closing = self._pool._assign_requests_to_connections() diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index 119d89fc..9619a398 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -45,6 +45,13 @@ def current_async_library() -> str: class AsyncLock: + """ + This is a standard lock. + + In the sync case `Lock` provides thread locking. + In the async case `AsyncLock` provides async locking. + """ + def __init__(self) -> None: self._backend = "" @@ -82,6 +89,26 @@ async def __aexit__( self._anyio_lock.release() +class AsyncThreadLock: + """ + This is a threading-only lock for no-I/O contexts. + + In the sync case `ThreadLock` provides thread locking. + In the async case `AsyncThreadLock` is a no-op. + """ + + def __enter__(self) -> "AsyncThreadLock": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + pass + + class AsyncEvent: def __init__(self) -> None: self._backend = "" @@ -202,6 +229,13 @@ def __exit__( class Lock: + """ + This is a standard lock. + + In the sync case `Lock` provides thread locking. + In the async case `AsyncLock` provides async locking. + """ + def __init__(self) -> None: self._lock = threading.Lock() @@ -218,6 +252,30 @@ def __exit__( self._lock.release() +class ThreadLock: + """ + This is a threading-only lock for no-I/O contexts. + + In the sync case `ThreadLock` provides thread locking. + In the async case `AsyncThreadLock` is a no-op. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + + def __enter__(self) -> "ThreadLock": + self._lock.acquire() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]] = None, + exc_value: Optional[BaseException] = None, + traceback: Optional[TracebackType] = None, + ) -> None: + self._lock.release() + + class Event: def __init__(self) -> None: self._event = threading.Event() From f91789652485d5ec7651458bc20a9164ff56dbd6 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 7 Feb 2024 15:50:51 +0100 Subject: [PATCH 10/12] nocover directive --- httpcore/_async/connection_pool.py | 2 +- httpcore/_sync/connection_pool.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 9dea3e06..9bcb9f76 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -200,7 +200,7 @@ async def handle_async_request(self, request: Request) -> Response: # In this case we clear the connection and try again. pool_request.clear_connection() else: - break + break # pragma: nocover except BaseException as exc: with self._optional_thread_lock: diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 1727374f..b5cafcb6 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -200,7 +200,7 @@ def handle_request(self, request: Request) -> Response: # In this case we clear the connection and try again. pool_request.clear_connection() else: - break + break # pragma: nocover except BaseException as exc: with self._optional_thread_lock: From 837f1e0b9c43c41e425d638868d7a35c2d5cd364 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Fri, 9 Feb 2024 18:24:44 +0100 Subject: [PATCH 11/12] Safe cancellations --- httpcore/_async/connection.py | 16 +++++++------- httpcore/_async/connection_pool.py | 31 ++++++++++++++++++++++++---- httpcore/_sync/connection.py | 16 +++++++------- httpcore/_sync/connection_pool.py | 31 ++++++++++++++++++++++++---- tests/_async/test_connection_pool.py | 29 ++++++++++++++++++++++++++ tests/_sync/test_connection_pool.py | 29 ++++++++++++++++++++++++++ 6 files changed, 126 insertions(+), 26 deletions(-) diff --git a/httpcore/_async/connection.py b/httpcore/_async/connection.py index 3aeb8ed9..2f439cf0 100644 --- a/httpcore/_async/connection.py +++ b/httpcore/_async/connection.py @@ -6,7 +6,7 @@ from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream -from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout +from .._exceptions import ConnectError, ConnectTimeout from .._models import Origin, Request, Response from .._ssl import default_ssl_context from .._synchronization import AsyncLock @@ -70,9 +70,9 @@ async def handle_async_request(self, request: Request) -> Response: f"Attempted to send request to {request.url.origin} on connection to {self._origin}" ) - async with self._request_lock: - if self._connection is None: - try: + try: + async with self._request_lock: + if self._connection is None: stream = await self._connect(request) ssl_object = stream.get_extra_info("ssl_object") @@ -94,11 +94,9 @@ async def handle_async_request(self, request: Request) -> Response: stream=stream, keepalive_expiry=self._keepalive_expiry, ) - except Exception as exc: - self._connect_failed = True - raise exc - elif not self._connection.is_available(): - raise ConnectionNotAvailable() + except BaseException as exc: + self._connect_failed = True + raise exc return await self._connection.handle_async_request(request) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 9bcb9f76..018b0ba2 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -12,7 +12,7 @@ from .interfaces import AsyncConnectionInterface, AsyncRequestInterface -class AsyncPoolRequest(Request): +class AsyncPoolRequest: def __init__(self, request: Request) -> None: self.request = request self.connection: Optional[AsyncConnectionInterface] = None @@ -36,6 +36,9 @@ async def wait_for_connection( assert self.connection is not None return self.connection + def is_queued(self) -> bool: + return self.connection is None + class AsyncConnectionPool(AsyncRequestInterface): """ @@ -256,9 +259,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: closing_connections.append(connection) # Assign queued requests to connections. - queued_requests = [ - request for request in self._requests if request.connection is None - ] + queued_requests = [request for request in self._requests if request.is_queued()] for pool_request in queued_requests: origin = pool_request.request.url.origin avilable_connections = [ @@ -322,6 +323,28 @@ async def __aexit__( ) -> None: await self.aclose() + def __repr__(self) -> str: + class_name = self.__class__.__name__ + with self._optional_thread_lock: + request_is_queued = [request.is_queued() for request in self._requests] + connection_is_idle = [ + connection.is_idle() for connection in self._connections + ] + + num_active_requests = request_is_queued.count(False) + num_queued_requests = request_is_queued.count(True) + num_active_connections = connection_is_idle.count(False) + num_idle_connections = connection_is_idle.count(True) + + requests_info = ( + f"Requests: {num_active_requests} active, {num_queued_requests} queued" + ) + connection_info = ( + f"Connections: {num_active_connections} active, {num_idle_connections} idle" + ) + + return f"<{class_name} [{requests_info} | {connection_info}]>" + class PoolByteStream: def __init__( diff --git a/httpcore/_sync/connection.py b/httpcore/_sync/connection.py index f6b99f1b..c3890f34 100644 --- a/httpcore/_sync/connection.py +++ b/httpcore/_sync/connection.py @@ -6,7 +6,7 @@ from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend, NetworkStream -from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout +from .._exceptions import ConnectError, ConnectTimeout from .._models import Origin, Request, Response from .._ssl import default_ssl_context from .._synchronization import Lock @@ -70,9 +70,9 @@ def handle_request(self, request: Request) -> Response: f"Attempted to send request to {request.url.origin} on connection to {self._origin}" ) - with self._request_lock: - if self._connection is None: - try: + try: + with self._request_lock: + if self._connection is None: stream = self._connect(request) ssl_object = stream.get_extra_info("ssl_object") @@ -94,11 +94,9 @@ def handle_request(self, request: Request) -> Response: stream=stream, keepalive_expiry=self._keepalive_expiry, ) - except Exception as exc: - self._connect_failed = True - raise exc - elif not self._connection.is_available(): - raise ConnectionNotAvailable() + except BaseException as exc: + self._connect_failed = True + raise exc return self._connection.handle_request(request) diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index b5cafcb6..8dcf348c 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -12,7 +12,7 @@ from .interfaces import ConnectionInterface, RequestInterface -class PoolRequest(Request): +class PoolRequest: def __init__(self, request: Request) -> None: self.request = request self.connection: Optional[ConnectionInterface] = None @@ -36,6 +36,9 @@ def wait_for_connection( assert self.connection is not None return self.connection + def is_queued(self) -> bool: + return self.connection is None + class ConnectionPool(RequestInterface): """ @@ -256,9 +259,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: closing_connections.append(connection) # Assign queued requests to connections. - queued_requests = [ - request for request in self._requests if request.connection is None - ] + queued_requests = [request for request in self._requests if request.is_queued()] for pool_request in queued_requests: origin = pool_request.request.url.origin avilable_connections = [ @@ -322,6 +323,28 @@ def __exit__( ) -> None: self.close() + def __repr__(self) -> str: + class_name = self.__class__.__name__ + with self._optional_thread_lock: + request_is_queued = [request.is_queued() for request in self._requests] + connection_is_idle = [ + connection.is_idle() for connection in self._connections + ] + + num_active_requests = request_is_queued.count(False) + num_queued_requests = request_is_queued.count(True) + num_active_connections = connection_is_idle.count(False) + num_idle_connections = connection_is_idle.count(True) + + requests_info = ( + f"Requests: {num_active_requests} active, {num_queued_requests} queued" + ) + connection_info = ( + f"Connections: {num_active_connections} active, {num_idle_connections} idle" + ) + + return f"<{class_name} [{requests_info} | {connection_info}]>" + class PoolByteStream: def __init__( diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 442b441d..2fc27204 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -38,6 +38,10 @@ async def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) await response.aread() assert response.status == 200 @@ -46,6 +50,10 @@ async def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) # Sending a second request to the same origin will reuse the existing IDLE connection. async with pool.stream("GET", "https://example.com/") as response: @@ -53,6 +61,10 @@ async def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) await response.aread() assert response.status == 200 @@ -61,6 +73,10 @@ async def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) # Sending a request to a different origin will not reuse the existing IDLE connection. async with pool.stream("GET", "http://example.com/") as response: @@ -69,6 +85,10 @@ async def test_connection_pool_with_keepalive(): "", "", ] + assert ( + repr(pool) + == "" + ) await response.aread() assert response.status == 200 @@ -78,6 +98,10 @@ async def test_connection_pool_with_keepalive(): "", "", ] + assert ( + repr(pool) + == "" + ) @pytest.mark.anyio @@ -620,6 +644,11 @@ async def fetch(pool, domain, info_list): "", ] + assert ( + repr(pool) + == "" + ) + @pytest.mark.anyio async def test_unsupported_protocol(): diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index 0897f1ff..ee303e5c 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -38,6 +38,10 @@ def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) response.read() assert response.status == 200 @@ -46,6 +50,10 @@ def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) # Sending a second request to the same origin will reuse the existing IDLE connection. with pool.stream("GET", "https://example.com/") as response: @@ -53,6 +61,10 @@ def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) response.read() assert response.status == 200 @@ -61,6 +73,10 @@ def test_connection_pool_with_keepalive(): assert info == [ "" ] + assert ( + repr(pool) + == "" + ) # Sending a request to a different origin will not reuse the existing IDLE connection. with pool.stream("GET", "http://example.com/") as response: @@ -69,6 +85,10 @@ def test_connection_pool_with_keepalive(): "", "", ] + assert ( + repr(pool) + == "" + ) response.read() assert response.status == 200 @@ -78,6 +98,10 @@ def test_connection_pool_with_keepalive(): "", "", ] + assert ( + repr(pool) + == "" + ) @@ -620,6 +644,11 @@ def fetch(pool, domain, info_list): "", ] + assert ( + repr(pool) + == "" + ) + def test_unsupported_protocol(): From a42a30d8c250848feac02f56340f2f5b71444c07 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Mon, 12 Feb 2024 10:10:28 +0000 Subject: [PATCH 12/12] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 061358f4..3d537c8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## Unreleased +- Fix support for async cancellations. (#880) - Fix trace extension when used with socks proxy. (#849) - Fix SSL context for connections using the "wss" scheme (#869)