Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlocks in tests that use popen #6483

Merged
merged 9 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_no_dashboard(loop):
def test_dashboard(loop):
pytest.importorskip("bokeh")

with popen(["dask-scheduler"], flush_output=False) as proc:
with popen(["dask-scheduler"], capture_output=True) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())

Expand Down Expand Up @@ -218,7 +218,7 @@ def test_dashboard_port_zero(loop):
pytest.importorskip("bokeh")
with popen(
["dask-scheduler", "--dashboard-address", ":0"],
flush_output=False,
capture_output=True,
) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
Expand Down
4 changes: 2 additions & 2 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ def test_errors():
"--spec-file",
"foo.yaml",
],
flush_output=False,
capture_output=True,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line

with popen(
[sys.executable, "-m", "distributed.cli.dask_spec"],
flush_output=False,
capture_output=True,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
Expand Down
4 changes: 2 additions & 2 deletions distributed/cli/tests/test_dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_version_option():
def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
with popen(
["dask-ssh", "--nprocs=2", "--nohost", "localhost"],
flush_output=False,
capture_output=True,
) as proc:
with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c:
c.wait_for_workers(2, timeout="15 seconds")
Expand All @@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
def test_ssh_cli_nworkers_with_nprocs_is_an_error():
with popen(
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
flush_output=False,
capture_output=True,
) as proc:
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)
14 changes: 7 additions & 7 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
"9686:9687",
"--no-dashboard",
],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)

Expand Down Expand Up @@ -278,14 +278,14 @@ async def test_no_nanny(c, s):
async def test_reconnect_deprecated(c, s):
with popen(
["dask-worker", s.address, "--reconnect"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
assert worker.wait() == 1

with popen(
["dask-worker", s.address, "--no-reconnect"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
await c.wait_for_workers(1)
Expand Down Expand Up @@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch):
async def test_nworkers_requires_nanny(s):
with popen(
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)

Expand Down Expand Up @@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s):
async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
with popen(
["dask-worker", s.address, "--nprocs=2"],
flush_output=False,
capture_output=True,
) as worker:
await c.wait_for_workers(2)
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)
Expand All @@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
with popen(
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)

Expand Down Expand Up @@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny):
"--port",
scheduler_port,
],
flush_output=False,
capture_output=True,
) as scheduler:
start = time()
# Wait for the scheduler to be up
Expand Down
31 changes: 31 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
gen_test,
inc,
new_config,
popen,
raises_with_cause,
tls_only_security,
)
Expand Down Expand Up @@ -772,3 +773,33 @@ async def test(s):
with pytest.raises(CustomError):
test()
assert test_done


def test_popen_write_during_terminate_deadlock():
# Fabricate a command which, when terminated, tries to write more than the pipe buffer can hold (65536 bytes).
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
# This would deadlock if `proc.wait()` was called, since the process will be trying to write to stdout, but
# stdout isn't being cleared because our process is blocked in `proc.wait()`.
# `proc.communicate()` is necessary: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait
with popen(
[
sys.executable,
"-c",
"; ".join(
[
"import signal",
"import time",
"import threading",
"e = threading.Event()",
"signal.signal(signal.SIGINT, lambda *args: [print('x' * 131072), e.set()])",
# ^ 131072 is 2x the size of the default Linux pipe buffer
"print('ready', flush=True)",
"e.wait()",
]
),
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
],
capture_output=True,
) as proc:
assert proc.stdout.readline().strip() == b"ready"

# Exiting the context manager (terminating the subprocess) will raise `subprocess.TimeoutExpired`
# if this test breaks.
66 changes: 26 additions & 40 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,14 +1285,14 @@ def raises(func, exc=Exception):
return True


def _terminate_process(proc):
def _terminate_process(proc: subprocess.Popen):
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
if proc.poll() is None:
if sys.platform.startswith("win"):
proc.send_signal(signal.CTRL_BREAK_EVENT)
else:
proc.send_signal(signal.SIGINT)
try:
proc.wait(30)
proc.communicate(timeout=30)
finally:
# Make sure we don't leave the process lingering around
with suppress(OSError):
Expand All @@ -1301,32 +1301,41 @@ def _terminate_process(proc):

@contextmanager
def popen(
args: list[str], flush_output: bool = True, **kwargs
args: list[str], capture_output: bool = False, **kwargs
) -> Iterator[subprocess.Popen[bytes]]:
"""Start a shell command in a subprocess.
Yields a subprocess.Popen object.

stderr is redirected to stdout.
stdout is redirected to a pipe.
On exit, the subprocess is terminated.

Parameters
----------
args: list[str]
Command line arguments
flush_output: bool, optional
If True (the default), the stdout/stderr pipe is emptied while it is being
filled. Set to False if you wish to read the output yourself. Note that setting
this to False and then failing to periodically read from the pipe may result in
a deadlock due to the pipe getting full.
capture_output: bool, default False
Set to True if you need to read output from the subprocess.
Stdout and stderr will both be piped to ``proc.stdout``.

If False, the subprocess will write to stdout/stderr normally.

When True, the test could deadlock if the stdout pipe's buffer gets full
(Linux default is 65536 bytes; macOS and Windows may be smaller).
Therefore, you may need to periodically read from ``proc.stdout``, or
use ``proc.communicate``. All the deadlock warnings apply from
https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr.

Note that ``proc.communicate`` is called automatically when the
contextmanager exits. Calling code must not call ``proc.communicate``
in a separate thread, since it's not thread-safe.
kwargs: optional
optional arguments to subprocess.Popen
"""
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
if capture_output:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
if sys.platform.startswith("win"):
# Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT
kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
dump_stdout = False

args = list(args)
if sys.platform.startswith("win"):
Expand All @@ -1335,37 +1344,14 @@ def popen(
args[0] = os.path.join(
os.environ.get("DESTDIR", "") + sys.prefix, "bin", args[0]
)
proc = subprocess.Popen(args, **kwargs)

if flush_output:
ex = concurrent.futures.ThreadPoolExecutor(1)
flush_future = ex.submit(proc.communicate)

try:
yield proc

# asyncio.CancelledError is raised by @gen_test/@gen_cluster timeout
except (Exception, asyncio.CancelledError):
dump_stdout = True
raise

finally:
with subprocess.Popen(args, **kwargs) as proc:
try:
_terminate_process(proc)
yield proc
finally:
# XXX Also dump stdout if return code != 0 ?
if flush_output:
out, err = flush_future.result()
ex.shutdown()
else:
out, err = proc.communicate()
_terminate_process(proc)
out, err = proc.communicate()
assert not err

if dump_stdout:
print("\n" + "-" * 27 + " Subprocess stdout/stderr" + "-" * 27)
print(out.decode().rstrip())
print("-" * 80)


def wait_for(predicate, timeout, fail_func=None, period=0.05):
deadline = time() + timeout
Expand Down