Skip to content

Commit

Permalink
Remove CrossFilter widget (#6484)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 2, 2022
1 parent a341432 commit 7e49d88
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 190 deletions.
158 changes: 2 additions & 156 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
from bokeh.core.properties import without_property_validation
from bokeh.layouts import column, row
from bokeh.models import (
BoxZoomTool,
ColumnDataSource,
DataRange1d,
HoverTool,
NumeralTickFormatter,
PanTool,
ResetTool,
Select,
WheelZoomTool,
)
from bokeh.models.widgets import DataTable, TableColumn
Expand All @@ -31,9 +29,8 @@
SystemMonitor,
)
from distributed.dashboard.utils import transpose, update
from distributed.diagnostics.progress_stream import color_of
from distributed.metrics import time
from distributed.utils import key_split, log_errors
from distributed.utils import log_errors

logger = logging.getLogger(__name__)

Expand All @@ -49,7 +46,7 @@
filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml")
)

template_variables = {"pages": ["status", "system", "profile", "crossfilter"]}
template_variables = {"pages": ["status", "system", "profile"]}


def standard_doc(title, active_page, *, template="simple.html"):
Expand Down Expand Up @@ -288,148 +285,6 @@ def update(self):
)


class CrossFilter(DashboardComponent):
@log_errors
def __init__(self, worker, **kwargs):
self.worker = worker

quantities = ["nbytes", "duration", "bandwidth", "count", "start", "stop"]
colors = ["inout-color", "type-color", "key-color"]

# self.source = ColumnDataSource({name: [] for name in names})
self.source = ColumnDataSource(
{
"nbytes": [1, 2],
"duration": [0.01, 0.02],
"bandwidth": [0.01, 0.02],
"count": [1, 2],
"type": ["int", "str"],
"inout-color": ["blue", "red"],
"type-color": ["blue", "red"],
"key": ["add", "inc"],
"start": [1, 2],
"stop": [1, 2],
}
)

self.x = Select(title="X-Axis", value="nbytes", options=quantities)
self.x.on_change("value", self.update_figure)

self.y = Select(title="Y-Axis", value="bandwidth", options=quantities)
self.y.on_change("value", self.update_figure)

self.color = Select(
title="Color", value="inout-color", options=["black"] + colors
)
self.color.on_change("value", self.update_figure)

if "sizing_mode" in kwargs:
kw = {"sizing_mode": kwargs["sizing_mode"]}
else:
kw = {}

self.control = column([self.x, self.y, self.color], width=200, **kw)

self.last_outgoing = 0
self.last_incoming = 0
self.kwargs = kwargs

self.layout = row(self.control, self.create_figure(**self.kwargs), **kw)

self.root = self.layout

@without_property_validation
@log_errors
def update(self):
outgoing = self.worker.outgoing_transfer_log
n = self.worker.outgoing_count - self.last_outgoing
n = min(n, 1000)
outgoing = [outgoing[-i].copy() for i in range(1, n)]
self.last_outgoing = self.worker.outgoing_count

incoming = self.worker.incoming_transfer_log
n = self.worker.incoming_count - self.last_incoming
n = min(n, 1000)
incoming = [incoming[-i].copy() for i in range(1, n)]
self.last_incoming = self.worker.incoming_count

out = []

for msg in incoming:
if msg["keys"]:
d = self.process_msg(msg)
d["inout-color"] = "red"
out.append(d)

for msg in outgoing:
if msg["keys"]:
d = self.process_msg(msg)
d["inout-color"] = "blue"
out.append(d)

if out:
out = transpose(out)
if (
len(self.source.data["stop"])
and min(out["start"]) > self.source.data["stop"][-1] + 10
):
update(self.source, out)
else:
self.source.stream(out, rollover=1000)

@log_errors
def create_figure(self, **kwargs):
fig = figure(title="", tools="", **kwargs)
fig.circle(
source=self.source,
x=self.x.value,
y=self.y.value,
color=self.color.value,
size=10,
alpha=0.5,
hover_alpha=1,
)
fig.xaxis.axis_label = self.x.value
fig.yaxis.axis_label = self.y.value

fig.add_tools(
# self.hover,
ResetTool(),
PanTool(),
WheelZoomTool(),
BoxZoomTool(),
)
return fig

@without_property_validation
@log_errors
def update_figure(self, attr, old, new):
fig = self.create_figure(**self.kwargs)
self.layout.children[1] = fig

def process_msg(self, msg):
try:
status_key = max(msg["keys"], key=lambda x: msg["keys"].get(x, 0))
typ = self.worker.types.get(status_key, object).__name__
keyname = key_split(status_key)
d = {
"nbytes": msg["total"],
"duration": msg["duration"],
"bandwidth": msg["bandwidth"],
"count": len(msg["keys"]),
"type": typ,
"type-color": color_of(typ),
"key": keyname,
"key-color": color_of(keyname),
"start": msg["start"],
"stop": msg["stop"],
}
return d
except Exception as e:
logger.exception(e)
raise


