Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix run_process() and open_process().__aexit__ leaking an orphan process when cancelled #672

Merged
merged 9 commits into from
Jan 25, 2024
12 changes: 12 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed passing ``total_tokens`` to ``anyio.CapacityLimiter()`` as a keyword argument
not working on the ``trio`` backend
(`#515 <https://github.com/agronholm/anyio/issues/515>`_)
- Fixed ``Process.aclose()`` not performing the minimum level of necessary cleanup when
cancelled. Previously:

- Cancellation of ``Process.aclose()`` could leak an orphan process
- Cancellation of ``run_process()`` could very briefly leak an orphan process.
- Cancellation of ``Process.aclose()`` or ``run_process()`` on Trio could leave
standard streams unclosed

(PR by Ganden Schaffner)
- Fixed ``Process.stdin.aclose()``, ``Process.stdout.aclose()``, and
``Process.stderr.aclose()`` not including a checkpoint on asyncio (PR by Ganden
Schaffner)

**4.2.0**

Expand Down
26 changes: 18 additions & 8 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:

async def aclose(self) -> None:
self._stream.feed_eof()
await AsyncIOBackend.checkpoint()


@dataclass(eq=False)
Expand All @@ -930,6 +931,7 @@ async def send(self, item: bytes) -> None:

async def aclose(self) -> None:
self._stream.close()
await AsyncIOBackend.checkpoint()


@dataclass(eq=False)
Expand All @@ -940,14 +942,22 @@ class Process(abc.Process):
_stderr: StreamReaderWrapper | None

async def aclose(self) -> None:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

await self.wait()
with CancelScope(shield=True):
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

try:
await self.wait()
except BaseException:
self.kill()
with CancelScope(shield=True):
await self.wait()

raise
agronholm marked this conversation as resolved.
Show resolved Hide resolved

async def wait(self) -> int:
return await self._process.wait()
Expand Down
23 changes: 15 additions & 8 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,21 @@ class Process(abc.Process):
_stderr: abc.ByteReceiveStream | None

async def aclose(self) -> None:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

await self.wait()
with CancelScope(shield=True):
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

try:
await self.wait()
except BaseException:
self.kill()
with CancelScope(shield=True):
await self.wait()
raise

async def wait(self) -> int:
return await self._process.wait()
Expand Down
26 changes: 12 additions & 14 deletions src/anyio/_core/_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,18 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
start_new_session=start_new_session,
) as process:
stream_contents: list[bytes | None] = [None, None]
try:
async with create_task_group() as tg:
if process.stdout:
tg.start_soon(drain_stream, process.stdout, 0)
if process.stderr:
tg.start_soon(drain_stream, process.stderr, 1)
if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()
except BaseException:
process.kill()
raise
async with create_task_group() as tg:
if process.stdout:
tg.start_soon(drain_stream, process.stdout, 0)

if process.stderr:
tg.start_soon(drain_stream, process.stderr, 1)

if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()

output, errors = stream_contents
if check and process.returncode != 0:
Expand Down
52 changes: 51 additions & 1 deletion tests/test_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import pytest

from anyio import open_process, run_process
from anyio import CancelScope, ClosedResourceError, open_process, run_process
from anyio.streams.buffered import BufferedByteReceiveStream

pytestmark = pytest.mark.anyio
Expand Down Expand Up @@ -176,3 +176,53 @@ async def test_run_process_inherit_stdout(capfd: pytest.CaptureFixture[str]) ->
out, err = capfd.readouterr()
assert out == "stdout-text" + os.linesep
assert err == "stderr-text" + os.linesep


async def test_process_aexit_cancellation_doesnt_orphan_process() -> None:
"""
Regression test for #669.

Ensures that open_process.__aexit__() doesn't leave behind an orphan process when
cancelled.

"""
with CancelScope() as scope:
async with await open_process(
[sys.executable, "-c", "import time; time.sleep(1)"]
) as process:
scope.cancel()

assert process.returncode is not None
assert process.returncode != 0


@pytest.mark.xfail("#671 needs to be resolved first")
agronholm marked this conversation as resolved.
Show resolved Hide resolved
async def test_process_aexit_cancellation_closes_standard_streams() -> None:
"""
Regression test for #669.

Ensures that open_process.__aexit__() closes standard streams when cancelled. Also
ensures that process.std{in.send,{out,err}.receive}() raise ClosedResourceError on a
closed stream.

"""
with CancelScope() as scope:
async with await open_process(
[sys.executable, "-c", "import time; time.sleep(1)"]
) as process:
scope.cancel()

assert process.stdin is not None

with pytest.raises(ClosedResourceError):
await process.stdin.send(b"foo")

assert process.stdout is not None

with pytest.raises(ClosedResourceError):
await process.stdout.receive(1)

assert process.stderr is not None

with pytest.raises(ClosedResourceError):
await process.stderr.receive(1)
Loading