Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimenting with Trio-based Sanic server #1662

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5f1838f
Fix watchdog reload worker repeatedly if there are multiple changed f…
lexhung Apr 18, 2019
c928c9f
Partially working Trio server.
Tronic Aug 26, 2019
2176c81
A very minimal Trio-based HTTP server.
Tronic Aug 27, 2019
387287d
Add quick termination of idle connections on shutdown.
Tronic Aug 27, 2019
94cc4c2
Just cancel 'em straight away.
Tronic Aug 27, 2019
26e4375
Allow auto_reload with multiple workers and use serve workers= argume…
Tronic Aug 27, 2019
2760a90
server.py switching (for testing)
Tronic Aug 28, 2019
183413a
HTTP/2 support etc.
Tronic Aug 29, 2019
fac8c36
Protocol and SSL autodetection, cleanup.
Tronic Aug 29, 2019
9bc620d
Get SSL SNI as protocol.servername.
Tronic Aug 29, 2019
bf32a19
Streamlined request handler logic. Sketching new streaming framework.
Tronic Sep 2, 2019
ec6707c
Streaming responses now working with HTTP/1.
Tronic Sep 2, 2019
7031547
Streaming fixes, now request and response may be streamed.
Tronic Sep 3, 2019
c9a8232
Bugfixes
Tronic Sep 3, 2019
c1fd59b
Non-streaming handlers working on top of streaming requests.
Tronic Sep 3, 2019
2c5f016
Response middleware for non-streaming responses.
Tronic Sep 4, 2019
b2ea924
Minor cleanup to push_back()
Tronic Sep 4, 2019
4537544
HTTP1 header formatting moved to headers.format_headers and rewritten.
Tronic Sep 4, 2019
d248dbb
Linter
Tronic Sep 4, 2019
7dc6839
format_http1_response
Tronic Sep 5, 2019
ec1b0a6
Merge branch 'master' into trio
Tronic Sep 5, 2019
8ed885b
Hacks to make it run on Windows.
Tronic Sep 6, 2019
8e97f28
Fix that hack.
Tronic Sep 6, 2019
1b0b5d6
Merge branch 'headerformat' into trio
Tronic Sep 6, 2019
84539fc
NewStreamingResponse only sends headers on write.
Tronic Sep 6, 2019
491ce25
Major cleanup; rewrote H1StreamRequest and NewStreamingHTTPResponse
Tronic Sep 8, 2019
af84694
Multiprocessing/server rewrite fully in async context with better sig…
Tronic Sep 9, 2019
e0ce6d2
Cleanup and error handling.
Tronic Sep 9, 2019
d8581ce
Merge remote-tracking branch 'lexhung/master' into reloader
Tronic Sep 9, 2019
1c7d404
Merge current master and lexhung reloader fixes into trio
Tronic Sep 9, 2019
c4a4fc6
Autoreloader should now work with any number of workers, on any OS.
Tronic Sep 9, 2019
87bdb90
Rework reloader.
Tronic Sep 9, 2019
53f088e
Faster and more correct reload and exit.
Tronic Sep 9, 2019
870b601
Optimized to run faster.
Tronic Sep 10, 2019
dda0296
Fix receive_request return value handling (should probably use except…
Tronic Sep 10, 2019
6bebfdd
Merge remote-tracking branch 'upstream/master' into trio (probably b0…
Tronic Dec 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
from sanic.exceptions import SanicException, ServerError, URLBuildError
from sanic.handlers import ErrorHandler
from sanic.log import LOGGING_CONFIG_DEFAULTS, error_logger, logger
from sanic.response import HTTPResponse, StreamingHTTPResponse
from sanic.router import Router
from sanic.server import (
AsyncioServer,
HttpProtocol,
Signal,
serve,
serve_multiple,
from sanic.response import (
BaseHTTPResponse,
HTTPResponse,
StreamingHTTPResponse,
)
from sanic.router import Router
from sanic.server import HttpProtocol, Signal, serve
from sanic.static import register as static_register
from sanic.testing import SanicASGITestClient, SanicTestClient
from sanic.views import CompositionView
Expand Down Expand Up @@ -1019,7 +1017,41 @@ async def handle_request(self, request, write_callback, stream_callback):
# - Add exception handling
pass
else:
write_callback(response)
await write_callback(response)

async def handle_request_trio(self, request):
# Request middleware
response = await self._run_request_middleware(request)
if response:
return await request.respond(response)
# Fetch handler from router
handler, args, kwargs, uri = self.router.get(request)
if handler is None:
raise ServerError(
"None was returned while requesting a handler from the router"
)
bp = getattr(handler, "__blueprintname__", None)
bp = (bp,) if bp else ()
request.endpoint = self._build_endpoint_name(*bp, handler.__name__)
request.uri_template = uri
# Load header body before starting handler?
if request.stream.length and not hasattr(handler, "is_stream"):
await request.receive_body()
# Run main handler
response = handler(request, *args, **kwargs)
if isawaitable(response):
response = await response
# Returned (non-streaming) response
if isinstance(response, BaseHTTPResponse):
await request.respond(
status=response.status,
headers=response.headers,
content_type=response.content_type,
).send(data_bytes=response.body, end_stream=True)
elif response is not None:
raise ServerError(
f"Handling {request.path}: HTTPResponse expected but got {type(response).__name__} {response!r:.200}"
)

# -------------------------------------------------------------------- #
# Testing
Expand Down Expand Up @@ -1050,7 +1082,7 @@ def run(
stop_event: Any = None,
register_sys_signals: bool = True,
access_log: Optional[bool] = None,
**kwargs: Any
**kwargs: Any,
) -> None:
"""Run the HTTP Server and listen until keyboard interrupt or term
signal. On termination, drain connections before closing.
Expand Down Expand Up @@ -1089,14 +1121,12 @@ def run(
"https://sanic.readthedocs.io/en/latest/sanic/deploying.html"
"#asynchronous-support"
)
self.is_first_process = (
os.environ.get("SANIC_SERVER_RUNNING") != "true"
)

# Default auto_reload to false
auto_reload = False
# If debug is set, default it to true (unless on windows)
if debug and os.name == "posix":
auto_reload = True
# Allow for overriding either of the defaults
auto_reload = kwargs.get("auto_reload", auto_reload)
# Allow for overriding the default of following debug mode setting
auto_reload = kwargs.get("auto_reload", debug)

if sock is None:
host, port = host or "127.0.0.1", port or 8000
Expand Down Expand Up @@ -1131,29 +1161,17 @@ def run(

try:
self.is_running = True
if workers == 1:
if auto_reload and os.name != "posix":
# This condition must be removed after implementing
# auto reloader for other operating systems.
raise NotImplementedError

if (
auto_reload
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
reloader_helpers.watchdog(2)
else:
serve(**server_settings)
if auto_reload and self.is_first_process:
reloader_helpers.watchdog(2)
else:
serve_multiple(server_settings, workers)
serve(**server_settings, workers=workers)
except BaseException:
error_logger.exception(
"Experienced exception while trying to serve"
)
raise
finally:
self.is_running = False
logger.info("Server Stopped")

def stop(self):
"""This kills the Sanic"""
Expand Down Expand Up @@ -1312,6 +1330,7 @@ def _helper(
raise ValueError("SSLContext or certificate and key required.")
context = create_default_context(purpose=Purpose.CLIENT_AUTH)
context.load_cert_chain(cert, keyfile=key)
context.set_alpn_protocols(["h2", "http/1.1"])
ssl = context
if stop_event is not None:
if debug:
Expand Down Expand Up @@ -1342,7 +1361,7 @@ def _helper(
"app": self,
"signal": Signal(),
"debug": debug,
"request_handler": self.handle_request,
"request_handler": self.handle_request_trio,
"error_handler": self.error_handler,
"request_timeout": self.config.REQUEST_TIMEOUT,
"response_timeout": self.config.RESPONSE_TIMEOUT,
Expand Down Expand Up @@ -1381,10 +1400,7 @@ def _helper(
if self.configure_logging and debug:
logger.setLevel(logging.DEBUG)

if (
self.config.LOGO
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
if self.config.LOGO and self.is_first_process:
logger.debug(
self.config.LOGO
if isinstance(self.config.LOGO, str)
Expand All @@ -1395,7 +1411,7 @@ def _helper(
server_settings["run_async"] = True

# Serve
if host and port and os.environ.get("SANIC_SERVER_RUNNING") != "true":
if host and port and self.is_first_process:
proto = "http"
if ssl is not None:
proto = "https"
Expand Down
6 changes: 5 additions & 1 deletion sanic/cookies.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def encode(self, encoding):
:return: Cookie encoded in a codec of choosing.
:except: UnicodeEncodeError
"""
return str(self).encode(encoding)

def __str__(self):
"""Format as a Set-Cookie header value."""
output = ["%s=%s" % (self.key, _quote(self.value))]
for key, value in self.items():
if key == "max-age":
Expand All @@ -147,4 +151,4 @@ def encode(self, encoding):
else:
output.append("%s=%s" % (self._keys[key], value))

return "; ".join(output).encode(encoding)
return "; ".join(output)
5 changes: 5 additions & 0 deletions sanic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ class ServiceUnavailable(SanicException):
pass


@add_status_code(505)
class VersionNotSupported(SanicException):
pass


class URLBuildError(ServerError):
pass

Expand Down
34 changes: 33 additions & 1 deletion sanic/headers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import re

from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import unquote

from sanic.helpers import STATUS_CODES


HeaderIterable = Iterable[Tuple[str, Any]] # Values convertible to str
Options = Dict[str, Union[int, str]] # key=value fields in various headers
OptionsIterable = Iterable[Tuple[str, str]] # May contain duplicate keys

Expand Down Expand Up @@ -170,3 +173,32 @@ def parse_host(host: str) -> Tuple[Optional[str], Optional[int]]:
return None, None
host, port = m.groups()
return host.lower(), int(port) if port is not None else None


def format_http1(headers: HeaderIterable) -> bytes:
"""Convert a headers iterable into HTTP/1 header format.

- Outputs UTF-8 bytes where each header line ends with \\r\\n.
- Values are converted into strings if necessary.
"""
return "".join(f"{name}: {val}\r\n" for name, val in headers).encode()


def format_http1_response(
status: int, headers: HeaderIterable, body: Optional[bytes] = None
) -> bytes:
"""Format a full HTTP/1.1 response.

- If `body` is included, content-length must be specified in headers.
"""
if body is None:
body = b""
headers = format_http1(headers)
if status == 200:
return b"HTTP/1.1 200 OK\r\n%b\r\n%b" % (headers, body)
return b"HTTP/1.1 %d %b\r\n%b\r\n%b" % (
status,
STATUS_CODES.get(status, b"UNKNOWN"),
headers,
body,
)
148 changes: 148 additions & 0 deletions sanic/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from sanic.headers import format_http1, format_http1_response
from sanic.helpers import has_message_body, remove_entity_headers


# FIXME: Put somewhere before response:
if False:
# Middleware has a chance to replace or modify the response
response = await self.app._run_response_middleware(
request, response
)

class H1Stream:
__slots__ = ("stream", "length", "pos", "set_timeout", "response_state", "status", "headers", "bytes_left")

def __init__(self, headers, stream, need_continue):
self.length = int(headers.get("content-length", "0"))
assert self.length >= 0
self.pos = None if need_continue else 0
self.stream = stream
self.status = self.bytes_left = None
self.response_state = 0

async def aclose(self):
# Finish sending a response (if no error)
if self.response_state < 2:
await self.send(end_stream=True)
# Response fully sent, request fully read?
if self.pos != self.length or self.response_state != 2:
await self.stream.aclose() # If not, must disconnect :(

# Request methods

def dont_continue(self):
"""Prevent a pending 100 Continue response being sent, and avoid
receiving the request body. Does not by itself send a 417 response."""
if self.pos is None:
self.pos = self.length = 0

async def trigger_continue(self):
if self.pos is None:
self.pos = 0
await self.stream.send_all(b"HTTP/1.1 100 Continue\r\n\r\n")

async def __aiter__(self):
while True:
data = await self.read()
if not data:
return
yield data

async def read(self):
await self.trigger_continue()
if self.pos == self.length:
return None
buf = await self.stream.receive_some()
if len(buf) > self.length:
self.stream.push_back(buf[self.length :])
buf = buf[: self.length]
self.pos += len(buf)
return buf

# Response methods

def respond(self, status, headers):
if self.response_state > 0:
self.response_state = 3 # FAIL mode
raise RuntimeError("Response already started")
self.status = status
self.headers = headers
return self

async def send(self, data=None, data_bytes=None, end_stream=False):
"""Send any pending response headers and the given data as body.
:param data: str-convertible data to be written
:param data_bytes: bytes-ish data to be written (used if data is None)
:end_stream: whether to close the stream after this block
"""
data = self.data_to_send(data, data_bytes, end_stream)
if data is None:
return
# Check if the request expects a 100-continue first
if self.pos is None:
if self.status == 417:
self.dont_continue()
else:
await self.trigger_continue()
# Send response
await self.stream.send_all(data)

def data_to_send(self, data, data_bytes, end_stream):
"""Format output data bytes for given body data.
Headers are prepended to the first output block and then cleared.
:param data: str-convertible data to be written
:param data_bytes: bytes-ish data to be written (used if data is None)
:return: bytes to send, or None if there is nothing to send
"""
data = data_bytes if data is None else f"{data}".encode()
size = len(data) if data is not None else 0

# Headers not yet sent?
if self.response_state == 0:
status, headers = self.status, self.headers
if status in (304, 412):
headers = remove_entity_headers(headers)
if not has_message_body(status):
# Header-only response status
assert (
size == 0 and end_stream
), f"A {status} response may only have headers, no body."
assert "content-length" not in self.headers
assert "transfer-encoding" not in self.headers
elif end_stream:
# Non-streaming response (all in one block)
headers["content-length"] = size
elif "content-length" in headers:
# Streaming response with size known in advance
self.bytes_left = int(headers["content-length"]) - size
assert self.bytes_left >= 0
else:
# Length not known, use chunked encoding
headers["transfer-encoding"] = "chunked"
data = b"%x\r\n%b\r\n" % (size, data) if size else None
self.bytes_left = ...
self.status = self.headers = None
self.response_state = 2 if end_stream else 1
return format_http1_response(status, headers.items(), data)

if self.response_state == 2:
if size:
raise RuntimeError("Cannot send data to a closed stream")
return

self.response_state = 2 if end_stream else 1

# Chunked encoding
if self.bytes_left is ...:
if end_stream:
self.bytes_left = None
if size:
return b"%x\r\n%b\r\n0\r\n\r\n" % (size, data)
return b"0\r\n\r\n"
return b"%x\r\n%b\r\n" % (size, data) if size else None

# Normal encoding
if isinstance(self.bytes_left, int):
self.bytes_left -= size
assert self.bytes_left >= 0
return data if size else None
Loading