Skip to content

Commit

Permalink
Merge branch 'idle-db-connections'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Jun 18, 2024
2 parents c5cd78d + 1d9d159 commit 5afe166
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 201 deletions.
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ scoped_task = { path = "../scoped_task" }
serde = { workspace = true }
serde_bytes = { workspace = true }
slab = "0.4.6"
sqlx = { version = "0.7.2", default-features = false, features = ["runtime-tokio", "sqlite"] }
sqlx = { version = "0.7.4", default-features = false, features = ["runtime-tokio", "sqlite"] }
ssdp-client = "1.0"
state_monitor = { path = "../state_monitor" }
subtle = { version = "2.5.0", default-features = false, features = ["core_hint_black_box"] }
Expand Down
152 changes: 82 additions & 70 deletions lib/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@ mod macros;
mod connection;
mod id;
mod migrations;
mod mutex;
mod transaction;

pub use id::DatabaseId;
pub use migrations::SCHEMA_VERSION;

use tracing::Span;

use self::{
mutex::{CommittedMutexTransaction, ConnectionMutex},
transaction::TransactionWrapper,
};
use deadlock::ExpectShortLifetime;
use ref_cast::RefCast;
use sqlx::{
sqlite::{
Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous,
SqliteTransactionManager,
},
Row, SqlitePool,
Row, SqlitePool, TransactionManager,
};
use std::{
fmt,
Expand All @@ -39,6 +34,7 @@ use thiserror::Error;
use tokio::{fs, task};

const WARN_AFTER_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3);
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);

pub(crate) use self::connection::Connection;

Expand All @@ -47,26 +43,32 @@ pub(crate) use self::connection::Connection;
pub(crate) struct Pool {
// Pool with multiple read-only connections
reads: SqlitePool,
// Single writable connection.
write: ConnectionMutex,
// Pool with a single writable connection.
write: SqlitePool,
}

