Skip to content

Commit

Permalink
Add credentials property to Request objects (sanic-org#2357)
Browse files Browse the repository at this point in the history
  • Loading branch information
SerGeRybakov authored and ChihweiLHBird committed Jun 1, 2022
1 parent 8f18389 commit f1a695f
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 87 deletions.
16 changes: 15 additions & 1 deletion sanic/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import re

from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import unquote

from sanic.exceptions import InvalidHeader
Expand Down Expand Up @@ -394,3 +394,17 @@ def parse_accept(accept: str) -> AcceptContainer:
return AcceptContainer(
sorted(accept_list, key=_sort_accept_value, reverse=True)
)


def parse_credentials(
header: Optional[str],
prefixes: Union[List, Tuple, Set] = None,
) -> Tuple[Optional[str], Optional[str]]:
"""Parses any header with the aim to retrieve any credentials from it."""
if not prefixes or not isinstance(prefixes, (list, tuple, set)):
prefixes = ("Basic", "Bearer", "Token")
if header is not None:
for prefix in prefixes:
if prefix in header:
return prefix, header.partition(prefix)[-1].strip()
return None, header
35 changes: 35 additions & 0 deletions sanic/models/http_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from base64 import b64decode
from dataclasses import dataclass, field
from typing import Optional


@dataclass()
class Credentials:
auth_type: Optional[str]
token: Optional[str]
_username: Optional[str] = field(default=None)
_password: Optional[str] = field(default=None)

def __post_init__(self):
if self._auth_is_basic:
self._username, self._password = (
b64decode(self.token.encode("utf-8")).decode().split(":")
)

@property
def username(self):
if not self._auth_is_basic:
raise AttributeError("Username is available for Basic Auth only")
return self._username

@property
def password(self):
if not self._auth_is_basic:
raise AttributeError("Password is available for Basic Auth only")
return self._password

@property
def _auth_is_basic(self) -> bool:
return self.auth_type == "Basic"
49 changes: 39 additions & 10 deletions sanic/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from sanic_routing.route import Route # type: ignore

from sanic.models.http_types import Credentials


if TYPE_CHECKING: # no cov
from sanic.server import ConnInfo
Expand All @@ -37,6 +39,7 @@
Options,
parse_accept,
parse_content_header,
parse_credentials,
parse_forwarded,
parse_host,
parse_xforwarded,
Expand Down Expand Up @@ -98,11 +101,13 @@ class Request:
"method",
"parsed_accept",
"parsed_args",
"parsed_not_grouped_args",
"parsed_credentials",
"parsed_files",
"parsed_form",
"parsed_json",
"parsed_forwarded",
"parsed_json",
"parsed_not_grouped_args",
"parsed_token",
"raw_url",
"responded",
"request_middleware_started",
Expand All @@ -122,6 +127,7 @@ def __init__(
app: Sanic,
head: bytes = b"",
):

self.raw_url = url_bytes
# TODO: Content-Encoding detection
self._parsed_url = parse_url(url_bytes)
Expand All @@ -141,9 +147,11 @@ def __init__(
self.ctx = SimpleNamespace()
self.parsed_forwarded: Optional[Options] = None
self.parsed_accept: Optional[AcceptContainer] = None
self.parsed_credentials: Optional[Credentials] = None
self.parsed_json = None
self.parsed_form = None
self.parsed_files = None
self.parsed_token: Optional[str] = None
self.parsed_args: DefaultDict[
Tuple[bool, bool, str, str], RequestParameters
] = defaultdict(RequestParameters)
Expand Down Expand Up @@ -332,20 +340,41 @@ def accept(self) -> AcceptContainer:
return self.parsed_accept

@property
def token(self):
def token(self) -> Optional[str]:
"""Attempt to return the auth header token.
:return: token related to request
"""
prefixes = ("Bearer", "Token")
auth_header = self.headers.getone("authorization", None)
if self.parsed_token is None:
prefixes = ("Bearer", "Token")
_, token = parse_credentials(
self.headers.getone("authorization", None), prefixes
)
self.parsed_token = token
return self.parsed_token

if auth_header is not None:
for prefix in prefixes:
if prefix in auth_header:
return auth_header.partition(prefix)[-1].strip()
@property
def credentials(self) -> Optional[Credentials]:
"""Attempt to return the auth header value.
Covers NoAuth, Basic Auth, Bearer Token, Api Token authentication
schemas.
return auth_header
:return: A named tuple with token or username and password related
to request
"""
if self.parsed_credentials is None:
try:
prefix, credentials = parse_credentials(
self.headers.getone("authorization", None)
)
if credentials:
self.parsed_credentials = Credentials(
auth_type=prefix, token=credentials
)
except ValueError:
pass
return self.parsed_credentials

@property
def form(self):
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import base64
import logging
import random
import re
Expand Down Expand Up @@ -204,3 +205,7 @@ def sanic_ext(ext_instance): # noqa
yield sanic_ext
with suppress(KeyError):
del sys.modules["sanic_ext"]


def encode_basic_auth_credentials(username, password):
return base64.b64encode(f"{username}:{password}".encode()).decode("ascii")
153 changes: 77 additions & 76 deletions tests/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
)

from sanic import Blueprint, Sanic
from sanic.exceptions import SanicException, ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, Request, RequestParameters
from sanic.exceptions import ServerError
from sanic.request import DEFAULT_HTTP_CONTENT_TYPE, RequestParameters
from sanic.response import html, json, text
from tests.conftest import encode_basic_auth_credentials


# ------------------------------------------------------------ #
Expand Down Expand Up @@ -362,93 +363,95 @@ async def handler(request, id, name):
assert request.uri_template == "/foo/<id:int>/bar/<name:[A-z]+>"


def test_token(app):
@pytest.mark.parametrize(
("auth_type", "token"),
[
# uuid4 generated token set in "Authorization" header
(None, "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"),
# uuid4 generated token with API Token authorization
("Token", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"),
# uuid4 generated token with Bearer Token authorization
("Bearer", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"),
# no Authorization header
(None, None),
],
)
def test_token(app, auth_type, token):
@app.route("/")
async def handler(request):
return text("OK")

# uuid4 generated token.
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"{token}",
}

request, response = app.test_client.get("/", headers=headers)

assert request.token == token

token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"Token {token}",
}

request, response = app.test_client.get("/", headers=headers)

assert request.token == token

token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"Bearer {token}",
}
if token:
headers = {
"content-type": "application/json",
"Authorization": f"{auth_type} {token}"
if auth_type
else f"{token}",
}
else:
headers = {"content-type": "application/json"}

