From e857c0147b4a316865d1c770ac31ae6dcc207e33 Mon Sep 17 00:00:00 2001 From: WeizhongX <77710146+WeizhongX@users.noreply.github.com> Date: Fri, 24 Jun 2022 08:46:49 -0700 Subject: [PATCH] Get the elements from queue in block mode (#34333) * Get the elements from queue in block mode Previously there are cases a worker get no more tests to run and quit early where there are still groups in the queue. This is because poll from Queue in unblock mode is not safe. Now poll in block mode and add a sentinal for each worker at the end of queue. --- tools/wptrunner/wptrunner/testloader.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tools/wptrunner/wptrunner/testloader.py b/tools/wptrunner/wptrunner/testloader.py index 9daa9a2136d6c4..ed290ae96426e3 100644 --- a/tools/wptrunner/wptrunner/testloader.py +++ b/tools/wptrunner/wptrunner/testloader.py @@ -366,6 +366,9 @@ def __init__(self, test_queue): self.test_queue = test_queue self.current_group = None self.current_metadata = None + self.logger = structured.get_default_logger() + if self.logger is None: + self.logger = structured.structuredlog.StructuredLogger("TestSource") @abstractmethod #@classmethod (doesn't compose with @abstractmethod in < 3.3) @@ -383,11 +386,18 @@ def group_metadata(cls, state): def group(self): if not self.current_group or len(self.current_group) == 0: try: - self.current_group, self.current_metadata = self.test_queue.get(block=False) + self.current_group, self.current_metadata = self.test_queue.get(block=True, timeout=5) except Empty: + self.logger.warning("Timed out getting test group from queue") return None, None return self.current_group, self.current_metadata + @classmethod + def add_sentinal(cls, test_queue, num_of_workers): + # add one sentinal for each worker + for _ in range(num_of_workers): + test_queue.put((None, None)) + class GroupedSource(TestSource): @classmethod @@ -413,6 +423,7 @@ def make_queue(cls, tests, **kwargs): for item in groups: test_queue.put(item) + cls.add_sentinal(test_queue, kwargs["processes"]) return test_queue @classmethod @@ -444,6 +455,7 @@ def make_queue(cls, tests, **kwargs): for item in zip(queues, metadatas): test_queue.put(item) + cls.add_sentinal(test_queue, kwargs["processes"]) return test_queue @@ -489,6 +501,8 @@ def make_queue(cls, tests, **kwargs): test_queue.put((group, group_metadata)) + cls.add_sentinal(test_queue, kwargs["processes"]) + return test_queue @classmethod