Skip to content

Commit

Permalink
Merge branch 'main' into cluster_dump_utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Mar 23, 2022
2 parents 4b65b81 + 4866615 commit 9f81cfc
Show file tree
Hide file tree
Showing 47 changed files with 2,566 additions and 2,018 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
jobs:
test-report:
name: Test Report
# Do not run the report job on forks
if: github.repository == 'dask/distributed' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
env:
GITHUB_TOKEN: ${{ github.token }}
Expand Down
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ repos:
- types-docutils
- types-requests
- types-paramiko
- types-pkg_resources
- types-PyYAML
- types-setuptools
- types-psutil
Expand Down
1 change: 0 additions & 1 deletion continuous_integration/recipes/distributed/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ outputs:
- toolz >=0.8.2
- tornado >=6.0.3
- zict >=0.1.3
- setuptools <60.0.0
run_constrained:
- distributed-impl >={{ version }} *{{ build_ext }}
- openssl !=1.1.1e
Expand Down
6 changes: 6 additions & 0 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""Implementation of the Active Memory Manager. This is a scheduler extension which
sends drop/replicate suggestions to the worker.
See also :mod:`distributed.worker_memory` and :mod:`distributed.spill`, which implement
spill/pause/terminate mechanics on the Worker side.
"""
from __future__ import annotations

import logging
Expand Down
15 changes: 10 additions & 5 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ def test_no_dashboard(loop):
def test_dashboard(loop):
pytest.importorskip("bokeh")

with popen(["dask-scheduler"]) as proc:
for line in proc.stderr:
with popen(["dask-scheduler"], flush_output=False) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
break
else:
raise Exception("dashboard not found")
assert False # pragma: nocover

with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
pass
Expand Down Expand Up @@ -217,12 +217,17 @@ def test_scheduler_port_zero(loop):

def test_dashboard_port_zero(loop):
pytest.importorskip("bokeh")
with popen(["dask-scheduler", "--dashboard-address", ":0"]) as proc:
for line in proc.stderr:
with popen(
["dask-scheduler", "--dashboard-address", ":0"],
flush_output=False,
) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0
break
else:
assert False # pragma: nocover


PRELOAD_TEXT = """
Expand Down
8 changes: 6 additions & 2 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ def test_errors():
'{"foo": "bar"}',
"--spec-file",
"foo.yaml",
]
],
flush_output=False,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line

with popen([sys.executable, "-m", "distributed.cli.dask_spec"]) as proc:
with popen(
[sys.executable, "-m", "distributed.cli.dask_spec"],
flush_output=False,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line
24 changes: 14 additions & 10 deletions distributed/cli/tests/test_dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@ def test_version_option():
assert result.exit_code == 0


@pytest.mark.slow
def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
n_workers = 2
with popen(
["dask-ssh", f"--nprocs={n_workers}", "--nohost", "localhost"]
) as cluster:
["dask-ssh", "--nprocs=2", "--nohost", "localhost"],
flush_output=False,
) as proc:
with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c:
c.wait_for_workers(n_workers, timeout="15 seconds")
c.wait_for_workers(2, timeout="15 seconds")
# This interrupt is necessary for the cluster to place output into the stdout
# and stderr pipes
cluster.send_signal(2)
_, stderr = cluster.communicate()

assert any(b"renamed to --nworkers" in l for l in stderr.splitlines())
proc.send_signal(2)
assert any(
b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15)
)


def test_ssh_cli_nworkers_with_nprocs_is_an_error():
with popen(["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"]) as c:
with popen(
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
flush_output=False,
) as proc:
assert any(
b"Both --nprocs and --nworkers" in c.stderr.readline() for i in range(15)
b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15)
)
Loading

0 comments on commit 9f81cfc

Please sign in to comment.