diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index c03c859baa7..a13392b8080 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -60,7 +60,9 @@ def parent_process(): def _cleanup(): # check for processes which have finished - for p in list(_children): + # FIXME(sgross): set.copy() is effectively atomic, but list(set) is + # not. Revert to list(_children) when that that's properly atomic. + for p in _children.copy(): if p._popen.poll() is not None: _children.discard(p) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index ea369507297..156820fad44 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -20,6 +20,7 @@ import sys import threading import warnings +import _thread from . import spawn from . import util @@ -54,7 +55,7 @@ class ResourceTracker(object): def __init__(self): - self._lock = threading.Lock() + self._lock = _thread.CriticalLock() self._fd = None self._pid = None diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 6ee0d33e88a..d2b92d9e3db 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -175,6 +175,8 @@ def register_after_fork(obj, func): # Finalization using weakrefs # +import _thread +_finalizer_registry_lock = _thread.CriticalLock() _finalizer_registry = {} _finalizer_counter = itertools.count() @@ -200,7 +202,8 @@ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): self._key = (exitpriority, next(_finalizer_counter)) self._pid = os.getpid() - _finalizer_registry[self._key] = self + with _finalizer_registry_lock: + _finalizer_registry[self._key] = self def __call__(self, wr=None, # Need to bind these locally because the globals can have @@ -211,7 +214,8 @@ def __call__(self, wr=None, Run the callback unless it has already been called or cancelled ''' try: - del _finalizer_registry[self._key] + with _finalizer_registry_lock: + del _finalizer_registry[self._key] except KeyError: sub_debug('finalizer no longer registered') else: @@ -231,7 +235,8 @@ def cancel(self): Cancel finalization of the object ''' try: - del _finalizer_registry[self._key] + with _finalizer_registry_lock: + del _finalizer_registry[self._key] except KeyError: pass else: @@ -242,7 +247,8 @@ def still_active(self): ''' Return whether this finalizer is still waiting to invoke callback ''' - return self._key in _finalizer_registry + with _finalizer_registry_lock: + return self._key in _finalizer_registry def __repr__(self): try: @@ -284,15 +290,18 @@ def _run_finalizers(minpriority=None): f = lambda p : p[0] is not None and p[0] >= minpriority # Careful: _finalizer_registry may be mutated while this function + with _finalizer_registry_lock: + registry = list(_finalizer_registry) # is running (either by a GC run or by another thread). # list(_finalizer_registry) should be atomic, while # list(_finalizer_registry.items()) is not. - keys = [key for key in list(_finalizer_registry) if f(key)] + keys = [key for key in registry if f(key)] keys.sort(reverse=True) for key in keys: - finalizer = _finalizer_registry.get(key) + with _finalizer_registry_lock: + finalizer = _finalizer_registry.get(key) # key may have been removed from the registry if finalizer is not None: sub_debug('calling %s', finalizer) @@ -303,7 +312,8 @@ def _run_finalizers(minpriority=None): traceback.print_exc() if minpriority is None: - _finalizer_registry.clear() + with _finalizer_registry_lock: + _finalizer_registry.clear() # # Clean up on exit