Skip to content

Commit

Permalink
more pool fixes (#1211)
Browse files Browse the repository at this point in the history
* a task that is marked woken but didn't actually wake before being cancelled will instead wake the next task in the queue

* a task that wakes but doesn't get a connection will put itself back in the queue instead of waiting until it times out with no way to be woken

* the idle reaper now won't run if there are tasks waiting for a connection, and also uses
the proper `SharedPool::release()` to return validated connections to the pool so waiting tasks get woken

closes #622, #1210

(hopefully for good this time)

Signed-off-by: Austin Bonander <austin@launchbadge.com>
  • Loading branch information
abonander authored May 18, 2021
1 parent 78a9424 commit 8f1d8c7
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 60 deletions.
141 changes: 88 additions & 53 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ use std::sync::{Arc, Weak};
use std::task::Context;
use std::time::{Duration, Instant};

/// Waiters should wake at least every this often to check if a connection has not come available
/// since they went to sleep.
const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500);
type Waiters = SegQueue<Weak<WaiterInner>>;

pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Weak<Waiter>>,
waiters: Waiters,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) options: PoolOptions<DB>,
Expand Down Expand Up @@ -152,7 +150,7 @@ impl<DB: Database> SharedPool<DB> {
// the strong ref of the `Weak<Waiter>` that we push to the queue
// initialized during the `timeout()` call below
// as long as we own this, we keep our place in line
let mut waiter = None;
let mut waiter: Option<Waiter<'_>> = None;

// Unless the pool has been closed ...
while !self.is_closed() {
Expand All @@ -173,32 +171,33 @@ impl<DB: Database> SharedPool<DB> {
}
}

// Wait for a connection to become available (or we are allowed to open a new one)
let timeout_duration = cmp::min(
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
MIN_WAKE_PERIOD,
);
if let Some(ref waiter) = waiter {
// return the waiter to the queue, note that this does put it to the back
// of the queue when it should ideally stay at the front
self.waiters.push(Arc::downgrade(&waiter.inner));
}

sqlx_rt::timeout(
timeout_duration,
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});
let waiter = waiter.get_or_insert_with(|| Waiter::push_new(cx, &self.waiters));

if waiter.is_woken() {
waiter.actually_woke = true;
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.ok(); // timeout is no longer fatal here; we check if the deadline expired above
.map_err(|_| Error::PoolTimedOut)?;

if let Some(ref mut waiter) = waiter {
waiter.reset();
}

waited = true;
}
Expand Down Expand Up @@ -329,37 +328,40 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let pool = Arc::clone(&pool);

sqlx_rt::spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pop_idle())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options)
});

for conn in keep {
// return these connections to the pool first
let is_ok = pool.idle_conns.push(conn.into_leakable()).is_ok();

if !is_ok {
panic!("BUG: connection queue overflow in spawn_reaper");
}
}

for conn in reap {
let _ = conn.close().await;
while !pool.is_closed() {
// only reap idle connections when no tasks are waiting
if pool.waiters.is_empty() {
do_reap(&pool).await;
}

sqlx_rt::sleep(period).await;
}
});
}

fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pop_idle())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options)
});

for conn in keep {
// return valid connections to the pool first
pool.release(conn.into_live());
}

for conn in reap {
let _ = conn.close().await;
}
}

fn wake_one(waiters: &Waiters) {
while let Some(weak) = waiters.pop() {
if let Some(waiter) = weak.upgrade() {
if waiter.wake() {
Expand All @@ -375,7 +377,7 @@ fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
waiters: &'a SegQueue<Weak<Waiter>>,
waiters: &'a Waiters,
dropped: bool,
}

Expand Down Expand Up @@ -407,19 +409,12 @@ impl Drop for DecrementSizeGuard<'_> {
}
}

struct Waiter {
struct WaiterInner {
woken: AtomicBool,
waker: Waker,
}

impl Waiter {
fn new(cx: &mut Context<'_>) -> Arc<Self> {
Arc::new(Self {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
})
}

impl WaiterInner {
/// Wake this waiter if it has not previously been woken.
///
/// Return `true` if this waiter was newly woken, or `false` if it was already woken.
Expand All @@ -435,8 +430,48 @@ impl Waiter {

false
}
}

struct Waiter<'a> {
inner: Arc<WaiterInner>,
queue: &'a Waiters,
actually_woke: bool,
}

impl<'a> Waiter<'a> {
fn push_new(cx: &mut Context<'_>, queue: &'a Waiters) -> Self {
let inner = Arc::new(WaiterInner {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
});

queue.push(Arc::downgrade(&inner));

Self {
inner,
queue,
actually_woke: false,
}
}

fn is_woken(&self) -> bool {
self.woken.load(Ordering::Acquire)
self.inner.woken.load(Ordering::Acquire)
}

fn reset(&mut self) {
self.inner
.woken
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.ok();
self.actually_woke = false;
}
}

impl Drop for Waiter<'_> {
fn drop(&mut self) {
// if we didn't actually wake to get a connection, wake the next task instead
if self.is_woken() && !self.actually_woke {
wake_one(self.queue);
}
}
}
19 changes: 12 additions & 7 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,26 +509,31 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
eprintln!("starting pool");

let pool = PgPoolOptions::new()
.connect_timeout(Duration::from_secs(30))
.min_connections(5)
.max_connections(10)
.connect_timeout(Duration::from_secs(5))
.min_connections(1)
.max_connections(1)
.connect(&dotenv::var("DATABASE_URL")?)
.await?;

// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
for i in 0..200 {
let pool = pool.clone();
sqlx_rt::spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
// normal error at termination of the test
if !matches!(e, sqlx::Error::PoolClosed) {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
}
});
}

for _ in 0..5 {
// spawn a bunch of tasks that attempt to acquire but give up to ensure correct handling
// of cancellations
for _ in 0..50 {
let pool = pool.clone();
sqlx_rt::spawn(async move {
while !pool.is_closed() {
Expand Down

0 comments on commit 8f1d8c7

Please sign in to comment.