Skip to content

Commit

Permalink
Merge pull request #2 from davidhewitt/new-nativetypes
Browse files Browse the repository at this point in the history
Thread-safe release pools
  • Loading branch information
kngwyu authored May 2, 2020
2 parents 39bfb5f + dfbe22b commit f2b347a
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 102 deletions.
209 changes: 117 additions & 92 deletions src/gil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! Interaction with python's global interpreter lock

use crate::{ffi, internal_tricks::Unsendable, Python};
use std::cell::{Cell, UnsafeCell};
use parking_lot::Mutex;
use std::cell::{Cell, RefCell, UnsafeCell};
use std::{any, mem::ManuallyDrop, ptr::NonNull, sync};

static START: sync::Once = sync::Once::new();
Expand All @@ -16,6 +17,13 @@ thread_local! {
///
/// As a result, if this thread has the GIL, GIL_COUNT is greater than zero.
static GIL_COUNT: Cell<u32> = Cell::new(0);

/// These are objects owned by the current thread, to be released when the GILPool drops.
static OWNED_OBJECTS: RefCell<Vec<NonNull<ffi::PyObject>>> = RefCell::new(Vec::with_capacity(256));

/// These are non-python objects such as (String) owned by the current thread, to be released
/// when the GILPool drops.
static OWNED_ANYS: RefCell<Vec<Box<dyn any::Any>>> = RefCell::new(Vec::with_capacity(256));
}

