Skip to content

Commit

Permalink
Always pick an open port when running tests (#6591)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jun 23, 2022
1 parent bc04d0e commit 7a0649a
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 138 deletions.
Empty file.
227 changes: 145 additions & 82 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import psutil
import pytest

Expand All @@ -21,16 +23,21 @@
from distributed import Client, Scheduler
from distributed.compatibility import LINUX, WINDOWS
from distributed.metrics import time
from distributed.utils import get_ip, get_ip_interface
from distributed.utils import get_ip, get_ip_interface, open_port
from distributed.utils_test import (
assert_can_connect_from_everywhere_4_6,
assert_can_connect_locally_4,
popen,
wait_for_log_line,
)


def test_defaults(loop):
def _get_dashboard_port(client: Client) -> int:
match = re.search(r":(\d+)\/status", client.dashboard_link)
assert match
return int(match.group(1))


def test_defaults(loop, requires_default_ports):
with popen(["dask-scheduler"]):

async def f():
Expand All @@ -39,24 +46,23 @@ async def f():

with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c:
c.sync(f)

response = requests.get("http://127.0.0.1:8787/status/")
response.raise_for_status()
assert _get_dashboard_port(c) == 8787


def test_hostport(loop):
with popen(["dask-scheduler", "--no-dashboard", "--host", "127.0.0.1:8978"]):
port = open_port()
with popen(["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"]):

async def f():
# The scheduler's main port can't be contacted from the outside
await assert_can_connect_locally_4(8978, timeout=5.0)
await assert_can_connect_locally_4(int(port), timeout=5.0)

with Client("127.0.0.1:8978", loop=loop) as c:
with Client(f"127.0.0.1:{port}", loop=loop) as c:
assert len(c.nthreads()) == 0
c.sync(f)


def test_no_dashboard(loop):
def test_no_dashboard(loop, requires_default_ports):
with popen(["dask-scheduler", "--no-dashboard"]):
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
response = requests.get("http://127.0.0.1:8787/status/")
Expand All @@ -65,13 +71,14 @@ def test_no_dashboard(loop):

def test_dashboard(loop):
pytest.importorskip("bokeh")
port = open_port()

with popen(["dask-scheduler"], capture_output=True) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
with popen(
["dask-scheduler", "--host", f"127.0.0.1:{port}"],
):

with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
pass
with Client(f"127.0.0.1:{port}", loop=loop) as c:
dashboard_port = _get_dashboard_port(c)

names = ["localhost", "127.0.0.1", get_ip()]
start = time()
Expand All @@ -97,24 +104,29 @@ def test_dashboard(loop):

def test_dashboard_non_standard_ports(loop):
pytest.importorskip("bokeh")

port1 = open_port()
port2 = open_port()
with popen(
["dask-scheduler", "--port", "3448", "--dashboard-address", ":4832"]
[
"dask-scheduler",
f"--port={port1}",
f"--dashboard-address=:{port2}",
]
) as proc:
with Client("127.0.0.1:3448", loop=loop) as c:
with Client(f"127.0.0.1:{port1}", loop=loop) as c:
pass

start = time()
while True:
try:
response = requests.get("http://localhost:4832/status/")
response = requests.get(f"http://localhost:{port2}/status/")
assert response.ok
break
except Exception:
sleep(0.1)
assert time() < start + 20
with pytest.raises(Exception):
requests.get("http://localhost:4832/status/")
requests.get(f"http://localhost:{port2}/status/")


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
Expand All @@ -123,8 +135,14 @@ def test_dashboard_allowlist(loop):
with pytest.raises(Exception):
requests.get("http://localhost:8787/status/").ok

with popen(["dask-scheduler"]) as proc:
with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
port = open_port()
with popen(
[
"dask-scheduler",
f"--port={port}",
]
) as proc:
with Client(f"127.0.0.1:{port}", loop=loop) as c:
pass

start = time()
Expand Down Expand Up @@ -156,11 +174,26 @@ def test_interface(loop):
"Available interfaces are: %s." % (if_names,)
)

with popen(["dask-scheduler", "--no-dashboard", "--interface", if_name]) as s:
port = open_port()
with popen(
[
"dask-scheduler",
f"--port={port}",
"--no-dashboard",
"--interface",
if_name,
]
) as s:
with popen(
["dask-worker", "127.0.0.1:8786", "--no-dashboard", "--interface", if_name]
[
"dask-worker",
f"127.0.0.1:{port}",
"--no-dashboard",
"--interface",
if_name,
]
) as a:
with Client("tcp://127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c:
start = time()
while not len(c.nthreads()):
sleep(0.1)
Expand All @@ -172,6 +205,8 @@ def test_interface(loop):

@pytest.mark.flaky(reruns=10, reruns_delay=5)
def test_pid_file(loop):
port = open_port()

def check_pidfile(proc, pidfile):
start = time()
while not os.path.exists(pidfile):
Expand Down Expand Up @@ -199,7 +234,13 @@ def check_pidfile(proc, pidfile):

with tmpfile() as w:
with popen(
["dask-worker", "127.0.0.1:8786", "--pid-file", w, "--no-dashboard"]
[
"dask-worker",
f"127.0.0.1:{port}",
"--pid-file",
w,
"--no-dashboard",
]
) as worker:
check_pidfile(worker, w)

Expand All @@ -216,13 +257,19 @@ def test_scheduler_port_zero(loop):

def test_dashboard_port_zero(loop):
pytest.importorskip("bokeh")
port = open_port()
with popen(
["dask-scheduler", "--dashboard-address", ":0"],
capture_output=True,
) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0
[
"dask-scheduler",
"--host",
f"127.0.0.1:{port}",
"--dashboard-address",
":0",
],
):
with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c:
port = _get_dashboard_port(c)
assert port > 0


