Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-117293: Fix race condition in run_workers.py #117298

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ class MultiprocessResult:
err_msg: str | None = None


# Indicates that a worker thread has exited
class WorkerExited:
colesbury marked this conversation as resolved.
Show resolved Hide resolved
pass
colesbury marked this conversation as resolved.
Show resolved Hide resolved

ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
QueueContent = QueueOutput | WorkerExited


class ExitThread(Exception):
Expand Down Expand Up @@ -376,8 +381,8 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
while not self._stopped:
try:
try:
while not self._stopped:
try:
test_name = next(self.pending)
except StopIteration:
Expand All @@ -396,11 +401,12 @@ def run(self) -> None:

if mp_result.result.must_stop(fail_fast, fail_env_changed):
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
except ExitThread:
pass
except BaseException:
self.output.put((True, traceback.format_exc()))
finally:
self.output.put(WorkerExited())

def _wait_completed(self) -> None:
popen = self._popen
Expand Down Expand Up @@ -458,8 +464,9 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.log = logger.log
self.display_progress = logger.display_progress
self.results: TestResults = results
self.live_worker_count = 0

self.output: queue.Queue[QueueOutput] = queue.Queue()
self.output: queue.Queue[QueueContent] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
self.timeout = runtests.timeout
Expand Down Expand Up @@ -497,6 +504,7 @@ def start_workers(self) -> None:
self.log(msg)
for worker in self.workers:
worker.start()
self.live_worker_count += 1

def stop_workers(self) -> None:
start_time = time.monotonic()
Expand All @@ -511,14 +519,18 @@ def _get_result(self) -> QueueOutput | None:

# bpo-46205: check the status of workers every iteration to avoid
# waiting forever on an empty queue.
while any(worker.is_alive() for worker in self.workers):
while self.live_worker_count > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker.is_alive() is alive is used in other places, I would prefer to use the same logic to list alive workers in all places. I would prefer that you remove this live_worker_count attribute.

If any(worker.is_alive() for worker in self.workers) is inefficient, we can design something else, like a list of alive workers and trim this list when a worker exits. But there number of workers should be less than 1000, so it should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what design you'd like. We can't continue to rely on the is_alive() method for the termination condition because that is the source of the race condition: it may happen before or after the processing of the results queue.

There are only two other uses of is_alive(), but it makes sense in those places, but doesn't make sense here.

In __repr__, where use it for displaying the worker status.

if self.is_alive():
info.append("running")

In wait_stopped() it's paired with join(). Only is_alive() makes sense here because of the use of join().

self.join(1.0)
if not self.is_alive():
break

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can ignore my comment, your change is fine.

I'm thinking at get_running() function, but I was wrong: this function doesn't call the is_alive() method but only relies on worker.test_name attribute to decide if a thread is "running" or not. There is already a nested finally: self.test_name = None which makes sure that the attribute is cleared in all cases.

if use_faulthandler:
faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
exit=True)

# wait for a thread
try:
return self.output.get(timeout=PROGRESS_UPDATE)
result = self.output.get(timeout=PROGRESS_UPDATE)
if isinstance(result, WorkerExited):
self.live_worker_count -= 1
continue
return result
except queue.Empty:
pass

Expand All @@ -528,12 +540,6 @@ def _get_result(self) -> QueueOutput | None:
if running:
self.log(running)

# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None

def display_result(self, mp_result: MultiprocessResult) -> None:
result = mp_result.result
pgo = self.runtests.pgo
Expand Down
Loading