Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and expose auth types: MultiAuth ApiToken* #137

Merged
merged 1 commit into from
Feb 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ Contributors
- Nils Philippsen (`@nphilipp <https://github.com/nphilipp>`_)
- Alexander Duryagin (`@daa <https://github.com/daa>`_)
- Sakorn Waungwiwatsin (`@SakornW <https://github.com/SakornW>`_)
- Jacob Floyd (`@cognifloyd <https://github.com/cognifloyd>`_)
166 changes: 163 additions & 3 deletions tests/unit/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

# Local imports
from uplink import auth
from uplink import utils


class TestGetAuth(object):
def test_none(self):
authentication = auth.get_auth(None)
assert authentication == utils.no_op

def test_tuple(self):
authentication = auth.get_auth(("username", "password"))
assert isinstance(authentication, auth.BasicAuth)
Expand All @@ -22,13 +27,73 @@ def test_unsupported(self):
auth.get_auth(object())


def test_api_token_param(request_builder):
# Setup
token_param_auth = auth.ApiTokenParam(param="token-param", token="token-value")

# Verify
token_param_auth(request_builder)
assert request_builder.info["params"]["token-param"] == "token-value"


def test_api_token_header_without_prefix(request_builder):
# Setup
token_header_auth = auth.ApiTokenHeader("Token-Header", "token-value")

# Verify
token_header_auth(request_builder)
assert request_builder.info["headers"]["Token-Header"] == "token-value"


def test_api_token_header_with_prefix(request_builder):
# Setup
token_header_auth = auth.ApiTokenHeader("Token-Header", "token-value", prefix="Prefix")

# Verify
token_header_auth(request_builder)
assert request_builder.info["headers"]["Token-Header"] == "Prefix token-value"


def test_api_token_header_subclass_without_prefix(request_builder):
# Setup

class ApiTokenHeaderSubclass(auth.ApiTokenHeader):
_header = "Token-Header"

def __init__(self, token):
self._token = token

token_header_auth = ApiTokenHeaderSubclass("token-value")

# Verify
token_header_auth(request_builder)
assert request_builder.info["headers"]["Token-Header"] == "token-value"


def test_api_token_header_subclass_with_prefix(request_builder):
# Setup

class ApiTokenHeaderSubclass(auth.ApiTokenHeader):
_header = "Token-Header"
_prefix = "Prefix"

def __init__(self, token):
self._token = token

token_header_auth = ApiTokenHeaderSubclass("token-value")

# Verify
token_header_auth(request_builder)
assert request_builder.info["headers"]["Token-Header"] == "Prefix token-value"


def test_basic_auth(request_builder):
# Setup
basic_auth = auth.BasicAuth("username", "password")

# Verify
basic_auth(request_builder)
auth_str = basic_auth._auth_str
auth_str = basic_auth._header_value
assert request_builder.info["headers"]["Authorization"] == auth_str


Expand All @@ -38,7 +103,7 @@ def test_proxy_auth(request_builder):

# Verify
proxy_auth(request_builder)
auth_str = proxy_auth._auth_str
auth_str = proxy_auth._header_value
assert request_builder.info["headers"]["Proxy-Authorization"] == auth_str


Expand All @@ -48,5 +113,100 @@ def test_bearer_token(request_builder):

# Verify
bearer_token(request_builder)
auth_str = bearer_token._auth_str
auth_str = bearer_token._header_value
assert request_builder.info["headers"]["Authorization"] == auth_str


class TestMultiAuth(object):
def setup_basic_auth(self):
return auth.BasicAuth("apiuser", "apipass")

def verify_basic_auth(self, basic_auth, request_builder):
basic_auth_str = basic_auth._header_value
assert request_builder.info["headers"]["Authorization"] == basic_auth_str

def setup_proxy_auth(self):
return auth.ProxyAuth("proxyuser", "proxypass")

def verify_proxy_auth(self, proxy_auth, request_builder):
proxy_auth_str = proxy_auth._header_value
assert request_builder.info["headers"]["Proxy-Authorization"] == proxy_auth_str

def setup_param_auth(self):
return auth.ApiTokenParam(param="token-param", token="token-value")

def verify_param_auth(self, request_builder):
assert request_builder.info["params"]["token-param"] == "token-value"

def test_len(self):
multi_auth = auth.MultiAuth()
assert len(multi_auth) == 0

def test_none(self):
multi_auth = auth.MultiAuth(None)
assert len(multi_auth) == 1
assert multi_auth[0] == utils.no_op

def test_one_method(self, request_builder):
# Setup
basic_auth = self.setup_basic_auth()
multi_auth = auth.MultiAuth(basic_auth)

# Verify
assert len(multi_auth) == 1
assert multi_auth[0] == basic_auth

multi_auth(request_builder)
self.verify_basic_auth(basic_auth, request_builder)

def test_four_methods(self, request_builder):
# Setup
param_auth = self.setup_param_auth()
basic_auth = self.setup_basic_auth()
proxy_auth = self.setup_proxy_auth()
multi_auth = auth.MultiAuth(None, param_auth, basic_auth, proxy_auth)

