From 485b26b6825b09bc581a599f350c58116bbb0772 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 23 Mar 2022 09:07:34 +0000 Subject: [PATCH 01/13] bpo-43884: Fix asyncio subprocess kill process cleanly when process is blocked --- Lib/asyncio/base_subprocess.py | 23 ++++++++++++++++------- Lib/test/test_asyncio/test_subprocess.py | 14 ++++++++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 14d50519228814..4e29574632d218 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -214,14 +214,22 @@ def _process_exited(self, returncode): # asyncio uses a child watcher: copy the status into the Popen # object. On Python 3.6, it is required to avoid a ResourceWarning. self._proc.returncode = returncode - self._call(self._protocol.process_exited) - self._try_finish() - # wake up futures waiting for wait() - for waiter in self._exit_waiters: - if not waiter.cancelled(): - waiter.set_result(returncode) - self._exit_waiters = None + def __process_exited(): + self._protocol.process_exited() + # Since process exited, clear all pipes + for proto in self._pipes.values(): + if proto is None: + continue + proto.pipe.close() + # Call _wait waiters + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(self._returncode) + self._exit_waiters = None + + self._call(__process_exited) + self._try_finish() async def _wait(self): """Wait until the process exit and return the process return code. @@ -252,6 +260,7 @@ def _call_connection_lost(self, exc): self._protocol = None + class WriteSubprocessPipeProto(protocols.BaseProtocol): def __init__(self, proc, fd): diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 14fa6dd76f9ca8..cc9dd17cd0924f 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -181,6 +181,20 @@ def test_kill(self): else: self.assertEqual(-signal.SIGKILL, returncode) + def test_kill_issue43884(self): + blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(10000)"' + proc = self.loop.run_until_complete( + asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE) + ) + self.loop.run_until_complete(asyncio.sleep(1)) + proc.kill() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGKILL, returncode) + def test_terminate(self): args = PROGRAM_BLOCKED proc = self.loop.run_until_complete( From 0b87f8d2d9a84bf89bbf174a71e638615b1db43c Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Thu, 24 Mar 2022 08:29:08 +0000 Subject: [PATCH 02/13] rework --- Lib/asyncio/base_subprocess.py | 23 +++++++---------------- Lib/asyncio/subprocess.py | 4 +++- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 4e29574632d218..14d50519228814 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -214,23 +214,15 @@ def _process_exited(self, returncode): # asyncio uses a child watcher: copy the status into the Popen # object. On Python 3.6, it is required to avoid a ResourceWarning. self._proc.returncode = returncode - - def __process_exited(): - self._protocol.process_exited() - # Since process exited, clear all pipes - for proto in self._pipes.values(): - if proto is None: - continue - proto.pipe.close() - # Call _wait waiters - for waiter in self._exit_waiters: - if not waiter.cancelled(): - waiter.set_result(self._returncode) - self._exit_waiters = None - - self._call(__process_exited) + self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + async def _wait(self): """Wait until the process exit and return the process return code. @@ -260,7 +252,6 @@ def _call_connection_lost(self, exc): self._protocol = None - class WriteSubprocessPipeProto(protocols.BaseProtocol): def __init__(self, proc, fd): diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index cd10231f710f11..ab548d53b71d5f 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -103,7 +103,9 @@ def process_exited(self): self._maybe_close_transport() def _maybe_close_transport(self): - if len(self._pipe_fds) == 0 and self._process_exited: + # Since process already exited + # clear all the pipes and close transport + if self._process_exited and self._transport: self._transport.close() self._transport = None From 2bd08cf005a37afcbbee3cb042d02f66ce4528f2 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Mar 2022 10:19:07 +0000 Subject: [PATCH 03/13] start from scratch --- Lib/ast.py | 9 +- Lib/asyncio/exceptions.py | 7 +- Lib/asyncio/locks.py | 157 +++++- Lib/asyncio/runners.py | 124 ++++- Lib/asyncio/subprocess.py | 4 +- Lib/logging/__init__.py | 67 +-- Lib/opcode.py | 1 + Lib/test/test_ast.py | 25 +- Lib/test/test_asyncio/test_locks.py | 578 ++++++++++++++++++++++- Lib/test/test_asyncio/test_runners.py | 133 ++++++ Lib/test/test_asyncio/test_subprocess.py | 14 - Lib/test/test_future.py | 41 +- Lib/test/test_logging.py | 10 +- Lib/test/test_pep646_syntax.py | 326 +++++++++++++ Lib/test/test_syntax.py | 143 ++++++ Lib/test/test_unparse.py | 26 +- Lib/typing.py | 11 +- Lib/unittest/async_case.py | 70 +-- kill_subprocess.py | 16 + 19 files changed, 1626 insertions(+), 136 deletions(-) create mode 100644 Lib/test/test_pep646_syntax.py create mode 100644 kill_subprocess.py diff --git a/Lib/ast.py b/Lib/ast.py index 625738ad681af4..e81e28044bc6e4 100644 --- a/Lib/ast.py +++ b/Lib/ast.py @@ -1476,20 +1476,17 @@ def visit_Call(self, node): self.traverse(e) def visit_Subscript(self, node): - def is_simple_tuple(slice_value): - # when unparsing a non-empty tuple, the parentheses can be safely - # omitted if there aren't any elements that explicitly requires - # parentheses (such as starred expressions). + def is_non_empty_tuple(slice_value): return ( isinstance(slice_value, Tuple) and slice_value.elts - and not any(isinstance(elt, Starred) for elt in slice_value.elts) ) self.set_precedence(_Precedence.ATOM, node.value) self.traverse(node.value) with self.delimit("[", "]"): - if is_simple_tuple(node.slice): + if is_non_empty_tuple(node.slice): + # parentheses can be omitted if the tuple isn't empty self.items_view(self.traverse, node.slice.elts) else: self.traverse(node.slice) diff --git a/Lib/asyncio/exceptions.py b/Lib/asyncio/exceptions.py index c764c9ffcfc199..5ece595aad6475 100644 --- a/Lib/asyncio/exceptions.py +++ b/Lib/asyncio/exceptions.py @@ -1,7 +1,8 @@ """asyncio exceptions.""" -__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError', +__all__ = ('BrokenBarrierError', + 'CancelledError', 'InvalidStateError', 'TimeoutError', 'IncompleteReadError', 'LimitOverrunError', 'SendfileNotAvailableError') @@ -55,3 +56,7 @@ def __init__(self, message, consumed): def __reduce__(self): return type(self), (self.args[0], self.consumed) + + +class BrokenBarrierError(RuntimeError): + """Barrier is broken by barrier.abort() call.""" diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py index 9b4612197de1d9..e71130274dd6f3 100644 --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -1,14 +1,15 @@ """Synchronization primitives.""" -__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore') +__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', + 'BoundedSemaphore', 'Barrier') import collections +import enum from . import exceptions from . import mixins from . import tasks - class _ContextManagerMixin: async def __aenter__(self): await self.acquire() @@ -416,3 +417,155 @@ def release(self): if self._value >= self._bound_value: raise ValueError('BoundedSemaphore released too many times') super().release() + + + +class _BarrierState(enum.Enum): + FILLING = 'filling' + DRAINING = 'draining' + RESETTING = 'resetting' + BROKEN = 'broken' + + +class Barrier(mixins._LoopBoundMixin): + """Asyncio equivalent to threading.Barrier + + Implements a Barrier primitive. + Useful for synchronizing a fixed number of tasks at known synchronization + points. Tasks block on 'wait()' and are simultaneously awoken once they + have all made their call. + """ + + def __init__(self, parties): + """Create a barrier, initialised to 'parties' tasks.""" + if parties < 1: + raise ValueError('parties must be > 0') + + self._cond = Condition() # notify all tasks when state changes + + self._parties = parties + self._state = _BarrierState.FILLING + self._count = 0 # count tasks in Barrier + + def __repr__(self): + res = super().__repr__() + extra = f'{self._state.value}' + if not self.broken: + extra += f', waiters:{self.n_waiting}/{self.parties}' + return f'<{res[1:-1]} [{extra}]>' + + async def __aenter__(self): + # wait for the barrier reaches the parties number + # when start draining release and return index of waited task + return await self.wait() + + async def __aexit__(self, *args): + pass + + async def wait(self): + """Wait for the barrier. + + When the specified number of tasks have started waiting, they are all + simultaneously awoken. + Returns an unique and individual index number from 0 to 'parties-1'. + """ + async with self._cond: + await self._block() # Block while the barrier drains or resets. + try: + index = self._count + self._count += 1 + if index + 1 == self._parties: + # We release the barrier + await self._release() + else: + await self._wait() + return index + finally: + self._count -= 1 + # Wake up any tasks waiting for barrier to drain. + self._exit() + + async def _block(self): + # Block until the barrier is ready for us, + # or raise an exception if it is broken. + # + # It is draining or resetting, wait until done + # unless a CancelledError occurs + await self._cond.wait_for( + lambda: self._state not in ( + _BarrierState.DRAINING, _BarrierState.RESETTING + ) + ) + + # see if the barrier is in a broken state + if self._state is _BarrierState.BROKEN: + raise exceptions.BrokenBarrierError("Barrier aborted") + + async def _release(self): + # Release the tasks waiting in the barrier. + + # Enter draining state. + # Next waiting tasks will be blocked until the end of draining. + self._state = _BarrierState.DRAINING + self._cond.notify_all() + + async def _wait(self): + # Wait in the barrier until we are released. Raise an exception + # if the barrier is reset or broken. + + # wait for end of filling + # unless a CancelledError occurs + await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING) + + if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING): + raise exceptions.BrokenBarrierError("Abort or reset of barrier") + + def _exit(self): + # If we are the last tasks to exit the barrier, signal any tasks + # waiting for the barrier to drain. + if self._count == 0: + if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING): + self._state = _BarrierState.FILLING + self._cond.notify_all() + + async def reset(self): + """Reset the barrier to the initial state. + + Any tasks currently waiting will get the BrokenBarrier exception + raised. + """ + async with self._cond: + if self._count > 0: + if self._state is not _BarrierState.RESETTING: + #reset the barrier, waking up tasks + self._state = _BarrierState.RESETTING + else: + self._state = _BarrierState.FILLING + self._cond.notify_all() + + async def abort(self): + """Place the barrier into a 'broken' state. + + Useful in case of error. Any currently waiting tasks and tasks + attempting to 'wait()' will have BrokenBarrierError raised. + """ + async with self._cond: + self._state = _BarrierState.BROKEN + self._cond.notify_all() + + @property + def parties(self): + """Return the number of tasks required to trip the barrier.""" + return self._parties + + @property + def n_waiting(self): + """Return the number of tasks currently waiting at the barrier.""" + if self._state is _BarrierState.FILLING: + return self._count + return 0 + + @property + def broken(self): + """Return True if the barrier is in a broken state.""" + return self._state is _BarrierState.BROKEN diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py index 9a5e9a48479ef7..768a403a85beec 100644 --- a/Lib/asyncio/runners.py +++ b/Lib/asyncio/runners.py @@ -1,10 +1,112 @@ -__all__ = 'run', +__all__ = ('Runner', 'run') +import contextvars +import enum from . import coroutines from . import events from . import tasks +class _State(enum.Enum): + CREATED = "created" + INITIALIZED = "initialized" + CLOSED = "closed" + + +class Runner: + """A context manager that controls event loop life cycle. + + The context manager always creates a new event loop, + allows to run async functions inside it, + and properly finalizes the loop at the context manager exit. + + If debug is True, the event loop will be run in debug mode. + If loop_factory is passed, it is used for new event loop creation. + + asyncio.run(main(), debug=True) + + is a shortcut for + + with asyncio.Runner(debug=True) as runner: + runner.run(main()) + + The run() method can be called multiple times within the runner's context. + + This can be useful for interactive console (e.g. IPython), + unittest runners, console tools, -- everywhere when async code + is called from existing sync framework and where the preferred single + asyncio.run() call doesn't work. + + """ + + # Note: the class is final, it is not intended for inheritance. + + def __init__(self, *, debug=None, loop_factory=None): + self._state = _State.CREATED + self._debug = debug + self._loop_factory = loop_factory + self._loop = None + self._context = None + + def __enter__(self): + self._lazy_init() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + """Shutdown and close event loop.""" + if self._state is not _State.INITIALIZED: + return + try: + loop = self._loop + _cancel_all_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.run_until_complete(loop.shutdown_default_executor()) + finally: + loop.close() + self._loop = None + self._state = _State.CLOSED + + def get_loop(self): + """Return embedded event loop.""" + self._lazy_init() + return self._loop + + def run(self, coro, *, context=None): + """Run a coroutine inside the embedded event loop.""" + if not coroutines.iscoroutine(coro): + raise ValueError("a coroutine was expected, got {!r}".format(coro)) + + if events._get_running_loop() is not None: + # fail fast with short traceback + raise RuntimeError( + "Runner.run() cannot be called from a running event loop") + + self._lazy_init() + + if context is None: + context = self._context + task = self._loop.create_task(coro, context=context) + return self._loop.run_until_complete(task) + + def _lazy_init(self): + if self._state is _State.CLOSED: + raise RuntimeError("Runner is closed") + if self._state is _State.INITIALIZED: + return + if self._loop_factory is None: + self._loop = events.new_event_loop() + else: + self._loop = self._loop_factory() + if self._debug is not None: + self._loop.set_debug(self._debug) + self._context = contextvars.copy_context() + self._state = _State.INITIALIZED + + + def run(main, *, debug=None): """Execute the coroutine and return the result. @@ -30,26 +132,12 @@ async def main(): asyncio.run(main()) """ if events._get_running_loop() is not None: + # fail fast with short traceback raise RuntimeError( "asyncio.run() cannot be called from a running event loop") - if not coroutines.iscoroutine(main): - raise ValueError("a coroutine was expected, got {!r}".format(main)) - - loop = events.new_event_loop() - try: - events.set_event_loop(loop) - if debug is not None: - loop.set_debug(debug) - return loop.run_until_complete(main) - finally: - try: - _cancel_all_tasks(loop) - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.run_until_complete(loop.shutdown_default_executor()) - finally: - events.set_event_loop(None) - loop.close() + with Runner(debug=debug) as runner: + return runner.run(main) def _cancel_all_tasks(loop): diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index ab548d53b71d5f..cd10231f710f11 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -103,9 +103,7 @@ def process_exited(self): self._maybe_close_transport() def _maybe_close_transport(self): - # Since process already exited - # clear all the pipes and close transport - if self._process_exited and self._transport: + if len(self._pipe_fds) == 0 and self._process_exited: self._transport.close() self._transport = None diff --git a/Lib/logging/__init__.py b/Lib/logging/__init__.py index 160b1afcee0f9e..d6315b047334ec 100644 --- a/Lib/logging/__init__.py +++ b/Lib/logging/__init__.py @@ -159,8 +159,8 @@ def addLevelName(level, levelName): finally: _releaseLock() -if hasattr(sys, '_getframe'): - currentframe = lambda: sys._getframe(3) +if hasattr(sys, "_getframe"): + currentframe = lambda: sys._getframe(1) else: #pragma: no cover def currentframe(): """Return the frame object for the caller's stack frame.""" @@ -184,13 +184,18 @@ def currentframe(): _srcfile = os.path.normcase(addLevelName.__code__.co_filename) # _srcfile is only used in conjunction with sys._getframe(). -# To provide compatibility with older versions of Python, set _srcfile -# to None if _getframe() is not available; this value will prevent -# findCaller() from being called. You can also do this if you want to avoid -# the overhead of fetching caller information, even when _getframe() is -# available. -#if not hasattr(sys, '_getframe'): -# _srcfile = None +# Setting _srcfile to None will prevent findCaller() from being called. This +# way, you can avoid the overhead of fetching caller information. + +# The following is based on warnings._is_internal_frame. It makes sure that +# frames of the import mechanism are skipped when logging at module level and +# using a stacklevel value greater than one. +def _is_internal_frame(frame): + """Signal whether the frame is a CPython or logging module internal.""" + filename = os.path.normcase(frame.f_code.co_filename) + return filename == _srcfile or ( + "importlib" in filename and "_bootstrap" in filename + ) def _checkLevel(level): @@ -1558,33 +1563,31 @@ def findCaller(self, stack_info=False, stacklevel=1): f = currentframe() #On some versions of IronPython, currentframe() returns None if #IronPython isn't run with -X:Frames. - if f is not None: - f = f.f_back - orig_f = f - while f and stacklevel > 1: - f = f.f_back - stacklevel -= 1 - if not f: - f = orig_f - rv = "(unknown file)", 0, "(unknown function)", None - while hasattr(f, "f_code"): - co = f.f_code - filename = os.path.normcase(co.co_filename) - if filename == _srcfile: - f = f.f_back - continue - sinfo = None - if stack_info: - sio = io.StringIO() - sio.write('Stack (most recent call last):\n') + if f is None: + return "(unknown file)", 0, "(unknown function)", None + while stacklevel > 0: + next_f = f.f_back + if next_f is None: + ## We've got options here. + ## If we want to use the last (deepest) frame: + break + ## If we want to mimic the warnings module: + #return ("sys", 1, "(unknown function)", None) + ## If we want to be pedantic: + #raise ValueError("call stack is not deep enough") + f = next_f + if not _is_internal_frame(f): + stacklevel -= 1 + co = f.f_code + sinfo = None + if stack_info: + with io.StringIO() as sio: + sio.write("Stack (most recent call last):\n") traceback.print_stack(f, file=sio) sinfo = sio.getvalue() if sinfo[-1] == '\n': sinfo = sinfo[:-1] - sio.close() - rv = (co.co_filename, f.f_lineno, co.co_name, sinfo) - break - return rv + return co.co_filename, f.f_lineno, co.co_name, sinfo def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): diff --git a/Lib/opcode.py b/Lib/opcode.py index 7a52c13579af7c..0b463d3f183aad 100644 --- a/Lib/opcode.py +++ b/Lib/opcode.py @@ -294,6 +294,7 @@ def jabs_op(name, op, entries=0): "PRECALL_BOUND_METHOD", "PRECALL_BUILTIN_CLASS", "PRECALL_BUILTIN_FAST_WITH_KEYWORDS", + "PRECALL_METHOD_DESCRIPTOR_FAST_WITH_KEYWORDS", "PRECALL_NO_KW_BUILTIN_FAST", "PRECALL_NO_KW_BUILTIN_O", "PRECALL_NO_KW_ISINSTANCE", diff --git a/Lib/test/test_ast.py b/Lib/test/test_ast.py index 039d1c1010b6d1..03d9b310f161b9 100644 --- a/Lib/test/test_ast.py +++ b/Lib/test/test_ast.py @@ -13,7 +13,7 @@ from test import support def to_tuple(t): - if t is None or isinstance(t, (str, int, complex)): + if t is None or isinstance(t, (str, int, complex)) or t is Ellipsis: return t elif isinstance(t, list): return [to_tuple(e) for e in t] @@ -46,10 +46,20 @@ def to_tuple(t): "def f(a=0): pass", # FunctionDef with varargs "def f(*args): pass", + # FunctionDef with varargs as TypeVarTuple + "def f(*args: *Ts): pass", + # FunctionDef with varargs as unpacked Tuple + "def f(*args: *tuple[int, ...]): pass", + # FunctionDef with varargs as unpacked Tuple *and* TypeVarTuple + "def f(*args: *tuple[int, *Ts]): pass", # FunctionDef with kwargs "def f(**kwargs): pass", # FunctionDef with all kind of args and docstring "def f(a, b=1, c=None, d=[], e={}, *args, f=42, **kwargs): 'doc for f()'", + # FunctionDef with type annotation on return involving unpacking + "def f() -> tuple[*Ts]: pass", + "def f() -> tuple[int, *Ts]: pass", + "def f() -> tuple[int, *tuple[int, ...]]: pass", # ClassDef "class C:pass", # ClassDef with docstring @@ -65,6 +75,10 @@ def to_tuple(t): "a,b = c", "(a,b) = c", "[a,b] = c", + # AnnAssign with unpacked types + "x: tuple[*Ts]", + "x: tuple[int, *Ts]", + "x: tuple[int, *tuple[str, ...]]", # AugAssign "v += 1", # For @@ -2315,8 +2329,14 @@ def main(): ('Module', [('FunctionDef', (1, 0, 1, 14), 'f', ('arguments', [], [('arg', (1, 6, 1, 7), 'a', None, None)], None, [], [], None, []), [('Pass', (1, 10, 1, 14))], [], None, None)], []), ('Module', [('FunctionDef', (1, 0, 1, 16), 'f', ('arguments', [], [('arg', (1, 6, 1, 7), 'a', None, None)], None, [], [], None, [('Constant', (1, 8, 1, 9), 0, None)]), [('Pass', (1, 12, 1, 16))], [], None, None)], []), ('Module', [('FunctionDef', (1, 0, 1, 18), 'f', ('arguments', [], [], ('arg', (1, 7, 1, 11), 'args', None, None), [], [], None, []), [('Pass', (1, 14, 1, 18))], [], None, None)], []), +('Module', [('FunctionDef', (1, 0, 1, 23), 'f', ('arguments', [], [], ('arg', (1, 7, 1, 16), 'args', ('Starred', (1, 13, 1, 16), ('Name', (1, 14, 1, 16), 'Ts', ('Load',)), ('Load',)), None), [], [], None, []), [('Pass', (1, 19, 1, 23))], [], None, None)], []), +('Module', [('FunctionDef', (1, 0, 1, 36), 'f', ('arguments', [], [], ('arg', (1, 7, 1, 29), 'args', ('Starred', (1, 13, 1, 29), ('Subscript', (1, 14, 1, 29), ('Name', (1, 14, 1, 19), 'tuple', ('Load',)), ('Tuple', (1, 20, 1, 28), [('Name', (1, 20, 1, 23), 'int', ('Load',)), ('Constant', (1, 25, 1, 28), Ellipsis, None)], ('Load',)), ('Load',)), ('Load',)), None), [], [], None, []), [('Pass', (1, 32, 1, 36))], [], None, None)], []), +('Module', [('FunctionDef', (1, 0, 1, 36), 'f', ('arguments', [], [], ('arg', (1, 7, 1, 29), 'args', ('Starred', (1, 13, 1, 29), ('Subscript', (1, 14, 1, 29), ('Name', (1, 14, 1, 19), 'tuple', ('Load',)), ('Tuple', (1, 20, 1, 28), [('Name', (1, 20, 1, 23), 'int', ('Load',)), ('Starred', (1, 25, 1, 28), ('Name', (1, 26, 1, 28), 'Ts', ('Load',)), ('Load',))], ('Load',)), ('Load',)), ('Load',)), None), [], [], None, []), [('Pass', (1, 32, 1, 36))], [], None, None)], []), ('Module', [('FunctionDef', (1, 0, 1, 21), 'f', ('arguments', [], [], None, [], [], ('arg', (1, 8, 1, 14), 'kwargs', None, None), []), [('Pass', (1, 17, 1, 21))], [], None, None)], []), ('Module', [('FunctionDef', (1, 0, 1, 71), 'f', ('arguments', [], [('arg', (1, 6, 1, 7), 'a', None, None), ('arg', (1, 9, 1, 10), 'b', None, None), ('arg', (1, 14, 1, 15), 'c', None, None), ('arg', (1, 22, 1, 23), 'd', None, None), ('arg', (1, 28, 1, 29), 'e', None, None)], ('arg', (1, 35, 1, 39), 'args', None, None), [('arg', (1, 41, 1, 42), 'f', None, None)], [('Constant', (1, 43, 1, 45), 42, None)], ('arg', (1, 49, 1, 55), 'kwargs', None, None), [('Constant', (1, 11, 1, 12), 1, None), ('Constant', (1, 16, 1, 20), None, None), ('List', (1, 24, 1, 26), [], ('Load',)), ('Dict', (1, 30, 1, 32), [], [])]), [('Expr', (1, 58, 1, 71), ('Constant', (1, 58, 1, 71), 'doc for f()', None))], [], None, None)], []), +('Module', [('FunctionDef', (1, 0, 1, 27), 'f', ('arguments', [], [], None, [], [], None, []), [('Pass', (1, 23, 1, 27))], [], ('Subscript', (1, 11, 1, 21), ('Name', (1, 11, 1, 16), 'tuple', ('Load',)), ('Tuple', (1, 17, 1, 20), [('Starred', (1, 17, 1, 20), ('Name', (1, 18, 1, 20), 'Ts', ('Load',)), ('Load',))], ('Load',)), ('Load',)), None)], []), +('Module', [('FunctionDef', (1, 0, 1, 32), 'f', ('arguments', [], [], None, [], [], None, []), [('Pass', (1, 28, 1, 32))], [], ('Subscript', (1, 11, 1, 26), ('Name', (1, 11, 1, 16), 'tuple', ('Load',)), ('Tuple', (1, 17, 1, 25), [('Name', (1, 17, 1, 20), 'int', ('Load',)), ('Starred', (1, 22, 1, 25), ('Name', (1, 23, 1, 25), 'Ts', ('Load',)), ('Load',))], ('Load',)), ('Load',)), None)], []), +('Module', [('FunctionDef', (1, 0, 1, 45), 'f', ('arguments', [], [], None, [], [], None, []), [('Pass', (1, 41, 1, 45))], [], ('Subscript', (1, 11, 1, 39), ('Name', (1, 11, 1, 16), 'tuple', ('Load',)), ('Tuple', (1, 17, 1, 38), [('Name', (1, 17, 1, 20), 'int', ('Load',)), ('Starred', (1, 22, 1, 38), ('Subscript', (1, 23, 1, 38), ('Name', (1, 23, 1, 28), 'tuple', ('Load',)), ('Tuple', (1, 29, 1, 37), [('Name', (1, 29, 1, 32), 'int', ('Load',)), ('Constant', (1, 34, 1, 37), Ellipsis, None)], ('Load',)), ('Load',)), ('Load',))], ('Load',)), ('Load',)), None)], []), ('Module', [('ClassDef', (1, 0, 1, 12), 'C', [], [], [('Pass', (1, 8, 1, 12))], [])], []), ('Module', [('ClassDef', (1, 0, 1, 32), 'C', [], [], [('Expr', (1, 9, 1, 32), ('Constant', (1, 9, 1, 32), 'docstring for class C', None))], [])], []), ('Module', [('ClassDef', (1, 0, 1, 21), 'C', [('Name', (1, 8, 1, 14), 'object', ('Load',))], [], [('Pass', (1, 17, 1, 21))], [])], []), @@ -2326,6 +2346,9 @@ def main(): ('Module', [('Assign', (1, 0, 1, 7), [('Tuple', (1, 0, 1, 3), [('Name', (1, 0, 1, 1), 'a', ('Store',)), ('Name', (1, 2, 1, 3), 'b', ('Store',))], ('Store',))], ('Name', (1, 6, 1, 7), 'c', ('Load',)), None)], []), ('Module', [('Assign', (1, 0, 1, 9), [('Tuple', (1, 0, 1, 5), [('Name', (1, 1, 1, 2), 'a', ('Store',)), ('Name', (1, 3, 1, 4), 'b', ('Store',))], ('Store',))], ('Name', (1, 8, 1, 9), 'c', ('Load',)), None)], []), ('Module', [('Assign', (1, 0, 1, 9), [('List', (1, 0, 1, 5), [('Name', (1, 1, 1, 2), 'a', ('Store',)), ('Name', (1, 3, 1, 4), 'b', ('Store',))], ('Store',))], ('Name', (1, 8, 1, 9), 'c', ('Load',)), None)], []), +('Module', [('AnnAssign', (1, 0, 1, 13), ('Name', (1, 0, 1, 1), 'x', ('Store',)), ('Subscript', (1, 3, 1, 13), ('Name', (1, 3, 1, 8), 'tuple', ('Load',)), ('Tuple', (1, 9, 1, 12), [('Starred', (1, 9, 1, 12), ('Name', (1, 10, 1, 12), 'Ts', ('Load',)), ('Load',))], ('Load',)), ('Load',)), None, 1)], []), +('Module', [('AnnAssign', (1, 0, 1, 18), ('Name', (1, 0, 1, 1), 'x', ('Store',)), ('Subscript', (1, 3, 1, 18), ('Name', (1, 3, 1, 8), 'tuple', ('Load',)), ('Tuple', (1, 9, 1, 17), [('Name', (1, 9, 1, 12), 'int', ('Load',)), ('Starred', (1, 14, 1, 17), ('Name', (1, 15, 1, 17), 'Ts', ('Load',)), ('Load',))], ('Load',)), ('Load',)), None, 1)], []), +('Module', [('AnnAssign', (1, 0, 1, 31), ('Name', (1, 0, 1, 1), 'x', ('Store',)), ('Subscript', (1, 3, 1, 31), ('Name', (1, 3, 1, 8), 'tuple', ('Load',)), ('Tuple', (1, 9, 1, 30), [('Name', (1, 9, 1, 12), 'int', ('Load',)), ('Starred', (1, 14, 1, 30), ('Subscript', (1, 15, 1, 30), ('Name', (1, 15, 1, 20), 'tuple', ('Load',)), ('Tuple', (1, 21, 1, 29), [('Name', (1, 21, 1, 24), 'str', ('Load',)), ('Constant', (1, 26, 1, 29), Ellipsis, None)], ('Load',)), ('Load',)), ('Load',))], ('Load',)), ('Load',)), None, 1)], []), ('Module', [('AugAssign', (1, 0, 1, 6), ('Name', (1, 0, 1, 1), 'v', ('Store',)), ('Add',), ('Constant', (1, 5, 1, 6), 1, None))], []), ('Module', [('For', (1, 0, 1, 15), ('Name', (1, 4, 1, 5), 'v', ('Store',)), ('Name', (1, 9, 1, 10), 'v', ('Load',)), [('Pass', (1, 11, 1, 15))], [], None)], []), ('Module', [('While', (1, 0, 1, 12), ('Name', (1, 6, 1, 7), 'v', ('Load',)), [('Pass', (1, 8, 1, 12))], [])], []), diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py index 920b3b5717a2cc..415cbe570fde98 100644 --- a/Lib/test/test_asyncio/test_locks.py +++ b/Lib/test/test_asyncio/test_locks.py @@ -1,4 +1,4 @@ -"""Tests for lock.py""" +"""Tests for locks.py""" import unittest from unittest import mock @@ -9,7 +9,10 @@ STR_RGX_REPR = ( r'^<(?P.*?) object at (?P
.*?)' r'\[(?P' - r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?' + r'(set|unset|locked|unlocked|filling|draining|resetting|broken)' + r'(, value:\d)?' + r'(, waiters:\d+)?' + r'(, waiters:\d+\/\d+)?' # barrier r')\]>\Z' ) RGX_REPR = re.compile(STR_RGX_REPR) @@ -943,5 +946,576 @@ async def coro(tag): ) +class BarrierTests(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + await super().asyncSetUp() + self.N = 5 + + def make_tasks(self, n, coro): + tasks = [asyncio.create_task(coro()) for _ in range(n)] + return tasks + + async def gather_tasks(self, n, coro): + tasks = self.make_tasks(n, coro) + res = await asyncio.gather(*tasks) + return res, tasks + + async def test_barrier(self): + barrier = asyncio.Barrier(self.N) + self.assertIn("filling", repr(barrier)) + with self.assertRaisesRegex( + TypeError, + "object Barrier can't be used in 'await' expression", + ): + await barrier + + self.assertIn("filling", repr(barrier)) + + async def test_repr(self): + barrier = asyncio.Barrier(self.N) + + self.assertTrue(RGX_REPR.match(repr(barrier))) + self.assertIn("filling", repr(barrier)) + + waiters = [] + async def wait(barrier): + await barrier.wait() + + incr = 2 + for i in range(incr): + waiters.append(asyncio.create_task(wait(barrier))) + await asyncio.sleep(0) + + self.assertTrue(RGX_REPR.match(repr(barrier))) + self.assertTrue(f"waiters:{incr}/{self.N}" in repr(barrier)) + self.assertIn("filling", repr(barrier)) + + # create missing waiters + for i in range(barrier.parties - barrier.n_waiting): + waiters.append(asyncio.create_task(wait(barrier))) + await asyncio.sleep(0) + + self.assertTrue(RGX_REPR.match(repr(barrier))) + self.assertIn("draining", repr(barrier)) + + # add a part of waiters + for i in range(incr): + waiters.append(asyncio.create_task(wait(barrier))) + await asyncio.sleep(0) + # and reset + await barrier.reset() + + self.assertTrue(RGX_REPR.match(repr(barrier))) + self.assertIn("resetting", repr(barrier)) + + # add a part of waiters again + for i in range(incr): + waiters.append(asyncio.create_task(wait(barrier))) + await asyncio.sleep(0) + # and abort + await barrier.abort() + + self.assertTrue(RGX_REPR.match(repr(barrier))) + self.assertIn("broken", repr(barrier)) + self.assertTrue(barrier.broken) + + # suppress unhandled exceptions + await asyncio.gather(*waiters, return_exceptions=True) + + async def test_barrier_parties(self): + self.assertRaises(ValueError, lambda: asyncio.Barrier(0)) + self.assertRaises(ValueError, lambda: asyncio.Barrier(-4)) + + self.assertIsInstance(asyncio.Barrier(self.N), asyncio.Barrier) + + async def test_context_manager(self): + self.N = 3 + barrier = asyncio.Barrier(self.N) + results = [] + + async def coro(): + async with barrier as i: + results.append(i) + + await self.gather_tasks(self.N, coro) + + self.assertListEqual(sorted(results), list(range(self.N))) + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_filling_one_task(self): + barrier = asyncio.Barrier(1) + + async def f(): + async with barrier as i: + return True + + ret = await f() + + self.assertTrue(ret) + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_filling_one_task_twice(self): + barrier = asyncio.Barrier(1) + + t1 = asyncio.create_task(barrier.wait()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 0) + + t2 = asyncio.create_task(barrier.wait()) + await asyncio.sleep(0) + + self.assertEqual(t1.result(), t2.result()) + self.assertEqual(t1.done(), t2.done()) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_filling_task_by_task(self): + self.N = 3 + barrier = asyncio.Barrier(self.N) + + t1 = asyncio.create_task(barrier.wait()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 1) + self.assertIn("filling", repr(barrier)) + + t2 = asyncio.create_task(barrier.wait()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 2) + self.assertIn("filling", repr(barrier)) + + t3 = asyncio.create_task(barrier.wait()) + await asyncio.sleep(0) + + await asyncio.wait([t1, t2, t3]) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_filling_tasks_wait_twice(self): + barrier = asyncio.Barrier(self.N) + results = [] + + async def coro(): + async with barrier: + results.append(True) + + async with barrier: + results.append(False) + + await self.gather_tasks(self.N, coro) + + self.assertEqual(len(results), self.N*2) + self.assertEqual(results.count(True), self.N) + self.assertEqual(results.count(False), self.N) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_filling_tasks_check_return_value(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + + async def coro(): + async with barrier: + results1.append(True) + + async with barrier as i: + results2.append(True) + return i + + res, _ = await self.gather_tasks(self.N, coro) + + self.assertEqual(len(results1), self.N) + self.assertTrue(all(results1)) + self.assertEqual(len(results2), self.N) + self.assertTrue(all(results2)) + self.assertListEqual(sorted(res), list(range(self.N))) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_draining_state(self): + barrier = asyncio.Barrier(self.N) + results = [] + + async def coro(): + async with barrier: + # barrier state change to filling for the last task release + results.append("draining" in repr(barrier)) + + await self.gather_tasks(self.N, coro) + + self.assertEqual(len(results), self.N) + self.assertEqual(results[-1], False) + self.assertTrue(all(results[:self.N-1])) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_blocking_tasks_while_draining(self): + rewait = 2 + barrier = asyncio.Barrier(self.N) + barrier_nowaiting = asyncio.Barrier(self.N - rewait) + results = [] + rewait_n = rewait + counter = 0 + + async def coro(): + nonlocal rewait_n + + # first time waiting + await barrier.wait() + + # after wainting once for all tasks + if rewait_n > 0: + rewait_n -= 1 + # wait again only for rewait tasks + await barrier.wait() + else: + # wait for end of draining state` + await barrier_nowaiting.wait() + # wait for other waiting tasks + await barrier.wait() + + # a success means that barrier_nowaiting + # was waited for exactly N-rewait=3 times + await self.gather_tasks(self.N, coro) + + async def test_filling_tasks_cancel_one(self): + self.N = 3 + barrier = asyncio.Barrier(self.N) + results = [] + + async def coro(): + await barrier.wait() + results.append(True) + + t1 = asyncio.create_task(coro()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 1) + + t2 = asyncio.create_task(coro()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 2) + + t1.cancel() + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 1) + with self.assertRaises(asyncio.CancelledError): + await t1 + self.assertTrue(t1.cancelled()) + + t3 = asyncio.create_task(coro()) + await asyncio.sleep(0) + self.assertEqual(barrier.n_waiting, 2) + + t4 = asyncio.create_task(coro()) + await asyncio.gather(t2, t3, t4) + + self.assertEqual(len(results), self.N) + self.assertTrue(all(results)) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_reset_barrier(self): + barrier = asyncio.Barrier(1) + + asyncio.create_task(barrier.reset()) + await asyncio.sleep(0) + + self.assertEqual(barrier.n_waiting, 0) + self.assertFalse(barrier.broken) + + async def test_reset_barrier_while_tasks_waiting(self): + barrier = asyncio.Barrier(self.N) + results = [] + + async def coro(): + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + results.append(True) + + async def coro_reset(): + await barrier.reset() + + # N-1 tasks waiting on barrier with N parties + tasks = self.make_tasks(self.N-1, coro) + await asyncio.sleep(0) + + # reset the barrier + asyncio.create_task(coro_reset()) + await asyncio.gather(*tasks) + + self.assertEqual(len(results), self.N-1) + self.assertTrue(all(results)) + self.assertEqual(barrier.n_waiting, 0) + self.assertNotIn("resetting", repr(barrier)) + self.assertFalse(barrier.broken) + + async def test_reset_barrier_when_tasks_half_draining(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + rest_of_tasks = self.N//2 + + async def coro(): + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + # catch here waiting tasks + results1.append(True) + else: + # here drained task ouside the barrier + if rest_of_tasks == barrier._count: + # tasks outside the barrier + await barrier.reset() + + await self.gather_tasks(self.N, coro) + + self.assertEqual(results1, [True]*rest_of_tasks) + self.assertEqual(barrier.n_waiting, 0) + self.assertNotIn("resetting", repr(barrier)) + self.assertFalse(barrier.broken) + + async def test_reset_barrier_when_tasks_half_draining_half_blocking(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + blocking_tasks = self.N//2 + count = 0 + + async def coro(): + nonlocal count + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + # here catch still waiting tasks + results1.append(True) + + # so now waiting again to reach nb_parties + await barrier.wait() + else: + count += 1 + if count > blocking_tasks: + # reset now: raise asyncio.BrokenBarrierError for waiting tasks + await barrier.reset() + + # so now waiting again to reach nb_parties + await barrier.wait() + else: + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + # here no catch - blocked tasks go to wait + results2.append(True) + + await self.gather_tasks(self.N, coro) + + self.assertEqual(results1, [True]*blocking_tasks) + self.assertEqual(results2, []) + self.assertEqual(barrier.n_waiting, 0) + self.assertNotIn("resetting", repr(barrier)) + self.assertFalse(barrier.broken) + + async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + + async def coro1(): + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + results1.append(True) + finally: + await barrier.wait() + results2.append(True) + + async def coro2(): + async with barrier: + results2.append(True) + + tasks = self.make_tasks(self.N-1, coro1) + + # reset barrier, N-1 waiting tasks raise an BrokenBarrierError + asyncio.create_task(barrier.reset()) + await asyncio.sleep(0) + + # complete waiting tasks in the `finally` + asyncio.create_task(coro2()) + + await asyncio.gather(*tasks) + + self.assertFalse(barrier.broken) + self.assertEqual(len(results1), self.N-1) + self.assertTrue(all(results1)) + self.assertEqual(len(results2), self.N) + self.assertTrue(all(results2)) + + self.assertEqual(barrier.n_waiting, 0) + + + async def test_reset_barrier_while_tasks_draining(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + results3 = [] + count = 0 + + async def coro(): + nonlocal count + + i = await barrier.wait() + count += 1 + if count == self.N: + # last task exited from barrier + await barrier.reset() + + # wit here to reach the `parties` + await barrier.wait() + else: + try: + # second waiting + await barrier.wait() + + # N-1 tasks here + results1.append(True) + except Exception as e: + # never goes here + results2.append(True) + + # Now, pass the barrier again + # last wait, must be completed + k = await barrier.wait() + results3.append(True) + + await self.gather_tasks(self.N, coro) + + self.assertFalse(barrier.broken) + self.assertTrue(all(results1)) + self.assertEqual(len(results1), self.N-1) + self.assertEqual(len(results2), 0) + self.assertEqual(len(results3), self.N) + self.assertTrue(all(results3)) + + self.assertEqual(barrier.n_waiting, 0) + + async def test_abort_barrier(self): + barrier = asyncio.Barrier(1) + + asyncio.create_task(barrier.abort()) + await asyncio.sleep(0) + + self.assertEqual(barrier.n_waiting, 0) + self.assertTrue(barrier.broken) + + async def test_abort_barrier_when_tasks_half_draining_half_blocking(self): + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + blocking_tasks = self.N//2 + count = 0 + + async def coro(): + nonlocal count + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + # here catch tasks waiting to drain + results1.append(True) + else: + count += 1 + if count > blocking_tasks: + # abort now: raise asyncio.BrokenBarrierError for all tasks + await barrier.abort() + else: + try: + await barrier.wait() + except asyncio.BrokenBarrierError: + # here catch blocked tasks (already drained) + results2.append(True) + + await self.gather_tasks(self.N, coro) + + self.assertTrue(barrier.broken) + self.assertEqual(results1, [True]*blocking_tasks) + self.assertEqual(results2, [True]*(self.N-blocking_tasks-1)) + self.assertEqual(barrier.n_waiting, 0) + self.assertNotIn("resetting", repr(barrier)) + + async def test_abort_barrier_when_exception(self): + # test from threading.Barrier: see `lock_tests.test_reset` + barrier = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + + async def coro(): + try: + async with barrier as i : + if i == self.N//2: + raise RuntimeError + async with barrier: + results1.append(True) + except asyncio.BrokenBarrierError: + results2.append(True) + except RuntimeError: + await barrier.abort() + + await self.gather_tasks(self.N, coro) + + self.assertTrue(barrier.broken) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertTrue(all(results2)) + self.assertEqual(barrier.n_waiting, 0) + + async def test_abort_barrier_when_exception_then_resetting(self): + # test from threading.Barrier: see `lock_tests.test_abort_and_reset`` + barrier1 = asyncio.Barrier(self.N) + barrier2 = asyncio.Barrier(self.N) + results1 = [] + results2 = [] + results3 = [] + + async def coro(): + try: + i = await barrier1.wait() + if i == self.N//2: + raise RuntimeError + await barrier1.wait() + results1.append(True) + except asyncio.BrokenBarrierError: + results2.append(True) + except RuntimeError: + await barrier1.abort() + + # Synchronize and reset the barrier. Must synchronize first so + # that everyone has left it when we reset, and after so that no + # one enters it before the reset. + i = await barrier2.wait() + if i == self.N//2: + await barrier1.reset() + await barrier2.wait() + await barrier1.wait() + results3.append(True) + + await self.gather_tasks(self.N, coro) + + self.assertFalse(barrier1.broken) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertTrue(all(results2)) + self.assertEqual(len(results3), self.N) + self.assertTrue(all(results3)) + + self.assertEqual(barrier1.n_waiting, 0) + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py index 112273662b20b5..94f26797b33090 100644 --- a/Lib/test/test_asyncio/test_runners.py +++ b/Lib/test/test_asyncio/test_runners.py @@ -1,4 +1,7 @@ import asyncio +import contextvars +import gc +import re import unittest from unittest import mock @@ -186,5 +189,135 @@ async def main(): self.assertFalse(spinner.ag_running) +class RunnerTests(BaseTest): + + def test_non_debug(self): + with asyncio.Runner(debug=False) as runner: + self.assertFalse(runner.get_loop().get_debug()) + + def test_debug(self): + with asyncio.Runner(debug=True) as runner: + self.assertTrue(runner.get_loop().get_debug()) + + def test_custom_factory(self): + loop = mock.Mock() + with asyncio.Runner(loop_factory=lambda: loop) as runner: + self.assertIs(runner.get_loop(), loop) + + def test_run(self): + async def f(): + await asyncio.sleep(0) + return 'done' + + with asyncio.Runner() as runner: + self.assertEqual('done', runner.run(f())) + loop = runner.get_loop() + + with self.assertRaisesRegex( + RuntimeError, + "Runner is closed" + ): + runner.get_loop() + + self.assertTrue(loop.is_closed()) + + def test_run_non_coro(self): + with asyncio.Runner() as runner: + with self.assertRaisesRegex( + ValueError, + "a coroutine was expected" + ): + runner.run(123) + + def test_run_future(self): + with asyncio.Runner() as runner: + with self.assertRaisesRegex( + ValueError, + "a coroutine was expected" + ): + fut = runner.get_loop().create_future() + runner.run(fut) + + def test_explicit_close(self): + runner = asyncio.Runner() + loop = runner.get_loop() + runner.close() + with self.assertRaisesRegex( + RuntimeError, + "Runner is closed" + ): + runner.get_loop() + + self.assertTrue(loop.is_closed()) + + def test_double_close(self): + runner = asyncio.Runner() + loop = runner.get_loop() + + runner.close() + self.assertTrue(loop.is_closed()) + + # the second call is no-op + runner.close() + self.assertTrue(loop.is_closed()) + + def test_second_with_block_raises(self): + ret = [] + + async def f(arg): + ret.append(arg) + + runner = asyncio.Runner() + with runner: + runner.run(f(1)) + + with self.assertRaisesRegex( + RuntimeError, + "Runner is closed" + ): + with runner: + runner.run(f(2)) + + self.assertEqual([1], ret) + + def test_run_keeps_context(self): + cvar = contextvars.ContextVar("cvar", default=-1) + + async def f(val): + old = cvar.get() + await asyncio.sleep(0) + cvar.set(val) + return old + + async def get_context(): + return contextvars.copy_context() + + with asyncio.Runner() as runner: + self.assertEqual(-1, runner.run(f(1))) + self.assertEqual(1, runner.run(f(2))) + + self.assertEqual(2, runner.run(get_context()).get(cvar)) + + def test_recursine_run(self): + async def g(): + pass + + async def f(): + runner.run(g()) + + with asyncio.Runner() as runner: + with self.assertWarnsRegex( + RuntimeWarning, + "coroutine .+ was never awaited", + ): + with self.assertRaisesRegex( + RuntimeError, + re.escape( + "Runner.run() cannot be called from a running event loop" + ), + ): + runner.run(f()) + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index cc9dd17cd0924f..14fa6dd76f9ca8 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -181,20 +181,6 @@ def test_kill(self): else: self.assertEqual(-signal.SIGKILL, returncode) - def test_kill_issue43884(self): - blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(10000)"' - proc = self.loop.run_until_complete( - asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE) - ) - self.loop.run_until_complete(asyncio.sleep(1)) - proc.kill() - returncode = self.loop.run_until_complete(proc.wait()) - if sys.platform == 'win32': - self.assertIsInstance(returncode, int) - # expect 1 but sometimes get 0 - else: - self.assertEqual(-signal.SIGKILL, returncode) - def test_terminate(self): args = PROGRAM_BLOCKED proc = self.loop.run_until_complete( diff --git a/Lib/test/test_future.py b/Lib/test/test_future.py index 5a3944e69e640e..c3e8420d6672c5 100644 --- a/Lib/test/test_future.py +++ b/Lib/test/test_future.py @@ -175,7 +175,7 @@ def _exec_future(self, code): scope = {} exec( "from __future__ import annotations\n" - + code, {}, scope + + code, scope ) return scope @@ -287,10 +287,11 @@ def test_annotations(self): eq("list[str]") eq("dict[str, int]") eq("set[str,]") + eq("tuple[()]") eq("tuple[str, ...]") - eq("tuple[(str, *types)]") + eq("tuple[str, *types]") eq("tuple[str, int, (str, int)]") - eq("tuple[(*int, str, str, (str, int))]") + eq("tuple[*int, str, str, (str, int)]") eq("tuple[str, int, float, dict[str, int]]") eq("slice[0]") eq("slice[0:1]") @@ -305,6 +306,21 @@ def test_annotations(self): eq("slice[1:2, 1]") eq("slice[1:2, 2, 3]") eq("slice[()]") + # Note that `slice[*Ts]`, `slice[*Ts,]`, and `slice[(*Ts,)]` all have + # the same AST, but only `slice[*Ts,]` passes this test, because that's + # what the unparser produces. + eq("slice[*Ts,]") + eq("slice[1, *Ts]") + eq("slice[*Ts, 2]") + eq("slice[1, *Ts, 2]") + eq("slice[*Ts, *Ts]") + eq("slice[1, *Ts, *Ts]") + eq("slice[*Ts, 1, *Ts]") + eq("slice[*Ts, *Ts, 1]") + eq("slice[1, *Ts, *Ts, 2]") + eq("slice[1:2, *Ts]") + eq("slice[*Ts, 1:2]") + eq("slice[1:2, *Ts, 3:4]") eq("slice[a, b:c, d:e:f]") eq("slice[(x for x in a)]") eq('str or None if sys.version_info[0] > (3,) else str or bytes or None') @@ -403,6 +419,25 @@ def foo(): def bar(arg: (yield)): pass """)) + def test_get_type_hints_on_func_with_variadic_arg(self): + # `typing.get_type_hints` might break on a function with a variadic + # annotation (e.g. `f(*args: *Ts)`) if `from __future__ import + # annotations`, because it could try to evaluate `*Ts` as an expression, + # which on its own isn't value syntax. + namespace = self._exec_future(dedent("""\ + class StarredC: pass + class C: + def __iter__(self): + yield StarredC() + c = C() + def f(*args: *c): pass + import typing + hints = typing.get_type_hints(f) + """)) + + hints = namespace.pop('hints') + self.assertIsInstance(hints['args'], namespace['StarredC']) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 5f72a6dd5871ae..f6f5977df2a1ee 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -5050,9 +5050,10 @@ def test_find_caller_with_stack_info(self): def test_find_caller_with_stacklevel(self): the_level = 1 + trigger = self.logger.warning def innermost(): - self.logger.warning('test', stacklevel=the_level) + trigger('test', stacklevel=the_level) def inner(): innermost() @@ -5074,6 +5075,13 @@ def outer(): self.assertEqual(records[-1].funcName, 'outer') self.assertGreater(records[-1].lineno, lineno) lineno = records[-1].lineno + root_logger = logging.getLogger() + root_logger.addHandler(self.recording) + trigger = logging.warning + outer() + self.assertEqual(records[-1].funcName, 'outer') + root_logger.removeHandler(self.recording) + trigger = self.logger.warning the_level += 1 outer() self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') diff --git a/Lib/test/test_pep646_syntax.py b/Lib/test/test_pep646_syntax.py new file mode 100644 index 00000000000000..3ffa82dc55fa23 --- /dev/null +++ b/Lib/test/test_pep646_syntax.py @@ -0,0 +1,326 @@ +doctests = """ + +Setup + + >>> class AClass: + ... def __init__(self): + ... self._setitem_name = None + ... self._setitem_val = None + ... self._delitem_name = None + ... def __setitem__(self, name, val): + ... self._delitem_name = None + ... self._setitem_name = name + ... self._setitem_val = val + ... def __repr__(self): + ... if self._setitem_name is not None: + ... return f"A[{self._setitem_name}]={self._setitem_val}" + ... elif self._delitem_name is not None: + ... return f"delA[{self._delitem_name}]" + ... def __getitem__(self, name): + ... return ParameterisedA(name) + ... def __delitem__(self, name): + ... self._setitem_name = None + ... self._delitem_name = name + ... + >>> class ParameterisedA: + ... def __init__(self, name): + ... self._name = name + ... def __repr__(self): + ... return f"A[{self._name}]" + ... def __iter__(self): + ... for p in self._name: + ... yield p + >>> class B: + ... def __iter__(self): + ... yield StarredB() + ... def __repr__(self): + ... return "B" + >>> class StarredB: + ... def __repr__(self): + ... return "StarredB" + >>> A = AClass() + >>> b = B() + +Slices that are supposed to work, starring our custom B class + + >>> A[*b] + A[(StarredB,)] + >>> A[*b] = 1; A + A[(StarredB,)]=1 + >>> del A[*b]; A + delA[(StarredB,)] + + >>> A[*b, *b] + A[(StarredB, StarredB)] + >>> A[*b, *b] = 1; A + A[(StarredB, StarredB)]=1 + >>> del A[*b, *b]; A + delA[(StarredB, StarredB)] + + >>> A[b, *b] + A[(B, StarredB)] + >>> A[b, *b] = 1; A + A[(B, StarredB)]=1 + >>> del A[b, *b]; A + delA[(B, StarredB)] + + >>> A[*b, b] + A[(StarredB, B)] + >>> A[*b, b] = 1; A + A[(StarredB, B)]=1 + >>> del A[*b, b]; A + delA[(StarredB, B)] + + >>> A[b, b, *b] + A[(B, B, StarredB)] + >>> A[b, b, *b] = 1; A + A[(B, B, StarredB)]=1 + >>> del A[b, b, *b]; A + delA[(B, B, StarredB)] + + >>> A[*b, b, b] + A[(StarredB, B, B)] + >>> A[*b, b, b] = 1; A + A[(StarredB, B, B)]=1 + >>> del A[*b, b, b]; A + delA[(StarredB, B, B)] + + >>> A[b, *b, b] + A[(B, StarredB, B)] + >>> A[b, *b, b] = 1; A + A[(B, StarredB, B)]=1 + >>> del A[b, *b, b]; A + delA[(B, StarredB, B)] + + >>> A[b, b, *b, b] + A[(B, B, StarredB, B)] + >>> A[b, b, *b, b] = 1; A + A[(B, B, StarredB, B)]=1 + >>> del A[b, b, *b, b]; A + delA[(B, B, StarredB, B)] + + >>> A[b, *b, b, b] + A[(B, StarredB, B, B)] + >>> A[b, *b, b, b] = 1; A + A[(B, StarredB, B, B)]=1 + >>> del A[b, *b, b, b]; A + delA[(B, StarredB, B, B)] + + >>> A[A[b, *b, b]] + A[A[(B, StarredB, B)]] + >>> A[A[b, *b, b]] = 1; A + A[A[(B, StarredB, B)]]=1 + >>> del A[A[b, *b, b]]; A + delA[A[(B, StarredB, B)]] + + >>> A[*A[b, *b, b]] + A[(B, StarredB, B)] + >>> A[*A[b, *b, b]] = 1; A + A[(B, StarredB, B)]=1 + >>> del A[*A[b, *b, b]]; A + delA[(B, StarredB, B)] + + >>> A[b, ...] + A[(B, Ellipsis)] + >>> A[b, ...] = 1; A + A[(B, Ellipsis)]=1 + >>> del A[b, ...]; A + delA[(B, Ellipsis)] + + >>> A[*A[b, ...]] + A[(B, Ellipsis)] + >>> A[*A[b, ...]] = 1; A + A[(B, Ellipsis)]=1 + >>> del A[*A[b, ...]]; A + delA[(B, Ellipsis)] + +Slices that are supposed to work, starring a list + + >>> l = [1, 2, 3] + + >>> A[*l] + A[(1, 2, 3)] + >>> A[*l] = 1; A + A[(1, 2, 3)]=1 + >>> del A[*l]; A + delA[(1, 2, 3)] + + >>> A[*l, 4] + A[(1, 2, 3, 4)] + >>> A[*l, 4] = 1; A + A[(1, 2, 3, 4)]=1 + >>> del A[*l, 4]; A + delA[(1, 2, 3, 4)] + + >>> A[0, *l] + A[(0, 1, 2, 3)] + >>> A[0, *l] = 1; A + A[(0, 1, 2, 3)]=1 + >>> del A[0, *l]; A + delA[(0, 1, 2, 3)] + + >>> A[1:2, *l] + A[(slice(1, 2, None), 1, 2, 3)] + >>> A[1:2, *l] = 1; A + A[(slice(1, 2, None), 1, 2, 3)]=1 + >>> del A[1:2, *l]; A + delA[(slice(1, 2, None), 1, 2, 3)] + + >>> repr(A[1:2, *l]) == repr(A[1:2, 1, 2, 3]) + True + +Slices that are supposed to work, starring a tuple + + >>> t = (1, 2, 3) + + >>> A[*t] + A[(1, 2, 3)] + >>> A[*t] = 1; A + A[(1, 2, 3)]=1 + >>> del A[*t]; A + delA[(1, 2, 3)] + + >>> A[*t, 4] + A[(1, 2, 3, 4)] + >>> A[*t, 4] = 1; A + A[(1, 2, 3, 4)]=1 + >>> del A[*t, 4]; A + delA[(1, 2, 3, 4)] + + >>> A[0, *t] + A[(0, 1, 2, 3)] + >>> A[0, *t] = 1; A + A[(0, 1, 2, 3)]=1 + >>> del A[0, *t]; A + delA[(0, 1, 2, 3)] + + >>> A[1:2, *t] + A[(slice(1, 2, None), 1, 2, 3)] + >>> A[1:2, *t] = 1; A + A[(slice(1, 2, None), 1, 2, 3)]=1 + >>> del A[1:2, *t]; A + delA[(slice(1, 2, None), 1, 2, 3)] + + >>> repr(A[1:2, *t]) == repr(A[1:2, 1, 2, 3]) + True + +Starring an expression (rather than a name) in a slice + + >>> def returns_list(): + ... return [1, 2, 3] + + >>> A[returns_list()] + A[[1, 2, 3]] + >>> A[returns_list()] = 1; A + A[[1, 2, 3]]=1 + >>> del A[returns_list()]; A + delA[[1, 2, 3]] + + >>> A[returns_list(), 4] + A[([1, 2, 3], 4)] + >>> A[returns_list(), 4] = 1; A + A[([1, 2, 3], 4)]=1 + >>> del A[returns_list(), 4]; A + delA[([1, 2, 3], 4)] + + >>> A[*returns_list()] + A[(1, 2, 3)] + >>> A[*returns_list()] = 1; A + A[(1, 2, 3)]=1 + >>> del A[*returns_list()]; A + delA[(1, 2, 3)] + + >>> A[*returns_list(), 4] + A[(1, 2, 3, 4)] + >>> A[*returns_list(), 4] = 1; A + A[(1, 2, 3, 4)]=1 + >>> del A[*returns_list(), 4]; A + delA[(1, 2, 3, 4)] + + >>> A[0, *returns_list()] + A[(0, 1, 2, 3)] + >>> A[0, *returns_list()] = 1; A + A[(0, 1, 2, 3)]=1 + >>> del A[0, *returns_list()]; A + delA[(0, 1, 2, 3)] + + >>> A[*returns_list(), *returns_list()] + A[(1, 2, 3, 1, 2, 3)] + >>> A[*returns_list(), *returns_list()] = 1; A + A[(1, 2, 3, 1, 2, 3)]=1 + >>> del A[*returns_list(), *returns_list()]; A + delA[(1, 2, 3, 1, 2, 3)] + +Using both a starred object and a start:stop in a slice +(See also tests in test_syntax confirming that starring *inside* a start:stop +is *not* valid syntax.) + + >>> A[1:2, *b] + A[(slice(1, 2, None), StarredB)] + >>> A[*b, 1:2] + A[(StarredB, slice(1, 2, None))] + >>> A[1:2, *b, 1:2] + A[(slice(1, 2, None), StarredB, slice(1, 2, None))] + >>> A[*b, 1:2, *b] + A[(StarredB, slice(1, 2, None), StarredB)] + + >>> A[1:, *b] + A[(slice(1, None, None), StarredB)] + >>> A[*b, 1:] + A[(StarredB, slice(1, None, None))] + >>> A[1:, *b, 1:] + A[(slice(1, None, None), StarredB, slice(1, None, None))] + >>> A[*b, 1:, *b] + A[(StarredB, slice(1, None, None), StarredB)] + + >>> A[:1, *b] + A[(slice(None, 1, None), StarredB)] + >>> A[*b, :1] + A[(StarredB, slice(None, 1, None))] + >>> A[:1, *b, :1] + A[(slice(None, 1, None), StarredB, slice(None, 1, None))] + >>> A[*b, :1, *b] + A[(StarredB, slice(None, 1, None), StarredB)] + + >>> A[:, *b] + A[(slice(None, None, None), StarredB)] + >>> A[*b, :] + A[(StarredB, slice(None, None, None))] + >>> A[:, *b, :] + A[(slice(None, None, None), StarredB, slice(None, None, None))] + >>> A[*b, :, *b] + A[(StarredB, slice(None, None, None), StarredB)] + +*args annotated as starred expression + + >>> def f1(*args: *b): pass + >>> f1.__annotations__ + {'args': StarredB} + + >>> def f2(*args: *b, arg1): pass + >>> f2.__annotations__ + {'args': StarredB} + + >>> def f3(*args: *b, arg1: int): pass + >>> f3.__annotations__ + {'args': StarredB, 'arg1': } + + >>> def f4(*args: *b, arg1: int = 2): pass + >>> f4.__annotations__ + {'args': StarredB, 'arg1': } + + >>> def f5(*args: *b = (1,)): pass + Traceback (most recent call last): + ... + SyntaxError: invalid syntax +""" + +__test__ = {'doctests' : doctests} + +def test_main(verbose=False): + from test import support + from test import test_pep646_syntax + support.run_doctest(test_pep646_syntax, verbose) + +if __name__ == "__main__": + test_main(verbose=True) diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py index 5f16a159f13d35..3e79ebfc683514 100644 --- a/Lib/test/test_syntax.py +++ b/Lib/test/test_syntax.py @@ -1622,6 +1622,149 @@ ... ... Traceback (most recent call last): SyntaxError: positional patterns follow keyword patterns + +Uses of the star operator which should fail: + +A[:*b] + + >>> A[:*b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[:(*b)] + Traceback (most recent call last): + ... + SyntaxError: cannot use starred expression here + >>> A[:*b] = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> del A[:*b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[*b:] + + >>> A[*b:] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[(*b):] + Traceback (most recent call last): + ... + SyntaxError: cannot use starred expression here + >>> A[*b:] = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> del A[*b:] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[*b:*b] + + >>> A[*b:*b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[(*b:*b)] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[*b:*b] = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> del A[*b:*b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[*(1:2)] + + >>> A[*(1:2)] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[*(1:2)] = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> del A[*(1:2)] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[*:] and A[:*] + + >>> A[*:] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[:*] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[*] + + >>> A[*] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[**] + + >>> A[**] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +A[**b] + + >>> A[**b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> A[**b] = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> del A[**b] + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +def f(x: *b) + + >>> def f6(x: *b): pass + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> def f7(x: *b = 1): pass + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +**kwargs: *a + + >>> def f8(**kwargs: *a): pass + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + +x: *b + + >>> x: *b + Traceback (most recent call last): + ... + SyntaxError: invalid syntax + >>> x: *b = 1 + Traceback (most recent call last): + ... + SyntaxError: invalid syntax """ import re diff --git a/Lib/test/test_unparse.py b/Lib/test/test_unparse.py index f5be13aa94a64c..e38b33574ccccf 100644 --- a/Lib/test/test_unparse.py +++ b/Lib/test/test_unparse.py @@ -344,7 +344,17 @@ def test_slices(self): self.check_ast_roundtrip("a[i]") self.check_ast_roundtrip("a[i,]") self.check_ast_roundtrip("a[i, j]") + # The AST for these next two both look like `a[(*a,)]` self.check_ast_roundtrip("a[(*a,)]") + self.check_ast_roundtrip("a[*a]") + self.check_ast_roundtrip("a[b, *a]") + self.check_ast_roundtrip("a[*a, c]") + self.check_ast_roundtrip("a[b, *a, c]") + self.check_ast_roundtrip("a[*a, *a]") + self.check_ast_roundtrip("a[b, *a, *a]") + self.check_ast_roundtrip("a[*a, b, *a]") + self.check_ast_roundtrip("a[*a, *a, b]") + self.check_ast_roundtrip("a[b, *a, *a, c]") self.check_ast_roundtrip("a[(a:=b)]") self.check_ast_roundtrip("a[(a:=b,c)]") self.check_ast_roundtrip("a[()]") @@ -543,9 +553,23 @@ def test_unary_op_factor(self): self.check_src_roundtrip(f"{prefix} 1") def test_slices(self): + self.check_src_roundtrip("a[()]") self.check_src_roundtrip("a[1]") self.check_src_roundtrip("a[1, 2]") - self.check_src_roundtrip("a[(1, *a)]") + # Note that `a[*a]`, `a[*a,]`, and `a[(*a,)]` all evaluate to the same + # thing at runtime and have the same AST, but only `a[*a,]` passes + # this test, because that's what `ast.unparse` produces. + self.check_src_roundtrip("a[*a,]") + self.check_src_roundtrip("a[1, *a]") + self.check_src_roundtrip("a[*a, 2]") + self.check_src_roundtrip("a[1, *a, 2]") + self.check_src_roundtrip("a[*a, *a]") + self.check_src_roundtrip("a[1, *a, *a]") + self.check_src_roundtrip("a[*a, 1, *a]") + self.check_src_roundtrip("a[*a, *a, 1]") + self.check_src_roundtrip("a[1, *a, *a, 2]") + self.check_src_roundtrip("a[1:2, *a]") + self.check_src_roundtrip("a[*a, 1:2]") def test_lambda_parameters(self): self.check_src_roundtrip("lambda: something") diff --git a/Lib/typing.py b/Lib/typing.py index 64b348e0b9d5ca..36f9eceb38c7cf 100644 --- a/Lib/typing.py +++ b/Lib/typing.py @@ -734,10 +734,19 @@ class ForwardRef(_Final, _root=True): def __init__(self, arg, is_argument=True, module=None, *, is_class=False): if not isinstance(arg, str): raise TypeError(f"Forward reference must be a string -- got {arg!r}") + + # If we do `def f(*args: *Ts)`, then we'll have `arg = '*Ts'`. + # Unfortunately, this isn't a valid expression on its own, so we + # do the unpacking manually. + if arg[0] == '*': + arg_to_compile = f'({arg},)[0]' # E.g. (*Ts,)[0] + else: + arg_to_compile = arg try: - code = compile(arg, '', 'eval') + code = compile(arg_to_compile, '', 'eval') except SyntaxError: raise SyntaxError(f"Forward reference must be an expression -- got {arg!r}") + self.__forward_arg__ = arg self.__forward_code__ = code self.__forward_evaluated__ = False diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py index 25adc3deff63d1..85b938fb293af1 100644 --- a/Lib/unittest/async_case.py +++ b/Lib/unittest/async_case.py @@ -34,7 +34,7 @@ class IsolatedAsyncioTestCase(TestCase): def __init__(self, methodName='runTest'): super().__init__(methodName) - self._asyncioTestLoop = None + self._asyncioRunner = None self._asyncioTestContext = contextvars.copy_context() async def asyncSetUp(self): @@ -75,76 +75,44 @@ def _callCleanup(self, function, *args, **kwargs): self._callMaybeAsync(function, *args, **kwargs) def _callAsync(self, func, /, *args, **kwargs): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' + assert self._asyncioRunner is not None, 'asyncio runner is not initialized' assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function' - task = self._asyncioTestLoop.create_task( + return self._asyncioRunner.run( func(*args, **kwargs), - context=self._asyncioTestContext, + context=self._asyncioTestContext ) - return self._asyncioTestLoop.run_until_complete(task) def _callMaybeAsync(self, func, /, *args, **kwargs): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' + assert self._asyncioRunner is not None, 'asyncio runner is not initialized' if inspect.iscoroutinefunction(func): - task = self._asyncioTestLoop.create_task( + return self._asyncioRunner.run( func(*args, **kwargs), context=self._asyncioTestContext, ) - return self._asyncioTestLoop.run_until_complete(task) else: return self._asyncioTestContext.run(func, *args, **kwargs) - def _setupAsyncioLoop(self): - assert self._asyncioTestLoop is None, 'asyncio test loop already initialized' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.set_debug(True) - self._asyncioTestLoop = loop + def _setupAsyncioRunner(self): + assert self._asyncioRunner is None, 'asyncio runner is already initialized' + runner = asyncio.Runner(debug=True) + self._asyncioRunner = runner - def _tearDownAsyncioLoop(self): - assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' - loop = self._asyncioTestLoop - self._asyncioTestLoop = None - - try: - # cancel all tasks - to_cancel = asyncio.all_tasks(loop) - if not to_cancel: - return - - for task in to_cancel: - task.cancel() - - loop.run_until_complete( - asyncio.gather(*to_cancel, return_exceptions=True)) - - for task in to_cancel: - if task.cancelled(): - continue - if task.exception() is not None: - loop.call_exception_handler({ - 'message': 'unhandled exception during test shutdown', - 'exception': task.exception(), - 'task': task, - }) - # shutdown asyncgens - loop.run_until_complete(loop.shutdown_asyncgens()) - finally: - asyncio.set_event_loop(None) - loop.close() + def _tearDownAsyncioRunner(self): + runner = self._asyncioRunner + runner.close() def run(self, result=None): - self._setupAsyncioLoop() + self._setupAsyncioRunner() try: return super().run(result) finally: - self._tearDownAsyncioLoop() + self._tearDownAsyncioRunner() def debug(self): - self._setupAsyncioLoop() + self._setupAsyncioRunner() super().debug() - self._tearDownAsyncioLoop() + self._tearDownAsyncioRunner() def __del__(self): - if self._asyncioTestLoop is not None: - self._tearDownAsyncioLoop() + if self._asyncioRunner is not None: + self._tearDownAsyncioRunner() diff --git a/kill_subprocess.py b/kill_subprocess.py new file mode 100644 index 00000000000000..88277f7faee6d5 --- /dev/null +++ b/kill_subprocess.py @@ -0,0 +1,16 @@ +import asyncio +from time import monotonic + +t = monotonic() +async def test(): + process = await asyncio.create_subprocess_shell( + "sleep 20 && echo done", + stdout=asyncio.subprocess.PIPE, + ) + await asyncio.sleep(1) + process.kill() + await process.wait() + # process._transport.close() + +asyncio.run(test()) +print(monotonic()-t) \ No newline at end of file From a2a3a1dcac03421e05e5dbb1c10d6dc446f038ee Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Mar 2022 10:20:21 +0000 Subject: [PATCH 04/13] rework --- Lib/asyncio/base_subprocess.py | 17 ++++++++++------- Lib/test/test_asyncio/test_subprocess.py | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 14d50519228814..185ece3f316b35 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -215,13 +215,8 @@ def _process_exited(self, returncode): # object. On Python 3.6, it is required to avoid a ResourceWarning. self._proc.returncode = returncode self._call(self._protocol.process_exited) - self._try_finish() + self._try_finish(force=True) - # wake up futures waiting for wait() - for waiter in self._exit_waiters: - if not waiter.cancelled(): - waiter.set_result(returncode) - self._exit_waiters = None async def _wait(self): """Wait until the process exit and return the process return code. @@ -234,10 +229,13 @@ async def _wait(self): self._exit_waiters.append(waiter) return await waiter - def _try_finish(self): + def _try_finish(self, force=False): assert not self._finished if self._returncode is None: return + if force: + for p in self._pipes.values(): + p.pipe.close() if all(p is not None and p.disconnected for p in self._pipes.values()): self._finished = True @@ -250,6 +248,11 @@ def _call_connection_lost(self, exc): self._loop = None self._proc = None self._protocol = None + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(self._returncode) + self._exit_waiters = None class WriteSubprocessPipeProto(protocols.BaseProtocol): diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 14fa6dd76f9ca8..cc9dd17cd0924f 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -181,6 +181,20 @@ def test_kill(self): else: self.assertEqual(-signal.SIGKILL, returncode) + def test_kill_issue43884(self): + blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(10000)"' + proc = self.loop.run_until_complete( + asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE) + ) + self.loop.run_until_complete(asyncio.sleep(1)) + proc.kill() + returncode = self.loop.run_until_complete(proc.wait()) + if sys.platform == 'win32': + self.assertIsInstance(returncode, int) + # expect 1 but sometimes get 0 + else: + self.assertEqual(-signal.SIGKILL, returncode) + def test_terminate(self): args = PROGRAM_BLOCKED proc = self.loop.run_until_complete( From e325b6ccaf97aab834c61411abbd889d48312543 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Mon, 28 Mar 2022 10:20:48 +0000 Subject: [PATCH 05/13] remove script --- kill_subprocess.py | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 kill_subprocess.py diff --git a/kill_subprocess.py b/kill_subprocess.py deleted file mode 100644 index 88277f7faee6d5..00000000000000 --- a/kill_subprocess.py +++ /dev/null @@ -1,16 +0,0 @@ -import asyncio -from time import monotonic - -t = monotonic() -async def test(): - process = await asyncio.create_subprocess_shell( - "sleep 20 && echo done", - stdout=asyncio.subprocess.PIPE, - ) - await asyncio.sleep(1) - process.kill() - await process.wait() - # process._transport.close() - -asyncio.run(test()) -print(monotonic()-t) \ No newline at end of file From 79462673ac15759837790c49c5b007449288c858 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Fri, 27 May 2022 14:49:01 +0000 Subject: [PATCH 06/13] another try --- Lib/asyncio/base_subprocess.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 185ece3f316b35..c2ca4a2792f666 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -215,8 +215,9 @@ def _process_exited(self, returncode): # object. On Python 3.6, it is required to avoid a ResourceWarning. self._proc.returncode = returncode self._call(self._protocol.process_exited) - self._try_finish(force=True) - + for p in self._pipes.values(): + p.pipe.close() + self._try_finish() async def _wait(self): """Wait until the process exit and return the process return code. @@ -229,13 +230,10 @@ async def _wait(self): self._exit_waiters.append(waiter) return await waiter - def _try_finish(self, force=False): + def _try_finish(self): assert not self._finished if self._returncode is None: return - if force: - for p in self._pipes.values(): - p.pipe.close() if all(p is not None and p.disconnected for p in self._pipes.values()): self._finished = True @@ -245,14 +243,14 @@ def _call_connection_lost(self, exc): try: self._protocol.connection_lost(exc) finally: - self._loop = None - self._proc = None - self._protocol = None # wake up futures waiting for wait() for waiter in self._exit_waiters: if not waiter.cancelled(): waiter.set_result(self._returncode) self._exit_waiters = None + self._loop = None + self._proc = None + self._protocol = None class WriteSubprocessPipeProto(protocols.BaseProtocol): From d896e0bb2e7d4d3b9fedff1828667caa375de2ce Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Fri, 8 Jul 2022 14:06:35 +0530 Subject: [PATCH 07/13] skip unless sleep --- Lib/test/test_asyncio/test_subprocess.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 7a304ef0bfb715..89061335ef761a 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -1,4 +1,5 @@ import os +import shutil import signal import sys import unittest @@ -184,8 +185,9 @@ def test_kill(self): else: self.assertEqual(-signal.SIGKILL, returncode) + @unittest.skipUnless(shutil.which('sleep'), 'sleep is not available') def test_kill_issue43884(self): - blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(10000)"' + blocking_shell_command = 'sleep 10000' proc = self.loop.run_until_complete( asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE) ) From b5de3749c2d35e5857a4dfb1da6c8cea7bc18ca8 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Fri, 8 Jul 2022 08:39:36 +0000 Subject: [PATCH 08/13] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst diff --git a/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst b/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst new file mode 100644 index 00000000000000..7d7e960a5a012b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst @@ -0,0 +1 @@ +Fix :mod:`asyncio` subprocess to kill process cleanly when process is blocked and avoid ``RuntimeError`` when loop is closed. Patch by Kumar Aditya. From 8b2cee6adae75c0bcb1768b211a9801d2184663d Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Sat, 1 Oct 2022 23:33:28 +0530 Subject: [PATCH 09/13] fix test on windows hopefully this works --- Lib/test/test_asyncio/test_subprocess.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index cae5a656ae14cb..9300e755abb9f1 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -183,13 +183,21 @@ def test_kill(self): else: self.assertEqual(-signal.SIGKILL, returncode) - @unittest.skipUnless(shutil.which('sleep'), 'sleep is not available') def test_kill_issue43884(self): - blocking_shell_command = 'sleep 10000' + import subprocess + blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(100000000)"' + creationflags = 0 + if sys.platform == 'win32': + # On windows create a new process group so that killing process + # kills the whole process group. + creationflags = subprocess.CREATE_NEW_PROCESS_GROUP proc = self.loop.run_until_complete( - asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE) + asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE, + creationflags=creationflags) ) self.loop.run_until_complete(asyncio.sleep(1)) + if sys.platform == 'win32': + proc.send_signal(signal.CTRL_BREAK_EVENT) proc.kill() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': From 53fbc9a8d2bfc147db364f712f145d1ef63f4559 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Sat, 1 Oct 2022 23:42:41 +0530 Subject: [PATCH 10/13] fix comment --- Lib/test/test_asyncio/test_subprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 9300e755abb9f1..91afa034dd36f4 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -189,7 +189,7 @@ def test_kill_issue43884(self): creationflags = 0 if sys.platform == 'win32': # On windows create a new process group so that killing process - # kills the whole process group. + # kills the process and all its children. creationflags = subprocess.CREATE_NEW_PROCESS_GROUP proc = self.loop.run_until_complete( asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE, From e223141489cd91e2edd83ec38d2e7c7e17fe5738 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 5 Oct 2022 12:12:05 +0530 Subject: [PATCH 11/13] Update Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst Co-authored-by: Guido van Rossum --- .../next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst b/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst index 7d7e960a5a012b..43c0765d940d3a 100644 --- a/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst +++ b/Misc/NEWS.d/next/Library/2022-07-08-08-39-35.gh-issue-88050.0aOC_m.rst @@ -1 +1 @@ -Fix :mod:`asyncio` subprocess to kill process cleanly when process is blocked and avoid ``RuntimeError`` when loop is closed. Patch by Kumar Aditya. +Fix :mod:`asyncio` subprocess transport to kill process cleanly when process is blocked and avoid ``RuntimeError`` when loop is closed. Patch by Kumar Aditya. From 69a9fda6eb1efdfd1916458cb3e5462c777508c2 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 5 Oct 2022 06:53:19 +0000 Subject: [PATCH 12/13] fixup test as Guido suggested --- Lib/test/test_asyncio/test_subprocess.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 91afa034dd36f4..ae042e1af14705 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -184,13 +184,13 @@ def test_kill(self): self.assertEqual(-signal.SIGKILL, returncode) def test_kill_issue43884(self): - import subprocess blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(100000000)"' creationflags = 0 if sys.platform == 'win32': + from subprocess import CREATE_NEW_PROCESS_GROUP # On windows create a new process group so that killing process # kills the process and all its children. - creationflags = subprocess.CREATE_NEW_PROCESS_GROUP + creationflags = CREATE_NEW_PROCESS_GROUP proc = self.loop.run_until_complete( asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE, creationflags=creationflags) @@ -198,7 +198,8 @@ def test_kill_issue43884(self): self.loop.run_until_complete(asyncio.sleep(1)) if sys.platform == 'win32': proc.send_signal(signal.CTRL_BREAK_EVENT) - proc.kill() + else: + proc.kill() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int) From ba0382d827ff8d1ac0d7748f7d641f4868b9d299 Mon Sep 17 00:00:00 2001 From: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Date: Wed, 5 Oct 2022 13:06:01 +0530 Subject: [PATCH 13/13] fix killing on windows --- Lib/test/test_asyncio/test_subprocess.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ae042e1af14705..9bc60b9dc2ae2e 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -198,8 +198,8 @@ def test_kill_issue43884(self): self.loop.run_until_complete(asyncio.sleep(1)) if sys.platform == 'win32': proc.send_signal(signal.CTRL_BREAK_EVENT) - else: - proc.kill() + # On windows it is an alias of terminate which sets the return code + proc.kill() returncode = self.loop.run_until_complete(proc.wait()) if sys.platform == 'win32': self.assertIsInstance(returncode, int)