PRELOAD_TEXT = """
Expand All @@ -237,70 +284,75 @@ def get_scheduler_address():
"""


def test_preload_file(loop):
def test_preload_file(loop, tmp_path):
def check_scheduler():
import scheduler_info

return scheduler_info.get_scheduler_address()

tmpdir = tempfile.mkdtemp()
try:
path = os.path.join(tmpdir, "scheduler_info.py")
with open(path, "w") as f:
f.write(PRELOAD_TEXT)
with tmpfile() as fn:
with popen(["dask-scheduler", "--scheduler-file", fn, "--preload", path]):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.run_on_scheduler(check_scheduler) == c.scheduler.address
finally:
shutil.rmtree(tmpdir)
path = tmp_path / "scheduler_info.py"
with open(path, "w") as f:
f.write(PRELOAD_TEXT)
with tmpfile() as fn:
with popen(
[
"dask-scheduler",
"--scheduler-file",
fn,
"--preload",
path,
f"--port={open_port()}",
]
):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.run_on_scheduler(check_scheduler) == c.scheduler.address


def test_preload_module(loop):
def test_preload_module(loop, tmp_path):
def check_scheduler():
import scheduler_info

return scheduler_info.get_scheduler_address()

tmpdir = tempfile.mkdtemp()
try:
path = os.path.join(tmpdir, "scheduler_info.py")
with open(path, "w") as f:
f.write(PRELOAD_TEXT)
env = os.environ.copy()
if "PYTHONPATH" in env:
env["PYTHONPATH"] = tmpdir + ":" + env["PYTHONPATH"]
else:
env["PYTHONPATH"] = tmpdir
with tmpfile() as fn:
with popen(
[
"dask-scheduler",
"--scheduler-file",
fn,
"--preload",
"scheduler_info",
],
env=env,
):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.run_on_scheduler(check_scheduler) == c.scheduler.address
finally:
shutil.rmtree(tmpdir)
path = tmp_path / "scheduler_info.py"
with open(path, "w") as f:
f.write(PRELOAD_TEXT)
env = os.environ.copy()
if "PYTHONPATH" in env:
env["PYTHONPATH"] = str(tmp_path) + ":" + env["PYTHONPATH"]
else:
env["PYTHONPATH"] = str(tmp_path)
with tmpfile() as fn:
with popen(
[
"dask-scheduler",
"--scheduler-file",
fn,
"--preload",
"scheduler_info",
f"--port={open_port()}",
],
env=env,
):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.run_on_scheduler(check_scheduler) == c.scheduler.address


def test_preload_remote_module(loop, tmp_path):
with open(tmp_path / "scheduler_info.py", "w") as f:
f.write(PRELOAD_TEXT)

with popen([sys.executable, "-m", "http.server", "9382"], cwd=tmp_path):
http_server_port = open_port()
with popen(
[sys.executable, "-m", "http.server", str(http_server_port)], cwd=tmp_path
):
with popen(
[
"dask-scheduler",
"--scheduler-file",
str(tmp_path / "scheduler-file.json"),
"--preload",
"http://localhost:9382/scheduler_info.py",
f"http://localhost:{http_server_port}/scheduler_info.py",
f"--port={open_port()}",
]
) as proc:
with Client(
Expand Down Expand Up @@ -439,19 +491,22 @@ def test_multiple_workers_2(loop):
def dask_setup(worker):
worker.foo = 'setup'
"""
with popen(["dask-scheduler", "--no-dashboard"]) as s:
port = open_port()
with popen(
["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"]
) as s:
with popen(
[
"dask-worker",
"localhost:8786",
f"localhost:{port}",
"--no-dashboard",
"--preload",
text,
"--preload-nanny",
text,
]
) as a:
with Client("127.0.0.1:8786", loop=loop) as c:
with Client(f"127.0.0.1:{port}", loop=loop) as c:
c.wait_for_workers(1)
[foo] = c.run(lambda dask_worker: dask_worker.foo).values()
assert foo == "setup"
Expand All @@ -460,10 +515,11 @@ def dask_setup(worker):


def test_multiple_workers(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as s:
with popen(["dask-worker", "localhost:8786", "--no-dashboard"]) as a:
with popen(["dask-worker", "localhost:8786", "--no-dashboard"]) as b:
with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
scheduler_address = f"127.0.0.1:{open_port()}"
with popen(["dask-scheduler", "--no-dashboard", "--host", scheduler_address]) as s:
with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as a:
with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as b:
with Client(scheduler_address, loop=loop) as c:
start = time()
while len(c.nthreads()) < 2:
sleep(0.1)
Expand All @@ -474,13 +530,20 @@ def test_multiple_workers(loop):
@pytest.mark.skipif(WINDOWS, reason="POSIX only")
@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM])
def test_signal_handling(loop, sig):
port = open_port()
with subprocess.Popen(
["python", "-m", "distributed.cli.dask_scheduler"],
[
"python",
"-m",
"distributed.cli.dask_scheduler",
f"--port={port}",
"--dashboard-address=:0",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as scheduler:
# Wait for scheduler to start
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c:
with Client(f"127.0.0.1:{port}", loop=loop) as c:
pass
scheduler.send_signal(sig)
stdout, stderr = scheduler.communicate()
Expand Down
Loading

0 comments on commit 7a0649a

Please sign in to comment.