diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 5d39e459..db3dcb62 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -10,6 +10,18 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed passing ``total_tokens`` to ``anyio.CapacityLimiter()`` as a keyword argument not working on the ``trio`` backend (`#515 `_) +- Fixed ``Process.aclose()`` not performing the minimum level of necessary cleanup when + cancelled. Previously: + + - Cancellation of ``Process.aclose()`` could leak an orphan process + - Cancellation of ``run_process()`` could very briefly leak an orphan process. + - Cancellation of ``Process.aclose()`` or ``run_process()`` on Trio could leave + standard streams unclosed + + (PR by Ganden Schaffner) +- Fixed ``Process.stdin.aclose()``, ``Process.stdout.aclose()``, and + ``Process.stderr.aclose()`` not including a checkpoint on asyncio (PR by Ganden + Schaffner) **4.2.0** diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index e884f564..2699bf81 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -918,6 +918,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes: async def aclose(self) -> None: self._stream.feed_eof() + await AsyncIOBackend.checkpoint() @dataclass(eq=False) @@ -930,6 +931,7 @@ async def send(self, item: bytes) -> None: async def aclose(self) -> None: self._stream.close() + await AsyncIOBackend.checkpoint() @dataclass(eq=False) @@ -940,14 +942,22 @@ class Process(abc.Process): _stderr: StreamReaderWrapper | None async def aclose(self) -> None: - if self._stdin: - await self._stdin.aclose() - if self._stdout: - await self._stdout.aclose() - if self._stderr: - await self._stderr.aclose() - - await self.wait() + with CancelScope(shield=True): + if self._stdin: + await self._stdin.aclose() + if self._stdout: + await self._stdout.aclose() + if self._stderr: + await self._stderr.aclose() + + try: + await self.wait() + except BaseException: + self.kill() + with CancelScope(shield=True): + await self.wait() + + raise async def wait(self) -> int: return await self._process.wait() diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 13b960f2..1a47192e 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -283,14 +283,21 @@ class Process(abc.Process): _stderr: abc.ByteReceiveStream | None async def aclose(self) -> None: - if self._stdin: - await self._stdin.aclose() - if self._stdout: - await self._stdout.aclose() - if self._stderr: - await self._stderr.aclose() - - await self.wait() + with CancelScope(shield=True): + if self._stdin: + await self._stdin.aclose() + if self._stdout: + await self._stdout.aclose() + if self._stderr: + await self._stderr.aclose() + + try: + await self.wait() + except BaseException: + self.kill() + with CancelScope(shield=True): + await self.wait() + raise async def wait(self) -> int: return await self._process.wait() diff --git a/src/anyio/_core/_subprocesses.py b/src/anyio/_core/_subprocesses.py index c1610296..5d5d7b76 100644 --- a/src/anyio/_core/_subprocesses.py +++ b/src/anyio/_core/_subprocesses.py @@ -65,20 +65,18 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: start_new_session=start_new_session, ) as process: stream_contents: list[bytes | None] = [None, None] - try: - async with create_task_group() as tg: - if process.stdout: - tg.start_soon(drain_stream, process.stdout, 0) - if process.stderr: - tg.start_soon(drain_stream, process.stderr, 1) - if process.stdin and input: - await process.stdin.send(input) - await process.stdin.aclose() - - await process.wait() - except BaseException: - process.kill() - raise + async with create_task_group() as tg: + if process.stdout: + tg.start_soon(drain_stream, process.stdout, 0) + + if process.stderr: + tg.start_soon(drain_stream, process.stderr, 1) + + if process.stdin and input: + await process.stdin.send(input) + await process.stdin.aclose() + + await process.wait() output, errors = stream_contents if check and process.returncode != 0: diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py index 640384c0..cc54fc5e 100644 --- a/tests/test_subprocesses.py +++ b/tests/test_subprocesses.py @@ -8,8 +8,9 @@ from textwrap import dedent import pytest +from _pytest.fixtures import FixtureRequest -from anyio import open_process, run_process +from anyio import CancelScope, ClosedResourceError, open_process, run_process from anyio.streams.buffered import BufferedByteReceiveStream pytestmark = pytest.mark.anyio @@ -176,3 +177,61 @@ async def test_run_process_inherit_stdout(capfd: pytest.CaptureFixture[str]) -> out, err = capfd.readouterr() assert out == "stdout-text" + os.linesep assert err == "stderr-text" + os.linesep + + +async def test_process_aexit_cancellation_doesnt_orphan_process() -> None: + """ + Regression test for #669. + + Ensures that open_process.__aexit__() doesn't leave behind an orphan process when + cancelled. + + """ + with CancelScope() as scope: + async with await open_process( + [sys.executable, "-c", "import time; time.sleep(1)"] + ) as process: + scope.cancel() + + assert process.returncode is not None + assert process.returncode != 0 + + +async def test_process_aexit_cancellation_closes_standard_streams( + request: FixtureRequest, + anyio_backend_name: str, +) -> None: + """ + Regression test for #669. + + Ensures that open_process.__aexit__() closes standard streams when cancelled. Also + ensures that process.std{in.send,{out,err}.receive}() raise ClosedResourceError on a + closed stream. + + """ + if anyio_backend_name == "asyncio": + # Avoid pytest.xfail here due to https://github.com/pytest-dev/pytest/issues/9027 + request.node.add_marker( + pytest.mark.xfail(reason="#671 needs to be resolved first") + ) + + with CancelScope() as scope: + async with await open_process( + [sys.executable, "-c", "import time; time.sleep(1)"] + ) as process: + scope.cancel() + + assert process.stdin is not None + + with pytest.raises(ClosedResourceError): + await process.stdin.send(b"foo") + + assert process.stdout is not None + + with pytest.raises(ClosedResourceError): + await process.stdout.receive(1) + + assert process.stderr is not None + + with pytest.raises(ClosedResourceError): + await process.stderr.receive(1) diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index f4c87b39..bc4a289d 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -185,6 +185,9 @@ async def taskfunc(*, task_status: TaskStatus) -> None: assert not finished +@pytest.mark.xfail( + sys.version_info < (3, 9), reason="Requires a way to detect cancellation source" +) @pytest.mark.parametrize("anyio_backend", ["asyncio"]) async def test_start_native_host_cancelled() -> None: started = finished = False @@ -199,9 +202,6 @@ async def start_another() -> None: async with create_task_group() as tg: await tg.start(taskfunc) - if sys.version_info < (3, 9): - pytest.xfail("Requires a way to detect cancellation source") - task = asyncio.get_running_loop().create_task(start_another()) await wait_all_tasks_blocked() task.cancel()