Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Httpcore interface #804

Merged
merged 11 commits into from
Apr 8, 2020
4 changes: 0 additions & 4 deletions httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ._dispatch.asgi import ASGIDispatch
from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import (
ConnectionClosed,
ConnectTimeout,
CookieConflict,
DecodingError,
Expand All @@ -23,7 +22,6 @@
ResponseClosed,
ResponseNotRead,
StreamConsumed,
TimeoutException,
TooManyRedirects,
WriteTimeout,
)
Expand Down Expand Up @@ -56,7 +54,6 @@
"Timeout",
"ConnectTimeout",
"CookieConflict",
"ConnectionClosed",
"DecodingError",
"HTTPError",
"InvalidURL",
Expand All @@ -79,7 +76,6 @@
"Headers",
"QueryParams",
"Request",
"TimeoutException",
"Response",
"DigestAuth",
"WSGIDispatch",
Expand Down
2 changes: 1 addition & 1 deletion httpx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__title__ = "httpx"
__description__ = "A next generation HTTP client, for Python 3."
__version__ = "0.12.1"
__version__ = "0.13.dev0"
157 changes: 106 additions & 51 deletions httpx/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from types import TracebackType

import hstspreload
import httpcore

from ._auth import Auth, AuthTypes, BasicAuth, FunctionAuth
from ._config import (
Expand All @@ -14,17 +15,14 @@
PoolLimits,
ProxiesTypes,
Proxy,
SSLConfig,
Timeout,
TimeoutTypes,
UnsetType,
VerifyTypes,
)
from ._content_streams import ContentStream
from ._dispatch.asgi import ASGIDispatch
from ._dispatch.base import AsyncDispatcher, SyncDispatcher
from ._dispatch.connection_pool import ConnectionPool
from ._dispatch.proxy_http import HTTPProxy
from ._dispatch.urllib3 import URLLib3Dispatcher
from ._dispatch.wsgi import WSGIDispatch
from ._exceptions import HTTPError, InvalidURL, RequestBodyUnavailable, TooManyRedirects
from ._models import (
Expand Down Expand Up @@ -96,7 +94,7 @@ def get_proxy_map(
elif isinstance(proxies, (str, URL, Proxy)):
proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
return {"all": proxy}
elif isinstance(proxies, AsyncDispatcher): # pragma: nocover
elif isinstance(proxies, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
"Passing a dispatcher instance to 'proxies=' is no longer "
"supported. Use `httpx.Proxy() instead.`"
Expand All @@ -107,7 +105,7 @@ def get_proxy_map(
if isinstance(value, (str, URL, Proxy)):
proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
new_proxies[str(key)] = proxy
elif isinstance(value, AsyncDispatcher): # pragma: nocover
elif isinstance(value, httpcore.AsyncHTTPTransport): # pragma: nocover
raise RuntimeError(
"Passing a dispatcher instance to 'proxies=' is "
"no longer supported. Use `httpx.Proxy() instead.`"
Expand Down Expand Up @@ -446,7 +444,7 @@ def __init__(
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
dispatch: SyncDispatcher = None,
dispatch: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
):
Expand All @@ -471,7 +469,7 @@ def __init__(
app=app,
trust_env=trust_env,
)
self.proxies: typing.Dict[str, SyncDispatcher] = {
self.proxies: typing.Dict[str, httpcore.SyncHTTPTransport] = {
key: self.init_proxy_dispatch(
proxy,
verify=verify,
Expand All @@ -487,18 +485,26 @@ def init_dispatch(
verify: VerifyTypes = True,
cert: CertTypes = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: SyncDispatcher = None,
dispatch: httpcore.SyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
) -> SyncDispatcher:
) -> httpcore.SyncHTTPTransport:
if dispatch is not None:
return dispatch

if app is not None:
return WSGIDispatch(app=app)

return URLLib3Dispatcher(
verify=verify, cert=cert, pool_limits=pool_limits, trust_env=trust_env,
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.SyncConnectionPool(
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def init_proxy_dispatch(
Expand All @@ -508,18 +514,25 @@ def init_proxy_dispatch(
cert: CertTypes = None,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
trust_env: bool = True,
) -> SyncDispatcher:
return URLLib3Dispatcher(
proxy=proxy,
verify=verify,
cert=cert,
pool_limits=pool_limits,
trust_env=trust_env,
) -> httpcore.SyncHTTPTransport:
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.SyncHTTPProxy(
proxy_origin=proxy.url.raw[:3],
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def dispatcher_for_url(self, url: URL) -> SyncDispatcher:
def dispatcher_for_url(self, url: URL) -> httpcore.SyncHTTPTransport:
"""
Returns the SyncDispatcher instance that should be used for a given URL.
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies and not should_not_be_proxied(url):
Expand Down Expand Up @@ -667,22 +680,41 @@ def send_handling_auth(
request = next_request
history.append(response)

def send_single_request(self, request: Request, timeout: Timeout,) -> Response:
def send_single_request(self, request: Request, timeout: Timeout) -> Response:
"""
Sends a single request, without handling any redirections.
"""

dispatcher = self.dispatcher_for_url(request.url)

try:
response = dispatcher.send(request, timeout=timeout)
(
http_version,
status_code,
reason_phrase,
headers,
stream,
) = dispatcher.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream,
timeout=timeout.as_dict(),
)
except HTTPError as exc:
# Add the original request to any HTTPError unless
# there'a already a request attached in the case of
# a ProxyError.
if exc._request is None:
exc._request = request
raise
response = Response(
status_code,
http_version=http_version.decode("ascii"),
headers=headers,
stream=stream, # type: ignore
request=request,
)

self.cookies.extract_cookies(response)

Expand Down Expand Up @@ -928,7 +960,6 @@ class AsyncClient(BaseClient):
rather than sending actual network requests.
* **trust_env** - *(optional)* Enables or disables usage of environment
variables for configuration.
* **uds** - *(optional)* A path to a Unix domain socket to connect through.
"""

def __init__(
Expand All @@ -946,10 +977,9 @@ def __init__(
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
base_url: URLTypes = None,
dispatch: AsyncDispatcher = None,
dispatch: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
uds: str = None,
):
super().__init__(
auth=auth,
Expand All @@ -972,9 +1002,8 @@ def __init__(
dispatch=dispatch,
app=app,
trust_env=trust_env,
uds=uds,
)
self.proxies: typing.Dict[str, AsyncDispatcher] = {
self.proxies: typing.Dict[str, httpcore.AsyncHTTPTransport] = {
key: self.init_proxy_dispatch(
proxy,
verify=verify,
Expand All @@ -992,24 +1021,27 @@ def init_dispatch(
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: AsyncDispatcher = None,
dispatch: httpcore.AsyncHTTPTransport = None,
app: typing.Callable = None,
trust_env: bool = True,
uds: str = None,
) -> AsyncDispatcher:
) -> httpcore.AsyncHTTPTransport:
if dispatch is not None:
return dispatch

if app is not None:
return ASGIDispatch(app=app)

return ConnectionPool(
verify=verify,
cert=cert,
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.AsyncConnectionPool(
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
http2=http2,
pool_limits=pool_limits,
trust_env=trust_env,
uds=uds,
)

def init_proxy_dispatch(
Expand All @@ -1020,21 +1052,25 @@ def init_proxy_dispatch(
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
trust_env: bool = True,
) -> AsyncDispatcher:
return HTTPProxy(
proxy_url=proxy.url,
proxy_headers=proxy.headers,
) -> httpcore.AsyncHTTPTransport:
ssl_context = SSLConfig(
verify=verify, cert=cert, trust_env=trust_env
).ssl_context
max_keepalive = pool_limits.soft_limit
max_connections = pool_limits.hard_limit

return httpcore.AsyncHTTPProxy(
proxy_origin=proxy.url.raw[:3],
proxy_headers=proxy.headers.raw,
proxy_mode=proxy.mode,
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
trust_env=trust_env,
ssl_context=ssl_context,
max_keepalive=max_keepalive,
max_connections=max_connections,
)

def dispatcher_for_url(self, url: URL) -> AsyncDispatcher:
def dispatcher_for_url(self, url: URL) -> httpcore.AsyncHTTPTransport:
"""
Returns the AsyncDispatcher instance that should be used for a given URL.
Returns the transport instance that should be used for a given URL.
This will either be the standard connection pool, or a proxy.
"""
if self.proxies and not should_not_be_proxied(url):
Expand Down Expand Up @@ -1193,14 +1229,33 @@ async def send_single_request(
dispatcher = self.dispatcher_for_url(request.url)

try:
response = await dispatcher.send(request, timeout=timeout)
(
http_version,
status_code,
reason_phrase,
headers,
stream,
) = await dispatcher.request(
request.method.encode(),
request.url.raw,
headers=request.headers.raw,
stream=request.stream,
timeout=timeout.as_dict(),
)
except HTTPError as exc:
# Add the original request to any HTTPError unless
# there'a already a request attached in the case of
# a ProxyError.
if exc._request is None:
exc._request = request
raise
response = Response(
status_code,
http_version=http_version.decode("ascii"),
headers=headers,
stream=stream, # type: ignore
request=request,
)

self.cookies.extract_cookies(response)

Expand Down Expand Up @@ -1383,9 +1438,9 @@ async def delete(
)

async def aclose(self) -> None:
await self.dispatch.close()
await self.dispatch.aclose()
for proxy in self.proxies.values():
await proxy.close()
await proxy.aclose()

async def __aenter__(self) -> "AsyncClient":
return self
Expand Down
8 changes: 8 additions & 0 deletions httpx/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ def __init__(
timeout if isinstance(pool_timeout, UnsetType) else pool_timeout
)

def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
return {
"connect": self.connect_timeout,
"read": self.read_timeout,
"write": self.write_timeout,
"pool": self.pool_timeout,
}

def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, self.__class__)
Expand Down
4 changes: 3 additions & 1 deletion httpx/_content_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pathlib import Path
from urllib.parse import urlencode

import httpcore

from ._exceptions import StreamConsumed
from ._types import StrOrBytes
from ._utils import format_form_param
Expand Down Expand Up @@ -35,7 +37,7 @@
]


class ContentStream:
class ContentStream(httpcore.AsyncByteStream, httpcore.SyncByteStream):
def get_headers(self) -> typing.Dict[str, str]:
"""
Return a dictionary of headers that are implied by the encoding.
Expand Down
Loading