Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for connection pool monitoring #702

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## unreleased

- The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699)
- Add some connection pool events for trace extension. (#702)

## 0.17.2 (May 23th, 2023)

Expand Down
12 changes: 12 additions & 0 deletions docs/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ The `event_name` and `info` arguments here will be one of the following:
* `{event_type}.{event_name}.started`, `<dictionary of keyword arguments>`
* `{event_type}.{event_name}.complete`, `{"return_value": <...>}`
* `{event_type}.{event_name}.failed`, `{"exception": <...>}`
* `{event_type}.{event_name}`, `<dictionary of keyword arguments>`

karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved

Note that when using the async variant of `httpcore` the handler function passed to `"trace"` must be an `async def ...` function.

Expand All @@ -130,6 +132,15 @@ The following event types are currently exposed...
* `"connection.connect_unix_socket"`
* `"connection.start_tls"`

**Connection pool events**
* `"connection_pool.add_request"`
karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved
* `"connection_pool.remove_request"`
* `"connection_pool.add_connection"`
* `"connection_pool.reuse_connection"`
* `"connection_pool.assign_request_to_connection"`
* `"connection_pool.unassign_request_from_connection"`
* `"connection_pool.timeout_waiting_for_connection"`

**HTTP/1.1 events**

* `"http11.send_request_headers"`
Expand All @@ -147,6 +158,7 @@ The following event types are currently exposed...
* `"http2.receive_response_body"`
* `"http2.response_closed"`


karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved
### `"sni_hostname"`

The server's hostname, which is used to confirm the hostname supplied by the SSL certificate.
Expand Down
62 changes: 62 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import logging
import ssl
import sys
from types import TracebackType
from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type

import httpcore

karpetrosyan marked this conversation as resolved.
Show resolved Hide resolved
from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock
from .._trace import atrace
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -161,8 +167,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
await atrace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -171,6 +183,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
await connection.aclose()
await atrace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -180,7 +198,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
await atrace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -192,6 +215,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -200,6 +226,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand All @@ -222,6 +251,7 @@ async def handle_async_request(self, request: Request) -> Response:
status = RequestStatus(request)

async with self._pool_lock:
await atrace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)
Expand All @@ -235,9 +265,22 @@ async def handle_async_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, httpcore.PoolTimeout):
await atrace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
async with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -254,6 +297,13 @@ async def handle_async_request(self, request: Request) -> Response:
async with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
await atrace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
await self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -285,9 +335,21 @@ async def response_closed(self, status: RequestStatus) -> None:
async with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
await atrace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.remove(connection)

# Since we've had a response closed, it's possible we'll now be able
Expand Down
62 changes: 62 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import logging
import ssl
import sys
from types import TracebackType
from typing import Iterable, Iterator, Iterable, List, Optional, Type

import httpcore

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock
from .._trace import trace
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -161,8 +167,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
trace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -171,6 +183,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
connection.close()
trace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -180,7 +198,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
trace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -192,6 +215,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -200,6 +226,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand All @@ -222,6 +251,7 @@ def handle_request(self, request: Request) -> Response:
status = RequestStatus(request)

with self._pool_lock:
trace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
self._close_expired_connections()
self._attempt_to_acquire_connection(status)
Expand All @@ -235,9 +265,22 @@ def handle_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, httpcore.PoolTimeout):
trace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -254,6 +297,13 @@ def handle_request(self, request: Request) -> Response:
with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
trace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -285,9 +335,21 @@ def response_closed(self, status: RequestStatus) -> None:
with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
trace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.remove(connection)

# Since we've had a response closed, it's possible we'll now be able
Expand Down
50 changes: 50 additions & 0 deletions httpcore/_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,56 @@
from ._models import Request


def trace(
name: str,
logger: logging.Logger,
request: Optional[Request] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> None:
trace_extension = None if request is None else request.extensions.get("trace")
prefix = logger.name.split(".")[-1]
info = kwargs or {}
debug = logger.isEnabledFor(logging.DEBUG)

if debug or trace_extension:
if trace_extension is not None:
prefix_and_name = f"{prefix}.{name}"
trace_extension(prefix_and_name, info)

if debug:
if not info: # pragma: no cover
message = name
else:
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
message = f"{name} {args}"
logger.debug(message)


async def atrace(
name: str,
logger: logging.Logger,
request: Optional[Request] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> None:
trace_extension = None if request is None else request.extensions.get("trace")
prefix = logger.name.split(".")[-1]
info = kwargs or {}
debug = logger.isEnabledFor(logging.DEBUG)

if debug or trace_extension:
if trace_extension is not None:
prefix_and_name = f"{prefix}.{name}"
await trace_extension(prefix_and_name, info)

if debug:
if not info: # pragma: no cover
message = name
else:
args = " ".join([f"{key}={value!r}" for key, value in info.items()])
message = f"{name} {args}"
logger.debug(message)


class Trace:
def __init__(
self,
Expand Down
Loading