Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for connection pool monitoring #702

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The project versioning policy is now explicitly governed by SEMVER. See https://

- Support async cancellations, ensuring that the connection pool is left in a clean state when cancellations occur. (#726)
- The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699)
- Add some connection pool events for trace extension. (#702)
- Graceful handling of HTTP/2 GoAway frames, with requests being transparently retried on a new connection. (#730)
- Add exceptions when a synchronous `trace callback` is passed to an asynchronous request or an asynchronous `trace callback` is passed to a synchronous request. (#717)
- Drop Python 3.7 support. (#727)
Expand Down
11 changes: 11 additions & 0 deletions docs/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ The `event_name` and `info` arguments here will be one of the following:
* `{event_type}.{event_name}.started`, `<dictionary of keyword arguments>`
* `{event_type}.{event_name}.complete`, `{"return_value": <...>}`
* `{event_type}.{event_name}.failed`, `{"exception": <...>}`
* `{event_type}.{event_name}`, `<dictionary of keyword arguments>`

karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved
Note that when using the async variant of `httpcore` the handler function passed to `"trace"` must be an `async def ...` function.

Expand All @@ -130,6 +131,16 @@ The following event types are currently exposed...
* `"connection.connect_unix_socket"`
* `"connection.start_tls"`

**Connection pool events**

* `"connection_pool.add_request"`
* `"connection_pool.remove_request"`
* `"connection_pool.add_connection"`
* `"connection_pool.reuse_connection"`
* `"connection_pool.assign_request_to_connection"`
* `"connection_pool.unassign_request_from_connection"`
* `"connection_pool.timeout_waiting_for_connection"`

**HTTP/1.1 events**

* `"http11.send_request_headers"`
Expand Down
60 changes: 60 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import ssl
import sys
import time
Expand All @@ -9,9 +10,12 @@
from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
from .._trace import atrace
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -162,8 +166,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
await atrace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -172,6 +182,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
await connection.aclose()
await atrace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -181,7 +197,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
await atrace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -193,6 +214,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -201,6 +225,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand Down Expand Up @@ -230,6 +257,7 @@ async def handle_async_request(self, request: Request) -> Response:
deadline = float("inf")

async with self._pool_lock:
await atrace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)
Expand All @@ -241,9 +269,22 @@ async def handle_async_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, PoolTimeout):
await atrace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
async with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -260,6 +301,13 @@ async def handle_async_request(self, request: Request) -> Response:
async with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
await atrace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
await self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -296,9 +344,21 @@ async def response_closed(self, status: RequestStatus) -> None:
async with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
await atrace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.remove(connection)

# Since we've had a response closed, it's possible we'll now be able
Expand Down
60 changes: 60 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import ssl
import sys
import time
Expand All @@ -9,9 +10,12 @@
from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock, ShieldCancellation
from .._trace import trace
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -162,8 +166,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
trace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -172,6 +182,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
connection.close()
trace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -181,7 +197,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
trace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -193,6 +214,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -201,6 +225,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand Down Expand Up @@ -230,6 +257,7 @@ def handle_request(self, request: Request) -> Response:
deadline = float("inf")

with self._pool_lock:
trace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
self._close_expired_connections()
self._attempt_to_acquire_connection(status)
Expand All @@ -241,9 +269,22 @@ def handle_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, PoolTimeout):
trace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -260,6 +301,13 @@ def handle_request(self, request: Request) -> Response:
with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
trace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -296,9 +344,21 @@ def response_closed(self, status: RequestStatus) -> None:
with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
trace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.remove(connection)

# Since we've had a response closed, it's possible we'll now be able
Expand Down
50 changes: 50 additions & 0 deletions httpcore/_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,56 @@
from ._models import Request


def trace(
name: str,
logger: logging.Logger,
request: Optional[Request] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> None:
trace_extension = None if request is None else request.extensions.get("trace")
prefix = logger.name.split(".")[-1]
info = kwargs or {}
debug = logger.isEnabledFor(logging.DEBUG)

if debug or trace_extension:
if trace_extension is not None:
prefix_and_name = f"{prefix}.{name}"
trace_extension(prefix_and_name, info)

if debug:
if not info: # pragma: no cover
message = name
else:
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
message = f"{name} {args}"
logger.debug(message)


async def atrace(
name: str,
logger: logging.Logger,
request: Optional[Request] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> None:
trace_extension = None if request is None else request.extensions.get("trace")
prefix = logger.name.split(".")[-1]
info = kwargs or {}
debug = logger.isEnabledFor(logging.DEBUG)

if debug or trace_extension:
if trace_extension is not None:
prefix_and_name = f"{prefix}.{name}"
await trace_extension(prefix_and_name, info)

if debug:
if not info: # pragma: no cover
message = name
else:
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
message = f"{name} {args}"
logger.debug(message)


class Trace:
def __init__(
self,
Expand Down
Loading