/// Check whether the GIL is acquired.
Expand Down Expand Up @@ -136,85 +144,75 @@ impl GILGuard {
impl Drop for GILGuard {
fn drop(&mut self) {
unsafe {
// Must drop the objects in the pool before releasing the GILGuard
ManuallyDrop::drop(&mut self.pool);
ffi::PyGILState_Release(self.gstate);
}
}
}

/// Implementation of release pool
struct ReleasePoolImpl {
owned: Vec<NonNull<ffi::PyObject>>,
pointers: *mut Vec<NonNull<ffi::PyObject>>,
obj: Vec<Box<dyn any::Any>>,
p: parking_lot::Mutex<*mut Vec<NonNull<ffi::PyObject>>>,
/// Thread-safe storage for objects which were dropped while the GIL was not held.
struct ReleasePool {
pointers_to_drop: Mutex<*mut Vec<NonNull<ffi::PyObject>>>,
pointers_being_dropped: UnsafeCell<*mut Vec<NonNull<ffi::PyObject>>>,
}

impl ReleasePoolImpl {
fn new() -> Self {
impl ReleasePool {
const fn new() -> Self {
Self {
owned: Vec::with_capacity(256),
pointers: Box::into_raw(Box::new(Vec::with_capacity(256))),
obj: Vec::with_capacity(8),
p: parking_lot::Mutex::new(Box::into_raw(Box::new(Vec::with_capacity(256)))),
pointers_to_drop: parking_lot::const_mutex(std::ptr::null_mut()),
pointers_being_dropped: UnsafeCell::new(std::ptr::null_mut()),
}
}

unsafe fn release_pointers(&mut self) {
let mut v = self.p.lock();
let vec = &mut **v;
if vec.is_empty() {
return;
fn register_pointer(&self, obj: NonNull<ffi::PyObject>) {
let mut storage = self.pointers_to_drop.lock();
if storage.is_null() {
*storage = Box::into_raw(Box::new(Vec::with_capacity(256)))
}

// switch vectors
std::mem::swap(&mut self.pointers, &mut *v);
drop(v);

// release PyObjects
for ptr in vec.iter_mut() {
ffi::Py_DECREF(ptr.as_ptr());
unsafe {
(**storage).push(obj);
}
vec.set_len(0);
}

pub unsafe fn drain(&mut self, _py: Python, owned: usize) {
// Release owned objects(call decref)
for i in owned..self.owned.len() {
ffi::Py_DECREF(self.owned[i].as_ptr());
fn release_pointers(&self, _py: Python) {
let mut v = self.pointers_to_drop.lock();

if v.is_null() {
// No pointers have been registered
return;
}
self.owned.truncate(owned);
self.release_pointers();
self.obj.clear();
}
}

/// Sync wrapper of ReleasePoolImpl
struct ReleasePool {
value: UnsafeCell<Option<ReleasePoolImpl>>,
}
unsafe {
// Function is safe to call because GIL is held, so only one thread can be inside this
// block at a time

impl ReleasePool {
const fn new() -> Self {
Self {
value: UnsafeCell::new(None),
let vec = &mut **v;
if vec.is_empty() {
return;
}

// switch vectors
std::mem::swap(&mut *self.pointers_being_dropped.get(), &mut *v);
drop(v);

// release PyObjects
for ptr in vec.iter_mut() {
ffi::Py_DECREF(ptr.as_ptr());
}
vec.set_len(0);
}
}
/// # Safety
/// This function is not thread safe. Thus, the caller has to have GIL.
#[allow(clippy::mut_from_ref)]
unsafe fn get_or_init(&self) -> &mut ReleasePoolImpl {
(*self.value.get()).get_or_insert_with(ReleasePoolImpl::new)
}
}

static POOL: ReleasePool = ReleasePool::new();

unsafe impl Sync for ReleasePool {}

static POOL: ReleasePool = ReleasePool::new();

#[doc(hidden)]
pub struct GILPool {
owned: usize,
owned_objects_start: usize,
owned_anys_start: usize,
// Stable solution for impl !Send
no_send: Unsendable,
}
Expand All @@ -226,10 +224,10 @@ impl GILPool {
pub unsafe fn new() -> GILPool {
increment_gil_count();
// Release objects that were dropped since last GIL acquisition
let pool = POOL.get_or_init();
pool.release_pointers();
POOL.release_pointers(Python::assume_gil_acquired());
GILPool {
owned: pool.owned.len(),
owned_objects_start: OWNED_OBJECTS.with(|o| o.borrow().len()),
owned_anys_start: OWNED_ANYS.with(|o| o.borrow().len()),
no_send: Unsendable::default(),
}
}
Expand All @@ -241,37 +239,68 @@ impl GILPool {
impl Drop for GILPool {
fn drop(&mut self) {
unsafe {
let pool = POOL.get_or_init();
pool.drain(self.python(), self.owned);
OWNED_OBJECTS.with(|owned_objects| {
// Note: inside this closure we must be careful to not hold a borrow too long, because
// while calling Py_DECREF we may cause other callbacks to run which will need to
// register objects into the GILPool.
let len = owned_objects.borrow().len();
for i in self.owned_objects_start..len {
let ptr = owned_objects.borrow().get_unchecked(i).as_ptr();
ffi::Py_DECREF(ptr);
}
// If this assertion fails, something weird is going on where another GILPool that was
// created after this one has not yet been dropped.
debug_assert!(owned_objects.borrow().len() == len);
owned_objects
.borrow_mut()
.truncate(self.owned_objects_start);
});

OWNED_ANYS.with(|owned_anys| owned_anys.borrow_mut().truncate(self.owned_anys_start));
}
decrement_gil_count();
}
}

pub unsafe fn register_any<'p, T: 'static>(obj: T) -> &'p T {
let pool = POOL.get_or_init();

pool.obj.push(Box::new(obj));
pool.obj
.last()
.unwrap()
.as_ref()
.downcast_ref::<T>()
.unwrap()
}

/// Register a Python object pointer inside the release pool, to have reference count decreased
/// next time the GIL is acquired in pyo3.
///
/// # Safety
/// The object must be an owned Python reference.
pub unsafe fn register_pointer(obj: NonNull<ffi::PyObject>) {
let pool = POOL.get_or_init();
if gil_is_acquired() {
ffi::Py_DECREF(obj.as_ptr())
} else {
(**pool.p.lock()).push(obj);
POOL.register_pointer(obj);
}
}

/// Register an owned object inside the GILPool.
///
/// # Safety
/// The object must be an owned Python reference.
pub unsafe fn register_owned(_py: Python, obj: NonNull<ffi::PyObject>) {
let pool = POOL.get_or_init();
pool.owned.push(obj);
debug_assert!(gil_is_acquired());
OWNED_OBJECTS.with(|objs| objs.borrow_mut().push(obj));
}

/// Register any value inside the GILPool.
///
/// # Safety
/// It is the caller's responsibility to ensure that the inferred lifetime 'p is not inferred by
/// the Rust compiler to outlast the current GILPool.
pub unsafe fn register_any<'p, T: 'static>(obj: T) -> &'p T {
debug_assert!(gil_is_acquired());
OWNED_ANYS.with(|owned_anys| {
let boxed = Box::new(obj);
let value_ref: &T = &*boxed;

// Sneaky - extend the lifetime of the reference so that the box can be moved
let value_ref_extended_lifetime = std::mem::transmute(value_ref);

owned_anys.borrow_mut().push(boxed);
value_ref_extended_lifetime
})
}

/// Increment pyo3's internal GIL count - to be called whenever GILPool or GILGuard is created.
Expand All @@ -295,7 +324,7 @@ fn decrement_gil_count() {

#[cfg(test)]
mod test {
use super::{GILPool, GIL_COUNT, POOL};
use super::{GILPool, GIL_COUNT, OWNED_OBJECTS};
use crate::{ffi, gil, AsPyPointer, IntoPyPointer, PyObject, Python, ToPyObject};
use std::ptr::NonNull;

Expand All @@ -309,6 +338,10 @@ mod test {
obj.to_object(py)
}

fn owned_object_count() -> usize {
OWNED_OBJECTS.with(|objs| objs.borrow().len())
}

#[test]
fn test_owned() {
let gil = Python::acquire_gil();
Expand All @@ -319,18 +352,16 @@ mod test {
let _ref = obj.clone_ref(py);

unsafe {
let p = POOL.get_or_init();

{
let gil = Python::acquire_gil();
gil::register_owned(gil.python(), NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
assert_eq!(p.owned.len(), 1);
assert_eq!(owned_object_count(), 1);
}
{
let _gil = Python::acquire_gil();
assert_eq!(p.owned.len(), 0);
assert_eq!(owned_object_count(), 0);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 1);
}
}
Expand All @@ -346,26 +377,24 @@ mod test {
let obj_ptr = obj.as_ptr();

unsafe {
let p = POOL.get_or_init();

{
let _pool = GILPool::new();
assert_eq!(p.owned.len(), 0);
assert_eq!(owned_object_count(), 0);

gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(p.owned.len(), 1);
assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
{
let _pool = GILPool::new();
let obj = get_object();
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
assert_eq!(p.owned.len(), 2);
assert_eq!(owned_object_count(), 2);
}
assert_eq!(p.owned.len(), 1);
assert_eq!(owned_object_count(), 1);
}
{
assert_eq!(p.owned.len(), 0);
assert_eq!(owned_object_count(), 0);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 1);
}
}
Expand All @@ -381,10 +410,8 @@ mod test {
let obj_ptr = obj.as_ptr();

unsafe {
let p = POOL.get_or_init();

{
assert_eq!(p.owned.len(), 0);
assert_eq!(owned_object_count(), 0);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
}

Expand All @@ -404,10 +431,8 @@ mod test {
let obj_ptr = obj.as_ptr();

unsafe {
let p = POOL.get_or_init();

{
assert_eq!(p.owned.len(), 0);
assert_eq!(owned_object_count(), 0);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
}

Expand Down
10 changes: 0 additions & 10 deletions tests/test_datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,8 @@ macro_rules! assert_check_only {
};
}

// Because of the relase pool unsoundness reported in https://github.com/PyO3/pyo3/issues/756,
// we need to stop other threads before calling `py.import()`.
// TODO(kngwyu): Remove this variable
static MUTEX: parking_lot::Mutex<()> = parking_lot::const_mutex(());

#[test]
fn test_date_check() {
let _lock = MUTEX.lock();
let gil = Python::acquire_gil();
let py = gil.python();
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "date", "2018, 1, 1").unwrap();
Expand All @@ -73,7 +67,6 @@ fn test_date_check() {

#[test]
fn test_time_check() {
let _lock = MUTEX.lock();
let gil = Python::acquire_gil();
let py = gil.python();
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "time", "12, 30, 15").unwrap();
Expand All @@ -85,7 +78,6 @@ fn test_time_check() {

#[test]
fn test_datetime_check() {
let _lock = MUTEX.lock();
let gil = Python::acquire_gil();
let py = gil.python();
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "datetime", "2018, 1, 1, 13, 30, 15")
Expand All @@ -100,7 +92,6 @@ fn test_datetime_check() {

#[test]
fn test_delta_check() {
let _lock = MUTEX.lock();
let gil = Python::acquire_gil();
let py = gil.python();
let (obj, sub_obj, sub_sub_obj) = _get_subclasses(&py, "timedelta", "1, -3").unwrap();
Expand All @@ -115,7 +106,6 @@ fn test_datetime_utc() {
use assert_approx_eq::assert_approx_eq;
use pyo3::types::PyDateTime;

let _lock = MUTEX.lock();
let gil = Python::acquire_gil();
let py = gil.python();
let datetime = py.import("datetime").map_err(|e| e.print(py)).unwrap();
Expand Down

0 comments on commit f2b347a

Please sign in to comment.