diff --git a/src/background/runner.rs b/src/background/runner.rs index 841b5c34435..4bd31ee6cd1 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -79,8 +79,10 @@ impl Runner { where F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, { - let conn = self.connection().expect("Could not acquire connection"); + // The connection may not be `Send` so we need to clone the pool instead + let pool = self.connection_pool.clone(); self.thread_pool.execute(move || { + let conn = pool.get().expect("Could not acquire connection"); conn.transaction::<_, Box, _>(|| { let job = storage::find_next_unlocked_job(&conn).optional()?; let job = match job { @@ -192,7 +194,7 @@ mod tests { let remaining_jobs = background_jobs .count() - .get_result(&runner.connection().unwrap()); + .get_result(&*runner.connection().unwrap()); assert_eq!(Ok(0), remaining_jobs); } @@ -223,7 +225,7 @@ mod tests { .select(id) .filter(retries.eq(0)) .for_update() - .load::(&conn) + .load::(&*conn) .unwrap(); assert_eq!(0, available_jobs.len()); @@ -231,7 +233,7 @@ mod tests { let total_jobs_including_failed = background_jobs .select(id) .for_update() - .load::(&conn) + .load::(&*conn) .unwrap(); assert_eq!(1, total_jobs_including_failed.len()); @@ -251,7 +253,7 @@ mod tests { .find(job_id) .select(retries) .for_update() - .first::(&runner.connection().unwrap()) + .first::(&*runner.connection().unwrap()) .unwrap(); assert_eq!(1, tries); } @@ -277,7 +279,7 @@ mod tests { impl<'a> Drop for TestGuard<'a> { fn drop(&mut self) { ::diesel::sql_query("TRUNCATE TABLE background_jobs") - .execute(&runner().connection().unwrap()) + .execute(&*runner().connection().unwrap()) .unwrap(); } } @@ -290,14 +292,14 @@ mod tests { let manager = r2d2::ConnectionManager::new(database_url); let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - Runner::builder(pool, ()).thread_count(2).build() + Runner::builder(DieselPool::Pool(pool), ()).thread_count(2).build() } fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { ::diesel::insert_into(background_jobs) .values((job_type.eq("Foo"), data.eq(json!(null)))) .returning((id, job_type, data)) - .get_result(&runner.connection().unwrap()) + .get_result(&*runner.connection().unwrap()) .unwrap() } } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 1f1254dda39..79550914b31 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -31,7 +31,7 @@ fn main() { // We're only using 1 thread, so we only need 1 connection let db_config = r2d2::Pool::builder().max_size(1); - let db_pool = db::diesel_pool(&config.db_url, db_config); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); let builder = background::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); diff --git a/src/db.rs b/src/db.rs index c70466b1643..8448638886f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,6 +4,7 @@ use conduit::Request; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; +use std::sync::Arc; use std::ops::Deref; use url::Url; @@ -12,9 +13,10 @@ use crate::util::CargoResult; use crate::Env; #[allow(missing_debug_implementations)] +#[derive(Clone)] pub enum DieselPool { Pool(r2d2::Pool>), - Test(ReentrantMutex), + Test(Arc>), } impl DieselPool { @@ -33,7 +35,7 @@ impl DieselPool { } fn test_conn(conn: PgConnection) -> Self { - DieselPool::Test(ReentrantMutex::new(conn)) + DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) } } @@ -43,6 +45,8 @@ pub enum DieselPooledConn<'a> { Test(ReentrantMutexGuard<'a, PgConnection>), } +unsafe impl<'a> Send for DieselPooledConn<'a> {} + impl Deref for DieselPooledConn<'_> { type Target = PgConnection;