class Counters(DashboardComponent):
def __init__(self, server, sizing_mode="stretch_both", **kwargs):
self.server = server
Expand Down Expand Up @@ -585,15 +440,6 @@ def status_doc(worker, extra, doc):
)


@standard_doc("Dask Worker Cross-filter", active_page="crossfilter")
def crossfilter_doc(worker, extra, doc):
statetable = StateTable(worker)
crossfilter = CrossFilter(worker)
add_periodic_callback(doc, statetable, 500)
add_periodic_callback(doc, crossfilter, 500)
doc.add_root(column(statetable.root, crossfilter.root))


@standard_doc("Dask Worker Monitor", active_page="system")
def systemmonitor_doc(worker, extra, doc):
sysmon = SystemMonitor(worker, sizing_mode="scale_width")
Expand Down
4 changes: 2 additions & 2 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,11 @@ async def test_proxy_to_workers(c, s, a, b):

assert response_proxy.code == 200
if proxy_exists:
assert b"Crossfilter" in response_proxy.body
assert b"System" in response_proxy.body
else:
assert b"python -m pip install jupyter-server-proxy" in response_proxy.body
assert response_direct.code == 200
assert b"Crossfilter" in response_direct.body
assert b"System" in response_direct.body


@gen_cluster(
Expand Down
48 changes: 22 additions & 26 deletions distributed/dashboard/tests/test_worker_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
CommunicatingStream,
CommunicatingTimeSeries,
Counters,
CrossFilter,
ExecutingTimeSeries,
StateTable,
SystemMonitor,
Expand Down Expand Up @@ -55,47 +54,44 @@ async def test_simple(c, s, a, b):
await asyncio.sleep(0.1)

http_client = AsyncHTTPClient()
for suffix in ["crossfilter", "system"]:
response = await http_client.fetch(
"http://localhost:%d/%s" % (a.http_server.port, suffix)
)
assert "bokeh" in response.body.decode().lower()
response = await http_client.fetch(f"http://localhost:{a.http_server.port}/system")
assert "bokeh" in response.body.decode().lower()


@gen_cluster(client=True, worker_kwargs={"dashboard": True})
async def test_services_kwargs(c, s, a, b):
assert s.workers[a.address].services == {"dashboard": a.http_server.port}


@gen_cluster(client=True)
async def test_basic(c, s, a, b):
for component in [
@pytest.mark.slow
@pytest.mark.parametrize(
"cls",
(
StateTable,
ExecutingTimeSeries,
CommunicatingTimeSeries,
CrossFilter,
SystemMonitor,
]:

aa = component(a)
bb = component(b)
),
)
@gen_cluster(client=True)
async def test_basic(c, s, a, b, cls):
aa = cls(a)
bb = cls(b)

xs = c.map(inc, range(10), workers=a.address)
ys = c.map(dec, range(10), workers=b.address)
xs = c.map(inc, range(10), workers=a.address)
ys = c.map(dec, range(10), workers=b.address)

def slowall(*args):
sleep(1)
def slowall(*args):
sleep(1)

x = c.submit(slowall, xs, ys, 1, workers=a.address)
y = c.submit(slowall, xs, ys, 2, workers=b.address)
await asyncio.sleep(0.1)
x = c.submit(slowall, xs, ys, 1, workers=a.address)
y = c.submit(slowall, xs, ys, 2, workers=b.address)
await asyncio.sleep(0.1)

aa.update()
bb.update()
aa.update()
bb.update()

assert len(first(aa.source.data.values())) and len(
first(bb.source.data.values())
)
assert len(first(aa.source.data.values())) and len(first(bb.source.data.values()))


@gen_cluster(client=True)
Expand Down
6 changes: 1 addition & 5 deletions distributed/dashboard/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

from distributed.dashboard.components.worker import (
counters_doc,
crossfilter_doc,
profile_doc,
profile_server_doc,
status_doc,
systemmonitor_doc,
)
from distributed.dashboard.core import BokehApplication

template_variables = {
"pages": ["status", "system", "profile", "crossfilter", "profile-server"]
}
template_variables = {"pages": ["status", "system", "profile", "profile-server"]}


def connect(application, http_server, worker, prefix=""):
Expand All @@ -26,7 +23,6 @@ def connect(application, http_server, worker, prefix=""):
applications = {
"/status": status_doc,
"/counters": counters_doc,
"/crossfilter": crossfilter_doc,
"/system": systemmonitor_doc,
"/profile": profile_doc,
"/profile-server": profile_server_doc,
Expand Down
1 change: 0 additions & 1 deletion docs/source/http_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ Worker HTTP

- ``/status``:
- ``/counters``:
- ``/crossfilter``:
- ``/sitemap.json``: list of available endpoints
- ``/system``:
- ``/health``: check server is alive
Expand Down

0 comments on commit 7e49d88

Please sign in to comment.