From b1f4f3d9e37119813a2c513795b1943556ce6bd6 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sun, 17 Jul 2022 10:48:40 -0400 Subject: [PATCH 1/6] Add test for preexec_fn fd double close issue --- tests/test_process.py | 65 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/tests/test_process.py b/tests/test_process.py index 357d6f0a..868293e0 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -7,6 +7,7 @@ import subprocess import sys import tempfile +import textwrap import time import unittest @@ -796,7 +797,69 @@ async def test(): class Test_UV_Process(_TestProcess, tb.UVTestCase): - pass + def test_process_preexec_fn_double_close(self): + script = textwrap.dedent(""" + import os + import sys + import threading + import queue + import concurrent.futures + + pid = os.getpid() + q = queue.Queue() + evt = threading.Event() + r, w = os.pipe() + pipe = os.pipe + close = os.close + + + def mock_pipe(): + rv = pipe() + q.put(rv[1]) + return rv + + + def mock_close(fd): + close(fd) + if os.getpid() == pid: + q.put(fd) + evt.wait() + + + os.pipe = mock_pipe + os.close = mock_close + + import asyncio + import uvloop + + uvloop.install() + + + def thread(): + fd = q.get() + while True: + fd_close = q.get() + if fd == fd_close: + os.dup2(r, fd) + evt.set() + break + while os.read(fd, 32) != b"exit": + pass + + + async def test(): + await asyncio.create_subprocess_exec( + sys.executable, "-c", "pass", preexec_fn=lambda: True + ) + os.write(w, b"exit") + + + with concurrent.futures.ThreadPoolExecutor() as executor: + fut = executor.submit(thread) + asyncio.get_event_loop().run_until_complete(test()) + fut.result() + """) + subprocess.check_call([sys.executable, '-c', script]) class Test_AIO_Process(_TestProcess, tb.AIOTestCase): From 757e39c037e5e2b4b1e74d8c734cdf3abe3592f2 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sun, 17 Jul 2022 10:40:56 -0500 Subject: [PATCH 2/6] Update tests/test_process.py Co-authored-by: Thomas Grainger --- tests/test_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_process.py b/tests/test_process.py index 868293e0..166093f0 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -856,7 +856,7 @@ async def test(): with concurrent.futures.ThreadPoolExecutor() as executor: fut = executor.submit(thread) - asyncio.get_event_loop().run_until_complete(test()) + asyncio.run(test()) fut.result() """) subprocess.check_call([sys.executable, '-c', script]) From 730dd1a22bfc434b9fc5bedf3f61bcefca5842e2 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 19 Jul 2022 19:26:59 +0100 Subject: [PATCH 3/6] use a stack of self._fds_to_close to prevent double closes and make tests easier to write because the close order is deterministic and in the order that opens happen in this should also be a bit faster because list.append is faster than set.add and we skip a call to os_close(-1) and catching an OSError exception --- tests/test_process.py | 91 +++++++++++++++++--------------------- uvloop/handles/process.pxd | 2 +- uvloop/handles/process.pyx | 23 ++++------ 3 files changed, 49 insertions(+), 67 deletions(-) diff --git a/tests/test_process.py b/tests/test_process.py index 166093f0..775b5f0d 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -797,69 +797,58 @@ async def test(): class Test_UV_Process(_TestProcess, tb.UVTestCase): - def test_process_preexec_fn_double_close(self): + def test_process_double_close(self): script = textwrap.dedent(""" import os import sys - import threading - import queue - import concurrent.futures - - pid = os.getpid() - q = queue.Queue() - evt = threading.Event() - r, w = os.pipe() - pipe = os.pipe - close = os.close - - - def mock_pipe(): - rv = pipe() - q.put(rv[1]) - return rv - - - def mock_close(fd): - close(fd) - if os.getpid() == pid: - q.put(fd) - evt.wait() - - - os.pipe = mock_pipe - os.close = mock_close + from unittest import mock import asyncio - import uvloop - uvloop.install() - - - def thread(): - fd = q.get() - while True: - fd_close = q.get() - if fd == fd_close: - os.dup2(r, fd) - evt.set() - break - while os.read(fd, 32) != b"exit": - pass + pipes = [] + original_os_pipe = os.pipe + def log_pipes(): + pipe = original_os_pipe() + pipes.append(pipe) + return pipe + + dups = [] + original_os_dup = os.dup + def log_dups(*args, **kwargs): + dup = original_os_dup(*args, **kwargs) + dups.append(dup) + return dup + + with mock.patch( + "os.close", wraps=os.close + ) as os_close, mock.patch( + "os.pipe", new=log_pipes + ), mock.patch( + "os.dup", new=log_dups + ): + import uvloop async def test(): - await asyncio.create_subprocess_exec( - sys.executable, "-c", "pass", preexec_fn=lambda: True + proc = await asyncio.create_subprocess_exec( + sys.executable, "-c", "pass" ) - os.write(w, b"exit") - + await proc.communicate() - with concurrent.futures.ThreadPoolExecutor() as executor: - fut = executor.submit(thread) - asyncio.run(test()) - fut.result() + uvloop.install() + asyncio.run(test()) + + stdin, stdout, stderr = dups + (r, w), = pipes + assert os_close.mock_calls == [ + mock.call(w), + mock.call(r), + mock.call(stderr), + mock.call(stdout), + mock.call(stdin), + ] """) - subprocess.check_call([sys.executable, '-c', script]) + subprocess.run([sys.executable, '-c', script], check=True) class Test_AIO_Process(_TestProcess, tb.AIOTestCase): diff --git a/uvloop/handles/process.pxd b/uvloop/handles/process.pxd index 5b69c737..c98b48f1 100644 --- a/uvloop/handles/process.pxd +++ b/uvloop/handles/process.pxd @@ -8,7 +8,7 @@ cdef class UVProcess(UVHandle): object _preexec_fn bint _restore_signals - set _fds_to_close + list _fds_to_close # Attributes used to compose uv_process_options_t: uv.uv_process_options_t options diff --git a/uvloop/handles/process.pyx b/uvloop/handles/process.pyx index 799e2957..d2ca00b9 100644 --- a/uvloop/handles/process.pyx +++ b/uvloop/handles/process.pyx @@ -7,7 +7,7 @@ cdef class UVProcess(UVHandle): self.uv_opt_args = NULL self._returncode = None self._pid = None - self._fds_to_close = set() + self._fds_to_close = list() self._preexec_fn = None self._restore_signals = True self.context = Context_CopyCurrent() @@ -69,6 +69,8 @@ cdef class UVProcess(UVHandle): 'Racing with another loop to spawn a process.') self._errpipe_read, self._errpipe_write = os_pipe() + self._fds_to_close.append(self._errpipe_read) + self._fds_to_close.append(self._errpipe_write) try: os_set_inheritable(self._errpipe_write, True) @@ -100,8 +102,8 @@ cdef class UVProcess(UVHandle): self._finish_init() - os_close(self._errpipe_write) - self._errpipe_write = -1 + # close the write pipe early + os_close(self._fds_to_close.pop()) if preexec_fn is not None: errpipe_data = bytearray() @@ -115,17 +117,8 @@ cdef class UVProcess(UVHandle): break finally: - os_close(self._errpipe_read) - try: - os_close(self._errpipe_write) - except OSError: - # Might be already closed - pass - - fds_to_close = self._fds_to_close - self._fds_to_close = None - for fd in fds_to_close: - os_close(fd) + while self._fds_to_close: + os_close(self._fds_to_close.pop()) for fd in restore_inheritable: os_set_inheritable(fd, False) @@ -202,7 +195,7 @@ cdef class UVProcess(UVHandle): if self._fds_to_close is None: raise RuntimeError( 'UVProcess._close_after_spawn called after uv_spawn') - self._fds_to_close.add(fd) + self._fds_to_close.append(fd) def __dealloc__(self): if self.uv_opt_env is not NULL: From f4f57f6e3d36b46fd5b2931d90da12e8670aed94 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 19 Jul 2022 20:17:07 +0100 Subject: [PATCH 4/6] Update process.pyx --- uvloop/handles/process.pyx | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/uvloop/handles/process.pyx b/uvloop/handles/process.pyx index d2ca00b9..0d27c2d9 100644 --- a/uvloop/handles/process.pyx +++ b/uvloop/handles/process.pyx @@ -69,8 +69,10 @@ cdef class UVProcess(UVHandle): 'Racing with another loop to spawn a process.') self._errpipe_read, self._errpipe_write = os_pipe() - self._fds_to_close.append(self._errpipe_read) - self._fds_to_close.append(self._errpipe_write) + fds_to_close = self._fds_to_close + self._fds_to_close = None + fds_to_close.append(self._errpipe_read) + fds_to_close.append(self._errpipe_write) try: os_set_inheritable(self._errpipe_write, True) @@ -103,7 +105,7 @@ cdef class UVProcess(UVHandle): self._finish_init() # close the write pipe early - os_close(self._fds_to_close.pop()) + os_close(fds_to_close.pop()) if preexec_fn is not None: errpipe_data = bytearray() @@ -117,8 +119,8 @@ cdef class UVProcess(UVHandle): break finally: - while self._fds_to_close: - os_close(self._fds_to_close.pop()) + while fds_to_close: + os_close(fds_to_close.pop()) for fd in restore_inheritable: os_set_inheritable(fd, False) From d5fe2792aaf9dfbd5ec6b991fbf640bf6ccb16b9 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 20 Jul 2022 10:18:50 +0100 Subject: [PATCH 5/6] Update uvloop/handles/process.pyx --- uvloop/handles/process.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/uvloop/handles/process.pyx b/uvloop/handles/process.pyx index 0d27c2d9..14c66b31 100644 --- a/uvloop/handles/process.pyx +++ b/uvloop/handles/process.pyx @@ -72,6 +72,7 @@ cdef class UVProcess(UVHandle): fds_to_close = self._fds_to_close self._fds_to_close = None fds_to_close.append(self._errpipe_read) + # add the write pipe last so we can close it early fds_to_close.append(self._errpipe_write) try: os_set_inheritable(self._errpipe_write, True) From 5926386f81ea49903db9d5095058787ed91cff3d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 20 Jul 2022 10:24:21 +0100 Subject: [PATCH 6/6] DRY os_dup call --- uvloop/handles/process.pyx | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/uvloop/handles/process.pyx b/uvloop/handles/process.pyx index 14c66b31..0146f92e 100644 --- a/uvloop/handles/process.pyx +++ b/uvloop/handles/process.pyx @@ -496,10 +496,7 @@ cdef class UVProcessTransport(UVProcess): # shouldn't ever happen raise RuntimeError('cannot apply subprocess.STDOUT') - newfd = os_dup(io[1]) - os_set_inheritable(newfd, True) - self._close_after_spawn(newfd) - io[2] = newfd + io[2] = self._file_redirect_stdio(io[1]) elif _stderr == subprocess_DEVNULL: io[2] = self._file_devnull() else: