diff --git a/.cargo/audit.toml b/.cargo/audit.toml index dfe918ab9..63ced22ef 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -3,6 +3,9 @@ ignore = [ "RUSTSEC-2020-0036", # failure is officially deprecated/unmaintained # https://github.com/rust-lang/docs.rs/issues/1014 + "RUSTSEC-2021-0127", # serde_cbor is unmaintained + # https://github.com/rust-lang/docs.rs/issues/1568 + "RUSTSEC-2023-0018", # rustwide -> remove_dir_all,TOCTOU / Race Condition # https://github.com/rust-lang/docs.rs/issues/2074 ] diff --git a/Cargo.lock b/Cargo.lock index 84a44bf82..86227d8cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1392,6 +1392,7 @@ dependencies = [ "kuchikiki", "log", "lol_html", + "memmap2 0.5.10", "mime", "mime_guess", "mockito", @@ -1422,6 +1423,7 @@ dependencies = [ "sentry-tower", "sentry-tracing", "serde", + "serde_cbor", "serde_json", "slug", "string_cache", @@ -2047,7 +2049,7 @@ dependencies = [ "gix-chunk", "gix-features 0.31.1", "gix-hash", - "memmap2", + "memmap2 0.7.1", "thiserror", ] @@ -2061,7 +2063,7 @@ dependencies = [ "gix-chunk", "gix-features 0.32.1", "gix-hash", - "memmap2", + "memmap2 0.7.1", "thiserror", ] @@ -2389,7 +2391,7 @@ dependencies = [ "gix-object 0.32.0", "gix-traverse 0.29.0", "itoa 1.0.9", - "memmap2", + "memmap2 0.7.1", "smallvec", "thiserror", ] @@ -2412,7 +2414,7 @@ dependencies = [ "gix-object 0.33.1", "gix-traverse 0.30.1", "itoa 1.0.9", - "memmap2", + "memmap2 0.7.1", "smallvec", "thiserror", ] @@ -2578,7 +2580,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-traverse 0.29.0", - "memmap2", + "memmap2 0.7.1", "parking_lot", "smallvec", "thiserror", @@ -2601,7 +2603,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-traverse 0.30.1", - "memmap2", + "memmap2 0.7.1", "parking_lot", "smallvec", "thiserror", @@ -2719,7 +2721,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-validate", - "memmap2", + "memmap2 0.7.1", "nom", "thiserror", ] @@ -2740,7 +2742,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-validate", - "memmap2", + "memmap2 0.7.1", "nom", "thiserror", ] @@ -3764,6 +3766,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +dependencies = [ + "libc", +] + [[package]] name = "memmap2" version = "0.7.1" @@ -5237,6 +5248,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.178" diff --git a/Cargo.toml b/Cargo.toml index cc8d85bed..8c91cc180 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ string_cache = "0.8.0" postgres-types = { version = "0.2", features = ["derive"] } zip = {version = "0.6.2", default-features = false, features = ["bzip2"]} bzip2 = "0.4.4" +serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.11.0", optional = true} rusqlite = { version = "0.29.0", features = ["bundled"] } @@ -86,6 +87,7 @@ uuid = "1.1.2" # Data serialization and deserialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +memmap2 = "0.5.0" # axum dependencies axum = { version = "0.6.1", features = ["headers"]} diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 8bead0151..beae1c0ff 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -15,8 +15,8 @@ use docs_rs::utils::{ remove_crate_priority, set_crate_priority, ConfigName, }; use docs_rs::{ - start_background_metrics_webserver, start_web_server, BuildQueue, Config, Context, Index, - InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage, + migrate_old_archive_indexes, start_background_metrics_webserver, start_web_server, BuildQueue, + Config, Context, Index, InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage, }; use humantime::Duration; use once_cell::sync::OnceCell; @@ -482,6 +482,9 @@ enum DatabaseSubcommand { /// Backfill GitHub/Gitlab stats for crates. BackfillRepositoryStats, + /// migrate the old CBOR archive index files to SQLIte + MigrateArchiveIndex, + /// Updates info for a crate from the registry's API UpdateCrateRegistryFields { #[arg(name = "CRATE")] @@ -533,6 +536,10 @@ impl DatabaseSubcommand { ctx.repository_stats_updater()?.update_all_crates()?; } + Self::MigrateArchiveIndex => { + migrate_old_archive_indexes(&*ctx.storage()?, &mut *ctx.conn()?)?; + } + Self::BackfillRepositoryStats => { ctx.repository_stats_updater()?.backfill_repositories()?; } diff --git a/src/lib.rs b/src/lib.rs index d53fd32b3..b49e9aff1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub use self::docbuilder::PackageKind; pub use self::docbuilder::RustwideBuilder; pub use self::index::Index; pub use self::metrics::{InstanceMetrics, ServiceMetrics}; +pub use self::storage::migrate_old_archive_indexes; pub use self::storage::Storage; pub use self::web::{start_background_metrics_webserver, start_web_server}; diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index 9cd2ee96f..815ebfab1 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,12 +1,20 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; +use memmap2::MmapOptions; use rusqlite::{Connection, OptionalExtension}; -use std::{fs, io, path::Path}; +use serde::de::DeserializeSeed; +use serde::de::{IgnoredAny, MapAccess, Visitor}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::io::BufReader; +use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path}; +use tempfile::TempPath; use super::sqlite_pool::SqliteConnectionPool; -#[derive(PartialEq, Eq, Debug)] +static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; + +#[derive(Deserialize, Serialize, PartialEq, Eq, Debug)] pub(crate) struct FileInfo { range: FileRange, compression: CompressionAlgorithm, @@ -21,23 +29,23 @@ impl FileInfo { } } -/// create an archive index based on a zipfile. -/// -/// Will delete the destination file if it already exists. -pub(crate) fn create>( - zipfile: &mut R, - destination: P, -) -> Result<()> { - let destination = destination.as_ref(); - if destination.exists() { - fs::remove_file(destination)?; - } +#[derive(Deserialize, Serialize)] +struct Index { + files: HashMap, +} - let conn = rusqlite::Connection::open(destination)?; - conn.execute("PRAGMA synchronous = FULL", ())?; - conn.execute("BEGIN", ())?; - conn.execute( - " +impl Index { + pub(crate) fn write_sqlite>(&self, destination: P) -> Result<()> { + let destination = destination.as_ref(); + if destination.exists() { + fs::remove_file(destination)?; + } + + let conn = rusqlite::Connection::open(destination)?; + conn.execute("PRAGMA synchronous = FULL", ())?; + conn.execute("BEGIN", ())?; + conn.execute( + " CREATE TABLE files ( id INTEGER PRIMARY KEY, path TEXT UNIQUE, @@ -46,34 +54,167 @@ pub(crate) fn create>( compression INTEGER ); ", - (), - )?; + (), + )?; - let mut archive = zip::ZipArchive::new(zipfile)?; - let compression_bzip = CompressionAlgorithm::Bzip2 as i32; + for (name, info) in self.files.iter() { + conn.execute( + "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", + ( + name, + info.range.start(), + info.range.end(), + info.compression as i32, + ), + )?; + } - for i in 0..archive.len() { - let zf = archive.by_index(i)?; + conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; + conn.execute("END", ())?; + conn.execute("VACUUM", ())?; + Ok(()) + } - conn.execute( - "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", - ( - zf.name(), - zf.data_start(), - zf.data_start() + zf.compressed_size() - 1, - match zf.compression() { - zip::CompressionMethod::Bzip2 => compression_bzip, - c => bail!("unsupported compression algorithm {} in zip-file", c), + pub(crate) fn from_zip(zipfile: &mut R) -> Result { + let mut archive = zip::ZipArchive::new(zipfile)?; + + let mut index = Index { + files: HashMap::with_capacity(archive.len()), + }; + + for i in 0..archive.len() { + let zf = archive.by_index(i)?; + + index.files.insert( + zf.name().to_owned(), + FileInfo { + range: FileRange::new( + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + ), + compression: match zf.compression() { + zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, + c => bail!("unsupported compression algorithm {} in zip-file", c), + }, }, - ), - )?; + ); + } + Ok(index) } - conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; - conn.execute("END", ())?; - conn.execute("VACUUM", ())?; +} + +/// create an archive index based on a zipfile. +/// +/// Will delete the destination file if it already exists. +pub(crate) fn create>( + zipfile: &mut R, + destination: P, +) -> Result<()> { + Index::from_zip(zipfile)? + .write_sqlite(&destination) + .context("error writing SQLite index")?; Ok(()) } +fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { + let mut deserializer = serde_cbor::Deserializer::from_slice(bytes); + + /// This visitor will just find the `files` element in the top-level map. + /// Then it will call the `FindFileVisitor` that should find the actual + /// FileInfo for the path we are searching for. + struct FindFileListVisitor { + search_for: String, + } + + impl<'de> Visitor<'de> for FindFileListVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a map with a 'files' key") + } + + fn visit_map(self, mut map: V) -> Result + where + V: MapAccess<'de>, + { + /// This visitor will walk the full `files` map and search for + /// the path we want to have. + /// Return value is just the `FileInfo` we want to have, or + /// `None`. + struct FindFileVisitor { + search_for: String, + } + + impl<'de> DeserializeSeed<'de> for FindFileVisitor { + type Value = Option; + fn deserialize(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_map(self) + } + } + + impl<'de> Visitor<'de> for FindFileVisitor { + type Value = Option; + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!( + formatter, + "a map with path => FileInfo, searching for path {:?}", + self.search_for + ) + } + fn visit_map(self, mut map: V) -> Result + where + V: MapAccess<'de>, + { + while let Some(key) = map.next_key::<&str>()? { + if key == self.search_for { + let value = map.next_value::()?; + // skip over the rest of the data without really parsing it. + // If we don't do this the serde_cbor deserializer fails because not + // the whole map is consumed. + while map.next_entry::()?.is_some() {} + return Ok(Some(value)); + } else { + // skip parsing the FileInfo structure when the key doesn't match. + map.next_value::()?; + } + } + + Ok(None) + } + } + + while let Some(key) = map.next_key::<&str>()? { + if key == "files" { + return map.next_value_seed(FindFileVisitor { + search_for: self.search_for, + }); + } + } + + Ok(None) + } + } + + impl<'de> DeserializeSeed<'de> for FindFileListVisitor { + type Value = Option; + + fn deserialize(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_map(self) + } + } + + Ok(FindFileListVisitor { + search_for: search_for.to_string(), + } + .deserialize(&mut deserializer)?) +} + fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { let mut stmt = conn.prepare( " @@ -101,14 +242,67 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result _Magic number_) +/// ```text +/// > FORMAT DETAILS +/// > OFFSET SIZE DESCRIPTION +/// > 0 16 Header string: "SQLite format 3\000" +/// > [...] +pub(crate) fn is_sqlite_file>(archive_index_path: P) -> Result { + let mut f = File::open(archive_index_path)?; + + let mut buffer = [0; 16]; + match f.read_exact(&mut buffer) { + Ok(()) => Ok(buffer == SQLITE_FILE_HEADER), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(false), + Err(err) => Err(err.into()), + } +} + pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, pool: &SqliteConnectionPool, ) -> Result> { - pool.with_connection(archive_index_path, |connection| { - find_in_sqlite_index(connection, search_for) - }) + if is_sqlite_file(&archive_index_path)? { + pool.with_connection(archive_index_path, |connection| { + find_in_sqlite_index(connection, search_for) + }) + } else { + let file = fs::File::open(archive_index_path).context("could not open file")?; + let mmap = unsafe { + MmapOptions::new() + .map(&file) + .context("could not create memory map")? + }; + + find_in_slice(&mmap, search_for) + } +} + +pub(crate) fn convert_to_sqlite_index>( + path: P, + tmpdir: impl AsRef, +) -> Result { + let path = path.as_ref(); + let index: Index = { serde_cbor::from_reader(BufReader::new(File::open(path)?))? }; + + // write the new index into a temporary file so reads from ongoing requests + // can continue on the old index until the new one is fully written. + let tmp_path = tempfile::NamedTempFile::new_in(tmpdir)?.into_temp_path(); + index + .write_sqlite(&tmp_path) + .context("error writing SQLite index")?; + + Ok(tmp_path) } #[cfg(test)] @@ -117,6 +311,14 @@ mod tests { use std::io::Write; use zip::write::FileOptions; + /// legacy archive index creation, only for testing that reading them still works + fn create_cbor_index( + zipfile: &mut R, + writer: &mut W, + ) -> Result<()> { + serde_cbor::to_writer(writer, &Index::from_zip(zipfile)?).context("serialization error") + } + fn create_test_archive() -> fs::File { let mut tf = tempfile::tempfile().unwrap(); @@ -134,12 +336,88 @@ mod tests { tf } + #[test] + fn convert_to_sqlite() { + let mut tf = create_test_archive(); + let mut cbor_buf = Vec::new(); + create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); + let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); + io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); + + assert!(!is_sqlite_file(&cbor_index_file).unwrap()); + + let original_fi = find_in_file( + cbor_index_file.path(), + "testfile1", + &SqliteConnectionPool::default(), + ) + .unwrap() + .unwrap(); + + let temp_dir = tempfile::TempDir::new().unwrap(); + let sqlite_index_file = convert_to_sqlite_index(cbor_index_file, &temp_dir).unwrap(); + assert!(is_sqlite_file(&sqlite_index_file).unwrap()); + + let migrated_fi = find_in_file( + sqlite_index_file, + "testfile1", + &SqliteConnectionPool::default(), + ) + .unwrap() + .unwrap(); + + assert_eq!(migrated_fi, original_fi); + } + + #[test] + fn index_create_save_load_cbor_direct() { + let mut tf = create_test_archive(); + let mut buf = Vec::new(); + create_cbor_index(&mut tf, &mut buf).unwrap(); + + let fi = find_in_slice(&buf, "testfile1").unwrap().unwrap(); + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_slice(&buf, "some_other_file").unwrap().is_none()); + } + + #[test] + fn index_create_save_load_cbor_as_fallback() { + let mut tf = create_test_archive(); + let mut cbor_buf = Vec::new(); + create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); + let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); + io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); + + assert!(!is_sqlite_file(&cbor_index_file).unwrap()); + + let fi = find_in_file( + cbor_index_file.path(), + "testfile1", + &SqliteConnectionPool::default(), + ) + .unwrap() + .unwrap(); + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_file( + cbor_index_file.path(), + "some_other_file", + &SqliteConnectionPool::default(), + ) + .unwrap() + .is_none()); + } + #[test] fn index_create_save_load_sqlite() { let mut tf = create_test_archive(); let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); create(&mut tf, &tempfile).unwrap(); + assert!(is_sqlite_file(&tempfile).unwrap()); let fi = find_in_file(&tempfile, "testfile1", &SqliteConnectionPool::default()) .unwrap() @@ -156,4 +434,24 @@ mod tests { .unwrap() .is_none()); } + + #[test] + fn is_sqlite_file_empty() { + let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + assert!(!is_sqlite_file(tempfile).unwrap()); + } + + #[test] + fn is_sqlite_file_other_content() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(b"some_bytes").unwrap(); + assert!(!is_sqlite_file(tempfile.path()).unwrap()); + } + + #[test] + fn is_sqlite_file_specific_headers() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(SQLITE_FILE_HEADER).unwrap(); + assert!(is_sqlite_file(tempfile.path()).unwrap()); + } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 980e4562a..79c13f8ff 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,6 +15,7 @@ use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use fn_error_context::context; use path_slash::PathExt; +use postgres::fallible_iterator::FallibleIterator; use std::io::BufReader; use std::num::NonZeroU64; use std::{ @@ -22,12 +23,13 @@ use std::{ ffi::OsStr, fmt, fs, io::{self, Write}, + iter, ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; use tokio::runtime::Runtime; -use tracing::{error, instrument, trace}; +use tracing::{error, info, instrument, trace}; const MAX_CONCURRENT_UPLOADS: usize = 1000; @@ -587,6 +589,91 @@ pub(crate) fn source_archive_path(name: &str, version: &str) -> String { format!("sources/{name}/{version}.zip") } +#[instrument(skip(storage))] +fn migrate_one(storage: &Storage, archive_path: &str, tmpdir: &Path) -> Result<()> { + // this will also download the index if it doesn't exist locally + let local_index_filename = match storage.get_index_filename(archive_path) { + Ok(filename) => filename, + Err(err) => { + if err.is::() { + info!("index does not exist, skipping"); + return Ok(()); + } else { + return Err(err); + } + } + }; + + if archive_index::is_sqlite_file(&local_index_filename)? { + info!("index already in SQLite format, skipping"); + return Ok(()); + } + + info!("converting local index..."); + let remote_index_path = format!("{}.index", &archive_path); + let new_index_temp_path = + archive_index::convert_to_sqlite_index(&local_index_filename, tmpdir)?; + + // first upload to S3, ongoing requests will still use the local CBOR index + info!("uplading to S3..."); + // S3 put-object will overwrite the existing index + storage.store_one(remote_index_path, std::fs::read(&new_index_temp_path)?)?; + + // move the temporary file into place + // This has a race condition when a request is trying to read the index between + // the `remove_file` and the `rename`. In this case the handler will then just + // unnecessarily download the index again. + fs::remove_file(&local_index_filename)?; + fs::rename(new_index_temp_path, local_index_filename)?; + + Ok(()) +} + +/// migrate existing archive indexes from the old CBOR format to SQLite +pub fn migrate_old_archive_indexes( + storage: &Storage, + conn: &mut impl postgres::GenericClient, +) -> Result<()> { + let tmpdir = storage.config.prefix.join("archive_cache_tmp"); + if !tmpdir.exists() { + fs::create_dir(&tmpdir)?; + } + + for row in conn + .query_raw( + " + SELECT + crates.name, + releases.version + FROM + crates + INNER JOIN releases ON releases.crate_id = crates.id + WHERE + releases.archive_storage = true + + ORDER BY + crates.name, + releases.id + ", + iter::empty::(), + )? + .iterator() + { + let row = row?; + let name: &str = row.get(0); + let version: &str = row.get(1); + info!("converting archive index for {} {}...", name, version); + + if let Err(err) = migrate_one(storage, &rustdoc_archive_path(name, version), &tmpdir) { + error!("error converting rustdoc archive index: {:?}", err); + } + if let Err(err) = migrate_one(storage, &source_archive_path(name, version), &tmpdir) { + error!("error converting source archive index: {:?}", err); + } + } + Ok(()) +} + #[cfg(test)] mod test { use super::*;