From 0847a04b7afabc47b9a8b40aa0365cd275e24dd9 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 5 May 2021 17:13:27 -0700 Subject: [PATCH] more pool fixes * a task that is marked woken but didn't actually wake before being cancelled will instead wake the next task in the queue * a task that wakes but doesn't get a connection will put itself back in the queue instead of waiting until it times out with no way to be woken * the idle reaper now won't run if there are tasks waiting for a connection, and also uses the proper `SharedPool::release()` to return validated connections to the pool so waiting tasks get woken closes #622, #1210 (hopefully for good this time) Signed-off-by: Austin Bonander --- sqlx-core/src/pool/inner.rs | 141 ++++++++++++++++++++++-------------- tests/postgres/postgres.rs | 19 +++-- 2 files changed, 100 insertions(+), 60 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index d1c4e82ff7..f9e5df43b3 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -15,14 +15,12 @@ use std::sync::{Arc, Weak}; use std::task::Context; use std::time::{Duration, Instant}; -/// Waiters should wake at least every this often to check if a connection has not come available -/// since they went to sleep. -const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500); +type Waiters = SegQueue>; pub(crate) struct SharedPool { pub(super) connect_options: ::Options, pub(super) idle_conns: ArrayQueue>, - waiters: SegQueue>, + waiters: Waiters, pub(super) size: AtomicU32, is_closed: AtomicBool, pub(super) options: PoolOptions, @@ -152,7 +150,7 @@ impl SharedPool { // the strong ref of the `Weak` that we push to the queue // initialized during the `timeout()` call below // as long as we own this, we keep our place in line - let mut waiter = None; + let mut waiter: Option> = None; // Unless the pool has been closed ... while !self.is_closed() { @@ -173,24 +171,21 @@ impl SharedPool { } } - // Wait for a connection to become available (or we are allowed to open a new one) - let timeout_duration = cmp::min( - // Returns an error if `deadline` passes - deadline_as_timeout::(deadline)?, - MIN_WAKE_PERIOD, - ); + if let Some(ref waiter) = waiter { + // return the waiter to the queue, note that this does put it to the back + // of the queue when it should ideally stay at the front + self.waiters.push(Arc::downgrade(&waiter.inner)); + } sqlx_rt::timeout( - timeout_duration, + // Returns an error if `deadline` passes + deadline_as_timeout::(deadline)?, // `poll_fn` gets us easy access to a `Waker` that we can push to our queue future::poll_fn(|cx| -> Poll<()> { - let waiter = waiter.get_or_insert_with(|| { - let waiter = Waiter::new(cx); - self.waiters.push(Arc::downgrade(&waiter)); - waiter - }); + let waiter = waiter.get_or_insert_with(|| Waiter::push_new(cx, &self.waiters)); if waiter.is_woken() { + waiter.actually_woke = true; Poll::Ready(()) } else { Poll::Pending @@ -198,7 +193,11 @@ impl SharedPool { }), ) .await - .ok(); // timeout is no longer fatal here; we check if the deadline expired above + .map_err(|_| Error::PoolTimedOut)?; + + if let Some(ref mut waiter) = waiter { + waiter.reset(); + } waited = true; } @@ -329,29 +328,10 @@ fn spawn_reaper(pool: &Arc>) { let pool = Arc::clone(&pool); sqlx_rt::spawn(async move { - while !pool.is_closed.load(Ordering::Acquire) { - // reap at most the current size minus the minimum idle - let max_reaped = pool.size().saturating_sub(pool.options.min_connections); - - // collect connections to reap - let (reap, keep) = (0..max_reaped) - // only connections waiting in the queue - .filter_map(|_| pool.pop_idle()) - .partition::, _>(|conn| { - is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options) - }); - - for conn in keep { - // return these connections to the pool first - let is_ok = pool.idle_conns.push(conn.into_leakable()).is_ok(); - - if !is_ok { - panic!("BUG: connection queue overflow in spawn_reaper"); - } - } - - for conn in reap { - let _ = conn.close().await; + while !pool.is_closed() { + // only reap idle connections when no tasks are waiting + if pool.waiters.is_empty() { + do_reap(&pool).await; } sqlx_rt::sleep(period).await; @@ -359,7 +339,29 @@ fn spawn_reaper(pool: &Arc>) { }); } -fn wake_one(waiters: &SegQueue>) { +async fn do_reap(pool: &SharedPool) { + // reap at most the current size minus the minimum idle + let max_reaped = pool.size().saturating_sub(pool.options.min_connections); + + // collect connections to reap + let (reap, keep) = (0..max_reaped) + // only connections waiting in the queue + .filter_map(|_| pool.pop_idle()) + .partition::, _>(|conn| { + is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options) + }); + + for conn in keep { + // return valid connections to the pool first + pool.release(conn.into_live()); + } + + for conn in reap { + let _ = conn.close().await; + } +} + +fn wake_one(waiters: &Waiters) { while let Some(weak) = waiters.pop() { if let Some(waiter) = weak.upgrade() { if waiter.wake() { @@ -375,7 +377,7 @@ fn wake_one(waiters: &SegQueue>) { /// (where the pool thinks it has more connections than it does). pub(in crate::pool) struct DecrementSizeGuard<'a> { size: &'a AtomicU32, - waiters: &'a SegQueue>, + waiters: &'a Waiters, dropped: bool, } @@ -407,19 +409,12 @@ impl Drop for DecrementSizeGuard<'_> { } } -struct Waiter { +struct WaiterInner { woken: AtomicBool, waker: Waker, } -impl Waiter { - fn new(cx: &mut Context<'_>) -> Arc { - Arc::new(Self { - woken: AtomicBool::new(false), - waker: cx.waker().clone(), - }) - } - +impl WaiterInner { /// Wake this waiter if it has not previously been woken. /// /// Return `true` if this waiter was newly woken, or `false` if it was already woken. @@ -435,8 +430,48 @@ impl Waiter { false } +} + +struct Waiter<'a> { + inner: Arc, + queue: &'a Waiters, + actually_woke: bool, +} + +impl<'a> Waiter<'a> { + fn push_new(cx: &mut Context<'_>, queue: &'a Waiters) -> Self { + let inner = Arc::new(WaiterInner { + woken: AtomicBool::new(false), + waker: cx.waker().clone(), + }); + + queue.push(Arc::downgrade(&inner)); + + Self { + inner, + queue, + actually_woke: false, + } + } fn is_woken(&self) -> bool { - self.woken.load(Ordering::Acquire) + self.inner.woken.load(Ordering::Acquire) + } + + fn reset(&mut self) { + self.inner + .woken + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .ok(); + self.actually_woke = false; + } +} + +impl Drop for Waiter<'_> { + fn drop(&mut self) { + // if we didn't actually wake to get a connection, wake the next task instead + if self.is_woken() && !self.actually_woke { + wake_one(self.queue); + } } } diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index a5d9bf6cd9..590f06b5c5 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -509,26 +509,31 @@ async fn pool_smoke_test() -> anyhow::Result<()> { eprintln!("starting pool"); let pool = PgPoolOptions::new() - .connect_timeout(Duration::from_secs(30)) - .min_connections(5) - .max_connections(10) + .connect_timeout(Duration::from_secs(5)) + .min_connections(1) + .max_connections(1) .connect(&dotenv::var("DATABASE_URL")?) .await?; // spin up more tasks than connections available, and ensure we don't deadlock - for i in 0..20 { + for i in 0..200 { let pool = pool.clone(); sqlx_rt::spawn(async move { loop { if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await { - eprintln!("pool task {} dying due to {}", i, e); - break; + // normal error at termination of the test + if !matches!(e, sqlx::Error::PoolClosed) { + eprintln!("pool task {} dying due to {}", i, e); + break; + } } } }); } - for _ in 0..5 { + // spawn a bunch of tasks that attempt to acquire but give up to ensure correct handling + // of cancellations + for _ in 0..50 { let pool = pool.clone(); sqlx_rt::spawn(async move { while !pool.is_closed() {