request, response = app.test_client.get("/", headers=headers)

assert request.token == token

# no Authorization headers
headers = {"content-type": "application/json"}

request, response = app.test_client.get("/", headers=headers)

assert request.token is None


@pytest.mark.asyncio
async def test_token_asgi(app):
@pytest.mark.parametrize(
("auth_type", "token", "username", "password"),
[
# uuid4 generated token set in "Authorization" header
(None, "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None),
# uuid4 generated token with API Token authorization
("Token", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None),
# uuid4 generated token with Bearer Token authorization
("Bearer", "a1d895e0-553a-421a-8e22-5ff8ecb48cbf", None, None),
# username and password with Basic Auth authorization
(
"Basic",
encode_basic_auth_credentials("some_username", "some_pass"),
"some_username",
"some_pass",
),
# no Authorization header
(None, None, None, None),
],
)
def test_credentials(app, capfd, auth_type, token, username, password):
@app.route("/")
async def handler(request):
return text("OK")

# uuid4 generated token.
token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"{token}",
}

request, response = await app.asgi_client.get("/", headers=headers)

assert request.token == token

token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"Token {token}",
}

request, response = await app.asgi_client.get("/", headers=headers)

assert request.token == token

token = "a1d895e0-553a-421a-8e22-5ff8ecb48cbf"
headers = {
"content-type": "application/json",
"Authorization": f"Bearer {token}",
}

request, response = await app.asgi_client.get("/", headers=headers)

assert request.token == token

# no Authorization headers
headers = {"content-type": "application/json"}
if token:
headers = {
"content-type": "application/json",
"Authorization": f"{auth_type} {token}"
if auth_type
else f"{token}",
}
else:
headers = {"content-type": "application/json"}

request, response = await app.asgi_client.get("/", headers=headers)
request, response = app.test_client.get("/", headers=headers)

assert request.token is None
if auth_type == "Basic":
assert request.credentials.username == username
assert request.credentials.password == password
else:
_, err = capfd.readouterr()
with pytest.raises(AttributeError):
request.credentials.password
assert "Password is available for Basic Auth only" in err
request.credentials.username
assert "Username is available for Basic Auth only" in err

if token:
assert request.credentials.token == token
assert request.credentials.auth_type == auth_type
else:
assert request.credentials is None
assert not hasattr(request.credentials, "token")
assert not hasattr(request.credentials, "auth_type")
assert not hasattr(request.credentials, "_username")
assert not hasattr(request.credentials, "_password")


def test_content_type(app):
Expand Down Expand Up @@ -1714,7 +1717,6 @@ def handler(request):


def test_request_cookies(app):

cookies = {"test": "OK"}

@app.get("/")
Expand All @@ -1729,7 +1731,6 @@ def handler(request):

@pytest.mark.asyncio
async def test_request_cookies_asgi(app):

cookies = {"test": "OK"}

@app.get("/")
Expand Down

0 comments on commit f1a695f

Please sign in to comment.