From 2cf6a2ddb06a8b422971efa59ae572087f75e716 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Thu, 3 May 2018 15:14:08 -0600 Subject: [PATCH 01/16] Add a minimal background queueing framework Note: This is intended to possibly be extracted into a library, so the docs are written as if this were its own library. Cargo specific code (besides the use of `CargoResult`) will go in its own module for the same reason. This adds an MVP background queueing system intended to be used in place of the "try to commit 20 times" loop in `git.rs`. This is a fairly simple queue, that is intended to be "the easiest thing that fits our needs, with the least operational impact". There's a few reasons I've opted to go with our own queuing system here, rather than an existing solution like Faktory or Beanstalkd. - We'd have to write the majority of this code ourselves no matter what. - Client libraries for beanstalkd don't deal with the actual job logic, only `storage.rs` would get replaced - The only client for faktory that exists made some really odd API choices. Faktory also hasn't seen a lot of use in the wild yet. - I want to limit the number of services we have to manage. We have extremely limited ops bandwidth today, and every new part of the stack we have to manage is a huge cost. Right now we only have our server and PG. I'd like to keep it that way for as long as possible. This system takes advantage of the `SKIP LOCKED` feature in PostgreSQL 9.5 to handle all of the hard stuff for us. We use PG's row locking to treat a row as "currently being processed", which means we don't have to worry about returning it to the queue if the power goes out on one of our workers. This queue is intended only for jobs with "at least once" semantics. That means the entire job has to be idempotent. If the entire job completes successfully, but the power goes out before we commit the transaction, we will run the whole thing again. The code today also makes a few additional assumptions based on our current needs. We expect all jobs to complete successfully the first time, and the most likely reason a job would fail is due to an incident happening at GitHub, hence the extremely high retry timeout. I'm also assuming that all jobs will eventually complete, and that any job failing N (likely 5) times is an event that should page whoever is on call. (Paging is not part of this PR). Finally, it's unlikely that this queue will be appropriate for high thoughput use cases, since it requires one PG connection per worker (a real connection, adding pg bouncer wouldn't help here). Right now our only background work that happens is something that comes in on average every 5 minutes, but if we start moving more code to be run here we may want to revisit this in the future. --- .../2018-05-03-150523_create_jobs/down.sql | 1 + .../2018-05-03-150523_create_jobs/up.sql | 8 + src/background/job.rs | 26 +++ src/background/mod.rs | 8 + src/background/registry.rs | 39 ++++ src/background/runner.rs | 200 ++++++++++++++++++ src/background/storage.rs | 71 +++++++ src/lib.rs | 5 + src/schema.rs | 48 +++++ 9 files changed, 406 insertions(+) create mode 100644 migrations/2018-05-03-150523_create_jobs/down.sql create mode 100644 migrations/2018-05-03-150523_create_jobs/up.sql create mode 100644 src/background/job.rs create mode 100644 src/background/mod.rs create mode 100644 src/background/registry.rs create mode 100644 src/background/runner.rs create mode 100644 src/background/storage.rs diff --git a/migrations/2018-05-03-150523_create_jobs/down.sql b/migrations/2018-05-03-150523_create_jobs/down.sql new file mode 100644 index 0000000000..d7ff875a67 --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/down.sql @@ -0,0 +1 @@ +DROP TABLE background_jobs; diff --git a/migrations/2018-05-03-150523_create_jobs/up.sql b/migrations/2018-05-03-150523_create_jobs/up.sql new file mode 100644 index 0000000000..b6ac3047c6 --- /dev/null +++ b/migrations/2018-05-03-150523_create_jobs/up.sql @@ -0,0 +1,8 @@ +CREATE TABLE background_jobs ( + id BIGSERIAL PRIMARY KEY, + job_type TEXT NOT NULL, + data JSONB NOT NULL, + retries INTEGER NOT NULL DEFAULT 0, + last_retry TIMESTAMP NOT NULL DEFAULT '1970-01-01', + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); diff --git a/src/background/job.rs b/src/background/job.rs new file mode 100644 index 0000000000..5d3f23144b --- /dev/null +++ b/src/background/job.rs @@ -0,0 +1,26 @@ +use diesel::PgConnection; +use serde::{Serialize, de::DeserializeOwned}; + +use super::storage; +use util::CargoResult; + +/// A background job, meant to be run asynchronously. +pub trait Job: Serialize + DeserializeOwned { + /// The environment this job is run with. This is a struct you define, + /// which should encapsulate things like database connection pools, any + /// configuration, and any other static data or shared resources. + type Environment; + + /// The key to use for storing this job, and looking it up later. + /// + /// Typically this is the name of your struct in `snake_case` + const JOB_TYPE: &'static str; + + /// Enqueue this job to be run at some point in the future. + fn enqueue(self, conn: &PgConnection) -> CargoResult<()> { + storage::enqueue_job(conn, self) + } + + /// The logic involved in actually performing this job. + fn perform(self, env: &Self::Environment) -> CargoResult<()>; +} diff --git a/src/background/mod.rs b/src/background/mod.rs new file mode 100644 index 0000000000..70b8d6dce7 --- /dev/null +++ b/src/background/mod.rs @@ -0,0 +1,8 @@ +mod job; +mod registry; +mod runner; +mod storage; + +pub use self::job::*; +pub use self::registry::Registry; +pub use self::runner::*; diff --git a/src/background/registry.rs b/src/background/registry.rs new file mode 100644 index 0000000000..9105298d06 --- /dev/null +++ b/src/background/registry.rs @@ -0,0 +1,39 @@ +use serde_json; +use std::collections::HashMap; + +use super::Job; +use util::CargoResult; + +#[doc(hidden)] +pub type PerformFn = Box CargoResult<()>>; + +#[derive(Default)] +#[allow(missing_debug_implementations)] // Can't derive debug +/// A registry of background jobs, used to map job types to concrege perform +/// functions at runtime. +pub struct Registry { + job_types: HashMap<&'static str, PerformFn>, +} + +impl Registry { + /// Create a new, empty registry + pub fn new() -> Self { + Registry { + job_types: Default::default(), + } + } + + /// Get the perform function for a given job type + pub fn get(&self, job_type: &str) -> Option<&PerformFn> { + self.job_types.get(job_type) + } + + /// Register a new background job. This will override any existing + /// registries with the same `JOB_TYPE`, if one exists. + pub fn register>(&mut self) { + self.job_types.insert(T::JOB_TYPE, Box::new(|data, env| { + let data = serde_json::from_value(data)?; + T::perform(data, env) + })); + } +} diff --git a/src/background/runner.rs b/src/background/runner.rs new file mode 100644 index 0000000000..726cfc26a7 --- /dev/null +++ b/src/background/runner.rs @@ -0,0 +1,200 @@ +#![allow(dead_code)] +use diesel::prelude::*; +use std::panic::{catch_unwind, UnwindSafe}; + +use super::storage; +use util::errors::*; + +fn get_single_job(conn: &PgConnection, f: F) -> CargoResult<()> +where + F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + UnwindSafe, +{ + conn.transaction::<_, Box, _>(|| { + let job = storage::find_next_unlocked_job(conn)?; + let job_id = job.id; + + let result = catch_unwind(|| f(job)) + .map_err(|_| internal("job panicked")) + .and_then(|r| r); + + if result.is_ok() { + storage::delete_successful_job(conn, job_id)?; + } else { + storage::update_failed_job(conn, job_id); + } + Ok(()) + }) +} + +#[cfg(test)] +mod tests { + use diesel::prelude::*; + + use schema::background_jobs::dsl::*; + use std::sync::{Mutex, MutexGuard, Barrier, Arc}; + use std::panic::AssertUnwindSafe; + use std::thread; + use super::*; + + #[test] + fn jobs_are_locked_when_fetched() { + let _guard = TestGuard::lock(); + + let conn = connection(); + let first_job_id = create_dummy_job(&conn).id; + let second_job_id = create_dummy_job(&conn).id; + let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let fetch_barrier2 = fetch_barrier.clone(); + let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let return_barrier2 = return_barrier.clone(); + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |job| { + fetch_barrier.0.wait(); // Tell thread 2 it can lock its job + assert_eq!(first_job_id, job.id); + return_barrier.0.wait(); // Wait for thread 2 to lock its job + Ok(()) + }); + }); + + let t2 = thread::spawn(move || { + fetch_barrier2.0.wait(); // Wait until thread 1 locks its job + get_single_job(&connection(), |job| { + assert_eq!(second_job_id, job.id); + return_barrier2.0.wait(); // Tell thread 1 it can unlock its job + Ok(()) + }) + .unwrap(); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn jobs_are_deleted_when_successfully_run() { + let _guard = TestGuard::lock(); + + let conn = connection(); + create_dummy_job(&conn); + + get_single_job(&conn, |_| { + Ok(()) + }).unwrap(); + + let remaining_jobs = background_jobs.count() + .get_result(&conn); + assert_eq!(Ok(0), remaining_jobs); + } + + #[test] + fn failed_jobs_do_not_release_lock_before_updating_retry_time() { + let _guard = TestGuard::lock(); + create_dummy_job(&connection()); + let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); + let barrier2 = barrier.clone(); + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |_| { + barrier.0.wait(); + // error so the job goes back into the queue + Err(human("nope")) + }); + }); + + let t2 = thread::spawn(move || { + let conn = connection(); + // Wait for the first thread to acquire the lock + barrier2.0.wait(); + // We are intentionally not using `get_single_job` here. + // `SKIP LOCKED` is intentionally omitted here, so we block until + // the lock on the first job is released. + // If there is any point where the row is unlocked, but the retry + // count is not updated, we will get a row here. + let available_jobs = background_jobs + .select(id) + .filter(retries.eq(0)) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(0, available_jobs.len()); + + // Sanity check to make sure the job actually is there + let total_jobs_including_failed = background_jobs + .select(id) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(1, total_jobs_including_failed.len()); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn panicking_in_jobs_updates_retry_counter() { + let _guard = TestGuard::lock(); + let conn = connection(); + let job_id = create_dummy_job(&conn).id; + + let t1 = thread::spawn(move || { + let _ = get_single_job(&connection(), |_| { + panic!() + }); + }); + + let _ = t1.join(); + + let tries = background_jobs + .find(job_id) + .select(retries) + .for_update() + .first::(&conn) + .unwrap(); + assert_eq!(1, tries); + } + + + lazy_static! { + // Since these tests deal with behavior concerning multiple connections + // running concurrently, they have to run outside of a transaction. + // Therefore we can't run more than one at a time. + // + // Rather than forcing the whole suite to be run with `--test-threads 1`, + // we just lock these tests instead. + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); + } + + struct TestGuard<'a>(MutexGuard<'a, ()>); + + impl<'a> TestGuard<'a> { + fn lock() -> Self { + TestGuard(TEST_MUTEX.lock().unwrap()) + } + } + + impl<'a> Drop for TestGuard<'a> { + fn drop(&mut self) { + ::diesel::sql_query("TRUNCATE TABLE background_jobs") + .execute(&connection()) + .unwrap(); + } + } + + fn connection() -> PgConnection { + use dotenv; + + let database_url = + dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); + PgConnection::establish(&database_url).unwrap() + } + + fn create_dummy_job(conn: &PgConnection) -> storage::BackgroundJob { + ::diesel::insert_into(background_jobs) + .values((job_type.eq("Foo"), data.eq(json!(null)))) + .returning((id, job_type, data)) + .get_result(conn) + .unwrap() + } +} diff --git a/src/background/storage.rs b/src/background/storage.rs new file mode 100644 index 0000000000..1dbd9e47bb --- /dev/null +++ b/src/background/storage.rs @@ -0,0 +1,71 @@ +use diesel::dsl::now; +use diesel::prelude::*; +use diesel::{delete, insert_into, update}; +use diesel::sql_types::Integer; +use serde_json; + +use schema::background_jobs; +use super::Job; +use util::CargoResult; + +#[derive(Queryable, Identifiable, Debug, Clone)] +pub struct BackgroundJob { + pub id: i64, + pub job_type: String, + pub data: serde_json::Value, +} + +/// Enqueues a job to be run as soon as possible. +pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { + use schema::background_jobs::dsl::*; + + let job_data = serde_json::to_value(job)?; + insert_into(background_jobs) + .values(( + job_type.eq(T::JOB_TYPE), + data.eq(job_data), + )) + .execute(conn)?; + Ok(()) +} + +/// Finds the next job that is unlocked, and ready to be retried. If a row is +/// found, it will be locked. +pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { + use schema::background_jobs::dsl::*; + use diesel::dsl::*; + use diesel::sql_types::Interval; + + sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer); + + background_jobs + .select((id, job_type, data)) + .filter(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) + .order(id) + .for_update() + .skip_locked() + .first::(conn) +} + +/// Deletes a job that has successfully completed running +pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> { + use schema::background_jobs::dsl::*; + + delete(background_jobs.find(job_id)).execute(conn)?; + Ok(()) +} + +/// Marks that we just tried and failed to run a job. +/// +/// Ignores any database errors that may have occurred. If the DB has gone away, +/// we assume that just trying again with a new connection will succeed. +pub fn update_failed_job(conn: &PgConnection, job_id: i64) { + use schema::background_jobs::dsl::*; + + let _ = update(background_jobs.find(job_id)) + .set(( + retries.eq(retries + 1), + last_retry.eq(now), + )) + .execute(conn); +} diff --git a/src/lib.rs b/src/lib.rs index 2fe2182a06..6927475c9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,10 @@ extern crate serde_derive; #[macro_use] extern crate serde_json; +#[cfg(test)] +#[macro_use] +extern crate lazy_static; + pub use crate::{app::App, config::Config, uploaders::Uploader}; use std::sync::Arc; @@ -29,6 +33,7 @@ use jemallocator::Jemalloc; static ALLOC: Jemalloc = Jemalloc; mod app; +pub mod background; pub mod boot; mod config; pub mod db; diff --git a/src/schema.rs b/src/schema.rs index 31315009af..c89bd7d4eb 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -53,6 +53,53 @@ table! { } } +table! { + use diesel::sql_types::*; + use diesel_full_text_search::{TsVector as Tsvector}; + + /// Representation of the `background_jobs` table. + /// + /// (Automatically generated by Diesel.) + background_jobs (id) { + /// The `id` column of the `background_jobs` table. + /// + /// Its SQL type is `Int8`. + /// + /// (Automatically generated by Diesel.) + id -> Int8, + /// The `job_type` column of the `background_jobs` table. + /// + /// Its SQL type is `Text`. + /// + /// (Automatically generated by Diesel.) + job_type -> Text, + /// The `data` column of the `background_jobs` table. + /// + /// Its SQL type is `Jsonb`. + /// + /// (Automatically generated by Diesel.) + data -> Jsonb, + /// The `retries` column of the `background_jobs` table. + /// + /// Its SQL type is `Int4`. + /// + /// (Automatically generated by Diesel.) + retries -> Int4, + /// The `last_retry` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + last_retry -> Timestamp, + /// The `created_at` column of the `background_jobs` table. + /// + /// Its SQL type is `Timestamp`. + /// + /// (Automatically generated by Diesel.) + created_at -> Timestamp, + } +} + table! { use diesel::sql_types::*; use diesel_full_text_search::{TsVector as Tsvector}; @@ -934,6 +981,7 @@ joinable!(versions_published_by -> versions (version_id)); allow_tables_to_appear_in_same_query!( api_tokens, + background_jobs, badges, categories, crate_downloads, From 1e234dd9871d24592575e3111b6c7c3708f3fb96 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 12 Oct 2018 11:39:27 -0600 Subject: [PATCH 02/16] Encapsulate some of our test behavior into a `Runner` The `Runner` is basically the main interface into the job queue, and stores the environment, thread pool, and database connection pool. We technically don't need a thread pool since our jobs are so infrequent that we don't need more than one thread. But our tests are simplified by having it, and a general purpose lib will need it. The other oddity is that we will want to panic instead of returning a result if updating or fetching a job failed for some reason. This isn't something we expect to occur unless there's an issue in the DB, so this seems fine. A thread panicking won't kill the runner. We are also only panicking if changes to the `background_jobs` table fails, not if the job itself fails. --- Cargo.lock | 10 ++ Cargo.toml | 1 + src/background/registry.rs | 2 +- src/background/runner.rs | 249 +++++++++++++++++++++++-------------- 4 files changed, 168 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1313e7de42..215eaff056 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,6 +193,7 @@ dependencies = [ "serde_json 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "tar 0.4.21 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2150,6 +2151,14 @@ dependencies = [ "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "threadpool" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.42" @@ -2805,6 +2814,7 @@ dependencies = [ "checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" +"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fcaabb3cec70485d0df6e9454fe514393ad1c4070dee8915f11041e95630b230" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" diff --git a/Cargo.toml b/Cargo.toml index 58d2d411ab..e4031889d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ tempdir = "0.3.7" parking_lot = "0.7.1" jemallocator = { version = "0.1.8", features = ['unprefixed_malloc_on_supported_platforms', 'profiling'] } jemalloc-ctl = "0.2.0" +threadpool = "1.7" lettre = {git = "https://github.com/lettre/lettre", version = "0.9"} lettre_email = {git = "https://github.com/lettre/lettre", version = "0.9"} diff --git a/src/background/registry.rs b/src/background/registry.rs index 9105298d06..c193d28d00 100644 --- a/src/background/registry.rs +++ b/src/background/registry.rs @@ -5,7 +5,7 @@ use super::Job; use util::CargoResult; #[doc(hidden)] -pub type PerformFn = Box CargoResult<()>>; +pub type PerformFn = Box CargoResult<()> + Send + Sync>; #[derive(Default)] #[allow(missing_debug_implementations)] // Can't derive debug diff --git a/src/background/runner.rs b/src/background/runner.rs index 726cfc26a7..2be7bdb40f 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -1,161 +1,216 @@ #![allow(dead_code)] use diesel::prelude::*; use std::panic::{catch_unwind, UnwindSafe}; +use threadpool::ThreadPool; -use super::storage; +use db::{DieselPool, DieselPooledConn}; +use super::{storage, Registry, Job}; use util::errors::*; -fn get_single_job(conn: &PgConnection, f: F) -> CargoResult<()> -where - F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + UnwindSafe, -{ - conn.transaction::<_, Box, _>(|| { - let job = storage::find_next_unlocked_job(conn)?; - let job_id = job.id; - - let result = catch_unwind(|| f(job)) - .map_err(|_| internal("job panicked")) - .and_then(|r| r); - - if result.is_ok() { - storage::delete_successful_job(conn, job_id)?; - } else { - storage::update_failed_job(conn, job_id); +#[allow(missing_debug_implementations)] +pub struct Builder { + connection_pool: DieselPool, + environment: Env, + registry: Registry, + thread_count: Option, +} + +impl Builder { + pub fn register>(mut self) -> Self { + self.registry.register::(); + self + } + + pub fn thread_count(mut self, thread_count: usize) -> Self { + self.thread_count = Some(thread_count); + self + } + + pub fn build(self) -> Runner { + Runner { + connection_pool: self.connection_pool, + thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)), + environment: self.environment, + registry: self.registry, } - Ok(()) - }) + } +} + +#[allow(missing_debug_implementations)] +pub struct Runner { + connection_pool: DieselPool, + thread_pool: ThreadPool, + environment: Env, + registry: Registry, +} + +impl Runner { + pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder { + Builder { + connection_pool, + environment, + registry: Registry::new(), + thread_count: None, + } + } + + fn get_single_job(&self, f: F) + where + F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, + { + let conn = self.connection().expect("Could not acquire connection"); + self.thread_pool.execute(move || { + conn.transaction::<_, Box, _>(|| { + let job = storage::find_next_unlocked_job(&conn).optional()?; + let job = match job { + Some(j) => j, + None => return Ok(()), + }; + let job_id = job.id; + + let result = catch_unwind(|| f(job)) + .map_err(|_| internal("job panicked")) + .and_then(|r| r); + + if result.is_ok() { + storage::delete_successful_job(&conn, job_id)?; + } else { + storage::update_failed_job(&conn, job_id); + } + Ok(()) + }).expect("Could not retrieve or update job") + }) + } + + fn connection(&self) -> CargoResult { + self.connection_pool.get().map_err(Into::into) + } + + #[cfg(test)] + fn wait_for_jobs(&self) { + self.thread_pool.join(); + assert_eq!(0, self.thread_pool.panic_count()); + } } #[cfg(test)] mod tests { use diesel::prelude::*; + use diesel::r2d2; use schema::background_jobs::dsl::*; use std::sync::{Mutex, MutexGuard, Barrier, Arc}; use std::panic::AssertUnwindSafe; - use std::thread; use super::*; #[test] fn jobs_are_locked_when_fetched() { let _guard = TestGuard::lock(); - let conn = connection(); - let first_job_id = create_dummy_job(&conn).id; - let second_job_id = create_dummy_job(&conn).id; + let runner = runner(); + let first_job_id = create_dummy_job(&runner).id; + let second_job_id = create_dummy_job(&runner).id; let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let fetch_barrier2 = fetch_barrier.clone(); let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let return_barrier2 = return_barrier.clone(); - let t1 = thread::spawn(move || { - let _ = get_single_job(&connection(), |job| { - fetch_barrier.0.wait(); // Tell thread 2 it can lock its job - assert_eq!(first_job_id, job.id); - return_barrier.0.wait(); // Wait for thread 2 to lock its job - Ok(()) - }); + runner.get_single_job(move |job| { + fetch_barrier.0.wait(); // Tell thread 2 it can lock its job + assert_eq!(first_job_id, job.id); + return_barrier.0.wait(); // Wait for thread 2 to lock its job + Ok(()) }); - let t2 = thread::spawn(move || { - fetch_barrier2.0.wait(); // Wait until thread 1 locks its job - get_single_job(&connection(), |job| { - assert_eq!(second_job_id, job.id); - return_barrier2.0.wait(); // Tell thread 1 it can unlock its job - Ok(()) - }) - .unwrap(); + fetch_barrier2.0.wait(); // Wait until thread 1 locks its job + runner.get_single_job(move |job| { + assert_eq!(second_job_id, job.id); + return_barrier2.0.wait(); // Tell thread 1 it can unlock its job + Ok(()) }); - t1.join().unwrap(); - t2.join().unwrap(); + runner.wait_for_jobs(); } #[test] fn jobs_are_deleted_when_successfully_run() { let _guard = TestGuard::lock(); - let conn = connection(); - create_dummy_job(&conn); + let runner = runner(); + create_dummy_job(&runner); - get_single_job(&conn, |_| { + runner.get_single_job(|_| { Ok(()) - }).unwrap(); + }); + runner.wait_for_jobs(); let remaining_jobs = background_jobs.count() - .get_result(&conn); + .get_result(&runner.connection().unwrap()); assert_eq!(Ok(0), remaining_jobs); } #[test] fn failed_jobs_do_not_release_lock_before_updating_retry_time() { let _guard = TestGuard::lock(); - create_dummy_job(&connection()); + + let runner = runner(); + create_dummy_job(&runner); let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let barrier2 = barrier.clone(); - let t1 = thread::spawn(move || { - let _ = get_single_job(&connection(), |_| { - barrier.0.wait(); - // error so the job goes back into the queue - Err(human("nope")) - }); + runner.get_single_job(move |_| { + barrier.0.wait(); + // error so the job goes back into the queue + Err(human("nope")) }); - let t2 = thread::spawn(move || { - let conn = connection(); - // Wait for the first thread to acquire the lock - barrier2.0.wait(); - // We are intentionally not using `get_single_job` here. - // `SKIP LOCKED` is intentionally omitted here, so we block until - // the lock on the first job is released. - // If there is any point where the row is unlocked, but the retry - // count is not updated, we will get a row here. - let available_jobs = background_jobs - .select(id) - .filter(retries.eq(0)) - .for_update() - .load::(&conn) - .unwrap(); - assert_eq!(0, available_jobs.len()); + let conn = runner.connection().unwrap(); + // Wait for the first thread to acquire the lock + barrier2.0.wait(); + // We are intentionally not using `get_single_job` here. + // `SKIP LOCKED` is intentionally omitted here, so we block until + // the lock on the first job is released. + // If there is any point where the row is unlocked, but the retry + // count is not updated, we will get a row here. + let available_jobs = background_jobs + .select(id) + .filter(retries.eq(0)) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(0, available_jobs.len()); - // Sanity check to make sure the job actually is there - let total_jobs_including_failed = background_jobs - .select(id) - .for_update() - .load::(&conn) - .unwrap(); - assert_eq!(1, total_jobs_including_failed.len()); - }); + // Sanity check to make sure the job actually is there + let total_jobs_including_failed = background_jobs + .select(id) + .for_update() + .load::(&conn) + .unwrap(); + assert_eq!(1, total_jobs_including_failed.len()); - t1.join().unwrap(); - t2.join().unwrap(); + runner.wait_for_jobs(); } #[test] fn panicking_in_jobs_updates_retry_counter() { let _guard = TestGuard::lock(); - let conn = connection(); - let job_id = create_dummy_job(&conn).id; + let runner = runner(); + let job_id = create_dummy_job(&runner).id; - let t1 = thread::spawn(move || { - let _ = get_single_job(&connection(), |_| { - panic!() - }); + runner.get_single_job(|_| { + panic!() }); - - let _ = t1.join(); + runner.wait_for_jobs(); let tries = background_jobs .find(job_id) .select(retries) .for_update() - .first::(&conn) + .first::(&runner.connection().unwrap()) .unwrap(); assert_eq!(1, tries); } - lazy_static! { // Since these tests deal with behavior concerning multiple connections // running concurrently, they have to run outside of a transaction. @@ -177,24 +232,32 @@ mod tests { impl<'a> Drop for TestGuard<'a> { fn drop(&mut self) { ::diesel::sql_query("TRUNCATE TABLE background_jobs") - .execute(&connection()) + .execute(&runner().connection().unwrap()) .unwrap(); } } - fn connection() -> PgConnection { + fn runner() -> Runner<()> { use dotenv; let database_url = dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); - PgConnection::establish(&database_url).unwrap() + let manager = r2d2::ConnectionManager::new(database_url); + let pool = r2d2::Pool::builder() + .max_size(2) + .build(manager) + .unwrap(); + + Runner::builder(pool, ()) + .thread_count(2) + .build() } - fn create_dummy_job(conn: &PgConnection) -> storage::BackgroundJob { + fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { ::diesel::insert_into(background_jobs) .values((job_type.eq("Foo"), data.eq(json!(null)))) .returning((id, job_type, data)) - .get_result(conn) + .get_result(&runner.connection().unwrap()) .unwrap() } } From 6991e903c3a04ee59acb33b72299fd5fe43e17ad Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 12 Oct 2018 15:15:32 -0600 Subject: [PATCH 03/16] Move our index updates to be run in background jobs This fundamentally changes the workflow for all operations we perform involving git, so that they are not performed on the web server and do not block the response. This will improve the response times of `cargo publish`, and make the publish process more resilient, reducing the liklihood of an inconsistency occurring such as the index getting updated, but not our database. Previously, our workflow looked something like this: - When the server boots, do a full clone of the index into a known location - Some request comes in that needs to update the index - Database transaction is opened - Local checkout is modified, we attempt to commit & push (note: This involves a mutex to avoid contention with another request to update the index on the same server) - If push fails, we fetch, `reset --hard`, and try again up to 20 times - Database transaction is committed - We send a successful response The reason for the retry logic is that we have more than one web server, meaning no server can be sure that its local checkout is actually up to date. There's also a major opportunity for an inconsistent state to be reached here. If the power goes out, the server is restarted, something crashes, etc, in between the index being updated and the database transaction being committed, we will never retry it. The new workflow looks like this: - Some request comes in that needs to update the index - A job is queued in the database to update the index at some point in the future. - We send a successful response - A separate process pulls the job out of the database - A full clone of the index is performed into a temporary directory - The new checkout is modified, committed, and pushed - If push succeeds, job is removed from database - If push fails, job is marked as failed and will be retried at some point in the future While a background worker can be spread across multiple machines and/or threads, we will be able to avoid the race conditions that were previously possible by ensuring that we only have one worker with one thread that handles index updates. Right now that's easy since index updates are the only background job we have, but as we add more we will need to add support for multiple queues to account for this. I've opted to do a fresh checkout in every job, rather than relying on some state that was setup when the machine booted. This is mostly for simplicity's sake. It also means that if we need to scale to multiple threads/processes for other jobs, we can punt the multi-queue enhancement for a while if we wish. However, it does mean the job will take a bit longer to run. If this turns out to be a problem, it's easy to address. This should eliminate the opportunity for the index to enter an inconsistent state from our database -- or at least they should become eventually consistent. If the power goes out before the job is committed as done, it is assumed the job failed and it will be retried. The job itself is idempotent, so even if the power goes out after the index is updated, the retry should succeed. One other side effect of this change is that when `cargo publish` returns with an exit status of 0, that does not mean that your crate/new version is immediately available for use -- if you try to point to it in Cargo.toml seconds after publishing, you may get an error that it could not find that version. This was technically already true, since neither S3 nor GitHub guarantee that uploads/pushes are immediately visible. However, this does increase the timescale beyond the delay we would have seen there. In most cases it should be under 10 seconds, and at most a minute. One enhancement that will come as a followup, but is not included in this PR is a UI to see the status of your upload. This is definitely nice to have, but is not something I think is necessary for this feature to land. The time it would take to navigate to that UI is going to be longer than the time it takes the background job to run in most cases. That enhancement is something I think can go hand in hand with #1503 (which incidentally becomes much easier to implement with this PR, since a "staging" publish just skips queuing the background job, and the only thing the button to full publish needs to do is queue the job). This setup does assume that all background jobs *must* eventually succeed. If any job fails, the index is in an inconsistent state with our database, and we are having an outage of some kind. Due to the nature of our background jobs, this likely means that GitHub is down, or there is a bug in our code. Either way, we page whoever is on-call, since it means publishing is broken. Since publishing crates is such an infrequent event, I've set the thresholds to be extremely low. --- src/app.rs | 9 +- src/background/job.rs | 2 +- src/background/registry.rs | 5 +- src/background/runner.rs | 76 ++++- src/background/storage.rs | 50 ++- src/background_jobs.rs | 23 ++ src/bin/server.rs | 35 +-- src/config.rs | 3 + src/controllers/krate/publish.rs | 2 +- src/controllers/version/yank.rs | 2 +- src/git.rs | 292 +++++++++--------- src/lib.rs | 1 + src/middleware/mod.rs | 6 + src/middleware/run_pending_background_jobs.rs | 28 ++ src/tests/all.rs | 2 + 15 files changed, 312 insertions(+), 224 deletions(-) create mode 100644 src/background_jobs.rs create mode 100644 src/middleware/run_pending_background_jobs.rs diff --git a/src/app.rs b/src/app.rs index 41591ae9f5..18dc025602 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,7 +4,7 @@ use crate::{db, util::CargoResult, Config, Env}; use std::{ env, path::PathBuf, - sync::{Arc, Mutex}, + sync::Arc, time::Duration, }; @@ -25,10 +25,8 @@ pub struct App { /// A unique key used with conduit_cookie to generate cookies pub session_key: String, - /// The crate index git repository - pub git_repo: Mutex, - /// The location on disk of the checkout of the crate index git repository + /// Only used in the development environment. pub git_repo_checkout: PathBuf, /// The server configuration @@ -86,13 +84,10 @@ impl App { .connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout))) .thread_pool(thread_pool); - let repo = git2::Repository::open(&config.git_repo_checkout).unwrap(); - App { diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config), github, session_key: config.session_key.clone(), - git_repo: Mutex::new(repo), git_repo_checkout: config.git_repo_checkout.clone(), config: config.clone(), } diff --git a/src/background/job.rs b/src/background/job.rs index 5d3f23144b..9ffa0006a3 100644 --- a/src/background/job.rs +++ b/src/background/job.rs @@ -2,7 +2,7 @@ use diesel::PgConnection; use serde::{Serialize, de::DeserializeOwned}; use super::storage; -use util::CargoResult; +use crate::util::CargoResult; /// A background job, meant to be run asynchronously. pub trait Job: Serialize + DeserializeOwned { diff --git a/src/background/registry.rs b/src/background/registry.rs index c193d28d00..5867c8abc9 100644 --- a/src/background/registry.rs +++ b/src/background/registry.rs @@ -1,11 +1,12 @@ use serde_json; use std::collections::HashMap; +use std::panic::RefUnwindSafe; use super::Job; -use util::CargoResult; +use crate::util::CargoResult; #[doc(hidden)] -pub type PerformFn = Box CargoResult<()> + Send + Sync>; +pub type PerformFn = Box CargoResult<()> + RefUnwindSafe + Send + Sync>; #[derive(Default)] #[allow(missing_debug_implementations)] // Can't derive debug diff --git a/src/background/runner.rs b/src/background/runner.rs index 2be7bdb40f..6dd8c5839b 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -1,11 +1,13 @@ #![allow(dead_code)] use diesel::prelude::*; -use std::panic::{catch_unwind, UnwindSafe}; +use std::any::Any; +use std::panic::{catch_unwind, UnwindSafe, RefUnwindSafe, PanicInfo}; +use std::sync::Arc; use threadpool::ThreadPool; -use db::{DieselPool, DieselPooledConn}; +use crate::db::{DieselPool, DieselPooledConn}; use super::{storage, Registry, Job}; -use util::errors::*; +use crate::util::errors::*; #[allow(missing_debug_implementations)] pub struct Builder { @@ -30,8 +32,8 @@ impl Builder { Runner { connection_pool: self.connection_pool, thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)), - environment: self.environment, - registry: self.registry, + environment: Arc::new(self.environment), + registry: Arc::new(self.registry), } } } @@ -40,11 +42,11 @@ impl Builder { pub struct Runner { connection_pool: DieselPool, thread_pool: ThreadPool, - environment: Env, - registry: Registry, + environment: Arc, + registry: Arc>, } -impl Runner { +impl Runner { pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder { Builder { connection_pool, @@ -54,6 +56,24 @@ impl Runner { } } + pub fn run_all_pending_jobs(&self) -> CargoResult<()> { + let available_job_count = storage::available_job_count(&*self.connection()?)?; + for _ in 0..available_job_count { + self.run_single_job() + } + Ok(()) + } + + fn run_single_job(&self) { + let environment = Arc::clone(&self.environment); + let registry = Arc::clone(&self.registry); + self.get_single_job(move |job| { + let perform_fn = registry.get(&job.job_type) + .ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?; + perform_fn(job.data, &environment) + }) + } + fn get_single_job(&self, f: F) where F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, @@ -69,13 +89,15 @@ impl Runner { let job_id = job.id; let result = catch_unwind(|| f(job)) - .map_err(|_| internal("job panicked")) + .map_err(try_to_extract_panic_info) .and_then(|r| r); - if result.is_ok() { - storage::delete_successful_job(&conn, job_id)?; - } else { - storage::update_failed_job(&conn, job_id); + match result { + Ok(_) => storage::delete_successful_job(&conn, job_id)?, + Err(e) => { + eprintln!("Job {} failed to run: {}", job_id, e); + storage::update_failed_job(&conn, job_id); + } } Ok(()) }).expect("Could not retrieve or update job") @@ -86,19 +108,43 @@ impl Runner { self.connection_pool.get().map_err(Into::into) } - #[cfg(test)] + pub fn assert_no_failed_jobs(&self) -> CargoResult<()> { + self.wait_for_jobs(); + let failed_jobs = storage::failed_job_count(&*self.connection()?)?; + assert_eq!(0, failed_jobs); + Ok(()) + } + fn wait_for_jobs(&self) { self.thread_pool.join(); assert_eq!(0, self.thread_pool.panic_count()); } } +/// Try to figure out what's in the box, and print it if we can. +/// +/// The actual error type we will get from `panic::catch_unwind` is really poorly documented. +/// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is +/// documented as "commonly but not always `&'static str` or `String`". So we can try all of those, +/// and give up if we didn't get one of those three types. +fn try_to_extract_panic_info(info: Box) -> Box { + if let Some(x) = info.downcast_ref::() { + internal(&format_args!("job panicked: {}", x)) + } else if let Some(x) = info.downcast_ref::<&'static str>() { + internal(&format_args!("job panicked: {}", x)) + } else if let Some(x) = info.downcast_ref::() { + internal(&format_args!("job panicked: {}", x)) + } else { + internal("job panicked") + } +} + #[cfg(test)] mod tests { use diesel::prelude::*; use diesel::r2d2; - use schema::background_jobs::dsl::*; + use crate::schema::background_jobs::dsl::*; use std::sync::{Mutex, MutexGuard, Barrier, Arc}; use std::panic::AssertUnwindSafe; use super::*; diff --git a/src/background/storage.rs b/src/background/storage.rs index 1dbd9e47bb..2873c896da 100644 --- a/src/background/storage.rs +++ b/src/background/storage.rs @@ -1,12 +1,13 @@ use diesel::dsl::now; +use diesel::pg::Pg; use diesel::prelude::*; use diesel::{delete, insert_into, update}; -use diesel::sql_types::Integer; +use diesel::sql_types::{Bool, Integer, Interval}; use serde_json; -use schema::background_jobs; +use crate::schema::background_jobs; use super::Job; -use util::CargoResult; +use crate::util::CargoResult; #[derive(Queryable, Identifiable, Debug, Clone)] pub struct BackgroundJob { @@ -17,7 +18,7 @@ pub struct BackgroundJob { /// Enqueues a job to be run as soon as possible. pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { - use schema::background_jobs::dsl::*; + use crate::schema::background_jobs::dsl::*; let job_data = serde_json::to_value(job)?; insert_into(background_jobs) @@ -29,27 +30,52 @@ pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { Ok(()) } -/// Finds the next job that is unlocked, and ready to be retried. If a row is -/// found, it will be locked. -pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { - use schema::background_jobs::dsl::*; +fn retriable() -> Box> { + use crate::schema::background_jobs::dsl::*; use diesel::dsl::*; - use diesel::sql_types::Interval; sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer); + Box::new(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) +} + +/// Finds the next job that is unlocked, and ready to be retried. If a row is +/// found, it will be locked. +pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + background_jobs .select((id, job_type, data)) - .filter(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) + .filter(retriable()) .order(id) .for_update() .skip_locked() .first::(conn) } +/// The number of jobs available to be run +pub fn failed_job_count(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + + background_jobs + .count() + .filter(retries.gt(0)) + .get_result(conn) +} + +/// The number of jobs that have failed at least once +pub fn available_job_count(conn: &PgConnection) -> QueryResult { + use crate::schema::background_jobs::dsl::*; + + background_jobs + .count() + .filter(retriable()) + .get_result(conn) +} + /// Deletes a job that has successfully completed running pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> { - use schema::background_jobs::dsl::*; + use crate::schema::background_jobs::dsl::*; delete(background_jobs.find(job_id)).execute(conn)?; Ok(()) @@ -60,7 +86,7 @@ pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<() /// Ignores any database errors that may have occurred. If the DB has gone away, /// we assume that just trying again with a new connection will succeed. pub fn update_failed_job(conn: &PgConnection, job_id: i64) { - use schema::background_jobs::dsl::*; + use crate::schema::background_jobs::dsl::*; let _ = update(background_jobs.find(job_id)) .set(( diff --git a/src/background_jobs.rs b/src/background_jobs.rs new file mode 100644 index 0000000000..a9cad294c4 --- /dev/null +++ b/src/background_jobs.rs @@ -0,0 +1,23 @@ +use url::Url; + +use crate::background::{Runner, Builder}; +use crate::git::{AddCrate, Yank}; + +pub fn job_runner(config: Builder) -> Runner { + config + .register::() + .register::() + .build() +} + +#[allow(missing_debug_implementations)] +pub struct Environment { + pub index_location: Url, + pub credentials: Option<(String, String)>, +} + +impl Environment { + pub fn credentials(&self) -> Option<(&str, &str)> { + self.credentials.as_ref().map(|(u, p)| (u.as_str(), p.as_str())) + } +} diff --git a/src/bin/server.rs b/src/bin/server.rs index cf2caaa5c1..43f08586d9 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,10 +1,10 @@ #![deny(warnings)] -use cargo_registry::{boot, build_handler, env, git, App, Config, Env}; +use cargo_registry::{boot, App, Env}; use jemalloc_ctl; use std::{ env, - fs::{self, File}, + fs::File, sync::{mpsc::channel, Arc}, }; @@ -23,37 +23,10 @@ fn main() { // Initialize logging env_logger::init(); - let config = Config::default(); - - // If there isn't a git checkout containing the crate index repo at the path specified - // by `GIT_REPO_CHECKOUT`, delete that directory and clone the repo specified by `GIT_REPO_URL` - // into that directory instead. Uses the credentials specified in `GIT_HTTP_USER` and - // `GIT_HTTP_PWD` via the `cargo_registry::git::credentials` function. - let url = env("GIT_REPO_URL"); - let repo = match git2::Repository::open(&config.git_repo_checkout) { - Ok(r) => r, - Err(..) => { - let _ = fs::remove_dir_all(&config.git_repo_checkout); - fs::create_dir_all(&config.git_repo_checkout).unwrap(); - let mut cb = git2::RemoteCallbacks::new(); - cb.credentials(git::credentials); - let mut opts = git2::FetchOptions::new(); - opts.remote_callbacks(cb); - git2::build::RepoBuilder::new() - .fetch_options(opts) - .clone(&url, &config.git_repo_checkout) - .unwrap() - } - }; - - // All commits to the index registry made through crates.io will be made by bors, the Rust - // community's friendly GitHub bot. - let mut cfg = repo.config().unwrap(); - cfg.set_str("user.name", "bors").unwrap(); - cfg.set_str("user.email", "bors@rust-lang.org").unwrap(); + let config = cargo_registry::Config::default(); let app = App::new(&config); - let app = build_handler(Arc::new(app)); + let app = cargo_registry::build_handler(Arc::new(app)); // On every server restart, ensure the categories available in the database match // the information in *src/categories.toml*. diff --git a/src/config.rs b/src/config.rs index c86f7affcc..bf2b436e41 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,13 @@ use crate::{env, uploaders::Uploader, Env, Replica}; use std::{env, path::PathBuf}; +use url::Url; #[derive(Clone, Debug)] pub struct Config { pub uploader: Uploader, pub session_key: String, pub git_repo_checkout: PathBuf, + pub index_location: Url, pub gh_client_id: String, pub gh_client_secret: String, pub db_url: String, @@ -124,6 +126,7 @@ impl Default for Config { uploader, session_key: env("SESSION_KEY"), git_repo_checkout: checkout, + index_location: Url::parse(&env("GIT_REPO_URL")).unwrap(), gh_client_id: env("GH_CLIENT_ID"), gh_client_secret: env("GH_CLIENT_SECRET"), db_url: env("DATABASE_URL"), diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 55312df7b0..c88cddcbbb 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -196,7 +196,7 @@ pub fn publish(req: &mut dyn Request) -> CargoResult { yanked: Some(false), links, }; - git::add_crate(&**req.app(), &git_crate).chain_error(|| { + git::add_crate(&conn, git_crate).chain_error(|| { internal(&format_args!( "could not add crate `{}` to the git repo", name diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index a413ad68e1..20a5a84c0b 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -43,7 +43,7 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> CargoResult { diesel::update(&version) .set(versions::yanked.eq(yanked)) .execute(&*conn)?; - git::yank(&**req.app(), &krate.name, &version.num, yanked)?; + git::yank(&conn, krate.name, version.num, yanked)?; Ok(()) })?; } diff --git a/src/git.rs b/src/git.rs index 6d7dfa0bf5..0caa58c93a 100644 --- a/src/git.rs +++ b/src/git.rs @@ -1,12 +1,16 @@ +#![allow(missing_debug_implementations)] + +use diesel::prelude::*; use std::collections::HashMap; -use std::env; -use std::fs::{self, File}; +use std::fs::{self, OpenOptions}; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use tempdir::TempDir; +use url::Url; -use crate::app::App; +use crate::background::Job; +use crate::background_jobs::Environment; use crate::util::{internal, CargoResult}; - use crate::models::DependencyKind; #[derive(Serialize, Deserialize, Debug)] @@ -34,180 +38,160 @@ pub struct Dependency { pub package: Option, } -fn index_file(base: &Path, name: &str) -> PathBuf { - let name = name - .chars() - .flat_map(|c| c.to_lowercase()) - .collect::(); - match name.len() { - 1 => base.join("1").join(&name), - 2 => base.join("2").join(&name), - 3 => base.join("3").join(&name[..1]).join(&name), - _ => base.join(&name[0..2]).join(&name[2..4]).join(&name), - } +struct Repository { + checkout_path: TempDir, + repository: git2::Repository, } -pub fn add_crate(app: &App, krate: &Crate) -> CargoResult<()> { - let repo = app.git_repo.lock().unwrap(); - let repo = &*repo; - let repo_path = repo.workdir().unwrap(); - let dst = index_file(repo_path, &krate.name); - - commit_and_push(repo, || { - // Add the crate to its relevant file - fs::create_dir_all(dst.parent().unwrap())?; - let mut prev = String::new(); - if fs::metadata(&dst).is_ok() { - File::open(&dst).and_then(|mut f| f.read_to_string(&mut prev))?; - } - let s = serde_json::to_string(krate).unwrap(); - let new = prev + &s; - let mut f = File::create(&dst)?; - f.write_all(new.as_bytes())?; - f.write_all(b"\n")?; - - Ok(( - format!("Updating crate `{}#{}`", krate.name, krate.vers), - dst.clone(), - )) - }) -} +impl Repository { + fn open(url: &Url) -> CargoResult { + let checkout_path = TempDir::new("git")?; + let repository = git2::Repository::clone(url.as_str(), checkout_path.path())?; + + // All commits to the index registry made through crates.io will be made by bors, the Rust + // community's friendly GitHub bot. + let mut cfg = repository.config()?; + cfg.set_str("user.name", "bors")?; + cfg.set_str("user.email", "bors@rust-lang.org")?; + + Ok(Self { + checkout_path, + repository, + }) + } -/// Yanks or unyanks a crate version. This requires finding the index -/// file, deserlialise the crate from JSON, change the yank boolean to -/// `true` or `false`, write all the lines back out, and commit and -/// push the changes. -pub fn yank(app: &App, krate: &str, version: &semver::Version, yanked: bool) -> CargoResult<()> { - let repo = app.git_repo.lock().unwrap(); - let repo_path = repo.workdir().unwrap(); - let dst = index_file(repo_path, krate); - - commit_and_push(&repo, || { - let mut prev = String::new(); - File::open(&dst).and_then(|mut f| f.read_to_string(&mut prev))?; - let new = prev - .lines() - .map(|line| { - let mut git_crate = serde_json::from_str::(line) - .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; - if git_crate.name != krate || git_crate.vers != version.to_string() { - return Ok(line.to_string()); - } - git_crate.yanked = Some(yanked); - Ok(serde_json::to_string(&git_crate).unwrap()) - }) - .collect::>>(); - let new = new?.join("\n"); - let mut f = File::create(&dst)?; - f.write_all(new.as_bytes())?; - f.write_all(b"\n")?; - - Ok(( - format!( - "{} crate `{}#{}`", - if yanked { "Yanking" } else { "Unyanking" }, - krate, - version - ), - dst.clone(), - )) - }) -} + fn index_file(&self, name: &str) -> PathBuf { + self.checkout_path.path() + .join(self.relative_index_file(name)) + } -/// Commits and pushes to the crates.io index. -/// -/// There are currently 2 instances of the crates.io backend running -/// on Heroku, and they race against each other e.g. if 2 pushes occur, -/// then one will succeed while the other will need to be rebased before -/// being pushed. -/// -/// A maximum of 20 attempts to commit and push to the index currently -/// accounts for the amount of traffic publishing crates, though this may -/// have to be changed in the future. -/// -/// Notes: -/// Currently, this function is called on the HTTP thread and is blocking. -/// Spawning a separate thread for this function means that the request -/// can return without waiting for completion, and other methods of -/// notifying upon completion or error can be used. -fn commit_and_push(repo: &git2::Repository, mut f: F) -> CargoResult<()> -where - F: FnMut() -> CargoResult<(String, PathBuf)>, -{ - let repo_path = repo.workdir().unwrap(); - - // Attempt to commit in a loop. It's possible that we're going to need to - // rebase our repository, and after that it's possible that we're going to - // race to commit the changes. For now we just cap out the maximum number of - // retries at a fixed number. - for _ in 0..20 { - let (msg, dst) = f()?; + fn relative_index_file(&self, name: &str) -> PathBuf { + let name = name.to_lowercase(); + match name.len() { + 1 => Path::new("1").join(&name), + 2 => Path::new("2").join(&name), + 3 => Path::new("3").join(&name[..1]).join(&name), + _ => Path::new(&name[0..2]).join(&name[2..4]).join(&name), + } + } + fn commit_and_push(&self, msg: &str, modified_file: &Path, credentials: Option<(&str, &str)>) -> CargoResult<()> { // git add $file - let mut index = repo.index()?; - let mut repo_path = repo_path.iter(); - let dst = dst - .iter() - .skip_while(|s| Some(*s) == repo_path.next()) - .collect::(); - index.add_path(&dst)?; + let mut index = self.repository.index()?; + index.add_path(modified_file)?; index.write()?; let tree_id = index.write_tree()?; - let tree = repo.find_tree(tree_id)?; + let tree = self.repository.find_tree(tree_id)?; // git commit -m "..." - let head = repo.head()?; - let parent = repo.find_commit(head.target().unwrap())?; - let sig = repo.signature()?; - repo.commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; + let head = self.repository.head()?; + let parent = self.repository.find_commit(head.target().unwrap())?; + let sig = self.repository.signature()?; + self.repository.commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; // git push - let mut ref_status = None; - let mut origin = repo.find_remote("origin")?; - let res = { + let mut ref_status = Ok(()); + { + let mut origin = self.repository.find_remote("origin")?; let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials(credentials); + callbacks.credentials(|_, _, _| credentials.ok_or_else(|| git2::Error::from_str("no authentication set")).and_then(|(u, p)| git2::Cred::userpass_plaintext(u, p))); callbacks.push_update_reference(|refname, status| { assert_eq!(refname, "refs/heads/master"); - ref_status = status.map(|s| s.to_string()); + if let Some(s) = status { + ref_status = Err(internal(&format_args!("failed to push a ref: {}", s))) + } Ok(()) }); let mut opts = git2::PushOptions::new(); opts.remote_callbacks(callbacks); - origin.push(&["refs/heads/master"], Some(&mut opts)) - }; - match res { - Ok(()) if ref_status.is_none() => return Ok(()), - Ok(()) => info!("failed to push a ref: {:?}", ref_status), - Err(e) => info!("failure to push: {}", e), + origin.push(&["refs/heads/master"], Some(&mut opts))?; } + ref_status + } +} + +#[derive(Deserialize, Serialize)] +pub struct AddCrate { + krate: Crate, +} + +impl Job for AddCrate { + type Environment = Environment; + const JOB_TYPE: &'static str = "add_crate"; - let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials(credentials); - origin.update_tips( - Some(&mut callbacks), - true, - git2::AutotagOption::Unspecified, - None, - )?; - - // Ok, we need to update, so fetch and reset --hard - origin.fetch(&["refs/heads/*:refs/heads/*"], None, None)?; - let head = repo.head()?.target().unwrap(); - let obj = repo.find_object(head, None)?; - repo.reset(&obj, git2::ResetType::Hard, None)?; + fn perform(self, env: &Self::Environment) -> CargoResult<()> { + let repo = Repository::open(&env.index_location)?; + let dst = repo.index_file(&self.krate.name); + + // Add the crate to its relevant file + fs::create_dir_all(dst.parent().unwrap())?; + let mut file = OpenOptions::new().append(true).create(true).open(&dst)?; + serde_json::to_writer(&mut file, &self.krate)?; + file.write_all(b"\n")?; + + repo.commit_and_push( + &format!("Updating crate `{}#{}`", self.krate.name, self.krate.vers), + &repo.relative_index_file(&self.krate.name), + env.credentials(), + ) } +} + +pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> { + AddCrate { krate } + .enqueue(conn) + .map_err(Into::into) +} - Err(internal("Too many rebase failures")) +#[derive(Serialize, Deserialize)] +pub struct Yank { + krate: String, + version: String, + yanked: bool, } -pub fn credentials( - _user: &str, - _user_from_url: Option<&str>, - _cred: git2::CredentialType, -) -> Result { - match (env::var("GIT_HTTP_USER"), env::var("GIT_HTTP_PWD")) { - (Ok(u), Ok(p)) => git2::Cred::userpass_plaintext(&u, &p), - _ => Err(git2::Error::from_str("no authentication set")), +impl Job for Yank { + type Environment = Environment; + const JOB_TYPE: &'static str = "yank"; + + fn perform(self, env: &Self::Environment) -> CargoResult<()> { + let repo = Repository::open(&env.index_location)?; + let dst = repo.index_file(&self.krate); + + let prev = fs::read_to_string(&dst)?; + let new = prev + .lines() + .map(|line| { + let mut git_crate = serde_json::from_str::(line) + .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; + if git_crate.name != self.krate || git_crate.vers != self.version { + return Ok(line.to_string()); + } + git_crate.yanked = Some(self.yanked); + Ok(serde_json::to_string(&git_crate)?) + }).collect::>>(); + let new = new?.join("\n") + "\n"; + fs::write(&dst, new.as_bytes())?; + + repo.commit_and_push( + &format!( + "{} crate `{}#{}`", + if self.yanked { "Yanking" } else { "Unyanking" }, + self.krate, + self.version + ), + &repo.relative_index_file(&self.krate), + env.credentials(), + ) } } + +/// Yanks or unyanks a crate version. This requires finding the index +/// file, deserlialise the crate from JSON, change the yank boolean to +/// `true` or `false`, write all the lines back out, and commit and +/// push the changes. +pub fn yank(conn: &PgConnection, krate: String, version: semver::Version, yanked: bool) -> CargoResult<()> { + Yank { krate, version: version.to_string(), yanked } + .enqueue(conn) + .map_err(Into::into) +} diff --git a/src/lib.rs b/src/lib.rs index 6927475c9c..c597c25fe1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ static ALLOC: Jemalloc = Jemalloc; mod app; pub mod background; +pub mod background_jobs; pub mod boot; mod config; pub mod db; diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index efa5e9f251..e34cb38dc0 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -10,6 +10,7 @@ pub use self::debug::*; pub use self::ember_index_rewrite::EmberIndexRewrite; pub use self::head::Head; use self::log_connection_pool_status::LogConnectionPoolStatus; +use self::run_pending_background_jobs::RunPendingBackgroundJobs; pub use self::security_headers::SecurityHeaders; pub use self::static_or_continue::StaticOrContinue; @@ -23,6 +24,7 @@ mod head; mod log_connection_pool_status; mod log_request; mod require_user_agent; +mod run_pending_background_jobs; mod security_headers; mod static_or_continue; @@ -100,5 +102,9 @@ pub fn build_middleware(app: Arc, endpoints: R404) -> MiddlewareBuilder { m.around(log_request::LogRequests::default()); } + if env == Env::Test { + m.add(RunPendingBackgroundJobs); + } + m } diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs new file mode 100644 index 0000000000..99238fc43f --- /dev/null +++ b/src/middleware/run_pending_background_jobs.rs @@ -0,0 +1,28 @@ +use crate::background::Runner; +use crate::background_jobs::*; +use super::app::RequestApp; +use super::prelude::*; + +pub struct RunPendingBackgroundJobs; + +impl Middleware for RunPendingBackgroundJobs { + fn after( + &self, + req: &mut dyn Request, + res: Result>, + ) -> Result> { + let app = req.app(); + let connection_pool = app.diesel_database.clone(); + let environment = Environment { + index_location: app.config.index_location.clone(), + credentials: None, + }; + + let config = Runner::builder(connection_pool, environment); + let runner = job_runner(config); + + runner.run_all_pending_jobs().expect("Could not run jobs"); + runner.assert_no_failed_jobs().expect("Could not determine if jobs failed"); + res + } +} diff --git a/src/tests/all.rs b/src/tests/all.rs index d6c5d14c8f..9929ca7ee7 100644 --- a/src/tests/all.rs +++ b/src/tests/all.rs @@ -30,6 +30,7 @@ use std::{ use conduit::Request; use conduit_test::MockRequest; use diesel::prelude::*; +use url::Url; macro_rules! t { ($e:expr) => { @@ -138,6 +139,7 @@ fn simple_app(uploader: Uploader) -> (Arc, conduit_middleware::MiddlewareBu uploader, session_key: "test this has to be over 32 bytes long".to_string(), git_repo_checkout: git::checkout(), + index_location: Url::from_file_path(&git::bare()).unwrap(), gh_client_id: env::var("GH_CLIENT_ID").unwrap_or_default(), gh_client_secret: env::var("GH_CLIENT_SECRET").unwrap_or_default(), db_url: env("TEST_DATABASE_URL"), From a5bc70c83f25f64d6e83ff55335c39463696035d Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Thu, 3 Jan 2019 12:25:59 -0700 Subject: [PATCH 04/16] rustfmt --- src/app.rs | 7 +-- src/background/job.rs | 2 +- src/background/registry.rs | 14 +++--- src/background/runner.rs | 34 ++++++-------- src/background/storage.rs | 19 +++----- src/background_jobs.rs | 11 +++-- src/git.rs | 45 +++++++++++++------ src/middleware/run_pending_background_jobs.rs | 8 ++-- 8 files changed, 72 insertions(+), 68 deletions(-) diff --git a/src/app.rs b/src/app.rs index 18dc025602..8dcb9fef7f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,12 +1,7 @@ //! Application-wide components in a struct accessible from each request use crate::{db, util::CargoResult, Config, Env}; -use std::{ - env, - path::PathBuf, - sync::Arc, - time::Duration, -}; +use std::{env, path::PathBuf, sync::Arc, time::Duration}; use diesel::r2d2; use scheduled_thread_pool::ScheduledThreadPool; diff --git a/src/background/job.rs b/src/background/job.rs index 9ffa0006a3..b6ea016f71 100644 --- a/src/background/job.rs +++ b/src/background/job.rs @@ -1,5 +1,5 @@ use diesel::PgConnection; -use serde::{Serialize, de::DeserializeOwned}; +use serde::{de::DeserializeOwned, Serialize}; use super::storage; use crate::util::CargoResult; diff --git a/src/background/registry.rs b/src/background/registry.rs index 5867c8abc9..becd46b303 100644 --- a/src/background/registry.rs +++ b/src/background/registry.rs @@ -6,7 +6,8 @@ use super::Job; use crate::util::CargoResult; #[doc(hidden)] -pub type PerformFn = Box CargoResult<()> + RefUnwindSafe + Send + Sync>; +pub type PerformFn = + Box CargoResult<()> + RefUnwindSafe + Send + Sync>; #[derive(Default)] #[allow(missing_debug_implementations)] // Can't derive debug @@ -32,9 +33,12 @@ impl Registry { /// Register a new background job. This will override any existing /// registries with the same `JOB_TYPE`, if one exists. pub fn register>(&mut self) { - self.job_types.insert(T::JOB_TYPE, Box::new(|data, env| { - let data = serde_json::from_value(data)?; - T::perform(data, env) - })); + self.job_types.insert( + T::JOB_TYPE, + Box::new(|data, env| { + let data = serde_json::from_value(data)?; + T::perform(data, env) + }), + ); } } diff --git a/src/background/runner.rs b/src/background/runner.rs index 6dd8c5839b..bc8870b71e 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -1,12 +1,12 @@ #![allow(dead_code)] use diesel::prelude::*; use std::any::Any; -use std::panic::{catch_unwind, UnwindSafe, RefUnwindSafe, PanicInfo}; +use std::panic::{catch_unwind, PanicInfo, RefUnwindSafe, UnwindSafe}; use std::sync::Arc; use threadpool::ThreadPool; +use super::{storage, Job, Registry}; use crate::db::{DieselPool, DieselPooledConn}; -use super::{storage, Registry, Job}; use crate::util::errors::*; #[allow(missing_debug_implementations)] @@ -68,7 +68,8 @@ impl Runner { let environment = Arc::clone(&self.environment); let registry = Arc::clone(&self.registry); self.get_single_job(move |job| { - let perform_fn = registry.get(&job.job_type) + let perform_fn = registry + .get(&job.job_type) .ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?; perform_fn(job.data, &environment) }) @@ -100,7 +101,8 @@ impl Runner { } } Ok(()) - }).expect("Could not retrieve or update job") + }) + .expect("Could not retrieve or update job") }) } @@ -144,10 +146,10 @@ mod tests { use diesel::prelude::*; use diesel::r2d2; + use super::*; use crate::schema::background_jobs::dsl::*; - use std::sync::{Mutex, MutexGuard, Barrier, Arc}; use std::panic::AssertUnwindSafe; - use super::*; + use std::sync::{Arc, Barrier, Mutex, MutexGuard}; #[test] fn jobs_are_locked_when_fetched() { @@ -185,12 +187,11 @@ mod tests { let runner = runner(); create_dummy_job(&runner); - runner.get_single_job(|_| { - Ok(()) - }); + runner.get_single_job(|_| Ok(())); runner.wait_for_jobs(); - let remaining_jobs = background_jobs.count() + let remaining_jobs = background_jobs + .count() .get_result(&runner.connection().unwrap()); assert_eq!(Ok(0), remaining_jobs); } @@ -243,9 +244,7 @@ mod tests { let runner = runner(); let job_id = create_dummy_job(&runner).id; - runner.get_single_job(|_| { - panic!() - }); + runner.get_single_job(|_| panic!()); runner.wait_for_jobs(); let tries = background_jobs @@ -289,14 +288,9 @@ mod tests { let database_url = dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); let manager = r2d2::ConnectionManager::new(database_url); - let pool = r2d2::Pool::builder() - .max_size(2) - .build(manager) - .unwrap(); + let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - Runner::builder(pool, ()) - .thread_count(2) - .build() + Runner::builder(pool, ()).thread_count(2).build() } fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { diff --git a/src/background/storage.rs b/src/background/storage.rs index 2873c896da..401d735741 100644 --- a/src/background/storage.rs +++ b/src/background/storage.rs @@ -1,12 +1,12 @@ use diesel::dsl::now; use diesel::pg::Pg; use diesel::prelude::*; -use diesel::{delete, insert_into, update}; use diesel::sql_types::{Bool, Integer, Interval}; +use diesel::{delete, insert_into, update}; use serde_json; -use crate::schema::background_jobs; use super::Job; +use crate::schema::background_jobs; use crate::util::CargoResult; #[derive(Queryable, Identifiable, Debug, Clone)] @@ -22,10 +22,7 @@ pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { let job_data = serde_json::to_value(job)?; insert_into(background_jobs) - .values(( - job_type.eq(T::JOB_TYPE), - data.eq(job_data), - )) + .values((job_type.eq(T::JOB_TYPE), data.eq(job_data))) .execute(conn)?; Ok(()) } @@ -67,10 +64,7 @@ pub fn failed_job_count(conn: &PgConnection) -> QueryResult { pub fn available_job_count(conn: &PgConnection) -> QueryResult { use crate::schema::background_jobs::dsl::*; - background_jobs - .count() - .filter(retriable()) - .get_result(conn) + background_jobs.count().filter(retriable()).get_result(conn) } /// Deletes a job that has successfully completed running @@ -89,9 +83,6 @@ pub fn update_failed_job(conn: &PgConnection, job_id: i64) { use crate::schema::background_jobs::dsl::*; let _ = update(background_jobs.find(job_id)) - .set(( - retries.eq(retries + 1), - last_retry.eq(now), - )) + .set((retries.eq(retries + 1), last_retry.eq(now))) .execute(conn); } diff --git a/src/background_jobs.rs b/src/background_jobs.rs index a9cad294c4..0f0fa0c320 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,13 +1,10 @@ use url::Url; -use crate::background::{Runner, Builder}; +use crate::background::{Builder, Runner}; use crate::git::{AddCrate, Yank}; pub fn job_runner(config: Builder) -> Runner { - config - .register::() - .register::() - .build() + config.register::().register::().build() } #[allow(missing_debug_implementations)] @@ -18,6 +15,8 @@ pub struct Environment { impl Environment { pub fn credentials(&self) -> Option<(&str, &str)> { - self.credentials.as_ref().map(|(u, p)| (u.as_str(), p.as_str())) + self.credentials + .as_ref() + .map(|(u, p)| (u.as_str(), p.as_str())) } } diff --git a/src/git.rs b/src/git.rs index 0caa58c93a..cc523b0183 100644 --- a/src/git.rs +++ b/src/git.rs @@ -10,8 +10,8 @@ use url::Url; use crate::background::Job; use crate::background_jobs::Environment; -use crate::util::{internal, CargoResult}; use crate::models::DependencyKind; +use crate::util::{internal, CargoResult}; #[derive(Serialize, Deserialize, Debug)] pub struct Crate { @@ -61,7 +61,8 @@ impl Repository { } fn index_file(&self, name: &str) -> PathBuf { - self.checkout_path.path() + self.checkout_path + .path() .join(self.relative_index_file(name)) } @@ -75,7 +76,12 @@ impl Repository { } } - fn commit_and_push(&self, msg: &str, modified_file: &Path, credentials: Option<(&str, &str)>) -> CargoResult<()> { + fn commit_and_push( + &self, + msg: &str, + modified_file: &Path, + credentials: Option<(&str, &str)>, + ) -> CargoResult<()> { // git add $file let mut index = self.repository.index()?; index.add_path(modified_file)?; @@ -87,14 +93,19 @@ impl Repository { let head = self.repository.head()?; let parent = self.repository.find_commit(head.target().unwrap())?; let sig = self.repository.signature()?; - self.repository.commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; + self.repository + .commit(Some("HEAD"), &sig, &sig, &msg, &tree, &[&parent])?; // git push let mut ref_status = Ok(()); { let mut origin = self.repository.find_remote("origin")?; let mut callbacks = git2::RemoteCallbacks::new(); - callbacks.credentials(|_, _, _| credentials.ok_or_else(|| git2::Error::from_str("no authentication set")).and_then(|(u, p)| git2::Cred::userpass_plaintext(u, p))); + callbacks.credentials(|_, _, _| { + credentials + .ok_or_else(|| git2::Error::from_str("no authentication set")) + .and_then(|(u, p)| git2::Cred::userpass_plaintext(u, p)) + }); callbacks.push_update_reference(|refname, status| { assert_eq!(refname, "refs/heads/master"); if let Some(s) = status { @@ -138,9 +149,7 @@ impl Job for AddCrate { } pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> { - AddCrate { krate } - .enqueue(conn) - .map_err(Into::into) + AddCrate { krate }.enqueue(conn).map_err(Into::into) } #[derive(Serialize, Deserialize)] @@ -169,7 +178,8 @@ impl Job for Yank { } git_crate.yanked = Some(self.yanked); Ok(serde_json::to_string(&git_crate)?) - }).collect::>>(); + }) + .collect::>>(); let new = new?.join("\n") + "\n"; fs::write(&dst, new.as_bytes())?; @@ -190,8 +200,17 @@ impl Job for Yank { /// file, deserlialise the crate from JSON, change the yank boolean to /// `true` or `false`, write all the lines back out, and commit and /// push the changes. -pub fn yank(conn: &PgConnection, krate: String, version: semver::Version, yanked: bool) -> CargoResult<()> { - Yank { krate, version: version.to_string(), yanked } - .enqueue(conn) - .map_err(Into::into) +pub fn yank( + conn: &PgConnection, + krate: String, + version: semver::Version, + yanked: bool, +) -> CargoResult<()> { + Yank { + krate, + version: version.to_string(), + yanked, + } + .enqueue(conn) + .map_err(Into::into) } diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs index 99238fc43f..d94ef5eea3 100644 --- a/src/middleware/run_pending_background_jobs.rs +++ b/src/middleware/run_pending_background_jobs.rs @@ -1,7 +1,7 @@ -use crate::background::Runner; -use crate::background_jobs::*; use super::app::RequestApp; use super::prelude::*; +use crate::background::Runner; +use crate::background_jobs::*; pub struct RunPendingBackgroundJobs; @@ -22,7 +22,9 @@ impl Middleware for RunPendingBackgroundJobs { let runner = job_runner(config); runner.run_all_pending_jobs().expect("Could not run jobs"); - runner.assert_no_failed_jobs().expect("Could not determine if jobs failed"); + runner + .assert_no_failed_jobs() + .expect("Could not determine if jobs failed"); res } } From 9d2c02db7bb9c1bbfb379768f4d6500c30f1064c Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Thu, 3 Jan 2019 13:50:53 -0700 Subject: [PATCH 05/16] Add a binary for running background jobs and monitoring their status These binaries are pretty simple (and a bit spaghetti) for the time being, since we have so little actually using the background job framework. The runner is meant to be run on a worker dyno continuously, while the monitor should be run by a cron-like tool roughly every 5 minutes. We should move `update-downloads` to be scheduled as well, since it spends so little time actually working. --- Procfile | 1 + src/bin/background-worker.rs | 47 +++++++++++++++++++++++ src/bin/monitor.rs | 73 ++++++++++++++++++++++++++++++++++++ src/bin/on_call/mod.rs | 1 + 4 files changed, 122 insertions(+) create mode 100644 src/bin/background-worker.rs create mode 100644 src/bin/monitor.rs diff --git a/Procfile b/Procfile index 0dbfa4cc6b..4636a18fee 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1,3 @@ web: bin/diesel migration run && bin/start-nginx ./target/release/server worker: ./target/release/update-downloads daemon 300 +background_worker: ./target/release/background-worker diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs new file mode 100644 index 0000000000..1f1254dda3 --- /dev/null +++ b/src/bin/background-worker.rs @@ -0,0 +1,47 @@ +// Runs enqueued background jobs +// +// This binary will loop until interrupted. Every second, it will attempt to +// run any jobs in the background queue. Panics if attempting to count +// available jobs fails. +// +// Usage: +// cargo run --bin background-worker + +#![deny(warnings)] + +use cargo_registry::{background, background_jobs::*, db}; +use diesel::r2d2; +use std::env; +use std::thread::sleep; +use std::time::Duration; + +fn main() { + let config = cargo_registry::Config::default(); + + let username = env::var("GIT_HTTP_USER"); + let password = env::var("GIT_HTTP_PWD"); + let credentials = match (username, password) { + (Ok(u), Ok(p)) => Some((u, p)), + _ => None, + }; + let environment = Environment { + index_location: config.index_location, + credentials, + }; + + // We're only using 1 thread, so we only need 1 connection + let db_config = r2d2::Pool::builder().max_size(1); + let db_pool = db::diesel_pool(&config.db_url, db_config); + + let builder = background::Runner::builder(db_pool, environment).thread_count(1); + let runner = job_runner(builder); + + println!("Runner booted, running jobs"); + + loop { + runner + .run_all_pending_jobs() + .expect("Could not begin running jobs"); + sleep(Duration::from_secs(1)); + } +} diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs new file mode 100644 index 0000000000..9f1c25be20 --- /dev/null +++ b/src/bin/monitor.rs @@ -0,0 +1,73 @@ +//! Checks for any invariants we expect to be true, and pages whoever is on call +//! if they are not. +//! +//! Usage: +//! cargo run --bin monitor + +#![deny(warnings)] + +#[macro_use] +extern crate serde_derive; + +mod on_call; + +use cargo_registry::{db, util::CargoResult}; +use diesel::prelude::*; +use std::env; + +fn main() -> CargoResult<()> { + let conn = db::connect_now()?; + + check_stalled_background_jobs(&conn)?; + Ok(()) +} + +fn check_stalled_background_jobs(conn: &PgConnection) -> CargoResult<()> { + use cargo_registry::schema::background_jobs::dsl::*; + use diesel::dsl::*; + + const BACKGROUND_JOB_KEY: &str = "background_jobs"; + + println!("Checking for stalled background jobs"); + + let max_job_time = env::var("MAX_JOB_TIME") + .map(|s| s.parse::().unwrap()) + .unwrap_or(15); + + let stalled_job_count = background_jobs + .filter(created_at.lt(now - max_job_time.minutes())) + .count() + .get_result::(conn)?; + + let event = if stalled_job_count > 0 { + on_call::Event::Trigger { + incident_key: Some(BACKGROUND_JOB_KEY.into()), + description: format!( + "{} jobs have been in the queue for more than {} minutes", + stalled_job_count, max_job_time + ), + } + } else { + on_call::Event::Resolve { + incident_key: BACKGROUND_JOB_KEY.into(), + description: Some("No stalled background jobs".into()), + } + }; + + log_and_trigger_event(event)?; + Ok(()) +} + +fn log_and_trigger_event(event: on_call::Event) -> CargoResult<()> { + match event { + on_call::Event::Trigger { + ref description, .. + } => println!("Paging on-call: {}", description), + on_call::Event::Resolve { + description: Some(ref description), + .. + } => println!("{}", description), + _ => {} // noop + } + event.send() +} diff --git a/src/bin/on_call/mod.rs b/src/bin/on_call/mod.rs index 39bedfecf0..1228f6e73b 100644 --- a/src/bin/on_call/mod.rs +++ b/src/bin/on_call/mod.rs @@ -10,6 +10,7 @@ pub enum Event { incident_key: Option, description: String, }, + #[allow(dead_code)] // Not all binaries create Acknowledge events Acknowledge { incident_key: String, description: Option, From 62e380cd52f6c69b65d15efe118a2883da695671 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 4 Jan 2019 10:29:51 -0700 Subject: [PATCH 06/16] Make clippy happy --- src/background/registry.rs | 2 ++ src/background/runner.rs | 4 ++-- src/controllers/version/yank.rs | 2 +- src/git.rs | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/background/registry.rs b/src/background/registry.rs index becd46b303..8dc2ee992e 100644 --- a/src/background/registry.rs +++ b/src/background/registry.rs @@ -1,3 +1,5 @@ +#![allow(clippy::new_without_default_derive)] // https://github.com/rust-lang/rust-clippy/issues/3632 + use serde_json; use std::collections::HashMap; use std::panic::RefUnwindSafe; diff --git a/src/background/runner.rs b/src/background/runner.rs index bc8870b71e..841b5c3443 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -90,7 +90,7 @@ impl Runner { let job_id = job.id; let result = catch_unwind(|| f(job)) - .map_err(try_to_extract_panic_info) + .map_err(|e| try_to_extract_panic_info(&e)) .and_then(|r| r); match result { @@ -129,7 +129,7 @@ impl Runner { /// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is /// documented as "commonly but not always `&'static str` or `String`". So we can try all of those, /// and give up if we didn't get one of those three types. -fn try_to_extract_panic_info(info: Box) -> Box { +fn try_to_extract_panic_info(info: &(dyn Any + Send + 'static)) -> Box { if let Some(x) = info.downcast_ref::() { internal(&format_args!("job panicked: {}", x)) } else if let Some(x) = info.downcast_ref::<&'static str>() { diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index 20a5a84c0b..0e50c096ec 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -43,7 +43,7 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> CargoResult { diesel::update(&version) .set(versions::yanked.eq(yanked)) .execute(&*conn)?; - git::yank(&conn, krate.name, version.num, yanked)?; + git::yank(&conn, krate.name, &version.num, yanked)?; Ok(()) })?; } diff --git a/src/git.rs b/src/git.rs index cc523b0183..ec60fb1f69 100644 --- a/src/git.rs +++ b/src/git.rs @@ -203,7 +203,7 @@ impl Job for Yank { pub fn yank( conn: &PgConnection, krate: String, - version: semver::Version, + version: &semver::Version, yanked: bool, ) -> CargoResult<()> { Yank { From 519a3485bac18a7d224cfb236e24a18d05aac1bd Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Tue, 12 Feb 2019 15:03:29 +0000 Subject: [PATCH 07/16] Fix rebase failures Now that we're wrapping the connection in our own smart pointer that doesn't implement `Connection` we need some explicit derefs (adding the manual `Connection` impl isn't worth avoiding these `*`s). We need to be able to clone the connection pool (only in tests, but this also only requires modifying the test variant), so we need the `Arc`. Similarly, since in tests the connection is a re-entrant mutex, we can't grab the connection before spawning the worker thread. The lock isn't `Send` that's for a very good reason. So we instead need to clone a handle to the pool and grab the connection on the thread we intend to use it. --- src/background/runner.rs | 18 ++++++++++-------- src/bin/background-worker.rs | 2 +- src/db.rs | 8 ++++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/background/runner.rs b/src/background/runner.rs index 841b5c3443..4bd31ee6cd 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -79,8 +79,10 @@ impl Runner { where F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, { - let conn = self.connection().expect("Could not acquire connection"); + // The connection may not be `Send` so we need to clone the pool instead + let pool = self.connection_pool.clone(); self.thread_pool.execute(move || { + let conn = pool.get().expect("Could not acquire connection"); conn.transaction::<_, Box, _>(|| { let job = storage::find_next_unlocked_job(&conn).optional()?; let job = match job { @@ -192,7 +194,7 @@ mod tests { let remaining_jobs = background_jobs .count() - .get_result(&runner.connection().unwrap()); + .get_result(&*runner.connection().unwrap()); assert_eq!(Ok(0), remaining_jobs); } @@ -223,7 +225,7 @@ mod tests { .select(id) .filter(retries.eq(0)) .for_update() - .load::(&conn) + .load::(&*conn) .unwrap(); assert_eq!(0, available_jobs.len()); @@ -231,7 +233,7 @@ mod tests { let total_jobs_including_failed = background_jobs .select(id) .for_update() - .load::(&conn) + .load::(&*conn) .unwrap(); assert_eq!(1, total_jobs_including_failed.len()); @@ -251,7 +253,7 @@ mod tests { .find(job_id) .select(retries) .for_update() - .first::(&runner.connection().unwrap()) + .first::(&*runner.connection().unwrap()) .unwrap(); assert_eq!(1, tries); } @@ -277,7 +279,7 @@ mod tests { impl<'a> Drop for TestGuard<'a> { fn drop(&mut self) { ::diesel::sql_query("TRUNCATE TABLE background_jobs") - .execute(&runner().connection().unwrap()) + .execute(&*runner().connection().unwrap()) .unwrap(); } } @@ -290,14 +292,14 @@ mod tests { let manager = r2d2::ConnectionManager::new(database_url); let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - Runner::builder(pool, ()).thread_count(2).build() + Runner::builder(DieselPool::Pool(pool), ()).thread_count(2).build() } fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { ::diesel::insert_into(background_jobs) .values((job_type.eq("Foo"), data.eq(json!(null)))) .returning((id, job_type, data)) - .get_result(&runner.connection().unwrap()) + .get_result(&*runner.connection().unwrap()) .unwrap() } } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 1f1254dda3..79550914b3 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -31,7 +31,7 @@ fn main() { // We're only using 1 thread, so we only need 1 connection let db_config = r2d2::Pool::builder().max_size(1); - let db_pool = db::diesel_pool(&config.db_url, db_config); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); let builder = background::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); diff --git a/src/db.rs b/src/db.rs index c70466b164..8448638886 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,6 +4,7 @@ use conduit::Request; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; +use std::sync::Arc; use std::ops::Deref; use url::Url; @@ -12,9 +13,10 @@ use crate::util::CargoResult; use crate::Env; #[allow(missing_debug_implementations)] +#[derive(Clone)] pub enum DieselPool { Pool(r2d2::Pool>), - Test(ReentrantMutex), + Test(Arc>), } impl DieselPool { @@ -33,7 +35,7 @@ impl DieselPool { } fn test_conn(conn: PgConnection) -> Self { - DieselPool::Test(ReentrantMutex::new(conn)) + DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) } } @@ -43,6 +45,8 @@ pub enum DieselPooledConn<'a> { Test(ReentrantMutexGuard<'a, PgConnection>), } +unsafe impl<'a> Send for DieselPooledConn<'a> {} + impl Deref for DieselPooledConn<'_> { type Target = PgConnection; From 0d36398e4a94345acbd8eb66fbc19a9a1aded154 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 13 Feb 2019 10:44:49 +0000 Subject: [PATCH 08/16] Yanking only updates the database after the index is updated Previously, we'd immediately update the database and then queue the index update to be run later. Jobs aren't guaranteed to run in the order they were queued (though in practice they likely will). This means that if someone yanked and unyanked a crate in rapid succession, the database may end up permanently out of sync with the index. With this change, the final result may still be out of order, but the database will be in sync with the index whatever the outcome is. Since rapidly yanking and unyanking isn't something we expect users to do, and even if they did jobs should be running in order under normal circumstances, I don't think we need to do anything to ensure more consistency than this. --- src/background_jobs.rs | 21 ++++++++++++++++++ src/bin/background-worker.rs | 15 +++++++------ src/controllers/version/yank.rs | 10 +-------- src/git.rs | 22 +++++++++++++------ src/middleware/run_pending_background_jobs.rs | 9 ++++---- src/models/version.rs | 2 +- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 0f0fa0c320..1e3e51b0ea 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,7 +1,10 @@ use url::Url; +use std::panic::AssertUnwindSafe; use crate::background::{Builder, Runner}; +use crate::db::{DieselPool, DieselPooledConn}; use crate::git::{AddCrate, Yank}; +use crate::util::CargoResult; pub fn job_runner(config: Builder) -> Runner { config.register::().register::().build() @@ -11,12 +14,30 @@ pub fn job_runner(config: Builder) -> Runner { pub struct Environment { pub index_location: Url, pub credentials: Option<(String, String)>, + // FIXME: https://github.com/sfackler/r2d2/pull/70 + pub connection_pool: AssertUnwindSafe, } impl Environment { + pub fn new( + index_location: Url, + credentials: Option<(String, String)>, + connection_pool: DieselPool, + ) -> Self { + Self { + index_location, + credentials, + connection_pool: AssertUnwindSafe(connection_pool), + } + } + pub fn credentials(&self) -> Option<(&str, &str)> { self.credentials .as_ref() .map(|(u, p)| (u.as_str(), p.as_str())) } + + pub fn connection(&self) -> CargoResult { + self.connection_pool.0.get().map_err(Into::into) + } } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 79550914b3..9d0954a6b5 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -18,20 +18,21 @@ use std::time::Duration; fn main() { let config = cargo_registry::Config::default(); + // We're only using 1 thread, so we only need 2 connections + let db_config = r2d2::Pool::builder().max_size(1); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); + let username = env::var("GIT_HTTP_USER"); let password = env::var("GIT_HTTP_PWD"); let credentials = match (username, password) { (Ok(u), Ok(p)) => Some((u, p)), _ => None, }; - let environment = Environment { - index_location: config.index_location, + let environment = Environment::new( + config.index_location, credentials, - }; - - // We're only using 1 thread, so we only need 1 connection - let db_config = r2d2::Pool::builder().max_size(1); - let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); + db_pool.clone(), + ); let builder = background::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index 0e50c096ec..481538b500 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -3,10 +3,8 @@ use crate::controllers::prelude::*; use crate::git; -use crate::util::errors::CargoError; use crate::models::Rights; -use crate::schema::*; use super::version_and_crate; @@ -39,13 +37,7 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> CargoResult { } if version.yanked != yanked { - conn.transaction::<_, Box, _>(|| { - diesel::update(&version) - .set(versions::yanked.eq(yanked)) - .execute(&*conn)?; - git::yank(&conn, krate.name, &version.num, yanked)?; - Ok(()) - })?; + git::yank(&conn, krate.name, version, yanked)?; } #[derive(Serialize)] diff --git a/src/git.rs b/src/git.rs index ec60fb1f69..f8ad82d619 100644 --- a/src/git.rs +++ b/src/git.rs @@ -10,7 +10,8 @@ use url::Url; use crate::background::Job; use crate::background_jobs::Environment; -use crate::models::DependencyKind; +use crate::models::{DependencyKind, Version}; +use crate::schema::versions; use crate::util::{internal, CargoResult}; #[derive(Serialize, Deserialize, Debug)] @@ -155,7 +156,7 @@ pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> { #[derive(Serialize, Deserialize)] pub struct Yank { krate: String, - version: String, + version: Version, yanked: bool, } @@ -168,12 +169,13 @@ impl Job for Yank { let dst = repo.index_file(&self.krate); let prev = fs::read_to_string(&dst)?; + let version = self.version.num.to_string(); let new = prev .lines() .map(|line| { let mut git_crate = serde_json::from_str::(line) .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; - if git_crate.name != self.krate || git_crate.vers != self.version { + if git_crate.name != self.krate || git_crate.vers != version { return Ok(line.to_string()); } git_crate.yanked = Some(self.yanked); @@ -188,11 +190,17 @@ impl Job for Yank { "{} crate `{}#{}`", if self.yanked { "Yanking" } else { "Unyanking" }, self.krate, - self.version + self.version.num ), &repo.relative_index_file(&self.krate), env.credentials(), - ) + )?; + + diesel::update(&self.version) + .set(versions::yanked.eq(self.yanked)) + .execute(&*env.connection()?)?; + + Ok(()) } } @@ -203,12 +211,12 @@ impl Job for Yank { pub fn yank( conn: &PgConnection, krate: String, - version: &semver::Version, + version: Version, yanked: bool, ) -> CargoResult<()> { Yank { krate, - version: version.to_string(), + version, yanked, } .enqueue(conn) diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs index d94ef5eea3..edec678c68 100644 --- a/src/middleware/run_pending_background_jobs.rs +++ b/src/middleware/run_pending_background_jobs.rs @@ -13,10 +13,11 @@ impl Middleware for RunPendingBackgroundJobs { ) -> Result> { let app = req.app(); let connection_pool = app.diesel_database.clone(); - let environment = Environment { - index_location: app.config.index_location.clone(), - credentials: None, - }; + let environment = Environment::new( + app.config.index_location.clone(), + None, + app.diesel_database.clone(), + ); let config = Runner::builder(connection_pool, environment); let runner = job_runner(config); diff --git a/src/models/version.rs b/src/models/version.rs index 202bbcec62..7efea6b12d 100644 --- a/src/models/version.rs +++ b/src/models/version.rs @@ -10,7 +10,7 @@ use crate::schema::*; use crate::views::{EncodableVersion, EncodableVersionLinks}; // Queryable has a custom implementation below -#[derive(Clone, Identifiable, Associations, Debug, Queryable)] +#[derive(Clone, Identifiable, Associations, Debug, Queryable, Deserialize, Serialize)] #[belongs_to(Crate)] pub struct Version { pub id: i32, From a9d5e5bc298b0f33fbf7fb518d8b79819543f642 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 13 Feb 2019 11:00:21 +0000 Subject: [PATCH 09/16] fix some minor nits --- script/ci/prune-cache.sh | 2 +- src/background/storage.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/script/ci/prune-cache.sh b/script/ci/prune-cache.sh index 852f9a0105..e2285c6f80 100755 --- a/script/ci/prune-cache.sh +++ b/script/ci/prune-cache.sh @@ -7,7 +7,7 @@ du -hs target/debug crate_name="cargo-registry" test_name="all" -bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads" +bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads background-worker monitor" normalized_crate_name=${crate_name//-/_} rm -v target/debug/$normalized_crate_name-* diff --git a/src/background/storage.rs b/src/background/storage.rs index 401d735741..4e289b0fc4 100644 --- a/src/background/storage.rs +++ b/src/background/storage.rs @@ -50,7 +50,7 @@ pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult .first::(conn) } -/// The number of jobs available to be run +/// The number of jobs that have failed at least once pub fn failed_job_count(conn: &PgConnection) -> QueryResult { use crate::schema::background_jobs::dsl::*; @@ -60,7 +60,7 @@ pub fn failed_job_count(conn: &PgConnection) -> QueryResult { .get_result(conn) } -/// The number of jobs that have failed at least once +/// The number of jobs available to be run pub fn available_job_count(conn: &PgConnection) -> QueryResult { use crate::schema::background_jobs::dsl::*; From c4fb027c1a197f83858aecdd68e64a6a30244999 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 13 Feb 2019 13:27:38 +0000 Subject: [PATCH 10/16] rustfmt --- src/background/runner.rs | 4 +++- src/background_jobs.rs | 2 +- src/bin/background-worker.rs | 6 +----- src/db.rs | 2 +- src/git.rs | 7 +------ 5 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/background/runner.rs b/src/background/runner.rs index 4bd31ee6cd..1211d4f43d 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -292,7 +292,9 @@ mod tests { let manager = r2d2::ConnectionManager::new(database_url); let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - Runner::builder(DieselPool::Pool(pool), ()).thread_count(2).build() + Runner::builder(DieselPool::Pool(pool), ()) + .thread_count(2) + .build() } fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 1e3e51b0ea..5f3da13b99 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,5 +1,5 @@ -use url::Url; use std::panic::AssertUnwindSafe; +use url::Url; use crate::background::{Builder, Runner}; use crate::db::{DieselPool, DieselPooledConn}; diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 9d0954a6b5..17459bea05 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -28,11 +28,7 @@ fn main() { (Ok(u), Ok(p)) => Some((u, p)), _ => None, }; - let environment = Environment::new( - config.index_location, - credentials, - db_pool.clone(), - ); + let environment = Environment::new(config.index_location, credentials, db_pool.clone()); let builder = background::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); diff --git a/src/db.rs b/src/db.rs index 8448638886..249d2aa9f4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,8 +4,8 @@ use conduit::Request; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; -use std::sync::Arc; use std::ops::Deref; +use std::sync::Arc; use url::Url; use crate::middleware::app::RequestApp; diff --git a/src/git.rs b/src/git.rs index f8ad82d619..0b6f003edd 100644 --- a/src/git.rs +++ b/src/git.rs @@ -208,12 +208,7 @@ impl Job for Yank { /// file, deserlialise the crate from JSON, change the yank boolean to /// `true` or `false`, write all the lines back out, and commit and /// push the changes. -pub fn yank( - conn: &PgConnection, - krate: String, - version: Version, - yanked: bool, -) -> CargoResult<()> { +pub fn yank(conn: &PgConnection, krate: String, version: Version, yanked: bool) -> CargoResult<()> { Yank { krate, version, From 8f9dccb6bda20843060e0088eb46872a25cb383a Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 12:55:30 -0700 Subject: [PATCH 11/16] Fix pool size, don't crash if we can't get a connection When testing in staging, the runner started crashing on boot. This was because we only set the connection pool size to 1 when we meant to set it to 2 (one for the runner, one for the worker thread). So in each loop, if all jobs took greater than 1 second to run, the runner would crash. This fixes the pool size, and also does not return an error if no database connection could be retrieved from the pool. --- src/background/runner.rs | 12 +++++++++--- src/bin/background-worker.rs | 2 +- src/db.rs | 7 +++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/background/runner.rs b/src/background/runner.rs index 1211d4f43d..22df210484 100644 --- a/src/background/runner.rs +++ b/src/background/runner.rs @@ -57,9 +57,11 @@ impl Runner { } pub fn run_all_pending_jobs(&self) -> CargoResult<()> { - let available_job_count = storage::available_job_count(&*self.connection()?)?; - for _ in 0..available_job_count { - self.run_single_job() + if let Some(conn) = self.try_connection() { + let available_job_count = storage::available_job_count(&conn)?; + for _ in 0..available_job_count { + self.run_single_job() + } } Ok(()) } @@ -112,6 +114,10 @@ impl Runner { self.connection_pool.get().map_err(Into::into) } + fn try_connection(&self) -> Option { + self.connection_pool.try_get() + } + pub fn assert_no_failed_jobs(&self) -> CargoResult<()> { self.wait_for_jobs(); let failed_jobs = storage::failed_job_count(&*self.connection()?)?; diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 17459bea05..c3b13854ac 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -19,7 +19,7 @@ fn main() { let config = cargo_registry::Config::default(); // We're only using 1 thread, so we only need 2 connections - let db_config = r2d2::Pool::builder().max_size(1); + let db_config = r2d2::Pool::builder().max_size(2); let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); let username = env::var("GIT_HTTP_USER"); diff --git a/src/db.rs b/src/db.rs index 249d2aa9f4..b090be41a4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -27,6 +27,13 @@ impl DieselPool { } } + pub fn try_get(&self) -> Option { + match self { + DieselPool::Pool(pool) => pool.try_get().map(DieselPooledConn::Pool), + DieselPool::Test(conn) => conn.try_lock().map(DieselPooledConn::Test), + } + } + pub fn state(&self) -> r2d2::State { match self { DieselPool::Pool(pool) => pool.state(), From ee1685e177e5441b090b6d2225b70482e7d97da5 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 13:13:42 -0700 Subject: [PATCH 12/16] Check if a crate is already yanked/unyanked in the job Right now we check if a crate is already yanked/unyanked before enqueing the job at all. This means that if you try to undo a yank before the yank finishes running, we won't enqueue the unyank at all. This isn't an expected scenario, and we still might not do the unyank if the jobs are run out of order, but this means that we should always end up in the expected state under normal operation. --- src/controllers/version/yank.rs | 4 +- src/git.rs | 81 +++++++++++++++++++-------------- 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index 481538b500..43136e2eb2 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -36,9 +36,7 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> CargoResult { return Err(human("must already be an owner to yank or unyank")); } - if version.yanked != yanked { - git::yank(&conn, krate.name, version, yanked)?; - } + git::yank(&conn, krate.name, version, yanked)?; #[derive(Serialize)] struct R { diff --git a/src/git.rs b/src/git.rs index 0b6f003edd..2abcc0d9a4 100644 --- a/src/git.rs +++ b/src/git.rs @@ -168,39 +168,54 @@ impl Job for Yank { let repo = Repository::open(&env.index_location)?; let dst = repo.index_file(&self.krate); - let prev = fs::read_to_string(&dst)?; - let version = self.version.num.to_string(); - let new = prev - .lines() - .map(|line| { - let mut git_crate = serde_json::from_str::(line) - .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; - if git_crate.name != self.krate || git_crate.vers != version { - return Ok(line.to_string()); - } - git_crate.yanked = Some(self.yanked); - Ok(serde_json::to_string(&git_crate)?) - }) - .collect::>>(); - let new = new?.join("\n") + "\n"; - fs::write(&dst, new.as_bytes())?; - - repo.commit_and_push( - &format!( - "{} crate `{}#{}`", - if self.yanked { "Yanking" } else { "Unyanking" }, - self.krate, - self.version.num - ), - &repo.relative_index_file(&self.krate), - env.credentials(), - )?; - - diesel::update(&self.version) - .set(versions::yanked.eq(self.yanked)) - .execute(&*env.connection()?)?; - - Ok(()) + let conn = env.connection()?; + + conn.transaction(|| { + let yanked_in_db = versions::table + .find(self.version.id) + .select(versions::yanked) + .for_update() + .first::(&*conn)?; + + if yanked_in_db == self.yanked { + // The crate is alread in the state requested, nothing to do + return Ok(()); + } + + let prev = fs::read_to_string(&dst)?; + let version = self.version.num.to_string(); + let new = prev + .lines() + .map(|line| { + let mut git_crate = serde_json::from_str::(line) + .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; + if git_crate.name != self.krate || git_crate.vers != version { + return Ok(line.to_string()); + } + git_crate.yanked = Some(self.yanked); + Ok(serde_json::to_string(&git_crate)?) + }) + .collect::>>(); + let new = new?.join("\n") + "\n"; + fs::write(&dst, new.as_bytes())?; + + repo.commit_and_push( + &format!( + "{} crate `{}#{}`", + if self.yanked { "Yanking" } else { "Unyanking" }, + self.krate, + self.version.num + ), + &repo.relative_index_file(&self.krate), + env.credentials(), + )?; + + diesel::update(&self.version) + .set(versions::yanked.eq(self.yanked)) + .execute(&*conn)?; + + Ok(()) + }) } } From 9d38d19c95a8ea6b619416d0c3d06cfc177031b1 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 13:34:26 -0700 Subject: [PATCH 13/16] fix rebase issues --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 215eaff056..4f615c8ec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,7 +2156,7 @@ name = "threadpool" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] From fc6c31e263ee79a314a975366ae4aca894d2c4b9 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 16:27:11 -0700 Subject: [PATCH 14/16] Use renamed clippy lint --- src/background/registry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/background/registry.rs b/src/background/registry.rs index 8dc2ee992e..2e672ec0de 100644 --- a/src/background/registry.rs +++ b/src/background/registry.rs @@ -1,4 +1,4 @@ -#![allow(clippy::new_without_default_derive)] // https://github.com/rust-lang/rust-clippy/issues/3632 +#![allow(clippy::new_without_default)] // https://github.com/rust-lang/rust-clippy/issues/3632 use serde_json; use std::collections::HashMap; From 6777978a234da8cc5f413af6d6b8d5c430cb3f48 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 16:45:32 -0700 Subject: [PATCH 15/16] Maintain a persistent checkout of the git index The time a full clone takes caused too long of a delay when publishing a crate. This instead performs a full clone when the runner boots, and then locks and does `git fetch && git reset --hard origin/master` at the top of every job. The reset means we can (somewhat) scale to multiple runners, and retains unwind safety in the environment. Note that there can still be a delay measured in minutes if a publish is performed right after the server boots, as the job runner starting up does not block the web server from starting. --- src/background_jobs.rs | 17 ++++++++++++----- src/bin/background-worker.rs | 11 ++++++++++- src/git.rs | 17 +++++++++++++---- src/middleware/run_pending_background_jobs.rs | 5 ++++- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 5f3da13b99..20e4a9cd90 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,9 +1,9 @@ use std::panic::AssertUnwindSafe; -use url::Url; +use std::sync::{Mutex, MutexGuard}; use crate::background::{Builder, Runner}; use crate::db::{DieselPool, DieselPooledConn}; -use crate::git::{AddCrate, Yank}; +use crate::git::{AddCrate, Repository, Yank}; use crate::util::CargoResult; pub fn job_runner(config: Builder) -> Runner { @@ -12,7 +12,7 @@ pub fn job_runner(config: Builder) -> Runner { #[allow(missing_debug_implementations)] pub struct Environment { - pub index_location: Url, + index: Mutex, pub credentials: Option<(String, String)>, // FIXME: https://github.com/sfackler/r2d2/pull/70 pub connection_pool: AssertUnwindSafe, @@ -20,12 +20,12 @@ pub struct Environment { impl Environment { pub fn new( - index_location: Url, + index: Repository, credentials: Option<(String, String)>, connection_pool: DieselPool, ) -> Self { Self { - index_location, + index: Mutex::new(index), credentials, connection_pool: AssertUnwindSafe(connection_pool), } @@ -40,4 +40,11 @@ impl Environment { pub fn connection(&self) -> CargoResult { self.connection_pool.0.get().map_err(Into::into) } + + pub fn lock_index(&self) -> CargoResult> { + let repo = self.index.lock() + .unwrap_or_else(|e| e.into_inner()); + repo.reset_head()?; + Ok(repo) + } } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index c3b13854ac..447845504c 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -10,12 +10,15 @@ #![deny(warnings)] use cargo_registry::{background, background_jobs::*, db}; +use cargo_registry::git::Repository; use diesel::r2d2; use std::env; use std::thread::sleep; use std::time::Duration; fn main() { + println!("Booting runner"); + let config = cargo_registry::Config::default(); // We're only using 1 thread, so we only need 2 connections @@ -28,7 +31,13 @@ fn main() { (Ok(u), Ok(p)) => Some((u, p)), _ => None, }; - let environment = Environment::new(config.index_location, credentials, db_pool.clone()); + + println!("Cloning index"); + + let repository = Repository::open(&config.index_location) + .expect("Failed to clone index"); + + let environment = Environment::new(repository, credentials, db_pool.clone()); let builder = background::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); diff --git a/src/git.rs b/src/git.rs index 2abcc0d9a4..c0db6cd612 100644 --- a/src/git.rs +++ b/src/git.rs @@ -39,13 +39,13 @@ pub struct Dependency { pub package: Option, } -struct Repository { +pub struct Repository { checkout_path: TempDir, repository: git2::Repository, } impl Repository { - fn open(url: &Url) -> CargoResult { + pub fn open(url: &Url) -> CargoResult { let checkout_path = TempDir::new("git")?; let repository = git2::Repository::clone(url.as_str(), checkout_path.path())?; @@ -120,6 +120,15 @@ impl Repository { } ref_status } + + pub fn reset_head(&self) -> CargoResult<()> { + let mut origin = self.repository.find_remote("origin")?; + origin.fetch(&["refs/heads/*:refs/heads/*"], None, None)?; + let head = self.repository.head()?.target().unwrap(); + let obj = self.repository.find_object(head, None)?; + self.repository.reset(&obj, git2::ResetType::Hard, None)?; + Ok(()) + } } #[derive(Deserialize, Serialize)] @@ -132,7 +141,7 @@ impl Job for AddCrate { const JOB_TYPE: &'static str = "add_crate"; fn perform(self, env: &Self::Environment) -> CargoResult<()> { - let repo = Repository::open(&env.index_location)?; + let repo = env.lock_index()?; let dst = repo.index_file(&self.krate.name); // Add the crate to its relevant file @@ -165,7 +174,7 @@ impl Job for Yank { const JOB_TYPE: &'static str = "yank"; fn perform(self, env: &Self::Environment) -> CargoResult<()> { - let repo = Repository::open(&env.index_location)?; + let repo = env.lock_index()?; let dst = repo.index_file(&self.krate); let conn = env.connection()?; diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs index edec678c68..f840bd6552 100644 --- a/src/middleware/run_pending_background_jobs.rs +++ b/src/middleware/run_pending_background_jobs.rs @@ -2,6 +2,7 @@ use super::app::RequestApp; use super::prelude::*; use crate::background::Runner; use crate::background_jobs::*; +use crate::git::Repository; pub struct RunPendingBackgroundJobs; @@ -13,8 +14,10 @@ impl Middleware for RunPendingBackgroundJobs { ) -> Result> { let app = req.app(); let connection_pool = app.diesel_database.clone(); + let repo = Repository::open(&app.config.index_location) + .expect("Could not clone index"); let environment = Environment::new( - app.config.index_location.clone(), + repo, None, app.diesel_database.clone(), ); From 1ab2c08254f5583faef0a2eee08d1875cb5b356b Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Fri, 8 Mar 2019 17:16:29 -0700 Subject: [PATCH 16/16] rustfmt --- src/background_jobs.rs | 3 +-- src/bin/background-worker.rs | 5 ++--- src/middleware/run_pending_background_jobs.rs | 9 ++------- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 20e4a9cd90..cd71f1c8a9 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -42,8 +42,7 @@ impl Environment { } pub fn lock_index(&self) -> CargoResult> { - let repo = self.index.lock() - .unwrap_or_else(|e| e.into_inner()); + let repo = self.index.lock().unwrap_or_else(|e| e.into_inner()); repo.reset_head()?; Ok(repo) } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 447845504c..07f4f1d841 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -9,8 +9,8 @@ #![deny(warnings)] -use cargo_registry::{background, background_jobs::*, db}; use cargo_registry::git::Repository; +use cargo_registry::{background, background_jobs::*, db}; use diesel::r2d2; use std::env; use std::thread::sleep; @@ -34,8 +34,7 @@ fn main() { println!("Cloning index"); - let repository = Repository::open(&config.index_location) - .expect("Failed to clone index"); + let repository = Repository::open(&config.index_location).expect("Failed to clone index"); let environment = Environment::new(repository, credentials, db_pool.clone()); diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs index f840bd6552..2b831c1f94 100644 --- a/src/middleware/run_pending_background_jobs.rs +++ b/src/middleware/run_pending_background_jobs.rs @@ -14,13 +14,8 @@ impl Middleware for RunPendingBackgroundJobs { ) -> Result> { let app = req.app(); let connection_pool = app.diesel_database.clone(); - let repo = Repository::open(&app.config.index_location) - .expect("Could not clone index"); - let environment = Environment::new( - repo, - None, - app.diesel_database.clone(), - ); + let repo = Repository::open(&app.config.index_location).expect("Could not clone index"); + let environment = Environment::new(repo, None, app.diesel_database.clone()); let config = Runner::builder(connection_pool, environment); let runner = job_runner(config);