# Verify
assert len(multi_auth) == 4
assert multi_auth[0] == utils.no_op
assert multi_auth[1] == param_auth
assert multi_auth[2] == basic_auth
assert multi_auth[3] == proxy_auth

multi_auth(request_builder)
self.verify_param_auth(request_builder)
self.verify_basic_auth(basic_auth, request_builder)
self.verify_proxy_auth(proxy_auth, request_builder)

def test_append(self, request_builder):
# Setup
basic_auth = self.setup_basic_auth()
proxy_auth = self.setup_proxy_auth()
multi_auth = auth.MultiAuth()
multi_auth.append(basic_auth)
multi_auth.append(proxy_auth)

# Verify
assert len(multi_auth) == 2
assert multi_auth[0] == basic_auth
assert multi_auth[1] == proxy_auth

multi_auth(request_builder)
self.verify_basic_auth(basic_auth, request_builder)
self.verify_proxy_auth(proxy_auth, request_builder)

def test_extend(self, request_builder):
# Setup
basic_auth = self.setup_basic_auth()
proxy_auth = self.setup_proxy_auth()
multi_auth = auth.MultiAuth()
multi_auth.extend([basic_auth, proxy_auth])

# Verify
assert len(multi_auth) == 2
assert multi_auth[0] == basic_auth
assert multi_auth[1] == proxy_auth

multi_auth(request_builder)
self.verify_basic_auth(basic_auth, request_builder)
self.verify_proxy_auth(proxy_auth, request_builder)
110 changes: 99 additions & 11 deletions uplink/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
# Local imports
from uplink import utils

__all__ = []
__all__ = [
"ApiTokenParam",
"ApiTokenHeader",
"BasicAuth",
"ProxyAuth",
"BearerToken",
"MultiAuth",
]


def get_auth(auth_object=None):
Expand All @@ -23,29 +30,110 @@ def get_auth(auth_object=None):
raise ValueError("Invalid authentication strategy: %s" % auth_object)


class BasicAuth(object):
class ApiTokenParam(object):
"""
Authorizes requests using a token or key in a query parameter.
Users should subclass this class to define which parameter is the token parameter.
"""
def __init__(self, param, token):
self._param = param
self._param_value = token

def __call__(self, request_builder):
request_builder.info["params"][self._param] = self._param_value


# class ExampleApiTokenParam(ApiTokenParam):
# _param = "api-token"
# def __init__(self, token):
# self._param_value = token


class ApiTokenHeader(object):
"""
Authorizes requests using a token or key in a header.
Users should subclass this class to define which header is the token header.
The subclass may also, optionally, define a token prefix (such as in BearerToken)

_header and/or _prefix may be defined as class attributes on subclasses,
but should also override __init__() when they do so.
"""
prkumar marked this conversation as resolved.
Show resolved Hide resolved
_header = None
_prefix = None

def __init__(self, header, token, prefix=None):
self._header = header
self._prefix = prefix
self._token = token

@property
def _header_value(self):
if self._prefix:
return "%s %s" % (self._prefix, self._token)
else:
return self._token

def __call__(self, request_builder):
request_builder.info["headers"][self._header] = self._header_value


class BasicAuth(ApiTokenHeader):
"""Authorizes requests using HTTP Basic Authentication."""

_header = "Authorization"

def __init__(self, username, password):
self._username = username
self._password = password

@property
def _auth_str(self):
def _header_value(self):
return auth._basic_auth_str(self._username, self._password)

def __call__(self, request_builder):
request_builder.info["headers"]["Authorization"] = self._auth_str


class ProxyAuth(BasicAuth):
def __call__(self, request_builder):
request_builder.info["headers"]["Proxy-Authorization"] = self._auth_str
"""Authorizes requests with an intermediate HTTP proxy."""
_header = "Proxy-Authorization"


class BearerToken(object):
class BearerToken(ApiTokenHeader):

_header = "Authorization"
_prefix = "Bearer"

def __init__(self, token):
self._auth_str = "Bearer %s" % token
self._token = token


class MultiAuth(object):
"""
Authorizes requests using multiple auth methods at the same time.
api_auth = MultiAuth(
BasicAuth(username, password),
ProxyAuth(proxy_user, proxy_pass)
)
api_consumer = SomeApiConsumerClass(
"https://my.base_url.com/",
auth=api_auth
)

Mostly, this is useful for API users to supply intermediary credentials (such as for a proxy).
"""
def __init__(self, *auth_methods):
self._auth_methods = [get_auth(auth_method) for auth_method in auth_methods]

def __call__(self, request_builder):
request_builder.info["headers"]["Authorization"] = self._auth_str
for auth_method in self._auth_methods:
auth_method(request_builder)

def __getitem__(self, index):
return self._auth_methods[index]

def __len__(self):
return len(self._auth_methods)

def append(self, auth_method):
self._auth_methods.append(get_auth(auth_method))

def extend(self, auth_methods):
self._auth_methods.extend([get_auth(auth_method) for auth_method in auth_methods])