Skip to content

Commit

Permalink
Run cluster widget periodic callbacks on the correct event loop (#6444)
Browse files Browse the repository at this point in the history
the PeriodicCallbacks in self.periodic_callbacks need to be started and stopped on the same loop as self.loop because they bind to IOLoop.current() when PeriodicCallback.start is called

When running a synchronous Cluster the loop will be running in a different thread to where cluster._ipython_widget_ is called from
  • Loading branch information
graingert authored May 26, 2022
1 parent 046ab17 commit 5e9e97f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
9 changes: 6 additions & 3 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,13 @@ def update():
cluster_repr_interval = parse_timedelta(
dask.config.get("distributed.deploy.cluster-repr-interval", default="ms")
)
pc = PeriodicCallback(update, cluster_repr_interval * 1000)
self.periodic_callbacks["cluster-repr"] = pc
pc.start()

def install():
pc = PeriodicCallback(update, cluster_repr_interval * 1000)
self.periodic_callbacks["cluster-repr"] = pc
pc.start()

self.loop.add_callback(install)
return tab

def _repr_html_(self, cluster_status=None):
Expand Down
28 changes: 28 additions & 0 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,34 @@ def test_ipywidgets(loop):
assert isinstance(box, ipywidgets.Widget)


def test_ipywidgets_loop(loop):
"""
Previously cluster._ipython_display_ attached the PeriodicCallback to the
currently running loop, See https://github.com/dask/distributed/pull/6444
"""
ipywidgets = pytest.importorskip("ipywidgets")

async def get_ioloop(cluster):
return cluster.periodic_callbacks["cluster-repr"].io_loop

async def amain():
# running synchronous code in an async context to setup a
# IOLoop.current() that's different from cluster.loop
with LocalCluster(
n_workers=0,
silence_logs=False,
loop=loop,
dashboard_address=":0",
processes=False,
) as cluster:
cluster._ipython_display_()
assert cluster.sync(get_ioloop, cluster) is loop
box = cluster._cached_widget
assert isinstance(box, ipywidgets.Widget)

asyncio.run(amain())


def test_no_ipywidgets(loop, monkeypatch):
from unittest.mock import MagicMock

Expand Down

0 comments on commit 5e9e97f

Please sign in to comment.