impl Pool {
async fn create(connect_options: SqliteConnectOptions) -> Result<Self, sqlx::Error> {
let common_options = connect_options
async fn create(conn_options: SqliteConnectOptions) -> Result<Self, sqlx::Error> {
let conn_options = conn_options
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.pragma("recursive_triggers", "ON")
.optimize_on_close(true, Some(1000));
.pragma("recursive_triggers", "ON");

let pool_options = SqlitePoolOptions::new()
// Disable the test as it breaks cancel-safety (also it's unnecessary in our case)
.test_before_acquire(false)
// Expire idle connections to conserve resources (threads, file descriptors)
.idle_timeout(IDLE_TIMEOUT);

let write_options = common_options.clone();
let write = ConnectionMutex::connect(write_options).await?;
let write = pool_options
.clone()
.max_connections(1)
.connect_with(conn_options.clone().optimize_on_close(true, Some(1000)))
.await?;

let read_options = common_options.read_only(true);
let reads = SqlitePoolOptions::new()
let reads = pool_options
.max_connections(8)
.test_before_acquire(false)
.connect_with(read_options)
.connect_with(conn_options.read_only(true))
.await?;

Ok(Self { reads, write })
Expand All @@ -75,37 +77,13 @@ impl Pool {
/// Acquire a read-only database connection.
#[track_caller]
pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection, sqlx::Error>> + '_ {
let location = Location::caller();

async move {
let conn = self.reads.acquire().await?;

let track_lifetime =
ExpectShortLifetime::new_in(WARN_AFTER_TRANSACTION_LIFETIME, location);

Ok(PoolConnection {
inner: conn,
_track_lifetime: track_lifetime,
})
}
PoolConnection::acquire(&self.reads, Location::caller())
}

/// Begin a read-only transaction. See [`ReadTransaction`] for more details.
#[track_caller]
pub fn begin_read(&self) -> impl Future<Output = Result<ReadTransaction, sqlx::Error>> + '_ {
let location = Location::caller();

async move {
let tx = self.reads.begin().await?;

let track_lifetime =
ExpectShortLifetime::new_in(WARN_AFTER_TRANSACTION_LIFETIME, location);

Ok(ReadTransaction {
inner: TransactionWrapper::Pool(tx),
_track_lifetime: Some(track_lifetime),
})
}
ReadTransaction::begin(&self.reads, Location::caller())
}

/// Begin a write transaction. See [`WriteTransaction`] for more details.
Expand All @@ -114,16 +92,8 @@ impl Pool {
let location = Location::caller();

async move {
let tx = self.write.begin().await?;

let track_lifetime =
ExpectShortLifetime::new_in(WARN_AFTER_TRANSACTION_LIFETIME, location);

Ok(WriteTransaction {
inner: ReadTransaction {
inner: TransactionWrapper::Mutex(tx),
_track_lifetime: Some(track_lifetime),
},
inner: ReadTransaction::begin(&self.write, location).await?,
})
}
}
Expand All @@ -145,6 +115,22 @@ pub(crate) struct PoolConnection {
_track_lifetime: ExpectShortLifetime,
}

impl PoolConnection {
// Internal
async fn acquire(
pool: &SqlitePool,
location: &'static Location<'static>,
) -> Result<Self, sqlx::Error> {
let inner = pool.acquire().await?;
let track_lifetime = ExpectShortLifetime::new_in(WARN_AFTER_TRANSACTION_LIFETIME, location);

Ok(Self {
inner,
_track_lifetime: track_lifetime,
})
}
}

impl Deref for PoolConnection {
type Target = Connection;

Expand All @@ -167,21 +153,44 @@ impl DerefMut for PoolConnection {
/// created. A read transaction doesn't need to be committed or rolled back - it's implicitly ended
/// when the `ReadTransaction` instance drops.
pub(crate) struct ReadTransaction {
inner: TransactionWrapper,
_track_lifetime: Option<ExpectShortLifetime>,
inner: PoolConnection,
closed: bool,
}

impl ReadTransaction {
// Internal
async fn begin(
pool: &SqlitePool,
location: &'static Location<'static>,
) -> Result<Self, sqlx::Error> {
let mut inner = PoolConnection::acquire(pool, location).await?;
SqliteTransactionManager::begin(&mut inner.inner).await?;

Ok(Self {
inner,
closed: false,
})
}

// Internal
async fn commit(mut self) -> Result<Committed, sqlx::Error> {
SqliteTransactionManager::commit(&mut self.inner.inner).await?;
self.closed = true;
Ok(Committed(self))
}
}

impl Deref for ReadTransaction {
type Target = Connection;

fn deref(&self) -> &Self::Target {
Connection::ref_cast(self.inner.deref())
self.inner.deref()
}
}

impl DerefMut for ReadTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
Connection::ref_cast_mut(self.inner.deref_mut())
self.inner.deref_mut()
}
}

Expand All @@ -193,6 +202,18 @@ impl fmt::Debug for ReadTransaction {

impl_executor_by_deref!(ReadTransaction);

impl Drop for ReadTransaction {
fn drop(&mut self) {
if !self.closed {
SqliteTransactionManager::start_rollback(&mut self.inner.inner);
}
}
}

// Wrapper for a transaction that's been committed. This allows to delay releasing the underlying
// connection to the pool while disallowing using the connection to execute any db operations.
struct Committed(#[allow(dead_code)] ReadTransaction);

/// Transaction that allows both reading and writing.
///
/// At most one task can hold a write transaction at any time. Any other tasks are blocked on
Expand All @@ -213,7 +234,7 @@ impl WriteTransaction {
/// is guaranteed to be either committed or rolled back but there is no way to tell in advance
/// which of the two operations happens.
pub async fn commit(self) -> Result<(), sqlx::Error> {
self.commit_inner().await?;
self.inner.commit().await?;
Ok(())
}

Expand Down Expand Up @@ -263,23 +284,14 @@ impl WriteTransaction {
let span = Span::current();

task::spawn(async move {
// Make sure `_committed_tx` is alive until `f` completes,
let _committed_tx = self.commit_inner().await?;
// IMPORTANT: `_committed` must live until `f` completes.
let _committed = self.inner.commit().await?;
let result = span.in_scope(f);
Ok(result)
})
.await
.unwrap()
}

async fn commit_inner(self) -> Result<CommittedMutexTransaction, sqlx::Error> {
let tx = match self.inner.inner {
TransactionWrapper::Mutex(tx) => tx.commit().await?,
TransactionWrapper::Pool(_) => unreachable!(),
};

Ok(tx)
}
}

impl Deref for WriteTransaction {
Expand Down
100 changes: 0 additions & 100 deletions lib/src/db/mutex.rs

This file was deleted.

Loading

0 comments on commit 5afe166

Please sign in to comment.