Skip to content
This repository has been archived by the owner on May 31, 2019. It is now read-only.

Commit

Permalink
Merge pull request #69 from kobinpy/refactor-router
Browse files Browse the repository at this point in the history
Refactor router
  • Loading branch information
c-bata committed Mar 20, 2017
2 parents 3e4aa2c + deb1c04 commit f399347
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 156 deletions.
6 changes: 4 additions & 2 deletions kobin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def frozen(self):

def route(self, rule=None, method='GET', name=None, callback=None):
def decorator(callback_func):
self.router.add(method, rule, name, callback_func)
self.router.add(rule, method, name, callback_func)
return callback_func
return decorator(callback) if callback else decorator

Expand All @@ -94,7 +94,9 @@ def _handle(self, environ):
for before_request_callback in self.before_request_callbacks:
before_request_callback()

callback, kwargs = self.router.match(environ)
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO'] or '/'
callback, kwargs = self.router.match(path, method)
response = callback(**kwargs) if kwargs else callback()

for after_request_callback in self.after_request_callbacks:
Expand Down
3 changes: 2 additions & 1 deletion kobin/app.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Kobin:
_frozen: bool

def __init__(self, config: Dict[str, Any] = ...) -> None: ...
def __call__(self, environ: WSGIEnviron, start_response: StartResponse) -> WSGIResponse: ...

