Skip to content

Commit

Permalink
queue: make SimpleQueue thread-safe
Browse files Browse the repository at this point in the history
This uses _PyMutex and the PyParkingLot functions to implement a
thread-safe unbouneded MPMC queue.
  • Loading branch information
colesbury committed Apr 23, 2023
1 parent 4450445 commit 7e60a01
Showing 1 changed file with 177 additions and 108 deletions.
285 changes: 177 additions & 108 deletions Modules/_queuemodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "Python.h"
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "structmember.h" // PyMemberDef
#include "parking_lot.h"
#include <stddef.h> // offsetof()

typedef struct {
Expand All @@ -25,10 +26,19 @@ static struct PyModuleDef queuemodule;

typedef struct {
PyObject_HEAD
PyThread_type_lock lock;
int locked;
PyObject *lst;
Py_ssize_t lst_pos;
/* protects all operations on queue */
_PyMutex mutex;
/* number of items in queue */
Py_ssize_t count;
/* offset of where to put next item */
Py_ssize_t put_index;
/* offset of where to take next item */
Py_ssize_t get_index;
/* size of data buffer */
Py_ssize_t buffer_size;
/* array of items with length buffer_size */
PyObject **data;
uintptr_t waiting;
PyObject *weakreflist;
} simplequeueobject;

Expand All @@ -41,7 +51,27 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
static int
simplequeue_clear(simplequeueobject *self)
{
Py_CLEAR(self->lst);
if (self->data) {
PyObject **data = self->data;
Py_ssize_t n = self->count;
Py_ssize_t idx = self->get_index;
Py_ssize_t buffer_size = self->buffer_size;

self->data = NULL;
self->count = 0;
self->put_index = 0;
self->get_index = 0;
self->buffer_size = 0;

for (; n != 0 ; n--) {
Py_DECREF(data[idx]);
idx++;
if (idx == buffer_size) {
idx = 0;
}
}
PyMem_Free(self->data);
}
return 0;
}

Expand All @@ -51,11 +81,8 @@ simplequeue_dealloc(simplequeueobject *self)
PyTypeObject *tp = Py_TYPE(self);

PyObject_GC_UnTrack(self);
if (self->lock != NULL) {
/* Unlock the lock so it's safe to free it */
if (self->locked > 0)
PyThread_release_lock(self->lock);
PyThread_free_lock(self->lock);
if (_PyMutex_is_locked(&self->mutex)) {
Py_FatalError("SimpleQueue: dealloc with locked queue");
}
(void)simplequeue_clear(self);
if (self->weakreflist != NULL)
Expand All @@ -67,7 +94,16 @@ simplequeue_dealloc(simplequeueobject *self)
static int
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
{
Py_VISIT(self->lst);
PyObject **data = self->data;
Py_ssize_t n = self->count;
Py_ssize_t idx = self->get_index;
for (; n != 0 ; n--) {
Py_VISIT(data[idx]);
idx++;
if (idx == self->buffer_size) {
idx = 0;
}
}
Py_VISIT(Py_TYPE(self));
return 0;
}
Expand All @@ -86,25 +122,51 @@ simplequeue_new_impl(PyTypeObject *type)
simplequeueobject *self;

self = (simplequeueobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->weakreflist = NULL;
self->lst = PyList_New(0);
self->lock = PyThread_allocate_lock();
self->lst_pos = 0;
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
return NULL;
}
if (self->lst == NULL) {
Py_DECREF(self);
return NULL;
}
if (!self) {
return NULL;
}

self->weakreflist = NULL;
self->buffer_size = 8;
self->data = PyMem_Malloc(self->buffer_size * sizeof(PyObject*));
if (self->data == NULL) {
Py_DECREF(self);
return NULL;
}
memset(self->data, 0, self->buffer_size * sizeof(PyObject*));
return (PyObject *) self;
}

static int
simplequeue_grow(simplequeueobject *self)
{
Py_ssize_t new_buffer_size = Py_MAX(8, self->buffer_size * 2);
PyObject **new_data = PyMem_Malloc(new_buffer_size * sizeof(PyObject*));
if (!new_data) {
return -1;
}

/* Copy the contiguous "tail" of the old buffer to the beginning
* of the new buffer. */
Py_ssize_t tail_size = self->buffer_size - self->get_index;
if (tail_size > 0) {
memcpy(new_data, self->data + self->get_index, tail_size * sizeof(PyObject*));
}

/* Next copy any elements that wrapped around the old buffer */
Py_ssize_t remaining = self->count - tail_size;
if (remaining > 0) {
memcpy(new_data + tail_size, self->data, remaining * sizeof(PyObject*));
}

PyMem_Free(self->data);
self->data = new_data;
self->buffer_size = new_buffer_size;
self->get_index = 0;
self->put_index = self->count;
return 0;
}

/*[clinic input]
_queue.SimpleQueue.put
item: object
Expand All @@ -123,15 +185,41 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
int block, PyObject *timeout)
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
{
/* BEGIN GIL-protected critical section */
if (PyList_Append(self->lst, item) < 0)
return NULL;
if (self->locked) {
/* A get() may be waiting, wake it up */
self->locked = 0;
PyThread_release_lock(self->lock);
_PyMutex_lock(&self->mutex);

int handoff = 0;
if (self->waiting) {
int more_waiters;
struct wait_entry *waiter;
PyObject **objptr;

/* If there is a waiter, handoff the item directly */
objptr = _PyParkingLot_BeginUnpark(&self->waiting, &waiter, &more_waiters);
if (objptr) {
Py_INCREF(item);
*objptr = item;
handoff = 1;
}
self->waiting = more_waiters;
_PyParkingLot_FinishUnpark(&self->waiting, waiter);
}
/* END GIL-protected critical section */

if (!handoff) {
/* If we didn't handoff the item, add it to the queue */
if (self->count == self->buffer_size && simplequeue_grow(self) < 0) {
_PyMutex_unlock(&self->mutex);
return NULL;
}
Py_INCREF(item);
self->data[self->put_index] = item;
self->put_index++;
self->count++;
if (self->put_index == self->buffer_size) {
self->put_index = 0;
}
}

_PyMutex_unlock(&self->mutex);
Py_RETURN_NONE;
}

Expand All @@ -154,30 +242,12 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
}

