From b941db32b91e85a65398c4022d51a44656fd3cf4 Mon Sep 17 00:00:00 2001 From: Samantha Hughes Date: Sat, 25 Sep 2021 12:29:34 -0700 Subject: [PATCH 1/3] typing for sync/async decorators --- asgiref/sync.py | 62 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index 3710a7f1..1a33c60f 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -7,7 +7,9 @@ import warnings import weakref from concurrent.futures import Future, ThreadPoolExecutor -from typing import Any, Callable, Dict, Optional, overload +from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar, overload + +from typing_extensions import ParamSpec from .compatibility import current_task, get_running_loop from .current_thread_executor import CurrentThreadExecutor @@ -98,7 +100,11 @@ async def __aexit__(self, exc, value, tb): pass -class AsyncToSync: +a_cls_params = ParamSpec("a_cls_params") +a_cls_return = TypeVar("a_cls_return") + + +class AsyncToSync(Generic[a_cls_params, a_cls_return]): """ Utility class which turns an awaitable that only works on the thread with the event loop into a synchronous callable that works in a subthread. @@ -118,7 +124,11 @@ class AsyncToSync: # Local, not a threadlocal, so that tasks can work out what their parent used. executors = Local() - def __init__(self, awaitable, force_new_loop=False): + def __init__( + self, + awaitable: Callable[a_cls_params, Awaitable[a_cls_return]], + force_new_loop=False, + ): if not callable(awaitable) or not _iscoroutinefunction_or_partial(awaitable): # Python does not have very reliable detection of async functions # (lots of false negatives) so this is just a warning. @@ -151,7 +161,9 @@ def __init__(self, awaitable, force_new_loop=False): else: self.main_event_loop = None - def __call__(self, *args, **kwargs): + def __call__( + self, *args: a_cls_params.args, **kwargs: a_cls_params.kwargs + ) -> a_cls_return: # You can't call AsyncToSync from a thread with a running event loop try: event_loop = get_running_loop() @@ -172,7 +184,7 @@ def __call__(self, *args, **kwargs): context = None # Make a future for the return information - call_result = Future() + call_result: Future[a_cls_return] = Future() # Get the source thread source_thread = threading.current_thread() # Make a CurrentThreadExecutor we'll use to idle in this thread - we @@ -271,7 +283,13 @@ def __get__(self, parent, objtype): return functools.update_wrapper(func, self.awaitable) async def main_wrap( - self, args, kwargs, call_result, source_thread, exc_info, context + self, + args, + kwargs, + call_result: a_cls_return, + source_thread, + exc_info, + context, ): """ Wraps the awaitable with something that puts the result into the @@ -303,7 +321,11 @@ async def main_wrap( context[0] = contextvars.copy_context() -class SyncToAsync: +s_cls_params = ParamSpec("s_cls_params") +s_cls_return = TypeVar("s_cls_return") + + +class SyncToAsync(Generic[s_cls_params, s_cls_return]): """ Utility class which turns a synchronous callable into an awaitable that runs in a threadpool. It also sets a threadlocal inside the thread so @@ -369,7 +391,7 @@ class SyncToAsync: def __init__( self, - func: Callable[..., Any], + func: Callable[s_cls_params, s_cls_return], thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, ) -> None: @@ -387,7 +409,9 @@ def __init__( except AttributeError: pass - async def __call__(self, *args, **kwargs): + async def __call__( + self, *args: s_cls_params.args, **kwargs: s_cls_params.kwargs + ) -> s_cls_return: loop = get_running_loop() # Work out what thread to run the code in @@ -461,7 +485,15 @@ def __get__(self, parent, objtype): """ return functools.partial(self.__call__, parent) - def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs): + def thread_handler( + self, + loop, + source_task, + exc_info, + func: Callable[s_cls_params, s_cls_return], + *args: s_cls_params.args, + **kwargs: s_cls_params.kwargs + ): """ Wraps the sync application with exception handling. """ @@ -511,21 +543,25 @@ def get_current_task(): async_to_sync = AsyncToSync +s_params = ParamSpec("s_params") +s_return = TypeVar("s_return") + + @overload def sync_to_async( func: None = None, thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, -) -> Callable[[Callable[..., Any]], SyncToAsync]: +) -> Callable[[Callable[s_params, s_return]], SyncToAsync[s_params, s_return]]: ... @overload def sync_to_async( - func: Callable[..., Any], + func: Callable[s_params, s_return], thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, -) -> SyncToAsync: +) -> SyncToAsync[s_params, s_return]: ... From c9b210af918f2fedf4847cae123b4362e276cff0 Mon Sep 17 00:00:00 2001 From: Samantha Hughes Date: Thu, 25 Nov 2021 14:13:03 -0800 Subject: [PATCH 2/3] workaround --- asgiref/sync.py | 101 ++++++++++++++++++++++++------------------------ setup.cfg | 2 +- 2 files changed, 52 insertions(+), 51 deletions(-) diff --git a/asgiref/sync.py b/asgiref/sync.py index 1a33c60f..435aa814 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -7,7 +7,7 @@ import warnings import weakref from concurrent.futures import Future, ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar, overload +from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, Union, overload from typing_extensions import ParamSpec @@ -20,6 +20,9 @@ else: contextvars = None +P = ParamSpec("P") +R = TypeVar("R") + def _restore_context(context): # Check for changes in contextvars, and set them to the current @@ -100,11 +103,7 @@ async def __aexit__(self, exc, value, tb): pass -a_cls_params = ParamSpec("a_cls_params") -a_cls_return = TypeVar("a_cls_return") - - -class AsyncToSync(Generic[a_cls_params, a_cls_return]): +class AsyncToSync: """ Utility class which turns an awaitable that only works on the thread with the event loop into a synchronous callable that works in a subthread. @@ -125,9 +124,7 @@ class AsyncToSync(Generic[a_cls_params, a_cls_return]): executors = Local() def __init__( - self, - awaitable: Callable[a_cls_params, Awaitable[a_cls_return]], - force_new_loop=False, + self, awaitable: Callable[..., Awaitable[Any]], force_new_loop: bool = False ): if not callable(awaitable) or not _iscoroutinefunction_or_partial(awaitable): # Python does not have very reliable detection of async functions @@ -137,7 +134,7 @@ def __init__( ) self.awaitable = awaitable try: - self.__self__ = self.awaitable.__self__ + self.__self__ = self.awaitable.__self__ # type: ignore except AttributeError: pass if force_new_loop: @@ -161,9 +158,7 @@ def __init__( else: self.main_event_loop = None - def __call__( - self, *args: a_cls_params.args, **kwargs: a_cls_params.kwargs - ) -> a_cls_return: + def __call__(self, *args, **kwargs): # You can't call AsyncToSync from a thread with a running event loop try: event_loop = get_running_loop() @@ -184,7 +179,7 @@ def __call__( context = None # Make a future for the return information - call_result: Future[a_cls_return] = Future() + call_result = Future() # Get the source thread source_thread = threading.current_thread() # Make a CurrentThreadExecutor we'll use to idle in this thread - we @@ -283,13 +278,7 @@ def __get__(self, parent, objtype): return functools.update_wrapper(func, self.awaitable) async def main_wrap( - self, - args, - kwargs, - call_result: a_cls_return, - source_thread, - exc_info, - context, + self, args, kwargs, call_result, source_thread, exc_info, context ): """ Wraps the awaitable with something that puts the result into the @@ -321,11 +310,7 @@ async def main_wrap( context[0] = contextvars.copy_context() -s_cls_params = ParamSpec("s_cls_params") -s_cls_return = TypeVar("s_cls_return") - - -class SyncToAsync(Generic[s_cls_params, s_cls_return]): +class SyncToAsync: """ Utility class which turns a synchronous callable into an awaitable that runs in a threadpool. It also sets a threadlocal inside the thread so @@ -391,7 +376,7 @@ class SyncToAsync(Generic[s_cls_params, s_cls_return]): def __init__( self, - func: Callable[s_cls_params, s_cls_return], + func: Callable[..., Any], thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, ) -> None: @@ -409,9 +394,7 @@ def __init__( except AttributeError: pass - async def __call__( - self, *args: s_cls_params.args, **kwargs: s_cls_params.kwargs - ) -> s_cls_return: + async def __call__(self, *args, **kwargs): loop = get_running_loop() # Work out what thread to run the code in @@ -485,15 +468,7 @@ def __get__(self, parent, objtype): """ return functools.partial(self.__call__, parent) - def thread_handler( - self, - loop, - source_task, - exc_info, - func: Callable[s_cls_params, s_cls_return], - *args: s_cls_params.args, - **kwargs: s_cls_params.kwargs - ): + def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs): """ Wraps the sync application with exception handling. """ @@ -539,12 +514,36 @@ def get_current_task(): return None -# Lowercase aliases (and decorator friendliness) -async_to_sync = AsyncToSync +# Lowercase aliases (and decorator/typing friendliness) +@overload +def async_to_sync( + func: None = None, + force_new_loop: bool = False, +) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, R]]: + ... -s_params = ParamSpec("s_params") -s_return = TypeVar("s_return") +@overload +def async_to_sync( + func: Callable[P, Awaitable[R]], + force_new_loop: bool = False, +) -> Callable[P, R]: + ... + + +def async_to_sync( + func: Optional[Callable[P, Awaitable[R]]] = None, + force_new_loop: bool = False, +) -> Union[Callable[P, R], Callable[[Callable[P, Awaitable[R]]], Callable[P, R]]]: + if func is None: + return lambda f: AsyncToSync( + f, + force_new_loop=force_new_loop, + ) + return AsyncToSync( + func, + force_new_loop=force_new_loop, + ) @overload @@ -552,24 +551,26 @@ def sync_to_async( func: None = None, thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, -) -> Callable[[Callable[s_params, s_return]], SyncToAsync[s_params, s_return]]: +) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]: ... @overload def sync_to_async( - func: Callable[s_params, s_return], + func: Callable[P, R], thread_sensitive: bool = True, executor: Optional["ThreadPoolExecutor"] = None, -) -> SyncToAsync[s_params, s_return]: +) -> Callable[P, Awaitable[R]]: ... def sync_to_async( - func=None, - thread_sensitive=True, - executor=None, -): + func: Optional[Callable[P, R]] = None, + thread_sensitive: bool = True, + executor: Optional["ThreadPoolExecutor"] = None, +) -> Union[ + Callable[P, Awaitable[R]], Callable[[Callable[P, R]], Callable[P, Awaitable[R]]] +]: if func is None: return lambda f: SyncToAsync( f, diff --git a/setup.cfg b/setup.cfg index 01d0b50b..41dfaf38 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,7 @@ zip_safe = false tests = pytest pytest-asyncio - mypy>=0.800 + mypy @ git+https://github.com/python/mypy.git@master [tool:pytest] testpaths = tests From ff26ff44b5cfd69c09639fe1832aa2dbf5355b17 Mon Sep 17 00:00:00 2001 From: Samantha Hughes Date: Thu, 16 Dec 2021 20:16:27 -0800 Subject: [PATCH 3/3] Update setup.cfg Co-authored-by: Thomas Grainger --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 41dfaf38..62bd85c7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,7 @@ zip_safe = false tests = pytest pytest-asyncio - mypy @ git+https://github.com/python/mypy.git@master + mypy>=0.920 [tool:pytest] testpaths = tests