From ea7eb5c7c79a3d64ff915c6032681dfdc6ed96c6 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Mon, 31 Jul 2023 18:45:54 +0200 Subject: [PATCH] drop CBOR index support & migration --- .cargo/audit.toml | 3 - Cargo.lock | 37 +--- Cargo.toml | 2 - src/bin/cratesfyi.rs | 11 +- src/lib.rs | 1 - src/storage/archive_index.rs | 380 ++++------------------------------- src/storage/mod.rs | 80 +------- 7 files changed, 52 insertions(+), 462 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 63ced22ef..dfe918ab9 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -3,9 +3,6 @@ ignore = [ "RUSTSEC-2020-0036", # failure is officially deprecated/unmaintained # https://github.com/rust-lang/docs.rs/issues/1014 - "RUSTSEC-2021-0127", # serde_cbor is unmaintained - # https://github.com/rust-lang/docs.rs/issues/1568 - "RUSTSEC-2023-0018", # rustwide -> remove_dir_all,TOCTOU / Race Condition # https://github.com/rust-lang/docs.rs/issues/2074 ] diff --git a/Cargo.lock b/Cargo.lock index 86227d8cb..84a44bf82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1392,7 +1392,6 @@ dependencies = [ "kuchikiki", "log", "lol_html", - "memmap2 0.5.10", "mime", "mime_guess", "mockito", @@ -1423,7 +1422,6 @@ dependencies = [ "sentry-tower", "sentry-tracing", "serde", - "serde_cbor", "serde_json", "slug", "string_cache", @@ -2049,7 +2047,7 @@ dependencies = [ "gix-chunk", "gix-features 0.31.1", "gix-hash", - "memmap2 0.7.1", + "memmap2", "thiserror", ] @@ -2063,7 +2061,7 @@ dependencies = [ "gix-chunk", "gix-features 0.32.1", "gix-hash", - "memmap2 0.7.1", + "memmap2", "thiserror", ] @@ -2391,7 +2389,7 @@ dependencies = [ "gix-object 0.32.0", "gix-traverse 0.29.0", "itoa 1.0.9", - "memmap2 0.7.1", + "memmap2", "smallvec", "thiserror", ] @@ -2414,7 +2412,7 @@ dependencies = [ "gix-object 0.33.1", "gix-traverse 0.30.1", "itoa 1.0.9", - "memmap2 0.7.1", + "memmap2", "smallvec", "thiserror", ] @@ -2580,7 +2578,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-traverse 0.29.0", - "memmap2 0.7.1", + "memmap2", "parking_lot", "smallvec", "thiserror", @@ -2603,7 +2601,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-traverse 0.30.1", - "memmap2 0.7.1", + "memmap2", "parking_lot", "smallvec", "thiserror", @@ -2721,7 +2719,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-validate", - "memmap2 0.7.1", + "memmap2", "nom", "thiserror", ] @@ -2742,7 +2740,7 @@ dependencies = [ "gix-path", "gix-tempfile", "gix-validate", - "memmap2 0.7.1", + "memmap2", "nom", "thiserror", ] @@ -3766,15 +3764,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memmap2" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" -dependencies = [ - "libc", -] - [[package]] name = "memmap2" version = "0.7.1" @@ -5248,16 +5237,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_cbor" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" -dependencies = [ - "half", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.178" diff --git a/Cargo.toml b/Cargo.toml index 8c91cc180..cc8d85bed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,6 @@ string_cache = "0.8.0" postgres-types = { version = "0.2", features = ["derive"] } zip = {version = "0.6.2", default-features = false, features = ["bzip2"]} bzip2 = "0.4.4" -serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.11.0", optional = true} rusqlite = { version = "0.29.0", features = ["bundled"] } @@ -87,7 +86,6 @@ uuid = "1.1.2" # Data serialization and deserialization serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -memmap2 = "0.5.0" # axum dependencies axum = { version = "0.6.1", features = ["headers"]} diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index beae1c0ff..8bead0151 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -15,8 +15,8 @@ use docs_rs::utils::{ remove_crate_priority, set_crate_priority, ConfigName, }; use docs_rs::{ - migrate_old_archive_indexes, start_background_metrics_webserver, start_web_server, BuildQueue, - Config, Context, Index, InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage, + start_background_metrics_webserver, start_web_server, BuildQueue, Config, Context, Index, + InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage, }; use humantime::Duration; use once_cell::sync::OnceCell; @@ -482,9 +482,6 @@ enum DatabaseSubcommand { /// Backfill GitHub/Gitlab stats for crates. BackfillRepositoryStats, - /// migrate the old CBOR archive index files to SQLIte - MigrateArchiveIndex, - /// Updates info for a crate from the registry's API UpdateCrateRegistryFields { #[arg(name = "CRATE")] @@ -536,10 +533,6 @@ impl DatabaseSubcommand { ctx.repository_stats_updater()?.update_all_crates()?; } - Self::MigrateArchiveIndex => { - migrate_old_archive_indexes(&*ctx.storage()?, &mut *ctx.conn()?)?; - } - Self::BackfillRepositoryStats => { ctx.repository_stats_updater()?.backfill_repositories()?; } diff --git a/src/lib.rs b/src/lib.rs index b49e9aff1..d53fd32b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ pub use self::docbuilder::PackageKind; pub use self::docbuilder::RustwideBuilder; pub use self::index::Index; pub use self::metrics::{InstanceMetrics, ServiceMetrics}; -pub use self::storage::migrate_old_archive_indexes; pub use self::storage::Storage; pub use self::web::{start_background_metrics_webserver, start_web_server}; diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index 815ebfab1..9cd2ee96f 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,20 +1,12 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; -use memmap2::MmapOptions; use rusqlite::{Connection, OptionalExtension}; -use serde::de::DeserializeSeed; -use serde::de::{IgnoredAny, MapAccess, Visitor}; -use serde::{Deserialize, Deserializer, Serialize}; -use std::io::BufReader; -use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path}; -use tempfile::TempPath; +use std::{fs, io, path::Path}; use super::sqlite_pool::SqliteConnectionPool; -static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; - -#[derive(Deserialize, Serialize, PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug)] pub(crate) struct FileInfo { range: FileRange, compression: CompressionAlgorithm, @@ -29,23 +21,23 @@ impl FileInfo { } } -#[derive(Deserialize, Serialize)] -struct Index { - files: HashMap, -} - -impl Index { - pub(crate) fn write_sqlite>(&self, destination: P) -> Result<()> { - let destination = destination.as_ref(); - if destination.exists() { - fs::remove_file(destination)?; - } +/// create an archive index based on a zipfile. +/// +/// Will delete the destination file if it already exists. +pub(crate) fn create>( + zipfile: &mut R, + destination: P, +) -> Result<()> { + let destination = destination.as_ref(); + if destination.exists() { + fs::remove_file(destination)?; + } - let conn = rusqlite::Connection::open(destination)?; - conn.execute("PRAGMA synchronous = FULL", ())?; - conn.execute("BEGIN", ())?; - conn.execute( - " + let conn = rusqlite::Connection::open(destination)?; + conn.execute("PRAGMA synchronous = FULL", ())?; + conn.execute("BEGIN", ())?; + conn.execute( + " CREATE TABLE files ( id INTEGER PRIMARY KEY, path TEXT UNIQUE, @@ -54,167 +46,34 @@ impl Index { compression INTEGER ); ", - (), - )?; - - for (name, info) in self.files.iter() { - conn.execute( - "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", - ( - name, - info.range.start(), - info.range.end(), - info.compression as i32, - ), - )?; - } - - conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; - conn.execute("END", ())?; - conn.execute("VACUUM", ())?; - Ok(()) - } - - pub(crate) fn from_zip(zipfile: &mut R) -> Result { - let mut archive = zip::ZipArchive::new(zipfile)?; + (), + )?; - let mut index = Index { - files: HashMap::with_capacity(archive.len()), - }; + let mut archive = zip::ZipArchive::new(zipfile)?; + let compression_bzip = CompressionAlgorithm::Bzip2 as i32; - for i in 0..archive.len() { - let zf = archive.by_index(i)?; + for i in 0..archive.len() { + let zf = archive.by_index(i)?; - index.files.insert( - zf.name().to_owned(), - FileInfo { - range: FileRange::new( - zf.data_start(), - zf.data_start() + zf.compressed_size() - 1, - ), - compression: match zf.compression() { - zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, - c => bail!("unsupported compression algorithm {} in zip-file", c), - }, + conn.execute( + "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", + ( + zf.name(), + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + match zf.compression() { + zip::CompressionMethod::Bzip2 => compression_bzip, + c => bail!("unsupported compression algorithm {} in zip-file", c), }, - ); - } - Ok(index) + ), + )?; } -} - -/// create an archive index based on a zipfile. -/// -/// Will delete the destination file if it already exists. -pub(crate) fn create>( - zipfile: &mut R, - destination: P, -) -> Result<()> { - Index::from_zip(zipfile)? - .write_sqlite(&destination) - .context("error writing SQLite index")?; + conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; + conn.execute("END", ())?; + conn.execute("VACUUM", ())?; Ok(()) } -fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { - let mut deserializer = serde_cbor::Deserializer::from_slice(bytes); - - /// This visitor will just find the `files` element in the top-level map. - /// Then it will call the `FindFileVisitor` that should find the actual - /// FileInfo for the path we are searching for. - struct FindFileListVisitor { - search_for: String, - } - - impl<'de> Visitor<'de> for FindFileListVisitor { - type Value = Option; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "a map with a 'files' key") - } - - fn visit_map(self, mut map: V) -> Result - where - V: MapAccess<'de>, - { - /// This visitor will walk the full `files` map and search for - /// the path we want to have. - /// Return value is just the `FileInfo` we want to have, or - /// `None`. - struct FindFileVisitor { - search_for: String, - } - - impl<'de> DeserializeSeed<'de> for FindFileVisitor { - type Value = Option; - fn deserialize(self, deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_map(self) - } - } - - impl<'de> Visitor<'de> for FindFileVisitor { - type Value = Option; - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!( - formatter, - "a map with path => FileInfo, searching for path {:?}", - self.search_for - ) - } - fn visit_map(self, mut map: V) -> Result - where - V: MapAccess<'de>, - { - while let Some(key) = map.next_key::<&str>()? { - if key == self.search_for { - let value = map.next_value::()?; - // skip over the rest of the data without really parsing it. - // If we don't do this the serde_cbor deserializer fails because not - // the whole map is consumed. - while map.next_entry::()?.is_some() {} - return Ok(Some(value)); - } else { - // skip parsing the FileInfo structure when the key doesn't match. - map.next_value::()?; - } - } - - Ok(None) - } - } - - while let Some(key) = map.next_key::<&str>()? { - if key == "files" { - return map.next_value_seed(FindFileVisitor { - search_for: self.search_for, - }); - } - } - - Ok(None) - } - } - - impl<'de> DeserializeSeed<'de> for FindFileListVisitor { - type Value = Option; - - fn deserialize(self, deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_map(self) - } - } - - Ok(FindFileListVisitor { - search_for: search_for.to_string(), - } - .deserialize(&mut deserializer)?) -} - fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { let mut stmt = conn.prepare( " @@ -242,67 +101,14 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result _Magic number_) -/// ```text -/// > FORMAT DETAILS -/// > OFFSET SIZE DESCRIPTION -/// > 0 16 Header string: "SQLite format 3\000" -/// > [...] -pub(crate) fn is_sqlite_file>(archive_index_path: P) -> Result { - let mut f = File::open(archive_index_path)?; - - let mut buffer = [0; 16]; - match f.read_exact(&mut buffer) { - Ok(()) => Ok(buffer == SQLITE_FILE_HEADER), - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(false), - Err(err) => Err(err.into()), - } -} - pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, pool: &SqliteConnectionPool, ) -> Result> { - if is_sqlite_file(&archive_index_path)? { - pool.with_connection(archive_index_path, |connection| { - find_in_sqlite_index(connection, search_for) - }) - } else { - let file = fs::File::open(archive_index_path).context("could not open file")?; - let mmap = unsafe { - MmapOptions::new() - .map(&file) - .context("could not create memory map")? - }; - - find_in_slice(&mmap, search_for) - } -} - -pub(crate) fn convert_to_sqlite_index>( - path: P, - tmpdir: impl AsRef, -) -> Result { - let path = path.as_ref(); - let index: Index = { serde_cbor::from_reader(BufReader::new(File::open(path)?))? }; - - // write the new index into a temporary file so reads from ongoing requests - // can continue on the old index until the new one is fully written. - let tmp_path = tempfile::NamedTempFile::new_in(tmpdir)?.into_temp_path(); - index - .write_sqlite(&tmp_path) - .context("error writing SQLite index")?; - - Ok(tmp_path) + pool.with_connection(archive_index_path, |connection| { + find_in_sqlite_index(connection, search_for) + }) } #[cfg(test)] @@ -311,14 +117,6 @@ mod tests { use std::io::Write; use zip::write::FileOptions; - /// legacy archive index creation, only for testing that reading them still works - fn create_cbor_index( - zipfile: &mut R, - writer: &mut W, - ) -> Result<()> { - serde_cbor::to_writer(writer, &Index::from_zip(zipfile)?).context("serialization error") - } - fn create_test_archive() -> fs::File { let mut tf = tempfile::tempfile().unwrap(); @@ -336,88 +134,12 @@ mod tests { tf } - #[test] - fn convert_to_sqlite() { - let mut tf = create_test_archive(); - let mut cbor_buf = Vec::new(); - create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); - let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); - io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); - - assert!(!is_sqlite_file(&cbor_index_file).unwrap()); - - let original_fi = find_in_file( - cbor_index_file.path(), - "testfile1", - &SqliteConnectionPool::default(), - ) - .unwrap() - .unwrap(); - - let temp_dir = tempfile::TempDir::new().unwrap(); - let sqlite_index_file = convert_to_sqlite_index(cbor_index_file, &temp_dir).unwrap(); - assert!(is_sqlite_file(&sqlite_index_file).unwrap()); - - let migrated_fi = find_in_file( - sqlite_index_file, - "testfile1", - &SqliteConnectionPool::default(), - ) - .unwrap() - .unwrap(); - - assert_eq!(migrated_fi, original_fi); - } - - #[test] - fn index_create_save_load_cbor_direct() { - let mut tf = create_test_archive(); - let mut buf = Vec::new(); - create_cbor_index(&mut tf, &mut buf).unwrap(); - - let fi = find_in_slice(&buf, "testfile1").unwrap().unwrap(); - assert_eq!(fi.range, FileRange::new(39, 459)); - assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - - assert!(find_in_slice(&buf, "some_other_file").unwrap().is_none()); - } - - #[test] - fn index_create_save_load_cbor_as_fallback() { - let mut tf = create_test_archive(); - let mut cbor_buf = Vec::new(); - create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); - let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); - io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); - - assert!(!is_sqlite_file(&cbor_index_file).unwrap()); - - let fi = find_in_file( - cbor_index_file.path(), - "testfile1", - &SqliteConnectionPool::default(), - ) - .unwrap() - .unwrap(); - assert_eq!(fi.range, FileRange::new(39, 459)); - assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - - assert!(find_in_file( - cbor_index_file.path(), - "some_other_file", - &SqliteConnectionPool::default(), - ) - .unwrap() - .is_none()); - } - #[test] fn index_create_save_load_sqlite() { let mut tf = create_test_archive(); let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); create(&mut tf, &tempfile).unwrap(); - assert!(is_sqlite_file(&tempfile).unwrap()); let fi = find_in_file(&tempfile, "testfile1", &SqliteConnectionPool::default()) .unwrap() @@ -434,24 +156,4 @@ mod tests { .unwrap() .is_none()); } - - #[test] - fn is_sqlite_file_empty() { - let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); - assert!(!is_sqlite_file(tempfile).unwrap()); - } - - #[test] - fn is_sqlite_file_other_content() { - let mut tempfile = tempfile::NamedTempFile::new().unwrap(); - tempfile.write_all(b"some_bytes").unwrap(); - assert!(!is_sqlite_file(tempfile.path()).unwrap()); - } - - #[test] - fn is_sqlite_file_specific_headers() { - let mut tempfile = tempfile::NamedTempFile::new().unwrap(); - tempfile.write_all(SQLITE_FILE_HEADER).unwrap(); - assert!(is_sqlite_file(tempfile.path()).unwrap()); - } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 7eb9bf51d..980e4562a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,7 +15,6 @@ use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use fn_error_context::context; use path_slash::PathExt; -use postgres::fallible_iterator::FallibleIterator; use std::io::BufReader; use std::num::NonZeroU64; use std::{ @@ -23,13 +22,12 @@ use std::{ ffi::OsStr, fmt, fs, io::{self, Write}, - iter, ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; use tokio::runtime::Runtime; -use tracing::{error, info, instrument, trace}; +use tracing::{error, instrument, trace}; const MAX_CONCURRENT_UPLOADS: usize = 1000; @@ -589,82 +587,6 @@ pub(crate) fn source_archive_path(name: &str, version: &str) -> String { format!("sources/{name}/{version}.zip") } -#[instrument(skip(storage))] -fn migrate_one(storage: &Storage, archive_path: &str, tmpdir: &Path) -> Result<()> { - // this will also download the index if it doesn't exist locally - let local_index_filename = storage.get_index_filename(archive_path)?; - - if archive_index::is_sqlite_file(&local_index_filename)? { - info!("index already in SQLite format, skipping"); - return Ok(()); - } - - info!("converting local index..."); - let remote_index_path = format!("{}.index", &archive_path); - let new_index_temp_path = - archive_index::convert_to_sqlite_index(&local_index_filename, tmpdir)?; - - // first upload to S3, ongoing requests will still use the local CBOR index - info!("uplading to S3..."); - // S3 put-object will overwrite the existing index - storage.store_one(remote_index_path, std::fs::read(&new_index_temp_path)?)?; - - // move the temporary file into place - // This has a race condition when a request is trying to read the index between - // the `remove_file` and the `rename`. In this case the handler will then just - // unnecessarily download the index again. - fs::remove_file(&local_index_filename)?; - fs::rename(new_index_temp_path, local_index_filename)?; - - Ok(()) -} - -/// migrate existing archive indexes from the old CBOR format to SQLite -pub fn migrate_old_archive_indexes( - storage: &Storage, - conn: &mut impl postgres::GenericClient, -) -> Result<()> { - let tmpdir = storage.config.prefix.join("archive_cache_tmp"); - if !tmpdir.exists() { - fs::create_dir(&tmpdir)?; - } - - for row in conn - .query_raw( - " - SELECT - crates.name, - releases.version - FROM - crates - INNER JOIN releases ON releases.crate_id = crates.id - WHERE - releases.archive_storage = true AND - releases.build_status = true - - ORDER BY - crates.name, - releases.id - ", - iter::empty::(), - )? - .iterator() - { - let row = row?; - let name: &str = row.get(0); - let version: &str = row.get(1); - info!("converting archive index for {} {}...", name, version); - - if let Err(err) = migrate_one(storage, &rustdoc_archive_path(name, version), &tmpdir) { - error!("error converting rustdoc archive index: {:?}", err); - } - if let Err(err) = migrate_one(storage, &source_archive_path(name, version), &tmpdir) { - error!("error converting source archive index: {:?}", err); - } - } - Ok(()) -} - #[cfg(test)] mod test { use super::*;