static PyObject *
simplequeue_pop_item(simplequeueobject *self)
empty_error(PyTypeObject *cls)
{
Py_ssize_t count, n;
PyObject *item;

n = PyList_GET_SIZE(self->lst);
assert(self->lst_pos < n);

item = PyList_GET_ITEM(self->lst, self->lst_pos);
Py_INCREF(Py_None);
PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
self->lst_pos += 1;
count = n - self->lst_pos;
if (self->lst_pos > count) {
/* The list is more than 50% empty, reclaim space at the beginning */
if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
/* Undo pop */
self->lst_pos -= 1;
PyList_SET_ITEM(self->lst, self->lst_pos, item);
return NULL;
}
self->lst_pos = 0;
}
return item;
PyObject *module = PyType_GetModule(cls);
simplequeue_state *state = simplequeue_get_state(module);
PyErr_SetNone(state->EmptyError);
return NULL;
}

/*[clinic input]
Expand Down Expand Up @@ -206,16 +276,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
{
_PyTime_t endtime = 0;
_PyTime_t timeout;
PyObject *item;
PyLockStatus r;
PY_TIMEOUT_T microseconds;

if (block == 0) {
/* Non-blocking */
microseconds = 0;
}
else if (timeout_obj != Py_None) {
if (block != 0 && timeout_obj != Py_None) {
_PyTime_t timeout;
/* With timeout */
if (_PyTime_FromSecondsObject(&timeout,
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
Expand All @@ -226,7 +288,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
"'timeout' must be a non-negative number");
return NULL;
}
microseconds = _PyTime_AsMicroseconds(timeout,
PY_TIMEOUT_T microseconds = _PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_CEILING);
if (microseconds > PY_TIMEOUT_MAX) {
PyErr_SetString(PyExc_OverflowError,
Expand All @@ -235,54 +297,55 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
}
endtime = _PyDeadline_Init(timeout);
}
else {
/* Infinitely blocking */
microseconds = -1;
}

/* put() signals the queue to be non-empty by releasing the lock.
* So we simply try to acquire the lock in a loop, until the condition
* (queue non-empty) becomes true.
*/
while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
/* First a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
Py_END_ALLOW_THREADS
for (;;) {
PyObject *item = NULL;

_PyMutex_lock(&self->mutex);
if (self->count > 0) {
item = self->data[self->get_index];
self->data[self->get_index] = NULL;

self->count--;
self->get_index++;
if (self->get_index == self->buffer_size) {
self->get_index = 0;
}
}
else {
_Py_atomic_store_uintptr_relaxed(&self->waiting, 1);
}
_PyMutex_unlock(&self->mutex);

if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
return NULL;
if (item) {
return item;
}
if (r == PY_LOCK_FAILURE) {
PyObject *module = PyType_GetModule(cls);
simplequeue_state *state = simplequeue_get_state(module);
/* Timed out */
PyErr_SetNone(state->EmptyError);
return NULL;

if (!block) {
return empty_error(cls);
}
self->locked = 1;

/* Adjust timeout for next iteration (if any) */
if (microseconds > 0) {
timeout = _PyDeadline_Get(endtime);
microseconds = _PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_CEILING);
int64_t timeout_ns = -1;
if (endtime != 0) {
timeout_ns = _PyDeadline_Get(endtime);
if (timeout_ns < 0) {
return empty_error(cls);
}
}
}

/* BEGIN GIL-protected critical section */
assert(self->lst_pos < PyList_GET_SIZE(self->lst));
item = simplequeue_pop_item(self);
if (self->locked) {
PyThread_release_lock(self->lock);
self->locked = 0;
int ret = _PyParkingLot_Park(&self->waiting, 1, &item, timeout_ns);
if (ret == PY_PARK_OK) {
assert(item);
return item;
}
else if (ret == PY_PARK_INTR && Py_MakePendingCalls() < 0) {
/* interrupted */
return NULL;
}
else if (ret == PY_PARK_TIMEOUT) {
return empty_error(cls);
}
}
/* END GIL-protected critical section */

return item;
}

/*[clinic input]
Expand Down Expand Up @@ -315,7 +378,10 @@ static int
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
{
return self->lst_pos == PyList_GET_SIZE(self->lst);
_PyMutex_lock(&self->mutex);
int empty = self->count == 0;
_PyMutex_unlock(&self->mutex);
return empty;
}

/*[clinic input]
Expand All @@ -328,7 +394,10 @@ static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
{
return PyList_GET_SIZE(self->lst) - self->lst_pos;
_PyMutex_lock(&self->mutex);
Py_ssize_t qsize = self->count;
_PyMutex_unlock(&self->mutex);
return qsize;
}

static int
Expand Down

0 comments on commit 7e60a01

Please sign in to comment.