Skip to content

Commit

Permalink
pythongh-111964: Implement stop-the-world pauses
Browse files Browse the repository at this point in the history
The `--disable-gil` builds occasionally need to pause all but one thread. Some
examples include:

* Cyclic garbage collection, where this is often called a "stop the world event"
* Before calling `fork()`, to ensure a consistent state for internal data structures
* During interpreter shutdown, to ensure that daemon threads aren't accessing Python objects

This adds the following functions to implement global and per-interpreter pauses:

* `_PyRuntimeState_StopTheWorld` and `_PyRuntimeState_StartTheWorld`
* `_PyInterpreterState_StopTheWorld` and `_PyInterpreterState_StartTheWorld`

These functions are no-ops outside of the `--disable-gil` build.

This also adds `_PyRWMutex`, a "readers-writer" lock, which is used to
serialize global stop-the-world pauses with per-interpreter pauses.
  • Loading branch information
colesbury committed Nov 27, 2023
1 parent ffe1b2d commit db1dd03
Show file tree
Hide file tree
Showing 12 changed files with 520 additions and 18 deletions.
1 change: 1 addition & 0 deletions Include/internal/pycore_ceval.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ void _PyEval_FrameClearAndPop(PyThreadState *tstate, _PyInterpreterFrame *frame)
#define _PY_CALLS_TO_DO_BIT 2
#define _PY_ASYNC_EXCEPTION_BIT 3
#define _PY_GC_SCHEDULED_BIT 4
#define _PY_EVAL_PLEASE_STOP_BIT 5

/* Reserve a few bits for future use */
#define _PY_EVAL_EVENTS_BITS 8
Expand Down
17 changes: 17 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ struct _Py_long_state {
int max_str_digits;
};

// Support for stop-the-world events. This exists in both the PyRuntime struct
// for global pauses and in each PyInterpreterState for per-interpreter pauses.
struct _stoptheworld_state {
PyMutex mutex; // Serializes stop-the-world attempts.

// NOTE: The below fields are protected by HEAD_LOCK(runtime), not by the
// above mutex.
bool requested; // Set when a pause is requested.
bool world_stopped; // Set when the world is stopped.
bool is_global; // Set when contained in PyRuntime struct.

PyEvent stop_event; // Set when thread_countdown reaches zero.
Py_ssize_t thread_countdown; // Number of threads that must pause.

PyThreadState *requester; // Thread that requested the pause (may be NULL).
};

/* cross-interpreter data registry */

Expand Down Expand Up @@ -164,6 +180,7 @@ struct _is {

struct _warnings_runtime_state warnings;
struct atexit_state atexit;
struct _stoptheworld_state stoptheworld;

struct _obmalloc_state obmalloc;

Expand Down
24 changes: 24 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,30 @@ _PyOnceFlag_CallOnce(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg)
return _PyOnceFlag_CallOnceSlow(flag, fn, arg);
}

// A readers-writer (RW) lock. The lock supports multiple concurrent readers or
// a single writer. The lock is write-preferring: if a writer is waiting, then
// new readers will be blocked. This avoids starvation of writers.
//
// The low two bits store whether the lock is write-locked (_Py_LOCKED) and
// whether there are parked threads (_Py_HAS_PARKED). The remaining bits are
// used to store the number of readers.
//
// The design is optimized for simplicity of the implementation. The lock is
// not fair: if fairness is desired, use an additional PyMutex to serialize
// writers. The lock is also not reentrant.
typedef struct {
uintptr_t bits;
} _PyRWMutex;

// Read lock
PyAPI_FUNC(void) _PyRWMutex_RLock(_PyRWMutex *rwmutex);
PyAPI_FUNC(void) _PyRWMutex_RUnlock(_PyRWMutex *rwmutex);

// Write lock
PyAPI_FUNC(void) _PyRWMutex_Lock(_PyRWMutex *rwmutex);
PyAPI_FUNC(void) _PyRWMutex_Unlock(_PyRWMutex *rwmutex);


#ifdef __cplusplus
}
#endif
Expand Down
28 changes: 26 additions & 2 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,37 @@ _PyThreadState_GET(void)
//
// High-level code should generally call PyEval_RestoreThread() instead, which
// calls this function.
void _PyThreadState_Attach(PyThreadState *tstate);
extern void _PyThreadState_Attach(PyThreadState *tstate);

// Detaches the current thread from the interpreter.
//
// High-level code should generally call PyEval_SaveThread() instead, which
// calls this function.
void _PyThreadState_Detach(PyThreadState *tstate);
extern void _PyThreadState_Detach(PyThreadState *tstate);

// Temporarily pauses the thread in the GC state.
//
// This is used to implement stop-the-world pauses. The thread must be in the
// "attached" state. It will switch to the "GC" state and pause until the
// stop-the-world event completes, after which it will switch back to the
// "attached" state.
extern void _PyThreadState_Park(PyThreadState *tstate);