@property
def frozen(self) -> bool: ...
def route(self, rule: str = ..., method: str = ..., name: str = ...,
Expand All @@ -33,7 +35,6 @@ class Kobin:
Callable[[BaseResponse], BaseResponse]: ...
def _handle(self, environ: WSGIEnviron) -> BaseResponse: ...
def wsgi(self, environ: WSGIEnviron, start_response: StartResponse) -> WSGIResponse: ...
def __call__(self, environ: WSGIEnviron, start_response: StartResponse) -> WSGIResponse: ...

def _get_exception_message(e: BaseException, debug: bool) -> str: ...

Expand Down
206 changes: 136 additions & 70 deletions kobin/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,93 +73,159 @@ def user_detail() -> Response:
"""
from typing import get_type_hints
from .requests import request
from .responses import HTTPError

DEFAULT_ARG_TYPE = str


def split_by_slash(path):
stripped_path = path.lstrip('/').rstrip('/')
return stripped_path.split('/')


class Route:
def __init__(self, rule, method, name, callback):
self.rule = rule
self.method = method.upper()
self.name = name
self.callback = callback

@property
def callback_types(self):
return get_type_hints(self.callback)
def match_url_vars_type(url_vars, type_hints):
""" Match types of url vars.
def get_typed_url_vars(self, url_vars):
typed_url_vars = {}
>>> match_url_vars_type({'user_id': '1'}, {'user_id': int})
(True, {'user_id': 1})
>>> match_url_vars_type({'user_id': 'foo'}, {'user_id': int})
(False, {})
"""
typed_url_vars = {}
try:
for k, v in url_vars.items():
arg_type = self.callback_types.get(k, DEFAULT_ARG_TYPE)
typed_url_vars[k] = arg_type(v)
return typed_url_vars

def _match_method(self, method):
return self.method == method.upper()

def _match_path(self, path):
split_rule = split_by_slash(self.rule)
split_path = split_by_slash(path)
url_vars = {}

if len(split_rule) != len(split_path):
return

for r, p in zip(split_rule, split_path):
if r.startswith('{') and r.endswith('}'):
url_vars[r[1:-1]] = p
continue
if r != p:
return
try:
typed_url_vars = self.get_typed_url_vars(url_vars)
except ValueError:
return
return typed_url_vars

def match(self, method, path):
url_vars = self._match_path(path)
if url_vars is None:
return None # path: not matched
elif not self._match_method(method):
return False # path: matched, method: not matched
else:
return url_vars # path: matched, method: matched
arg_type = type_hints.get(k)
if arg_type and arg_type != str:
typed_url_vars[k] = arg_type(v)
else:
typed_url_vars[k] = v
except ValueError:
return False, {}
return True, typed_url_vars


def match_path(rule, path):
""" Match path.
>>> match_path('/foo', '/foo')
(True, {})
>>> match_path('/foo', '/bar')
(False, {})
>>> match_path('/users/{user_id}', '/users/1')
(True, {'user_id': '1'})
>>> match_path('/users/{user_id}', '/users/not-integer')
(True, {'user_id': 'not-integer'})
"""
split_rule = split_by_slash(rule)
split_path = split_by_slash(path)
url_vars = {}

if len(split_rule) != len(split_path):
return False, {}

for r, p in zip(split_rule, split_path):
if r.startswith('{') and r.endswith('}'):
url_vars[r[1:-1]] = p
continue
if r != p:
return False, {}
return True, url_vars


class Router:
def __init__(self) -> None:
self.routes = []

def match(self, environ):
method = environ['REQUEST_METHOD'].upper()
path = environ['PATH_INFO'] or '/'
self.endpoints = []

def match(self, path, method):
""" Get callback and url_vars.
>>> from kobin import Response
>>> r = Router()
>>> def view(user_id: int) -> Response:
... return Response(f'You are {user_id}')
...
>>> r.add('/users/{user_id}', 'GET', 'user-detail', view)
>>> callback, url_vars = r.match('/users/1', 'GET')
>>> url_vars
{'user_id': 1}
>>> response = callback(**url_vars)
>>> response.body
[b'You are 1']
>>> callback, url_vars = r.match('/notfound', 'GET')
Traceback (most recent call last):
...
kobin.responses.HTTPError
"""
if path != '/':
path = path.rstrip('/')
method = method.upper()

status = 404
for route in self.routes:
url_vars = route.match(method, path)
if url_vars is None: # path: not matched
for p, n, m in self.endpoints:
matched, url_vars = match_path(p, path)
if not matched: # path: not matched
continue
elif url_vars is False: # path: matched, method: not matched
status = 405
else: # path: matched, method: matched
return route.callback, url_vars
raise HTTPError(status=status, body='Not found: {}'.format(request.path))

def add(self, method, rule, name, callback):
""" Add a new rule or replace the target for an existing rule. """
route = Route(method=method.upper(), rule=rule, name=name, callback=callback)
self.routes.append(route)
if method not in m: # path: matched, method: not matched
status = 405
raise HTTPError(status=status, body=f'Method not found: {path} {method}') # it has security issue??

callback, type_hints = m[method]
type_matched, typed_url_vars = match_url_vars_type(url_vars, type_hints)
if not type_matched:
continue # path: not matched (types are different)
return callback, typed_url_vars
raise HTTPError(status=status, body=f'Not found: {path}')

def add(self, rule, method, name, callback):
""" Add a new rule or replace the target for an existing rule.
>>> from kobin import Response
>>> r = Router()
>>> def view(user_id: int) -> Response:
... return Response(f'You are {user_id}')
...
>>> r.add('/users/{user_id}', 'GET', 'user-detail', view)
>>> path, name, methods = r.endpoints[0]
>>> path
'/users/{user_id}'
>>> name
'user-detail'
>>> callback, type_hints = methods['GET']
>>> view == callback
True
>>> type_hints['user_id'] == int
True
"""
if rule != '/':
rule = rule.rstrip('/')
method = method.upper()

for i, e in enumerate(self.endpoints):
r, n, callbacks = e
if r == rule:
assert name == n and n is not None, (
"A same path should set a same name for reverse routing."
)
callbacks[method] = (callback, get_type_hints(callback))
self.endpoints[i] = (r, name, callbacks)
break
else:
e = (rule, name, {method: (callback, get_type_hints(callback))})
self.endpoints.append(e)

def reverse(self, name, **kwargs):
for route in self.routes:
if name == route.name:
return route.rule.format(**kwargs)
""" Reverse routing.
>>> from kobin import Response
>>> r = Router()
>>> def view(user_id: int) -> Response:
... return Response(f'You are {user_id}')
...
>>> r.add('/users/{user_id}', 'GET', 'user-detail', view)
>>> r.reverse('user-detail', user_id=1)
'/users/1'
"""
for p, n, _ in self.endpoints:
if name == n:
return p.format(**kwargs)
24 changes: 6 additions & 18 deletions kobin/routes.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,15 @@ from typing import Callable, Dict, List, Tuple, Union, Any
from .responses import BaseResponse

ViewFunction = Callable[..., BaseResponse]
DEFAULT_ARG_TYPE = ... # type: type

def redirect(url: str) -> BaseResponse: ...
def split_by_slash(path: str) -> List[str]: ...

class Route:
rule: str
method: str
name: str
callback: ViewFunction
def __init__(self, rule: str, method: str, name: str, callback: ViewFunction) -> None: ...
@property
def callback_types(self) -> Dict[str, Any]: ...
def get_typed_url_vars(self, url_vars: Dict[str, str]) -> Dict[str, Any]: ...
def _match_method(self, method: str) -> bool: ...
def _match_path(self, path: str) -> Union[None, Dict[str, Any]]: ...
def match(self, method: str, path: str) -> Dict[str, Any]: ...
def match_url_vars_type(url_vars: Dict[str, str],
type_hints: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: ...
def match_path(rule: str, path: str) -> Tuple[bool, Dict[str, str]]: ...

class Router:
routes: List[Route]
endpoints: Tuple[str, Dict[str, ViewFunction], Dict[str, Any]]
def __init__(self) -> None: ...
def match(self, environ: Dict[str, str]) -> Tuple[ViewFunction, Dict[str, Any]]: ...
def add(self, method: str, rule: str, name: str, callback: ViewFunction) -> None: ...
def match(self, path: str, method: str) -> Tuple[ViewFunction, Dict[str, Any]]: ...
def add(self, rule: str, method: str, name: str, callback: ViewFunction) -> None: ...
def reverse(self, name: str, **kwargs: Any) -> str: ...
Loading

0 comments on commit f399347

Please sign in to comment.