From 43449f890d550bee35a6c400e6f9b4f048f0a1f9 Mon Sep 17 00:00:00 2001 From: Aleksandr Mezin Date: Thu, 9 Mar 2023 14:31:13 +0200 Subject: [PATCH] Fix hang caused by `steal` command (`worksteal` scheduler) Fixes #884 --- changelog/884.bugfix | 1 + src/xdist/remote.py | 15 +++++++++++++-- testing/test_remote.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 changelog/884.bugfix diff --git a/changelog/884.bugfix b/changelog/884.bugfix new file mode 100644 index 00000000..f49c131f --- /dev/null +++ b/changelog/884.bugfix @@ -0,0 +1 @@ +Fix hang caused by `steal` command (`worksteal` scheduler). \ No newline at end of file diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 2e83a8dc..d80400ec 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -58,6 +58,7 @@ def worker_title(title): class WorkerInteractor: SHUTDOWN_MARK = object() + RETRY_MARK = object() def __init__(self, config, channel): self.config = config @@ -72,6 +73,12 @@ def __init__(self, config, channel): def _make_queue(self): return self.channel.gateway.execmodel.queue.Queue() + def _get_next_item_index(self): + result = self.RETRY_MARK + while result is self.RETRY_MARK: + result = self.torun.get() + return result + def sendevent(self, name, **kwargs): self.log("sending", name, kwargs) self.channel.send((name, kwargs)) @@ -136,19 +143,23 @@ def old_queue_get_nowait_noraise(): self.torun.put(i) self.sendevent("unscheduled", indices=stolen) + old_queue.put(self.RETRY_MARK) @pytest.hookimpl def pytest_runtestloop(self, session): self.log("entering main loop") self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK) - self.nextitem_index = self.torun.get() + self.nextitem_index = self._get_next_item_index() while self.nextitem_index is not self.SHUTDOWN_MARK: self.run_one_test() return True def run_one_test(self): items = self.session.items - self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get() + self.item_index, self.nextitem_index = ( + self.nextitem_index, + self._get_next_item_index(), + ) item = items[self.item_index] if self.nextitem_index is self.SHUTDOWN_MARK: nextitem = None diff --git a/testing/test_remote.py b/testing/test_remote.py index cb8f6b7f..72db78ce 100644 --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -271,6 +271,40 @@ def test_func4(): pass ev = worker.popevent("workerfinished") assert "workeroutput" in ev.kwargs + def test_steal_failed(self, worker: WorkerSetup, unserialize_report) -> None: + worker.pytester.makepyfile( + """ + def test_func(): pass + def test_func2(): pass + """ + ) + worker.setup() + ev = worker.popevent("collectionfinish") + ids = ev.kwargs["ids"] + assert len(ids) == 2 + worker.sendcommand("runtests_all") + + for when in ["setup", "call", "teardown"]: + ev = worker.popevent("testreport") + rep = unserialize_report(ev.kwargs["data"]) + assert rep.nodeid.endswith("::test_func") + assert rep.when == when + + worker.sendcommand("steal", indices=[0, 1]) + ev = worker.popevent("unscheduled") + assert ev.kwargs["indices"] == [] + + worker.sendcommand("shutdown") + + for when in ["setup", "call", "teardown"]: + ev = worker.popevent("testreport") + rep = unserialize_report(ev.kwargs["data"]) + assert rep.nodeid.endswith("::test_func2") + assert rep.when == when + + ev = worker.popevent("workerfinished") + assert "workeroutput" in ev.kwargs + def test_remote_env_vars(pytester: pytest.Pytester) -> None: pytester.makepyfile(