Skip to content

Commit

Permalink
multiprocessing: use CriticalLock in a few places
Browse files Browse the repository at this point in the history
This uses _thread.CriticalLock to prevent garbage collection while
modifying the finalizer registry and in the resource tracker.

There is a potential deadlock when checking if the resource tracker is
running. This exists in upstream CPython, but occurs much more frequently
with biased reference counting. The call to _check_alive can trigger a
garbage collection that collects a multiprocessing lock. The finalizer
for that lock calls back into resource tracker leading to a deadlock.

This is rare in upstream CPython because most function calls do not
trigger a garbage collection.
  • Loading branch information
colesbury committed Apr 23, 2023
1 parent 464b96f commit cfb6ed1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
4 changes: 3 additions & 1 deletion Lib/multiprocessing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def parent_process():

def _cleanup():
# check for processes which have finished
for p in list(_children):
# FIXME(sgross): set.copy() is effectively atomic, but list(set) is
# not. Revert to list(_children) when that that's properly atomic.
for p in _children.copy():
if p._popen.poll() is not None:
_children.discard(p)

Expand Down
3 changes: 2 additions & 1 deletion Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
import threading
import warnings
import _thread

from . import spawn
from . import util
Expand Down Expand Up @@ -54,7 +55,7 @@
class ResourceTracker(object):

def __init__(self):
self._lock = threading.Lock()
self._lock = _thread.CriticalLock()
self._fd = None
self._pid = None

Expand Down
24 changes: 17 additions & 7 deletions Lib/multiprocessing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def register_after_fork(obj, func):
# Finalization using weakrefs
#

import _thread
_finalizer_registry_lock = _thread.CriticalLock()
_finalizer_registry = {}
_finalizer_counter = itertools.count()

Expand All @@ -200,7 +202,8 @@ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
self._key = (exitpriority, next(_finalizer_counter))
self._pid = os.getpid()

_finalizer_registry[self._key] = self
with _finalizer_registry_lock:
_finalizer_registry[self._key] = self

def __call__(self, wr=None,
# Need to bind these locally because the globals can have
Expand All @@ -211,7 +214,8 @@ def __call__(self, wr=None,
Run the callback unless it has already been called or cancelled
'''
try:
del _finalizer_registry[self._key]
with _finalizer_registry_lock:
del _finalizer_registry[self._key]
except KeyError:
sub_debug('finalizer no longer registered')
else:
Expand All @@ -231,7 +235,8 @@ def cancel(self):
Cancel finalization of the object
'''
try:
del _finalizer_registry[self._key]
with _finalizer_registry_lock:
del _finalizer_registry[self._key]
except KeyError:
pass
else:
Expand All @@ -242,7 +247,8 @@ def still_active(self):
'''
Return whether this finalizer is still waiting to invoke callback
'''
return self._key in _finalizer_registry
with _finalizer_registry_lock:
return self._key in _finalizer_registry

def __repr__(self):
try:
Expand Down Expand Up @@ -284,15 +290,18 @@ def _run_finalizers(minpriority=None):
f = lambda p : p[0] is not None and p[0] >= minpriority

# Careful: _finalizer_registry may be mutated while this function
with _finalizer_registry_lock:
registry = list(_finalizer_registry)
# is running (either by a GC run or by another thread).

# list(_finalizer_registry) should be atomic, while
# list(_finalizer_registry.items()) is not.
keys = [key for key in list(_finalizer_registry) if f(key)]
keys = [key for key in registry if f(key)]
keys.sort(reverse=True)

for key in keys:
finalizer = _finalizer_registry.get(key)
with _finalizer_registry_lock:
finalizer = _finalizer_registry.get(key)
# key may have been removed from the registry
if finalizer is not None:
sub_debug('calling %s', finalizer)
Expand All @@ -303,7 +312,8 @@ def _run_finalizers(minpriority=None):
traceback.print_exc()

if minpriority is None:
_finalizer_registry.clear()
with _finalizer_registry_lock:
_finalizer_registry.clear()

#
# Clean up on exit
Expand Down

0 comments on commit cfb6ed1

Please sign in to comment.