From 41597adffa9d34171a63f7511fc0f702558dd08c Mon Sep 17 00:00:00 2001 From: RafaelWO <38643099+RafaelWO@users.noreply.github.com> Date: Fri, 1 Nov 2024 20:20:18 +0100 Subject: [PATCH] Move remaining utility functions from _utils.py to _models.py (#3387) --- httpx/_models.py | 84 +++++++++++++++++++++++++++++++----- httpx/_utils.py | 70 ------------------------------ scripts/lint | 2 +- tests/models/test_headers.py | 43 ++++++++++++++++++ tests/test_utils.py | 43 ------------------ 5 files changed, 118 insertions(+), 124 deletions(-) diff --git a/httpx/_models.py b/httpx/_models.py index 16c4d1cf56..e7c992f77e 100644 --- a/httpx/_models.py +++ b/httpx/_models.py @@ -1,8 +1,10 @@ from __future__ import annotations +import codecs import datetime import email.message import json as jsonlib +import re import typing import urllib.request from collections.abc import Mapping @@ -44,15 +46,23 @@ SyncByteStream, ) from ._urls import URL -from ._utils import ( - is_known_encoding, - obfuscate_sensitive_headers, - parse_content_type_charset, - parse_header_links, -) +from ._utils import to_bytes_or_str, to_str __all__ = ["Cookies", "Headers", "Request", "Response"] +SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} + + +def _is_known_encoding(encoding: str) -> bool: + """ + Return `True` if `encoding` is a known codec. + """ + try: + codecs.lookup(encoding) + except LookupError: + return False + return True + def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes: """ @@ -72,6 +82,60 @@ def _normalize_header_value(value: str | bytes, encoding: str | None = None) -> return value.encode(encoding or "ascii") +def _parse_content_type_charset(content_type: str) -> str | None: + # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. + # See: https://peps.python.org/pep-0594/#cgi + msg = email.message.Message() + msg["content-type"] = content_type + return msg.get_content_charset(failobj=None) + + +def _parse_header_links(value: str) -> list[dict[str, str]]: + """ + Returns a list of parsed link headers, for more info see: + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link + The generic syntax of those is: + Link: < uri-reference >; param1=value1; param2="value2" + So for instance: + Link; '; type="image/jpeg",;' + would return + [ + {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, + {"url": "http://.../back.jpeg"}, + ] + :param value: HTTP Link entity-header field + :return: list of parsed link headers + """ + links: list[dict[str, str]] = [] + replace_chars = " '\"" + value = value.strip(replace_chars) + if not value: + return links + for val in re.split(", *<", value): + try: + url, params = val.split(";", 1) + except ValueError: + url, params = val, "" + link = {"url": url.strip("<> '\"")} + for param in params.split(";"): + try: + key, value = param.split("=") + except ValueError: + break + link[key.strip(replace_chars)] = value.strip(replace_chars) + links.append(link) + return links + + +def _obfuscate_sensitive_headers( + items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], +) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: + for k, v in items: + if to_str(k.lower()) in SENSITIVE_HEADERS: + v = to_bytes_or_str("[secure]", match_type_of=v) + yield k, v + + class Headers(typing.MutableMapping[str, str]): """ HTTP headers, as a case-insensitive multi-dict. @@ -306,7 +370,7 @@ def __repr__(self) -> str: if self.encoding != "ascii": encoding_str = f", encoding={self.encoding!r}" - as_list = list(obfuscate_sensitive_headers(self.multi_items())) + as_list = list(_obfuscate_sensitive_headers(self.multi_items())) as_dict = dict(as_list) no_duplicate_keys = len(as_dict) == len(as_list) @@ -599,7 +663,7 @@ def encoding(self) -> str | None: """ if not hasattr(self, "_encoding"): encoding = self.charset_encoding - if encoding is None or not is_known_encoding(encoding): + if encoding is None or not _is_known_encoding(encoding): if isinstance(self.default_encoding, str): encoding = self.default_encoding elif hasattr(self, "_content"): @@ -630,7 +694,7 @@ def charset_encoding(self) -> str | None: if content_type is None: return None - return parse_content_type_charset(content_type) + return _parse_content_type_charset(content_type) def _get_content_decoder(self) -> ContentDecoder: """ @@ -785,7 +849,7 @@ def links(self) -> dict[str | None, dict[str, str]]: return { (link.get("rel") or link.get("url")): link - for link in parse_header_links(header) + for link in _parse_header_links(header) } @property diff --git a/httpx/_utils.py b/httpx/_utils.py index c873bdb2f0..9a1ed54749 100644 --- a/httpx/_utils.py +++ b/httpx/_utils.py @@ -1,7 +1,5 @@ from __future__ import annotations -import codecs -import email.message import ipaddress import os import re @@ -29,74 +27,6 @@ def primitive_value_to_str(value: PrimitiveData) -> str: return str(value) -def is_known_encoding(encoding: str) -> bool: - """ - Return `True` if `encoding` is a known codec. - """ - try: - codecs.lookup(encoding) - except LookupError: - return False - return True - - -def parse_header_links(value: str) -> list[dict[str, str]]: - """ - Returns a list of parsed link headers, for more info see: - https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link - The generic syntax of those is: - Link: < uri-reference >; param1=value1; param2="value2" - So for instance: - Link; '; type="image/jpeg",;' - would return - [ - {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, - {"url": "http://.../back.jpeg"}, - ] - :param value: HTTP Link entity-header field - :return: list of parsed link headers - """ - links: list[dict[str, str]] = [] - replace_chars = " '\"" - value = value.strip(replace_chars) - if not value: - return links - for val in re.split(", *<", value): - try: - url, params = val.split(";", 1) - except ValueError: - url, params = val, "" - link = {"url": url.strip("<> '\"")} - for param in params.split(";"): - try: - key, value = param.split("=") - except ValueError: - break - link[key.strip(replace_chars)] = value.strip(replace_chars) - links.append(link) - return links - - -def parse_content_type_charset(content_type: str) -> str | None: - # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. - # See: https://peps.python.org/pep-0594/#cgi - msg = email.message.Message() - msg["content-type"] = content_type - return msg.get_content_charset(failobj=None) - - -SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} - - -def obfuscate_sensitive_headers( - items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], -) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: - for k, v in items: - if to_str(k.lower()) in SENSITIVE_HEADERS: - v = to_bytes_or_str("[secure]", match_type_of=v) - yield k, v - - def port_or_default(url: URL) -> int | None: if url.port is not None: return url.port diff --git a/scripts/lint b/scripts/lint index 3d8685a065..6d096d760b 100755 --- a/scripts/lint +++ b/scripts/lint @@ -8,5 +8,5 @@ export SOURCE_FILES="httpx tests" set -x -${PREFIX}ruff --fix $SOURCE_FILES +${PREFIX}ruff check --fix $SOURCE_FILES ${PREFIX}ruff format $SOURCE_FILES diff --git a/tests/models/test_headers.py b/tests/models/test_headers.py index d671dc4186..a87a446784 100644 --- a/tests/models/test_headers.py +++ b/tests/models/test_headers.py @@ -174,3 +174,46 @@ def test_sensitive_headers(header): value = "s3kr3t" h = httpx.Headers({header: value}) assert repr(h) == "Headers({'%s': '[secure]'})" % header + + +@pytest.mark.parametrize( + "headers, output", + [ + ([("content-type", "text/html")], [("content-type", "text/html")]), + ([("authorization", "s3kr3t")], [("authorization", "[secure]")]), + ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]), + ], +) +def test_obfuscate_sensitive_headers(headers, output): + as_dict = {k: v for k, v in output} + headers_class = httpx.Headers({k: v for k, v in headers}) + assert repr(headers_class) == f"Headers({as_dict!r})" + + +@pytest.mark.parametrize( + "value, expected", + ( + ( + '; rel=front; type="image/jpeg"', + [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}], + ), + ("", [{"url": "http:/.../front.jpeg"}]), + (";", [{"url": "http:/.../front.jpeg"}]), + ( + '; type="image/jpeg",;', + [ + {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, + {"url": "http://.../back.jpeg"}, + ], + ), + ("", []), + ), +) +def test_parse_header_links(value, expected): + all_links = httpx.Response(200, headers={"link": value}).links.values() + assert all(link in all_links for link in expected) + + +def test_parse_header_links_no_link(): + all_links = httpx.Response(200).links + assert all_links == {} diff --git a/tests/test_utils.py b/tests/test_utils.py index f7e6c1642a..3e2abdef28 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -53,35 +53,6 @@ def test_guess_by_bom(encoding, expected): assert response.json() == {"abc": 123} -@pytest.mark.parametrize( - "value, expected", - ( - ( - '; rel=front; type="image/jpeg"', - [{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}], - ), - ("", [{"url": "http:/.../front.jpeg"}]), - (";", [{"url": "http:/.../front.jpeg"}]), - ( - '; type="image/jpeg",;', - [ - {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, - {"url": "http://.../back.jpeg"}, - ], - ), - ("", []), - ), -) -def test_parse_header_links(value, expected): - all_links = httpx.Response(200, headers={"link": value}).links.values() - assert all(link in all_links for link in expected) - - -def test_parse_header_links_no_link(): - all_links = httpx.Response(200).links - assert all_links == {} - - def test_logging_request(server, caplog): caplog.set_level(logging.INFO) with httpx.Client() as client: @@ -144,20 +115,6 @@ def test_get_environment_proxies(environment, proxies): assert get_environment_proxies() == proxies -@pytest.mark.parametrize( - "headers, output", - [ - ([("content-type", "text/html")], [("content-type", "text/html")]), - ([("authorization", "s3kr3t")], [("authorization", "[secure]")]), - ([("proxy-authorization", "s3kr3t")], [("proxy-authorization", "[secure]")]), - ], -) -def test_obfuscate_sensitive_headers(headers, output): - as_dict = {k: v for k, v in output} - headers_class = httpx.Headers({k: v for k, v in headers}) - assert repr(headers_class) == f"Headers({as_dict!r})" - - def test_same_origin(): origin = httpx.URL("https://example.com") request = httpx.Request("GET", "HTTPS://EXAMPLE.COM:443")