// Perform a stop-the-world pause for all threads in the all interpreters.
//
// Threads in the "attached" state are paused and transitioned to the "GC"
// state. Threads in the "detached" state switch to the "GC" state, preventing
// them from reattaching until the stop-the-world pause is complete.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyRuntimeState_StopTheWorld(_PyRuntimeState *runtime);
extern void _PyRuntimeState_StartTheWorld(_PyRuntimeState *runtime);

// Perform a stop-the-world pause for threads in the specified interpreter.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyInterpreterState_StopTheWorld(PyInterpreterState *interp);
extern void _PyInterpreterState_StartTheWorld(PyInterpreterState *interp);


static inline void
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ typedef struct pyruntimestate {
struct _faulthandler_runtime_state faulthandler;
struct _tracemalloc_runtime_state tracemalloc;

_PyRWMutex stoptheworld_mutex;
struct _stoptheworld_state stoptheworld;

PyPreConfig preconfig;

// Audit values must be preserved when Py_Initialize()/Py_Finalize()
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ extern PyTypeObject _PyExc_MemoryError;
}, \
.faulthandler = _faulthandler_runtime_state_INIT, \
.tracemalloc = _tracemalloc_runtime_state_INIT, \
.stoptheworld = { \
.is_global = 1, \
}, \
.float_state = { \
.float_format = _py_float_format_unknown, \
.double_format = _py_float_format_unknown, \
Expand Down
3 changes: 3 additions & 0 deletions Include/pymacro.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@
Py_FatalError("Unreachable C code path reached")
#endif

#define _Py_CONTAINER_OF(ptr, type, member) \
(type*)((char*)ptr - offsetof(type, member))

// Prevent using an expression as a l-value.
// For example, "int x; _Py_RVALUE(x) = 1;" fails with a compiler error.
#define _Py_RVALUE(EXPR) ((void)0, (EXPR))
Expand Down
93 changes: 93 additions & 0 deletions Modules/_testinternalcapi/test_lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,98 @@ test_lock_once(PyObject *self, PyObject *obj)
Py_RETURN_NONE;
}

struct test_rwlock_data {
Py_ssize_t nthreads;
_PyRWMutex rw;
PyEvent step1;
PyEvent step2;
PyEvent step3;
PyEvent done;
};

static void
rdlock_thread(void *arg)
{
struct test_rwlock_data *test_data = arg;

// Acquire the lock in read mode
_PyRWMutex_RLock(&test_data->rw);
PyEvent_Wait(&test_data->step1);
_PyRWMutex_RUnlock(&test_data->rw);

_PyRWMutex_RLock(&test_data->rw);
PyEvent_Wait(&test_data->step3);
_PyRWMutex_RUnlock(&test_data->rw);

if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) {
_PyEvent_Notify(&test_data->done);
}
}
static void
wrlock_thread(void *arg)
{
struct test_rwlock_data *test_data = arg;

// First acquire the lock in write mode
_PyRWMutex_Lock(&test_data->rw);
PyEvent_Wait(&test_data->step2);
_PyRWMutex_Unlock(&test_data->rw);

if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) {
_PyEvent_Notify(&test_data->done);
}
}

static void
wait_until(uintptr_t *ptr, uintptr_t value)
{
// wait up to two seconds for *ptr == value
int iters = 0;
uintptr_t bits;
do {
pysleep(10);
bits = _Py_atomic_load_uintptr(ptr);
iters++;
} while (bits != value && iters < 200);
}

static PyObject *
test_lock_rwlock(PyObject *self, PyObject *obj)
{
struct test_rwlock_data test_data = {.nthreads = 3};

// Start two readers
PyThread_start_new_thread(rdlock_thread, &test_data);
PyThread_start_new_thread(rdlock_thread, &test_data);

// wait up to two seconds for the threads to attempt to read-lock "rw"
wait_until(&test_data.rw.bits, 8);
assert(test_data.rw.bits == 8);

// start writer (while readers hold lock)
PyThread_start_new_thread(wrlock_thread, &test_data);
wait_until(&test_data.rw.bits, 10);
assert(test_data.rw.bits == 10);

// readers release lock, writer should acquire it
_PyEvent_Notify(&test_data.step1);
wait_until(&test_data.rw.bits, 3);
assert(test_data.rw.bits == 3);

// writer releases lock, readers acquire it
_PyEvent_Notify(&test_data.step2);
wait_until(&test_data.rw.bits, 8);
assert(test_data.rw.bits == 8);

// readers release lock again
_PyEvent_Notify(&test_data.step3);
wait_until(&test_data.rw.bits, 0);
assert(test_data.rw.bits == 0);

PyEvent_Wait(&test_data.done);
Py_RETURN_NONE;
}

