Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move _CBScheduler code to Rust #457

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 8 additions & 41 deletions granian/_futures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from asyncio.tasks import _enter_task, _leave_task
import contextvars
from asyncio.tasks import _enter_task as _aio_taskenter, _leave_task as _aio_taskleave
from functools import partial

from ._granian import CallbackScheduler as _BaseCBScheduler

Expand All @@ -18,50 +20,15 @@ async def future_watcher(watcher):
class _CBScheduler(_BaseCBScheduler):
__slots__ = []

def __init__(self, loop, ctx, cb):
def __init__(self, loop, ctx, cb, aio_tenter, aio_texit):
super().__init__()
self._schedule_fn = _cbsched_schedule(loop, ctx, self._run, cb)

def _waker(self, coro):
def _wake(fut):
self._resume(coro, fut)

return _wake

def _resume(self, coro, fut):
try:
fut.result()
except BaseException as exc:
self._throw(coro, exc)
else:
self._run(coro)

def _run(self, coro):
_enter_task(self._loop, self)
try:
try:
result = coro.send(None)
except (KeyboardInterrupt, SystemExit):
raise
except BaseException:
pass
else:
if getattr(result, '_asyncio_future_blocking', None):
result._asyncio_future_blocking = False
result.add_done_callback(self._waker(coro), context=self._ctx)
elif result is None:
self._loop.call_soon(self._run, coro, context=self._ctx)
finally:
_leave_task(self._loop, self)

def _throw(self, coro, exc):
_enter_task(self._loop, self)
try:
coro.throw(exc)
except BaseException:
pass
finally:
_leave_task(self._loop, self)
def _new_cbscheduler(loop, cb):
return _CBScheduler(
loop, contextvars.copy_context(), cb, partial(_aio_taskenter, loop), partial(_aio_taskleave, loop)
)


def _cbsched_schedule(loop, ctx, run, cb):
Expand Down
2 changes: 2 additions & 0 deletions granian/_granian.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ class ListenerHolder:
class CallbackScheduler:
_loop: Any
_ctx: Any

def _run(self, coro: Any): ...
13 changes: 5 additions & 8 deletions granian/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import contextvars
import errno
import multiprocessing
import os
Expand All @@ -13,7 +12,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type

from ._futures import _CBScheduler, _future_watcher_wrapper
from ._futures import _future_watcher_wrapper, _new_cbscheduler
from ._granian import ASGIWorker, RSGIWorker, WSGIWorker
from ._imports import setproctitle, watchfiles
from ._internal import load_target
Expand Down Expand Up @@ -220,7 +219,7 @@ def _spawn_asgi_worker(
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(wcallback))
scheduler = _new_cbscheduler(loop, _future_watcher_wrapper(wcallback))
serve(scheduler, loop, shutdown_event)

@staticmethod
Expand Down Expand Up @@ -278,7 +277,7 @@ def _spawn_asgi_lifespan_worker(
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(wcallback))
scheduler = _new_cbscheduler(loop, _future_watcher_wrapper(wcallback))
serve(scheduler, loop, shutdown_event)
loop.run_until_complete(lifespan_handler.shutdown())

Expand Down Expand Up @@ -338,7 +337,7 @@ def _spawn_rsgi_worker(
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(callback))
scheduler = _new_cbscheduler(loop, _future_watcher_wrapper(callback))
serve(scheduler, loop, shutdown_event)
callback_del(loop)

Expand Down Expand Up @@ -380,9 +379,7 @@ def _spawn_wsgi_worker(
worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
scheduler = _CBScheduler(
loop, contextvars.copy_context(), _wsgi_call_wrap(callback, scope_opts, log_access_fmt)
)
scheduler = _new_cbscheduler(loop, _wsgi_call_wrap(callback, scope_opts, log_access_fmt))
serve(scheduler, loop, shutdown_event)
shutdown_event.qs.wait()

Expand Down
6 changes: 4 additions & 2 deletions src/asgi/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ASGIHTTPProtocol {
let body_ref = self.request_body.clone();
let flow_ref = self.flow_rx_exhausted.clone();
let flow_hld = self.flow_tx_waiter.clone();
future_into_py_iter(self.rt.clone(), py, async move {
future_into_py_futlike(self.rt.clone(), py, async move {
let mut bodym = body_ref.lock().await;
let body = &mut *bodym;
let mut more_body = false;
Expand Down Expand Up @@ -327,7 +327,7 @@ impl ASGIWebsocketProtocol {
let tx = self.ws_tx.clone();
let pynone = py.None();

future_into_py_iter(self.rt.clone(), py, async move {
future_into_py_futlike(self.rt.clone(), py, async move {
if let Some(mut upgrade) = upgrade {
let upgrade_headers = match subproto {
Some(v) => vec![(WS_SUBPROTO_HNAME.to_string(), v)],
Expand All @@ -347,6 +347,7 @@ impl ASGIWebsocketProtocol {
}
}
}
Python::with_gil(|_| drop(pynone));
error_flow!()
})
}
Expand All @@ -369,6 +370,7 @@ impl ASGIWebsocketProtocol {
}
};
};
Python::with_gil(|_| drop(pynone));
error_flow!()
})
}
Expand Down
Loading
Loading