From 7e49d8806d2cd8aa201d836cd9a4231827fd4396 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 2 Jun 2022 02:49:46 +0100 Subject: [PATCH] Remove CrossFilter widget (#6484) --- distributed/dashboard/components/worker.py | 158 +----------------- .../dashboard/tests/test_scheduler_bokeh.py | 4 +- .../dashboard/tests/test_worker_bokeh.py | 48 +++--- distributed/dashboard/worker.py | 6 +- docs/source/http_services.rst | 1 - 5 files changed, 27 insertions(+), 190 deletions(-) diff --git a/distributed/dashboard/components/worker.py b/distributed/dashboard/components/worker.py index ea91ca7603d..64ca86dd820 100644 --- a/distributed/dashboard/components/worker.py +++ b/distributed/dashboard/components/worker.py @@ -5,14 +5,12 @@ from bokeh.core.properties import without_property_validation from bokeh.layouts import column, row from bokeh.models import ( - BoxZoomTool, ColumnDataSource, DataRange1d, HoverTool, NumeralTickFormatter, PanTool, ResetTool, - Select, WheelZoomTool, ) from bokeh.models.widgets import DataTable, TableColumn @@ -31,9 +29,8 @@ SystemMonitor, ) from distributed.dashboard.utils import transpose, update -from distributed.diagnostics.progress_stream import color_of from distributed.metrics import time -from distributed.utils import key_split, log_errors +from distributed.utils import log_errors logger = logging.getLogger(__name__) @@ -49,7 +46,7 @@ filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml") ) -template_variables = {"pages": ["status", "system", "profile", "crossfilter"]} +template_variables = {"pages": ["status", "system", "profile"]} def standard_doc(title, active_page, *, template="simple.html"): @@ -288,148 +285,6 @@ def update(self): ) -class CrossFilter(DashboardComponent): - @log_errors - def __init__(self, worker, **kwargs): - self.worker = worker - - quantities = ["nbytes", "duration", "bandwidth", "count", "start", "stop"] - colors = ["inout-color", "type-color", "key-color"] - - # self.source = ColumnDataSource({name: [] for name in names}) - self.source = ColumnDataSource( - { - "nbytes": [1, 2], - "duration": [0.01, 0.02], - "bandwidth": [0.01, 0.02], - "count": [1, 2], - "type": ["int", "str"], - "inout-color": ["blue", "red"], - "type-color": ["blue", "red"], - "key": ["add", "inc"], - "start": [1, 2], - "stop": [1, 2], - } - ) - - self.x = Select(title="X-Axis", value="nbytes", options=quantities) - self.x.on_change("value", self.update_figure) - - self.y = Select(title="Y-Axis", value="bandwidth", options=quantities) - self.y.on_change("value", self.update_figure) - - self.color = Select( - title="Color", value="inout-color", options=["black"] + colors - ) - self.color.on_change("value", self.update_figure) - - if "sizing_mode" in kwargs: - kw = {"sizing_mode": kwargs["sizing_mode"]} - else: - kw = {} - - self.control = column([self.x, self.y, self.color], width=200, **kw) - - self.last_outgoing = 0 - self.last_incoming = 0 - self.kwargs = kwargs - - self.layout = row(self.control, self.create_figure(**self.kwargs), **kw) - - self.root = self.layout - - @without_property_validation - @log_errors - def update(self): - outgoing = self.worker.outgoing_transfer_log - n = self.worker.outgoing_count - self.last_outgoing - n = min(n, 1000) - outgoing = [outgoing[-i].copy() for i in range(1, n)] - self.last_outgoing = self.worker.outgoing_count - - incoming = self.worker.incoming_transfer_log - n = self.worker.incoming_count - self.last_incoming - n = min(n, 1000) - incoming = [incoming[-i].copy() for i in range(1, n)] - self.last_incoming = self.worker.incoming_count - - out = [] - - for msg in incoming: - if msg["keys"]: - d = self.process_msg(msg) - d["inout-color"] = "red" - out.append(d) - - for msg in outgoing: - if msg["keys"]: - d = self.process_msg(msg) - d["inout-color"] = "blue" - out.append(d) - - if out: - out = transpose(out) - if ( - len(self.source.data["stop"]) - and min(out["start"]) > self.source.data["stop"][-1] + 10 - ): - update(self.source, out) - else: - self.source.stream(out, rollover=1000) - - @log_errors - def create_figure(self, **kwargs): - fig = figure(title="", tools="", **kwargs) - fig.circle( - source=self.source, - x=self.x.value, - y=self.y.value, - color=self.color.value, - size=10, - alpha=0.5, - hover_alpha=1, - ) - fig.xaxis.axis_label = self.x.value - fig.yaxis.axis_label = self.y.value - - fig.add_tools( - # self.hover, - ResetTool(), - PanTool(), - WheelZoomTool(), - BoxZoomTool(), - ) - return fig - - @without_property_validation - @log_errors - def update_figure(self, attr, old, new): - fig = self.create_figure(**self.kwargs) - self.layout.children[1] = fig - - def process_msg(self, msg): - try: - status_key = max(msg["keys"], key=lambda x: msg["keys"].get(x, 0)) - typ = self.worker.types.get(status_key, object).__name__ - keyname = key_split(status_key) - d = { - "nbytes": msg["total"], - "duration": msg["duration"], - "bandwidth": msg["bandwidth"], - "count": len(msg["keys"]), - "type": typ, - "type-color": color_of(typ), - "key": keyname, - "key-color": color_of(keyname), - "start": msg["start"], - "stop": msg["stop"], - } - return d - except Exception as e: - logger.exception(e) - raise - - class Counters(DashboardComponent): def __init__(self, server, sizing_mode="stretch_both", **kwargs): self.server = server @@ -585,15 +440,6 @@ def status_doc(worker, extra, doc): ) -@standard_doc("Dask Worker Cross-filter", active_page="crossfilter") -def crossfilter_doc(worker, extra, doc): - statetable = StateTable(worker) - crossfilter = CrossFilter(worker) - add_periodic_callback(doc, statetable, 500) - add_periodic_callback(doc, crossfilter, 500) - doc.add_root(column(statetable.root, crossfilter.root)) - - @standard_doc("Dask Worker Monitor", active_page="system") def systemmonitor_doc(worker, extra, doc): sysmon = SystemMonitor(worker, sizing_mode="scale_width") diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 0d1649de0d5..88db7db032d 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -845,11 +845,11 @@ async def test_proxy_to_workers(c, s, a, b): assert response_proxy.code == 200 if proxy_exists: - assert b"Crossfilter" in response_proxy.body + assert b"System" in response_proxy.body else: assert b"python -m pip install jupyter-server-proxy" in response_proxy.body assert response_direct.code == 200 - assert b"Crossfilter" in response_direct.body + assert b"System" in response_direct.body @gen_cluster( diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index b6f5476f618..84c97e15a73 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -14,7 +14,6 @@ CommunicatingStream, CommunicatingTimeSeries, Counters, - CrossFilter, ExecutingTimeSeries, StateTable, SystemMonitor, @@ -55,11 +54,8 @@ async def test_simple(c, s, a, b): await asyncio.sleep(0.1) http_client = AsyncHTTPClient() - for suffix in ["crossfilter", "system"]: - response = await http_client.fetch( - "http://localhost:%d/%s" % (a.http_server.port, suffix) - ) - assert "bokeh" in response.body.decode().lower() + response = await http_client.fetch(f"http://localhost:{a.http_server.port}/system") + assert "bokeh" in response.body.decode().lower() @gen_cluster(client=True, worker_kwargs={"dashboard": True}) @@ -67,35 +63,35 @@ async def test_services_kwargs(c, s, a, b): assert s.workers[a.address].services == {"dashboard": a.http_server.port} -@gen_cluster(client=True) -async def test_basic(c, s, a, b): - for component in [ +@pytest.mark.slow +@pytest.mark.parametrize( + "cls", + ( StateTable, ExecutingTimeSeries, CommunicatingTimeSeries, - CrossFilter, SystemMonitor, - ]: - - aa = component(a) - bb = component(b) + ), +) +@gen_cluster(client=True) +async def test_basic(c, s, a, b, cls): + aa = cls(a) + bb = cls(b) - xs = c.map(inc, range(10), workers=a.address) - ys = c.map(dec, range(10), workers=b.address) + xs = c.map(inc, range(10), workers=a.address) + ys = c.map(dec, range(10), workers=b.address) - def slowall(*args): - sleep(1) + def slowall(*args): + sleep(1) - x = c.submit(slowall, xs, ys, 1, workers=a.address) - y = c.submit(slowall, xs, ys, 2, workers=b.address) - await asyncio.sleep(0.1) + x = c.submit(slowall, xs, ys, 1, workers=a.address) + y = c.submit(slowall, xs, ys, 2, workers=b.address) + await asyncio.sleep(0.1) - aa.update() - bb.update() + aa.update() + bb.update() - assert len(first(aa.source.data.values())) and len( - first(bb.source.data.values()) - ) + assert len(first(aa.source.data.values())) and len(first(bb.source.data.values())) @gen_cluster(client=True) diff --git a/distributed/dashboard/worker.py b/distributed/dashboard/worker.py index 15e6e47ab21..39bb3d9cbe0 100644 --- a/distributed/dashboard/worker.py +++ b/distributed/dashboard/worker.py @@ -2,7 +2,6 @@ from distributed.dashboard.components.worker import ( counters_doc, - crossfilter_doc, profile_doc, profile_server_doc, status_doc, @@ -10,9 +9,7 @@ ) from distributed.dashboard.core import BokehApplication -template_variables = { - "pages": ["status", "system", "profile", "crossfilter", "profile-server"] -} +template_variables = {"pages": ["status", "system", "profile", "profile-server"]} def connect(application, http_server, worker, prefix=""): @@ -26,7 +23,6 @@ def connect(application, http_server, worker, prefix=""): applications = { "/status": status_doc, "/counters": counters_doc, - "/crossfilter": crossfilter_doc, "/system": systemmonitor_doc, "/profile": profile_doc, "/profile-server": profile_server_doc, diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index 31bb62292a7..fc13c084674 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -100,7 +100,6 @@ Worker HTTP - ``/status``: - ``/counters``: -- ``/crossfilter``: - ``/sitemap.json``: list of available endpoints - ``/system``: - ``/health``: check server is alive