static PyMethodDef test_methods[] = {
{"test_lock_basic", test_lock_basic, METH_NOARGS},
{"test_lock_two_threads", test_lock_two_threads, METH_NOARGS},
Expand All @@ -380,6 +472,7 @@ static PyMethodDef test_methods[] = {
_TESTINTERNALCAPI_BENCHMARK_LOCKS_METHODDEF
{"test_lock_benchmark", test_lock_benchmark, METH_NOARGS},
{"test_lock_once", test_lock_once, METH_NOARGS},
{"test_lock_rwlock", test_lock_rwlock, METH_NOARGS},
{NULL, NULL} /* sentinel */
};

Expand Down
6 changes: 6 additions & 0 deletions Python/ceval_gil.c
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,12 @@ _Py_HandlePending(PyThreadState *tstate)
{
PyInterpreterState *interp = tstate->interp;

/* Pending signals */
if (_Py_eval_breaker_bit_is_set(interp, _PY_EVAL_PLEASE_STOP_BIT)) {
_Py_set_eval_breaker_bit(interp, _PY_EVAL_PLEASE_STOP_BIT, 0);
_PyThreadState_Park(tstate);
}

/* Pending signals */
if (_Py_eval_breaker_bit_is_set(interp, _PY_SIGNALS_PENDING_BIT)) {
if (handle_signals(tstate) != 0) {
Expand Down
86 changes: 86 additions & 0 deletions Python/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,89 @@ _PyOnceFlag_CallOnceSlow(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg)
v = _Py_atomic_load_uint8(&flag->v);
}
}

#define _PyRWMutex_READER_SHIFT 2

void
_PyRWMutex_RLock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
for (;;) {
// If the lock is not write-locked and there is no writer waiting, then
// we can increment the reader count.
if ((bits & (_Py_LOCKED|_Py_HAS_PARKED)) == 0) {
uintptr_t newval = bits + (1 << _PyRWMutex_READER_SHIFT);
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits, newval)) {
continue;
}
return;
}

// Set _Py_HAS_PARKED if it's not already set.
if ((bits & _Py_HAS_PARKED) == 0) {
uintptr_t newval = bits | _Py_HAS_PARKED;
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits, newval)) {
continue;
}
bits = newval;
}

_PyParkingLot_Park(&rwmutex->bits, &bits, sizeof(bits), -1, NULL, 1);
bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
}
}

void
_PyRWMutex_RUnlock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_add_uintptr(&rwmutex->bits, -(1 << _PyRWMutex_READER_SHIFT));
bits -= (1 << _PyRWMutex_READER_SHIFT);

if ((bits >> _PyRWMutex_READER_SHIFT) == 0 && (bits & _Py_HAS_PARKED)) {
_PyParkingLot_UnparkAll(&rwmutex->bits);
return;
}
}

void
_PyRWMutex_Lock(_PyRWMutex *rwmutex)
{
uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
for (;;) {
// If there are no active readers and it's not already write-locked,
// then we can grab the lock.
if ((bits & ~_Py_HAS_PARKED) == 0) {
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits,
bits | _Py_LOCKED)) {
return;
}
continue;
}

if (!(bits & _Py_HAS_PARKED)) {
if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits,
&bits,
bits | _Py_HAS_PARKED)) {
continue;
}
bits |= _Py_HAS_PARKED;
}

_PyParkingLot_Park(&rwmutex->bits, &bits, sizeof(bits), -1, NULL, 1);
bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits);
}
}

void
_PyRWMutex_Unlock(_PyRWMutex *rwmutex)
{
uintptr_t old_bits = _Py_atomic_exchange_uintptr(&rwmutex->bits, 0);
assert(old_bits >> _PyRWMutex_READER_SHIFT == 0);

if ((old_bits & _Py_HAS_PARKED) != 0) {
_PyParkingLot_UnparkAll(&rwmutex->bits);
}
}
9 changes: 7 additions & 2 deletions Python/parking_lot.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ _PySemaphore_PlatformWait(_PySemaphore *sema, _PyTime_t timeout)
if (timeout >= 0) {
struct timespec ts;

# ifdef HAVE_SEM_CLOCKWAIT
_PyTime_t deadline = _PyDeadline_Init(timeout);
_PyTime_AsTimespec_clamp(deadline, &ts);
err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
# else
_PyTime_t deadline = _PyTime_Add(_PyTime_GetSystemClock(), timeout);
_PyTime_AsTimespec(deadline, &ts);

_PyTime_AsTimespec_clamp(deadline, &ts);
err = sem_timedwait(&sema->platform_sem, &ts);
#endif
}
else {
err = sem_wait(&sema->platform_sem);
Expand Down
Loading

0 comments on commit db1dd03

Please sign in to comment.