Skip to content

Commit

Permalink
proxy: fix overloaded db connection closure (#7364)
Browse files Browse the repository at this point in the history
## Problem

possible for the database connections to not close in time.

## Summary of changes

force the closing of connections if the client has hung up
  • Loading branch information
conradludgate committed Apr 11, 2024
1 parent 3fa17e9 commit 95a184e
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;

use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL};
Expand Down Expand Up @@ -469,15 +470,32 @@ pub fn poll_client<C: ClientInnerExt>(

let db_user = conn_info.db_and_user();
let idle = global_pool.get_idle_timeout();
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();

tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);

poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
}

match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
}
_ => {}
}

// 5 minute idle connection timeout
Expand Down Expand Up @@ -532,6 +550,7 @@ pub fn poll_client<C: ClientInnerExt>(
let inner = ClientInner {
inner: client,
session: tx,
cancel,
aux,
conn_id,
};
Expand All @@ -541,10 +560,18 @@ pub fn poll_client<C: ClientInnerExt>(
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
}

impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}

pub trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
Expand Down Expand Up @@ -697,6 +724,7 @@ mod tests {
ClientInner {
inner: client,
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
cancel: CancellationToken::new(),
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),
Expand Down

0 comments on commit 95a184e

Please sign in to comment.