Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log popen stdout/err when subprocess times out #6567

Merged
merged 5 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
inc,
map_varying,
nodebug,
popen,
pristine_loop,
randominc,
save_sys_modules,
Expand Down Expand Up @@ -7507,7 +7508,7 @@ async def test_wait_for_workers_updates_info(c, s):
client_script = """
from dask.distributed import Client
if __name__ == "__main__":
client = Client(processes=%s, n_workers=1)
client = Client(processes=%s, n_workers=1, scheduler_port=0, dashboard_address=":0")
"""


Expand All @@ -7516,13 +7517,8 @@ def test_quiet_close_process(processes, tmp_path):
with open(tmp_path / "script.py", mode="w") as f:
f.write(client_script % processes)

proc = subprocess.Popen(
[sys.executable, tmp_path / "script.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

out, err = proc.communicate(timeout=10)
with popen([sys.executable, tmp_path / "script.py"], capture_output=True) as proc:
out, err = proc.communicate(timeout=10)

assert not out
assert not err
53 changes: 53 additions & 0 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import signal
import socket
import subprocess
import sys
import textwrap
import threading
Expand Down Expand Up @@ -825,6 +826,58 @@ def cb(signum, frame):
# `subprocess.TimeoutExpired` if this test breaks.


def test_popen_timeout(capsys: pytest.CaptureFixture):
with pytest.raises(subprocess.TimeoutExpired):
with popen(
[
sys.executable,
"-c",
textwrap.dedent(
"""
import signal
import sys
import time

if sys.platform == "win32":
signal.signal(signal.SIGBREAK, signal.default_int_handler)
# ^ Cause `CTRL_BREAK_EVENT` on Windows to raise `KeyboardInterrupt`

print('ready', flush=True)
while True:
try:
time.sleep(0.1)
print("slept", flush=True)
except KeyboardInterrupt:
print("interrupted", flush=True)
"""
),
],
capture_output=True,
terminate_timeout=1,
) as proc:
assert proc.stdout
assert proc.stdout.readline().strip() == b"ready"
# Exiting contextmanager sends SIGINT, waits 1s for shutdown.
# Our script ignores SIGINT, so after 1s it sends SIGKILL.
# The contextmanager raises `TimeoutExpired` once the process is killed,
# because it failed the 1s timeout
captured = capsys.readouterr()
assert "stdout: returncode" in captured.out
assert "interrupted" in captured.out
assert "slept" in captured.out


def test_popen_always_prints_output(capsys: pytest.CaptureFixture):
# We always print stdout even if there was no error, in case some other assertion
# later in the test fails and the output would be useful.
with popen([sys.executable, "-c", "print('foo')"], capture_output=True) as proc:
proc.communicate(timeout=5)

captured = capsys.readouterr()
assert "stdout: returncode 0" in captured.out
assert "foo" in captured.out


@gen_test()
async def test_freeze_batched_send():
async with EchoServer() as e:
Expand Down
38 changes: 32 additions & 6 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,14 +1283,14 @@ def raises(func, exc=Exception):
return True


def _terminate_process(proc: subprocess.Popen) -> None:
def _terminate_process(proc: subprocess.Popen, terminate_timeout: float) -> None:
if proc.poll() is None:
if sys.platform.startswith("win"):
proc.send_signal(signal.CTRL_BREAK_EVENT)
else:
proc.send_signal(signal.SIGINT)
try:
proc.communicate(timeout=30)
proc.communicate(timeout=terminate_timeout)
finally:
# Make sure we don't leave the process lingering around
with suppress(OSError):
Expand All @@ -1299,7 +1299,11 @@ def _terminate_process(proc: subprocess.Popen) -> None:

@contextmanager
def popen(
args: list[str], capture_output: bool = False, **kwargs
args: list[str],
capture_output: bool = False,
terminate_timeout: float = 30,
kill_timeout: float = 10,
**kwargs,
) -> Iterator[subprocess.Popen[bytes]]:
"""Start a shell command in a subprocess.
Yields a subprocess.Popen object.
Expand All @@ -1325,6 +1329,21 @@ def popen(
Note that ``proc.communicate`` is called automatically when the
contextmanager exits. Calling code must not call ``proc.communicate``
in a separate thread, since it's not thread-safe.

When captured, the stdout/stderr of the process is always printed
when the process exits for easier test debugging.
terminate_timeout: optional, default 30
When the contextmanager exits, SIGINT is sent to the subprocess.
``terminate_timeout`` sets how many seconds to wait for the subprocess
to terminate after that. If the timeout expires, SIGKILL is sent to
the subprocess (which cannot be blocked); see ``kill_timeout``.
If this timeout expires, `subprocess.TimeoutExpired` is raised.
kill_timeout: optional, default 10
When the contextmanger exits, if the subprocess does not shut down
after ``terminate_timeout`` seconds in response to SIGINT, SIGKILL
is sent to the subprocess (which cannot be blocked). ``kill_timeout``
controls how long to wait after SIGKILL to join the process.
If this timeout expires, `subprocess.TimeoutExpired` is raised.
kwargs: optional
optional arguments to subprocess.Popen
"""
Expand All @@ -1346,9 +1365,16 @@ def popen(
try:
yield proc
finally:
_terminate_process(proc)
out, err = proc.communicate()
assert not err
try:
_terminate_process(proc, terminate_timeout)
finally:
out, err = proc.communicate(timeout=kill_timeout)
if out:
print(f"------ stdout: returncode {proc.returncode}, {args} ------")
print(out.decode() if isinstance(out, bytes) else out)
if err:
print(f"------ stderr: returncode {proc.returncode}, {args} ------")
print(err.decode() if isinstance(err, bytes) else err)


def wait_for(predicate, timeout, fail_func=None, period=0.05):
Expand Down