diff --git a/Cargo.lock b/Cargo.lock index 1313e7de42..4f615c8ec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,6 +193,7 @@ dependencies = [ "serde_json 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "tar 0.4.21 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2150,6 +2151,14 @@ dependencies = [ "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "threadpool" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.42" @@ -2805,6 +2814,7 @@ dependencies = [ "checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" +"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fcaabb3cec70485d0df6e9454fe514393ad1c4070dee8915f11041e95630b230" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" diff --git a/Cargo.toml b/Cargo.toml index 58d2d411ab..e4031889d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ tempdir = "0.3.7" parking_lot = "0.7.1" jemallocator = { version = "0.1.8", features = ['unprefixed_malloc_on_supported_platforms', 'profiling'] } jemalloc-ctl = "0.2.0" +threadpool = "1.7" lettre = {git = "https://github.com/lettre/lettre", version = "0.9"} lettre_email = {git = "https://github.com/lettre/lettre", version = "0.9"} diff --git a/Procfile b/Procfile index 0dbfa4cc6b..4636a18fee 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1,3 @@ web: bin/diesel migration run && bin/start-nginx ./target/release/server worker: ./target/release/update-downloads daemon 300 +background_worker: ./target/release/background-worker diff --git a/migrations/2018-05-03-150523_create_jobs/down.sql b/migrations/2018-05-03-150523_create_jobs/down.sql new file mode 100644 index 0000000000..d7ff875a67 --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/down.sql @@ -0,0 +1 @@ +DROP TABLE background_jobs; diff --git a/migrations/2018-05-03-150523_create_jobs/up.sql b/migrations/2018-05-03-150523_create_jobs/up.sql new file mode 100644 index 0000000000..b6ac3047c6 --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE background_jobs ( + id BIGSERIAL PRIMARY KEY, + job_type TEXT NOT NULL, + data JSONB NOT NULL, + retries INTEGER NOT NULL DEFAULT 0, + last_retry TIMESTAMP NOT NULL DEFAULT '1970-01-01', + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); diff --git a/script/ci/prune-cache.sh b/script/ci/prune-cache.sh index 852f9a0105..e2285c6f80 100755 --- a/script/ci/prune-cache.sh +++ b/script/ci/prune-cache.sh @@ -7,7 +7,7 @@ du -hs target/debug crate_name="cargo-registry" test_name="all" -bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads" +bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads background-worker monitor" normalized_crate_name=${crate_name//-/_} rm -v target/debug/$normalized_crate_name-* diff --git a/src/app.rs b/src/app.rs index 41591ae9f5..8dcb9fef7f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,12 +1,7 @@ //! Application-wide components in a struct accessible from each request use crate::{db, util::CargoResult, Config, Env}; -use std::{ - env, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{env, path::PathBuf, sync::Arc, time::Duration}; use diesel::r2d2; use scheduled_thread_pool::ScheduledThreadPool; @@ -25,10 +20,8 @@ pub struct App { /// A unique key used with conduit_cookie to generate cookies pub session_key: String, - /// The crate index git repository - pub git_repo: Mutex, - /// The location on disk of the checkout of the crate index git repository + /// Only used in the development environment. pub git_repo_checkout: PathBuf, /// The server configuration @@ -86,13 +79,10 @@ impl App { .connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout))) .thread_pool(thread_pool); - let repo = git2::Repository::open(&config.git_repo_checkout).unwrap(); - App { diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config), github, session_key: config.session_key.clone(), - git_repo: Mutex::new(repo), git_repo_checkout: config.git_repo_checkout.clone(), config: config.clone(), } diff --git a/src/background/job.rs b/src/background/job.rs new file mode 100644 index 0000000000..b6ea016f71 --- /dev/null +++ b/src/background/job.rs @@ -0,0 +1,26 @@ +use diesel::PgConnection; +use serde::{de::DeserializeOwned, Serialize}; + +use super::storage; +use crate::util::CargoResult; + +/// A background job, meant to be run asynchronously. +pub trait Job: Serialize + DeserializeOwned { + /// The environment this job is run with. This is a struct you define, + /// which should encapsulate things like database connection pools, any + /// configuration, and any other static data or shared resources. + type Environment; + + /// The key to use for storing this job, and looking it up later. + /// + /// Typically this is the name of your struct in `snake_case` + const JOB_TYPE: &'static str; + + /// Enqueue this job to be run at some point in the future. + fn enqueue(self, conn: &PgConnection) -> CargoResult<()> { + storage::enqueue_job(conn, self) + } + + /// The logic involved in actually performing this job. + fn perform(self, env: &Self::Environment) -> CargoResult<()>; +} diff --git a/src/background/mod.rs b/src/background/mod.rs new file mode 100644 index 0000000000..70b8d6dce7 --- /dev/null +++ b/src/background/mod.rs @@ -0,0 +1,8 @@ +mod job; +mod registry; +mod runner; +mod storage; + +pub use self::job::*; +pub use self::registry::Registry; +pub use self::runner::*; diff --git a/src/background/registry.rs b/src/background/registry.rs new file mode 100644 index 0000000000..2e672ec0de --- /dev/null +++ b/src/background/registry.rs @@ -0,0 +1,46 @@ +#![allow(clippy::new_without_default)] // https://github.com/rust-lang/rust-clippy/issues/3632 + +use serde_json; +use std::collections::HashMap; +use std::panic::RefUnwindSafe; + +use super::Job; +use crate::util::CargoResult; + +#[doc(hidden)] +pub type PerformFn = + Box CargoResult<()> + RefUnwindSafe + Send + Sync>; + +#[derive(Default)] +#[allow(missing_debug_implementations)] // Can't derive debug +/// A registry of background jobs, used to map job types to concrege perform +/// functions at runtime. +pub struct Registry { + job_types: HashMap<&'static str, PerformFn>, +} + +impl Registry { + /// Create a new, empty registry + pub fn new() -> Self { + Registry { + job_types: Default::default(), + } + } + + /// Get the perform function for a given job type + pub fn get(&self, job_type: &str) -> Option<&PerformFn> { + self.job_types.get(job_type) + } + + /// Register a new background job. This will override any existing + /// registries with the same `JOB_TYPE`, if one exists. + pub fn register>(&mut self) { + self.job_types.insert( + T::JOB_TYPE, + Box::new(|data, env| { + let data = serde_json::from_value(data)?; + T::perform(data, env) + }), + ); + } +} diff --git a/src/background/runner.rs b/src/background/runner.rs new file mode 100644 index 0000000000..22df210484 --- /dev/null +++ b/src/background/runner.rs @@ -0,0 +1,313 @@ +#![allow(dead_code)] +use diesel::prelude::*; +use std::any::Any; +use std::panic::{catch_unwind, PanicInfo, RefUnwindSafe, UnwindSafe}; +use std::sync::Arc; +use threadpool::ThreadPool; + +use super::{storage, Job, Registry}; +use crate::db::{DieselPool, DieselPooledConn}; +use crate::util::errors::*; + +#[allow(missing_debug_implementations)] +pub struct Builder { + connection_pool: DieselPool, + environment: Env, + registry: Registry, + thread_count: Option, +} + +impl Builder { + pub fn register>(mut self) -> Self { + self.registry.register::(); + self + } + + pub fn thread_count(mut self, thread_count: usize) -> Self { + self.thread_count = Some(thread_count); + self + } + + pub fn build(self) -> Runner { + Runner { + connection_pool: self.connection_pool, + thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)), + environment: Arc::new(self.environment), + registry: Arc::new(self.registry), + } + } +} + +#[allow(missing_debug_implementations)] +pub struct Runner { + connection_pool: DieselPool, + thread_pool: ThreadPool, + environment: Arc, + registry: Arc>, +} + +impl Runner { + pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder { + Builder { + connection_pool, + environment, + registry: Registry::new(), + thread_count: None, + } + } + + pub fn run_all_pending_jobs(&self) -> CargoResult<()> { + if let Some(conn) = self.try_connection() { + let available_job_count = storage::available_job_count(&conn)?; + for _ in 0..available_job_count { + self.run_single_job() + } + } + Ok(()) + } + + fn run_single_job(&self) { + let environment = Arc::clone(&self.environment); + let registry = Arc::clone(&self.registry); + self.get_single_job(move |job| { + let perform_fn = registry + .get(&job.job_type) + .ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?; + perform_fn(job.data, &environment) + }) + } + + fn get_single_job(&self, f: F) + where + F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, + { + // The connection may not be `Send` so we need to clone the pool instead + let pool = self.connection_pool.clone(); + self.thread_pool.execute(move || { + let conn = pool.get().expect("Could not acquire connection"); + conn.transaction::<_, Box, _>(|| { + let job = storage::find_next_unlocked_job(&conn).optional()?; + let job = match job { + Some(j) => j, + None => return Ok(()), + }; + let job_id = job.id; + + let result = catch_unwind(|| f(job)) + .map_err(|e| try_to_extract_panic_info(&e)) + .and_then(|r| r); + + match result { + Ok(_) => storage::delete_successful_job(&conn, job_id)?, + Err(e) => { + eprintln!("Job {} failed to run: {}", job_id, e); + storage::update_failed_job(&conn, job_id); + } + } + Ok(()) + }) + .expect("Could not retrieve or update job") + }) + } + + fn connection(&self) -> CargoResult { + self.connection_pool.get().map_err(Into::into) + } + + fn try_connection(&self) -> Option { + self.connection_pool.try_get() + } + + pub fn assert_no_failed_jobs(&self) -> CargoResult<()> { + self.wait_for_jobs(); + let failed_jobs = storage::failed_job_count(&*self.connection()?)?; + assert_eq!(0, failed_jobs); + Ok(()) + } + + fn wait_for_jobs(&self) { + self.thread_pool.join(); + assert_eq!(0, self.thread_pool.panic_count()); + } +} + +/// Try to figure out what's in the box, and print it if we can. +/// +/// The actual error type we will get from `panic::catch_unwind` is really poorly documented. +/// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is +/// documented as "commonly but not always `&'static str` or `String`". So we can try all of those, +/// and give up if we didn't get one of those three types. +fn try_to_extract_panic_info(info: &(dyn Any + Send + 'static)) -> Box { + if let Some(x) = info.downcast_ref::() { + internal(&format_args!("job panicked: {}", x)) + } else if let Some(x) = info.downcast_ref::<&'static str>() { + internal(&format_args!("job panicked: {}", x)) + } else if let Some(x) = info.downcast_ref::() { + internal(&format_args!("job panicked: {}", x)) + } else { + internal("job panicked") + } +} + +#[cfg(test)] +mod tests { + use diesel::prelude::*; + use diesel::r2d2; + + use super::*; + use crate::schema::background_jobs::dsl::*; + use std::panic::AssertUnwindSafe; + use std::sync::{Arc, Barrier, Mutex, MutexGuard}; + + #[test] + fn jobs_are_locked_when_fetched() { + let _guard = TestGuard::lock(); + + let runner = runner(); + let first_job_id = create_dummy_job(&runner).id; + let second_job_id = create_dummy_job(&runner).id; + let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let fetch_barrier2 = fetch_barrier.clone(); + let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let return_barrier2 = return_barrier.clone(); + + runner.get_single_job(move |job| { + fetch_barrier.0.wait(); // Tell thread 2 it can lock its job + assert_eq!(first_job_id, job.id); + return_barrier.0.wait(); // Wait for thread 2 to lock its job + Ok(()) + }); + + fetch_barrier2.0.wait(); // Wait until thread 1 locks its job + runner.get_single_job(move |job| { + assert_eq!(second_job_id, job.id); + return_barrier2.0.wait(); // Tell thread 1 it can unlock its job + Ok(()) + }); + + runner.wait_for_jobs(); + } + + #[test] + fn jobs_are_deleted_when_successfully_run() { + let _guard = TestGuard::lock(); + + let runner = runner(); + create_dummy_job(&runner); + + runner.get_single_job(|_| Ok(())); + runner.wait_for_jobs(); + + let remaining_jobs = background_jobs + .count() + .get_result(&*runner.connection().unwrap()); + assert_eq!(Ok(0), remaining_jobs); + } + + #[test] + fn failed_jobs_do_not_release_lock_before_updating_retry_time() { + let _guard = TestGuard::lock(); + + let runner = runner(); + create_dummy_job(&runner); + let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let barrier2 = barrier.clone(); + + runner.get_single_job(move |_| { + barrier.0.wait(); + // error so the job goes back into the queue + Err(human("nope")) + }); + + let conn = runner.connection().unwrap(); + // Wait for the first thread to acquire the lock + barrier2.0.wait(); + // We are intentionally not using `get_single_job` here. + // `SKIP LOCKED` is intentionally omitted here, so we block until + // the lock on the first job is released. + // If there is any point where the row is unlocked, but the retry + // count is not updated, we will get a row here. + let available_jobs = background_jobs + .select(id) + .filter(retries.eq(0)) + .for_update() + .load::(&*conn) + .unwrap(); + assert_eq!(0, available_jobs.len()); + + // Sanity check to make sure the job actually is there + let total_jobs_including_failed = background_jobs + .select(id) + .for_update() + .load::(&*conn) + .unwrap(); + assert_eq!(1, total_jobs_including_failed.len()); + + runner.wait_for_jobs(); + } + + #[test] + fn panicking_in_jobs_updates_retry_counter() { + let _guard = TestGuard::lock(); + let runner = runner(); + let job_id = create_dummy_job(&runner).id; + + runner.get_single_job(|_| panic!()); + runner.wait_for_jobs(); + + let tries = background_jobs + .find(job_id) + .select(retries) + .for_update() + .first::(&*runner.connection().unwrap()) + .unwrap(); + assert_eq!(1, tries); + } + + lazy_static! { + // Since these tests deal with behavior concerning multiple connections + // running concurrently, they have to run outside of a transaction. + // Therefore we can't run more than one at a time. + // + // Rather than forcing the whole suite to be run with `--test-threads 1`, + // we just lock these tests instead. + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); + } + + struct TestGuard<'a>(MutexGuard<'a, ()>); + + impl<'a> TestGuard<'a> { + fn lock() -> Self { + TestGuard(TEST_MUTEX.lock().unwrap()) + } + } + + impl<'a> Drop for TestGuard<'a> { + fn drop(&mut self) { + ::diesel::sql_query("TRUNCATE TABLE background_jobs") + .execute(&*runner().connection().unwrap()) + .unwrap(); + } + } + + fn runner() -> Runner<()> { + use dotenv; + + let database_url = + dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); + let manager = r2d2::ConnectionManager::new(database_url); + let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); + + Runner::builder(DieselPool::Pool(pool), ()) + .thread_count(2) + .build() + } + + fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { + ::diesel::insert_into(background_jobs) + .values((job_type.eq("Foo"), data.eq(json!(null)))) + .returning((id, job_type, data)) + .get_result(&*runner.connection().unwrap()) + .unwrap() + } +} diff --git a/src/background/storage.rs b/src/background/storage.rs new file mode 100644 index 0000000000..4e289b0fc4 --- /dev/null +++ b/src/background/storage.rs @@ -0,0 +1,88 @@ +use diesel::dsl::now; +use diesel::pg::Pg; +use diesel::prelude::*; +use diesel::sql_types::{Bool, Integer, Interval}; +use diesel::{delete, insert_into, update}; +use serde_json; + +use super::Job; +use crate::schema::background_jobs; +use crate::util::CargoResult; + +#[derive(Queryable, Identifiable, Debug, Clone)] +pub struct BackgroundJob { + pub id: i64, + pub job_type: String, + pub data: serde_json::Value, +} + +/// Enqueues a job to be run as soon as possible. +pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { + use crate::schema::background_jobs::dsl::*; + + let job_data = serde_json::to_value(job)?; + insert_into(background_jobs) + .values((job_type.eq(T::JOB_TYPE), data.eq(job_data))) + .execute(conn)?; + Ok(()) +} + +fn retriable() -> Box> { + use crate::schema::background_jobs::dsl::*; + use diesel::dsl::*; + + sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer); + + Box::new(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) +} + +/// Finds the next job that is unlocked, and ready to be retried. If a row is +/// found, it will be locked. +pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + + background_jobs + .select((id, job_type, data)) + .filter(retriable()) + .order(id) + .for_update() + .skip_locked() + .first::(conn) +} + +/// The number of jobs that have failed at least once +pub fn failed_job_count(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + + background_jobs + .count() + .filter(retries.gt(0)) + .get_result(conn) +} + +/// The number of jobs available to be run +pub fn available_job_count(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + + background_jobs.count().filter(retriable()).get_result(conn) +} + +/// Deletes a job that has successfully completed running +pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> { + use crate::schema::background_jobs::dsl::*; + + delete(background_jobs.find(job_id)).execute(conn)?; + Ok(()) +} + +/// Marks that we just tried and failed to run a job. +/// +/// Ignores any database errors that may have occurred. If the DB has gone away, +/// we assume that just trying again with a new connection will succeed. +pub fn update_failed_job(conn: &PgConnection, job_id: i64) { + use crate::schema::background_jobs::dsl::*; + + let _ = update(background_jobs.find(job_id)) + .set((retries.eq(retries + 1), last_retry.eq(now))) + .execute(conn); +} diff --git a/src/background_jobs.rs b/src/background_jobs.rs new file mode 100644 index 0000000000..cd71f1c8a9 --- /dev/null +++ b/src/background_jobs.rs @@ -0,0 +1,49 @@ +use std::panic::AssertUnwindSafe; +use std::sync::{Mutex, MutexGuard}; + +use crate::background::{Builder, Runner}; +use crate::db::{DieselPool, DieselPooledConn}; +use crate::git::{AddCrate, Repository, Yank}; +use crate::util::CargoResult; + +pub fn job_runner(config: Builder) -> Runner { + config.register::().register::().build() +} + +#[allow(missing_debug_implementations)] +pub struct Environment { + index: Mutex, + pub credentials: Option<(String, String)>, + // FIXME: https://github.com/sfackler/r2d2/pull/70 + pub connection_pool: AssertUnwindSafe, +} + +impl Environment { + pub fn new( + index: Repository, + credentials: Option<(String, String)>, + connection_pool: DieselPool, + ) -> Self { + Self { + index: Mutex::new(index), + credentials, + connection_pool: AssertUnwindSafe(connection_pool), + } + } + + pub fn credentials(&self) -> Option<(&str, &str)> { + self.credentials + .as_ref() + .map(|(u, p)| (u.as_str(), p.as_str())) + } + + pub fn connection(&self) -> CargoResult { + self.connection_pool.0.get().map_err(Into::into) + } + + pub fn lock_index(&self) -> CargoResult> { + let repo = self.index.lock().unwrap_or_else(|e| e.into_inner()); + repo.reset_head()?; + Ok(repo) + } +} diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs new file mode 100644 index 0000000000..07f4f1d841 --- /dev/null +++ b/src/bin/background-worker.rs @@ -0,0 +1,52 @@ +// Runs enqueued background jobs +// +// This binary will loop until interrupted. Every second, it will attempt to +// run any jobs in the background queue. Panics if attempting to count +// available jobs fails. +// +// Usage: +// cargo run --bin background-worker + +#![deny(warnings)] + +use cargo_registry::git::Repository; +use cargo_registry::{background, background_jobs::*, db}; +use diesel::r2d2; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +fn main() { + println!("Booting runner"); + + let config = cargo_registry::Config::default(); + + // We're only using 1 thread, so we only need 2 connections + let db_config = r2d2::Pool::builder().max_size(2); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); + + let username = env::var("GIT_HTTP_USER"); + let password = env::var("GIT_HTTP_PWD"); + let credentials = match (username, password) { + (Ok(u), Ok(p)) => Some((u, p)), + _ => None, + }; + + println!("Cloning index"); + + let repository = Repository::open(&config.index_location).expect("Failed to clone index"); + + let environment = Environment::new(repository, credentials, db_pool.clone()); + + let builder = background::Runner::builder(db_pool, environment).thread_count(1); + let runner = job_runner(builder); + + println!("Runner booted, running jobs"); + + loop { + runner + .run_all_pending_jobs() + .expect("Could not begin running jobs"); + sleep(Duration::from_secs(1)); + } +} diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs new file mode 100644 index 0000000000..9f1c25be20 --- /dev/null +++ b/src/bin/monitor.rs @@ -0,0 +1,73 @@ +//! Checks for any invariants we expect to be true, and pages whoever is on call +//! if they are not. +//! +//! Usage: +//! cargo run --bin monitor + +#![deny(warnings)] + +#[macro_use] +extern crate serde_derive; + +mod on_call; + +use cargo_registry::{db, util::CargoResult}; +use diesel::prelude::*; +use std::env; + +fn main() -> CargoResult<()> { + let conn = db::connect_now()?; + + check_stalled_background_jobs(&conn)?; + Ok(()) +} + +fn check_stalled_background_jobs(conn: &PgConnection) -> CargoResult<()> { + use cargo_registry::schema::background_jobs::dsl::*; + use diesel::dsl::*; + + const BACKGROUND_JOB_KEY: &str = "background_jobs"; + + println!("Checking for stalled background jobs"); + + let max_job_time = env::var("MAX_JOB_TIME") + .map(|s| s.parse::().unwrap()) + .unwrap_or(15); + + let stalled_job_count = background_jobs + .filter(created_at.lt(now - max_job_time.minutes())) + .count() + .get_result::(conn)?; + + let event = if stalled_job_count > 0 { + on_call::Event::Trigger { + incident_key: Some(BACKGROUND_JOB_KEY.into()), + description: format!( + "{} jobs have been in the queue for more than {} minutes", + stalled_job_count, max_job_time + ), + } + } else { + on_call::Event::Resolve { + incident_key: BACKGROUND_JOB_KEY.into(), + description: Some("No stalled background jobs".into()), + } + }; + + log_and_trigger_event(event)?; + Ok(()) +} + +fn log_and_trigger_event(event: on_call::Event) -> CargoResult<()> { + match event { + on_call::Event::Trigger { + ref description, .. + } => println!("Paging on-call: {}", description), + on_call::Event::Resolve { + description: Some(ref description), + .. + } => println!("{}", description), + _ => {} // noop + } + event.send() +} diff --git a/src/bin/on_call/mod.rs b/src/bin/on_call/mod.rs index 39bedfecf0..1228f6e73b 100644 --- a/src/bin/on_call/mod.rs +++ b/src/bin/on_call/mod.rs @@ -10,6 +10,7 @@ pub enum Event { incident_key: Option, description: String, }, + #[allow(dead_code)] // Not all binaries create Acknowledge events Acknowledge { incident_key: String, description: Option, diff --git a/src/bin/server.rs b/src/bin/server.rs index cf2caaa5c1..43f08586d9 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,10 +1,10 @@ #![deny(warnings)] -use cargo_registry::{boot, build_handler, env, git, App, Config, Env}; +use cargo_registry::{boot, App, Env}; use jemalloc_ctl; use std::{ env, - fs::{self, File}, + fs::File, sync::{mpsc::channel, Arc}, }; @@ -23,37 +23,10 @@ fn main() { // Initialize logging env_logger::init(); - let config = Config::default(); - - // If there isn't a git checkout containing the crate index repo at the path specified - // by `GIT_REPO_CHECKOUT`, delete that directory and clone the repo specified by `GIT_REPO_URL` - // into that directory instead. Uses the credentials specified in `GIT_HTTP_USER` and - // `GIT_HTTP_PWD` via the `cargo_registry::git::credentials` function. - let url = env("GIT_REPO_URL"); - let repo = match git2::Repository::open(&config.git_repo_checkout) { - Ok(r) => r, - Err(..) => { - let _ = fs::remove_dir_all(&config.git_repo_checkout); - fs::create_dir_all(&config.git_repo_checkout).unwrap(); - let mut cb = git2::RemoteCallbacks::new(); - cb.credentials(git::credentials); - let mut opts = git2::FetchOptions::new(); - opts.remote_callbacks(cb); - git2::build::RepoBuilder::new() - .fetch_options(opts) - .clone(&url, &config.git_repo_checkout) - .unwrap() - } - }; - - // All commits to the index registry made through crates.io will be made by bors, the Rust - // community's friendly GitHub bot. - let mut cfg = repo.config().unwrap(); - cfg.set_str("user.name", "bors").unwrap(); - cfg.set_str("user.email", "bors@rust-lang.org").unwrap(); + let config = cargo_registry::Config::default(); let app = App::new(&config); - let app = build_handler(Arc::new(app)); + let app = cargo_registry::build_handler(Arc::new(app)); // On every server restart, ensure the categories available in the database match // the information in *src/categories.toml*. diff --git a/src/config.rs b/src/config.rs index c86f7affcc..bf2b436e41 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,13 @@ use crate::{env, uploaders::Uploader, Env, Replica}; use std::{env, path::PathBuf}; +use url::Url; #[derive(Clone, Debug)] pub struct Config { pub uploader: Uploader, pub session_key: String, pub git_repo_checkout: PathBuf, + pub index_location: Url, pub gh_client_id: String, pub gh_client_secret: String, pub db_url: String, @@ -124,6 +126,7 @@ impl Default for Config { uploader, session_key: env("SESSION_KEY"), git_repo_checkout: checkout, + index_location: Url::parse(&env("GIT_REPO_URL")).unwrap(), gh_client_id: env("GH_CLIENT_ID"), gh_client_secret: env("GH_CLIENT_SECRET"), db_url: env("DATABASE_URL"), diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 55312df7b0..c88cddcbbb 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -196,7 +196,7 @@ pub fn publish(req: &mut dyn Request) -> CargoResult { yanked: Some(false), links, }; - git::add_crate(&**req.app(), &git_crate).chain_error(|| { + git::add_crate(&conn, git_crate).chain_error(|| { internal(&format_args!( "could not add crate `{}` to the git repo", name diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index a413ad68e1..43136e2eb2 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -3,10 +3,8 @@ use crate::controllers::prelude::*; use crate::git; -use crate::util::errors::CargoError; use crate::models::Rights; -use crate::schema::*; use super::version_and_crate; @@ -38,15 +36,7 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> CargoResult { return Err(human("must already be an owner to yank or unyank")); } - if version.yanked != yanked { - conn.transaction::<_, Box, _>(|| { - diesel::update(&version) - .set(versions::yanked.eq(yanked)) - .execute(&*conn)?; - git::yank(&**req.app(), &krate.name, &version.num, yanked)?; - Ok(()) - })?; - } + git::yank(&conn, krate.name, version, yanked)?; #[derive(Serialize)] struct R { diff --git a/src/db.rs b/src/db.rs index c70466b164..b090be41a4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,6 +5,7 @@ use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; use std::ops::Deref; +use std::sync::Arc; use url::Url; use crate::middleware::app::RequestApp; @@ -12,9 +13,10 @@ use crate::util::CargoResult; use crate::Env; #[allow(missing_debug_implementations)] +#[derive(Clone)] pub enum DieselPool { Pool(r2d2::Pool>), - Test(ReentrantMutex), + Test(Arc>), } impl DieselPool { @@ -25,6 +27,13 @@ impl DieselPool { } } + pub fn try_get(&self) -> Option { + match self { + DieselPool::Pool(pool) => pool.try_get().map(DieselPooledConn::Pool), + DieselPool::Test(conn) => conn.try_lock().map(DieselPooledConn::Test), + } + } + pub fn state(&self) -> r2d2::State { match self { DieselPool::Pool(pool) => pool.state(), @@ -33,7 +42,7 @@ impl DieselPool { } fn test_conn(conn: PgConnection) -> Self { - DieselPool::Test(ReentrantMutex::new(conn)) + DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) } } @@ -43,6 +52,8 @@ pub enum DieselPooledConn<'a> { Test(ReentrantMutexGuard<'a, PgConnection>), } +unsafe impl<'a> Send for DieselPooledConn<'a> {} + impl Deref for DieselPooledConn<'_> { type Target = PgConnection; diff --git a/src/git.rs b/src/git.rs index 6d7dfa0bf5..c0db6cd612 100644 --- a/src/git.rs +++ b/src/git.rs @@ -1,14 +1,19 @@ +#![allow(missing_debug_implementations)] + +use diesel::prelude::*; use std::collections::HashMap; -use std::env; -use std::fs::{self, File}; +use std::fs::{self, OpenOptions}; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use tempdir::TempDir; +use url::Url; -use crate::app::App; +use crate::background::Job; +use crate::background_jobs::Environment; +use crate::models::{DependencyKind, Version}; +use crate::schema::versions; use crate::util::{internal, CargoResult}; -use crate::models::DependencyKind; - #[derive(Serialize, Deserialize, Debug)] pub struct Crate { pub name: String, @@ -34,180 +39,205 @@ pub struct Dependency { pub package: Option, } -fn index_file(base: &Path, name: &str) -> PathBuf { - let name = name - .chars() - .flat_map(|c| c.to_lowercase()) - .collect::(); - match name.len() { - 1 => base.join("1").join(&name), - 2 => base.join("2").join(&name), - 3 => base.join("3").join(&name[..1]).join(&name), - _ => base.join(&name[0..2]).join(&name[2..4]).join(&name), - } +pub struct Repository { + checkout_path: TempDir, + repository: git2::Repository, } -pub fn add_crate(app: &App, krate: &Crate) -> CargoResult<()> { - let repo = app.git_repo.lock().unwrap(); - let repo = &*repo; - let repo_path = repo.workdir().unwrap(); - let dst = index_file(repo_path, &krate.name); - - commit_and_push(repo, || { - // Add the crate to its relevant file - fs::create_dir_all(dst.parent().unwrap())?; - let mut prev = String::new(); - if fs::metadata(&dst).is_ok() { - File::open(&dst).and_then(|mut f| f.read_to_string(&mut prev))?; - } - let s = serde_json::to_string(krate).unwrap(); - let new = prev + &s; - let mut f = File::create(&dst)?; - f.write_all(new.as_bytes())?; - f.write_all(b"\n")?; - - Ok(( - format!("Updating crate `{}#{}`", krate.name, krate.vers), - dst.clone(), - )) - }) -} +impl Repository { + pub fn open(url: &Url) -> CargoResult { + let checkout_path = TempDir::new("git")?; + let repository = git2::Repository::clone(url.as_str(), checkout_path.path())?; + + // All commits to the index registry made through crates.io will be made by bors, the Rust + // community's friendly GitHub bot. + let mut cfg = repository.config()?; + cfg.set_str("user.name", "bors")?; + cfg.set_str("user.email", "bors@rust-lang.org")?; + + Ok(Self { + checkout_path, + repository, + }) + } -/// Yanks or unyanks a crate version. This requires finding the index -/// file, deserlialise the crate from JSON, change the yank boolean to -/// `true` or `false`, write all the lines back out, and commit and -/// push the changes. -pub fn yank(app: &App, krate: &str, version: &semver::Version, yanked: bool) -> CargoResult<()> { - let repo = app.git_repo.lock().unwrap(); - let repo_path = repo.workdir().unwrap(); - let dst = index_file(repo_path, krate); - - commit_and_push(&repo, || { - let mut prev = String::new(); - File::open(&dst).and_then(|mut f| f.read_to_string(&mut prev))?; - let new = prev - .lines() - .map(|line| { - let mut git_crate = serde_json::from_str::(line) - .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; - if git_crate.name != krate || git_crate.vers != version.to_string() { - return Ok(line.to_string()); - } - git_crate.yanked = Some(yanked); - Ok(serde_json::to_string(&git_crate).unwrap()) - }) - .collect::>>(); - let new = new?.join("\n"); - let mut f = File::create(&dst)?; - f.write_all(new.as_bytes())?; - f.write_all(b"\n")?; - - Ok(( - format!( - "{} crate `{}#{}`", - if yanked { "Yanking" } else { "Unyanking" }, - krate, - version - ), - dst.clone(), - )) - }) -} + fn index_file(&self, name: &str) -> PathBuf { + self.checkout_path + .path() + .join(self.relative_index_file(name)) + } -/// Commits and pushes to the crates.io index. -/// -/// There are currently 2 instances of the crates.io backend running -/// on Heroku, and they race against each other e.g. if 2 pushes occur, -/// then one will succeed while the other will need to be rebased before -/// being pushed. -/// -/// A maximum of 20 attempts to commit and push to the index currently -/// accounts for the amount of traffic publishing crates, though this may -/// have to be changed in the future. -/// -/// Notes: -/// Currently, this function is called on the HTTP thread and is blocking. -/// Spawning a separate thread for this function means that the request -/// can return without waiting for completion, and other methods of -/// notifying upon completion or error can be used. -fn commit_and_push(repo: &git2::Repository, mut f: F) -> CargoResult<()> -where - F: FnMut() -> CargoResult<(String, PathBuf)>, -{ - let repo_path = repo.workdir().unwrap(); - - // Attempt to commit in a loop. It's possible that we're going to need to - // rebase our repository, and after that it's possible that we're going to - // race to commit the changes. For now we just cap out the maximum number of - // retries at a fixed number. - for _ in 0..20 { - let (msg, dst) = f()?; + fn relative_index_file(&self, name: &str) -> PathBuf { + let name = name.to_lowercase(); + match name.len() { + 1 => Path::new("1").join(&name), + 2 => Path::new("2").join(&name), + 3 => Path::new("3").join(&name[..1]).join(&name), + _ => Path::new(&name[0..2]).join(&name[2..4]).join(&name), + } + } + fn commit_and_push( + &self, + msg: &str, + modified_file: &Path, + credentials: Option<(&str, &str)>, + ) -> CargoResult<()> { // git add $file - let mut index = repo.index()?; - let mut repo_path = repo_path.iter(); - let dst = dst - .iter() - .skip_while(|s| Some(*s) == repo_path.next()) - .collect::(); - index.add_path(&dst)?; + let mut index = self.repository.index()?; + index.add_path(modified_file)?; index.write()?; let tree_id = index.write_tree()?; - let tree = repo.find_tree(tree_id)?; + let tree = self.repository.find_tree(tree_id)?; // git commit -m "..." - let head = repo.head()?; - let parent = repo.find_commit(head.target().unwrap())?; - let sig = repo.signature()?; - repo.commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; + let head = self.repository.head()?; + let parent = self.repository.find_commit(head.target().unwrap())?; + let sig = self.repository.signature()?; + self.repository + .commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; // git push - let mut ref_status = None; - let mut origin = repo.find_remote("origin")?; - let res = { + let mut ref_status = Ok(()); + { + let mut origin = self.repository.find_remote("origin")?; let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials(credentials); + callbacks.credentials(|_, _, _| { + credentials + .ok_or_else(|| git2::Error::from_str("no authentication set")) + .and_then(|(u, p)| git2::Cred::userpass_plaintext(u, p)) + }); callbacks.push_update_reference(|refname, status| { assert_eq!(refname, "refs/heads/master"); - ref_status = status.map(|s| s.to_string()); + if let Some(s) = status { + ref_status = Err(internal(&format_args!("failed to push a ref: {}", s))) + } Ok(()) }); let mut opts = git2::PushOptions::new(); opts.remote_callbacks(callbacks); - origin.push(&["refs/heads/master"], Some(&mut opts)) - }; - match res { - Ok(()) if ref_status.is_none() => return Ok(()), - Ok(()) => info!("failed to push a ref: {:?}", ref_status), - Err(e) => info!("failure to push: {}", e), + origin.push(&["refs/heads/master"], Some(&mut opts))?; } + ref_status + } - let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials(credentials); - origin.update_tips( - Some(&mut callbacks), - true, - git2::AutotagOption::Unspecified, - None, - )?; - - // Ok, we need to update, so fetch and reset --hard + pub fn reset_head(&self) -> CargoResult<()> { + let mut origin = self.repository.find_remote("origin")?; origin.fetch(&["refs/heads/*:refs/heads/*"], None, None)?; - let head = repo.head()?.target().unwrap(); - let obj = repo.find_object(head, None)?; - repo.reset(&obj, git2::ResetType::Hard, None)?; + let head = self.repository.head()?.target().unwrap(); + let obj = self.repository.find_object(head, None)?; + self.repository.reset(&obj, git2::ResetType::Hard, None)?; + Ok(()) + } +} + +#[derive(Deserialize, Serialize)] +pub struct AddCrate { + krate: Crate, +} + +impl Job for AddCrate { + type Environment = Environment; + const JOB_TYPE: &'static str = "add_crate"; + + fn perform(self, env: &Self::Environment) -> CargoResult<()> { + let repo = env.lock_index()?; + let dst = repo.index_file(&self.krate.name); + + // Add the crate to its relevant file + fs::create_dir_all(dst.parent().unwrap())?; + let mut file = OpenOptions::new().append(true).create(true).open(&dst)?; + serde_json::to_writer(&mut file, &self.krate)?; + file.write_all(b"\n")?; + + repo.commit_and_push( + &format!("Updating crate `{}#{}`", self.krate.name, self.krate.vers), + &repo.relative_index_file(&self.krate.name), + env.credentials(), + ) } +} + +pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> { + AddCrate { krate }.enqueue(conn).map_err(Into::into) +} + +#[derive(Serialize, Deserialize)] +pub struct Yank { + krate: String, + version: Version, + yanked: bool, +} - Err(internal("Too many rebase failures")) +impl Job for Yank { + type Environment = Environment; + const JOB_TYPE: &'static str = "yank"; + + fn perform(self, env: &Self::Environment) -> CargoResult<()> { + let repo = env.lock_index()?; + let dst = repo.index_file(&self.krate); + + let conn = env.connection()?; + + conn.transaction(|| { + let yanked_in_db = versions::table + .find(self.version.id) + .select(versions::yanked) + .for_update() + .first::(&*conn)?; + + if yanked_in_db == self.yanked { + // The crate is alread in the state requested, nothing to do + return Ok(()); + } + + let prev = fs::read_to_string(&dst)?; + let version = self.version.num.to_string(); + let new = prev + .lines() + .map(|line| { + let mut git_crate = serde_json::from_str::(line) + .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; + if git_crate.name != self.krate || git_crate.vers != version { + return Ok(line.to_string()); + } + git_crate.yanked = Some(self.yanked); + Ok(serde_json::to_string(&git_crate)?) + }) + .collect::>>(); + let new = new?.join("\n") + "\n"; + fs::write(&dst, new.as_bytes())?; + + repo.commit_and_push( + &format!( + "{} crate `{}#{}`", + if self.yanked { "Yanking" } else { "Unyanking" }, + self.krate, + self.version.num + ), + &repo.relative_index_file(&self.krate), + env.credentials(), + )?; + + diesel::update(&self.version) + .set(versions::yanked.eq(self.yanked)) + .execute(&*conn)?; + + Ok(()) + }) + } } -pub fn credentials( - _user: &str, - _user_from_url: Option<&str>, - _cred: git2::CredentialType, -) -> Result { - match (env::var("GIT_HTTP_USER"), env::var("GIT_HTTP_PWD")) { - (Ok(u), Ok(p)) => git2::Cred::userpass_plaintext(&u, &p), - _ => Err(git2::Error::from_str("no authentication set")), +/// Yanks or unyanks a crate version. This requires finding the index +/// file, deserlialise the crate from JSON, change the yank boolean to +/// `true` or `false`, write all the lines back out, and commit and +/// push the changes. +pub fn yank(conn: &PgConnection, krate: String, version: Version, yanked: bool) -> CargoResult<()> { + Yank { + krate, + version, + yanked, } + .enqueue(conn) + .map_err(Into::into) } diff --git a/src/lib.rs b/src/lib.rs index 2fe2182a06..c597c25fe1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,10 @@ extern crate serde_derive; #[macro_use] extern crate serde_json; +#[cfg(test)] +#[macro_use] +extern crate lazy_static; + pub use crate::{app::App, config::Config, uploaders::Uploader}; use std::sync::Arc; @@ -29,6 +33,8 @@ use jemallocator::Jemalloc; static ALLOC: Jemalloc = Jemalloc; mod app; +pub mod background; +pub mod background_jobs; pub mod boot; mod config; pub mod db; diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index efa5e9f251..e34cb38dc0 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -10,6 +10,7 @@ pub use self::debug::*; pub use self::ember_index_rewrite::EmberIndexRewrite; pub use self::head::Head; use self::log_connection_pool_status::LogConnectionPoolStatus; +use self::run_pending_background_jobs::RunPendingBackgroundJobs; pub use self::security_headers::SecurityHeaders; pub use self::static_or_continue::StaticOrContinue; @@ -23,6 +24,7 @@ mod head; mod log_connection_pool_status; mod log_request; mod require_user_agent; +mod run_pending_background_jobs; mod security_headers; mod static_or_continue; @@ -100,5 +102,9 @@ pub fn build_middleware(app: Arc, endpoints: R404) -> MiddlewareBuilder { m.around(log_request::LogRequests::default()); } + if env == Env::Test { + m.add(RunPendingBackgroundJobs); + } + m } diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs new file mode 100644 index 0000000000..2b831c1f94 --- /dev/null +++ b/src/middleware/run_pending_background_jobs.rs @@ -0,0 +1,29 @@ +use super::app::RequestApp; +use super::prelude::*; +use crate::background::Runner; +use crate::background_jobs::*; +use crate::git::Repository; + +pub struct RunPendingBackgroundJobs; + +impl Middleware for RunPendingBackgroundJobs { + fn after( + &self, + req: &mut dyn Request, + res: Result>, + ) -> Result> { + let app = req.app(); + let connection_pool = app.diesel_database.clone(); + let repo = Repository::open(&app.config.index_location).expect("Could not clone index"); + let environment = Environment::new(repo, None, app.diesel_database.clone()); + + let config = Runner::builder(connection_pool, environment); + let runner = job_runner(config); + + runner.run_all_pending_jobs().expect("Could not run jobs"); + runner + .assert_no_failed_jobs() + .expect("Could not determine if jobs failed"); + res + } +} diff --git a/src/models/version.rs b/src/models/version.rs index 202bbcec62..7efea6b12d 100644 --- a/src/models/version.rs +++ b/src/models/version.rs @@ -10,7 +10,7 @@ use crate::schema::*; use crate::views::{EncodableVersion, EncodableVersionLinks}; // Queryable has a custom implementation below -#[derive(Clone, Identifiable, Associations, Debug, Queryable)] +#[derive(Clone, Identifiable, Associations, Debug, Queryable, Deserialize, Serialize)] #[belongs_to(Crate)] pub struct Version { pub id: i32, diff --git a/src/schema.rs b/src/schema.rs index 31315009af..c89bd7d4eb 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -53,6 +53,53 @@ table! { } } +table! { + use diesel::sql_types::*; + use diesel_full_text_search::{TsVector as Tsvector}; + + /// Representation of the `background_jobs` table. + /// + /// (Automatically generated by Diesel.) + background_jobs (id) { + /// The `id` column of the `background_jobs` table. + /// + /// Its SQL type is `Int8`. + /// + /// (Automatically generated by Diesel.) + id -> Int8, + /// The `job_type` column of the `background_jobs` table. + /// + /// Its SQL type is `Text`. + /// + /// (Automatically generated by Diesel.) + job_type -> Text, + /// The `data` column of the `background_jobs` table. + /// + /// Its SQL type is `Jsonb`. + /// + /// (Automatically generated by Diesel.) + data -> Jsonb, + /// The `retries` column of the `background_jobs` table. + /// + /// Its SQL type is `Int4`. + /// + /// (Automatically generated by Diesel.) + retries -> Int4, + /// The `last_retry` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + last_retry -> Timestamp, + /// The `created_at` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + created_at -> Timestamp, + } +} + table! { use diesel::sql_types::*; use diesel_full_text_search::{TsVector as Tsvector}; @@ -934,6 +981,7 @@ joinable!(versions_published_by -> versions (version_id)); allow_tables_to_appear_in_same_query!( api_tokens, + background_jobs, badges, categories, crate_downloads, diff --git a/src/tests/all.rs b/src/tests/all.rs index d6c5d14c8f..9929ca7ee7 100644 --- a/src/tests/all.rs +++ b/src/tests/all.rs @@ -30,6 +30,7 @@ use std::{ use conduit::Request; use conduit_test::MockRequest; use diesel::prelude::*; +use url::Url; macro_rules! t { ($e:expr) => { @@ -138,6 +139,7 @@ fn simple_app(uploader: Uploader) -> (Arc, conduit_middleware::MiddlewareBu uploader, session_key: "test this has to be over 32 bytes long".to_string(), git_repo_checkout: git::checkout(), + index_location: Url::from_file_path(&git::bare()).unwrap(), gh_client_id: env::var("GH_CLIENT_ID").unwrap_or_default(), gh_client_secret: env::var("GH_CLIENT_SECRET").unwrap_or_default(), db_url: env("TEST_DATABASE_URL"),