Skip to content

Commit

Permalink
pythongh-117293: Fix race condition in runtest_mp
Browse files Browse the repository at this point in the history
  • Loading branch information
colesbury committed Mar 27, 2024
1 parent ce00de4 commit 20b21e5
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class MultiprocessResult:
worker_stdout: str | None = None
err_msg: str | None = None

# Sentinel value indicating that a worker thread has exited
worker_exited = object()

ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
Expand Down Expand Up @@ -376,8 +378,8 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
while not self._stopped:
try:
try:
while not self._stopped:
try:
test_name = next(self.pending)
except StopIteration:
Expand All @@ -396,11 +398,12 @@ def run(self) -> None:

if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
except ExitThread:
pass
except BaseException:
self.output.put((True, traceback.format_exc()))
finally:
self.output.put(worker_exited)

def _wait_completed(self) -> None:
popen = self._popen
Expand Down Expand Up @@ -458,6 +461,7 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.log = logger.log
self.display_progress = logger.display_progress
self.results: TestResults = results
self.live_worker_count = 0

self.output: queue.Queue[QueueOutput] = queue.Queue()
tests_iter = runtests.iter_tests()
Expand Down Expand Up @@ -497,6 +501,7 @@ def start_workers(self) -> None:
self.log(msg)
for worker in self.workers:
worker.start()
self.live_worker_count += 1

def stop_workers(self) -> None:
start_time = time.monotonic()
Expand All @@ -511,14 +516,18 @@ def _get_result(self) -> QueueOutput | None:

# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
while any(worker.is_alive() for worker in self.workers):
while self.live_worker_count > 0:
if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)

# wait for a thread
try:
return self.output.get(timeout=PROGRESS_UPDATE)
result = self.output.get(timeout=PROGRESS_UPDATE)
if result is worker_exited:
self.live_worker_count -= 1
continue
return result
except queue.Empty:
pass

Expand All @@ -528,12 +537,6 @@ def _get_result(self) -> QueueOutput | None:
if running:
self.log(running)

# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None

def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
pgo = self.runtests.pgo
Expand Down

0 comments on commit 20b21e5

Please sign in to comment.