From 5e9e97fe39b341061849e370cd386ca1a4447d17 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 26 May 2022 12:48:49 +0100 Subject: [PATCH] Run cluster widget periodic callbacks on the correct event loop (#6444) the PeriodicCallbacks in self.periodic_callbacks need to be started and stopped on the same loop as self.loop because they bind to IOLoop.current() when PeriodicCallback.start is called When running a synchronous Cluster the loop will be running in a different thread to where cluster._ipython_widget_ is called from --- distributed/deploy/cluster.py | 9 ++++++--- distributed/deploy/tests/test_local.py | 28 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 13e81f5f82d..d2d0da82ae1 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -427,10 +427,13 @@ def update(): cluster_repr_interval = parse_timedelta( dask.config.get("distributed.deploy.cluster-repr-interval", default="ms") ) - pc = PeriodicCallback(update, cluster_repr_interval * 1000) - self.periodic_callbacks["cluster-repr"] = pc - pc.start() + def install(): + pc = PeriodicCallback(update, cluster_repr_interval * 1000) + self.periodic_callbacks["cluster-repr"] = pc + pc.start() + + self.loop.add_callback(install) return tab def _repr_html_(self, cluster_status=None): diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 85c1f21b30e..781dc29a131 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -582,6 +582,34 @@ def test_ipywidgets(loop): assert isinstance(box, ipywidgets.Widget) +def test_ipywidgets_loop(loop): + """ + Previously cluster._ipython_display_ attached the PeriodicCallback to the + currently running loop, See https://github.com/dask/distributed/pull/6444 + """ + ipywidgets = pytest.importorskip("ipywidgets") + + async def get_ioloop(cluster): + return cluster.periodic_callbacks["cluster-repr"].io_loop + + async def amain(): + # running synchronous code in an async context to setup a + # IOLoop.current() that's different from cluster.loop + with LocalCluster( + n_workers=0, + silence_logs=False, + loop=loop, + dashboard_address=":0", + processes=False, + ) as cluster: + cluster._ipython_display_() + assert cluster.sync(get_ioloop, cluster) is loop + box = cluster._cached_widget + assert isinstance(box, ipywidgets.Widget) + + asyncio.run(amain()) + + def test_no_ipywidgets(loop, monkeypatch): from unittest.mock import MagicMock