Skip to content

Commit

Permalink
Get the elements from queue in block mode (#34333)
Browse files Browse the repository at this point in the history
* Get the elements from queue in block mode

Previously there are cases a worker get no more tests to run
and quit early where there are still groups in the queue. This
is because poll from Queue in unblock mode is not safe.

Now poll in block mode and add a sentinal for each worker at
the end of queue.
  • Loading branch information
WeizhongX committed Jun 24, 2022
1 parent 808d59e commit e857c01
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion tools/wptrunner/wptrunner/testloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ def __init__(self, test_queue):
self.test_queue = test_queue
self.current_group = None
self.current_metadata = None
self.logger = structured.get_default_logger()
if self.logger is None:
self.logger = structured.structuredlog.StructuredLogger("TestSource")

@abstractmethod
#@classmethod (doesn't compose with @abstractmethod in < 3.3)
Expand All @@ -383,11 +386,18 @@ def group_metadata(cls, state):
def group(self):
if not self.current_group or len(self.current_group) == 0:
try:
self.current_group, self.current_metadata = self.test_queue.get(block=False)
self.current_group, self.current_metadata = self.test_queue.get(block=True, timeout=5)
except Empty:
self.logger.warning("Timed out getting test group from queue")
return None, None
return self.current_group, self.current_metadata

@classmethod
def add_sentinal(cls, test_queue, num_of_workers):
# add one sentinal for each worker
for _ in range(num_of_workers):
test_queue.put((None, None))


class GroupedSource(TestSource):
@classmethod
Expand All @@ -413,6 +423,7 @@ def make_queue(cls, tests, **kwargs):

for item in groups:
test_queue.put(item)
cls.add_sentinal(test_queue, kwargs["processes"])
return test_queue

@classmethod
Expand Down Expand Up @@ -444,6 +455,7 @@ def make_queue(cls, tests, **kwargs):

for item in zip(queues, metadatas):
test_queue.put(item)
cls.add_sentinal(test_queue, kwargs["processes"])

return test_queue

Expand Down Expand Up @@ -489,6 +501,8 @@ def make_queue(cls, tests, **kwargs):

test_queue.put((group, group_metadata))

cls.add_sentinal(test_queue, kwargs["processes"])

return test_queue

@classmethod
Expand Down

0 comments on commit e857c01

Please sign in to comment.