Skip to content

Commit

Permalink
fix(index): change sqlite schema to (name, ver, blob)
Browse files Browse the repository at this point in the history
  • Loading branch information
weihanglo committed Apr 2, 2024
1 parent 4b4a993 commit aadab5f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 32 deletions.
81 changes: 63 additions & 18 deletions src/cargo/sources/registry/index/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ use crate::CargoResult;
use crate::GlobalContext;

use super::split;
use super::Summaries;
use super::MaybeIndexSummary;
use super::INDEX_V_MAX;

/// The current version of [`SummariesCache`].
Expand Down Expand Up @@ -231,18 +233,27 @@ impl<'a> SummariesCache<'a> {
/// An abstraction of the actual cache store.
trait CacheStore {
/// Gets the cache associated with the key.
fn get(&self, key: &str) -> Option<Vec<u8>>;
fn get(&self, key: &str) -> Option<MaybeSummaries>;

/// Associates the value with the key.
fn put(&self, key: &str, value: &[u8]);

/// Associates the value with the key + version tuple.
fn put_summary(&self, key: (&str, &Version), value: &[u8]);

/// Invalidates the cache associated with the key.
fn invalidate(&self, key: &str);
}

pub enum MaybeSummaries {
Unparsed(Vec<u8>),
Parsed(Summaries),
}

/// Manages the on-disk index caches.
pub struct CacheManager<'gctx> {
store: Box<dyn CacheStore + 'gctx>,
is_sqlite: bool,
}

impl<'gctx> CacheManager<'gctx> {
Expand All @@ -258,11 +269,15 @@ impl<'gctx> CacheManager<'gctx> {
} else {
Box::new(LocalFileSystem::new(cache_root, gctx))
};
CacheManager { store }
CacheManager { store, is_sqlite: use_sqlite }
}

pub fn is_sqlite(&self) -> bool {
self.is_sqlite
}

/// Gets the cache associated with the key.
pub fn get(&self, key: &str) -> Option<Vec<u8>> {
pub fn get(&self, key: &str) -> Option<MaybeSummaries> {
self.store.get(key)
}

Expand All @@ -271,6 +286,11 @@ impl<'gctx> CacheManager<'gctx> {
self.store.put(key, value)
}

/// Associates the value with the key + version tuple.
pub fn put_summary(&self, key: (&str, &Version), value: &[u8]) {
self.store.put_summary(key, value)
}

/// Invalidates the cache associated with the key.
pub fn invalidate(&self, key: &str) {
self.store.invalidate(key)
Expand Down Expand Up @@ -301,10 +321,10 @@ impl LocalFileSystem<'_> {
}

impl CacheStore for LocalFileSystem<'_> {
fn get(&self, key: &str) -> Option<Vec<u8>> {
fn get(&self, key: &str) -> Option<MaybeSummaries> {
let cache_path = &self.cache_path(key);
match fs::read(cache_path) {
Ok(contents) => Some(contents),
Ok(contents) => Some(MaybeSummaries::Unparsed(contents)),
Err(e) => {
tracing::debug!(?cache_path, "cache missing: {e}");
None
Expand All @@ -324,6 +344,10 @@ impl CacheStore for LocalFileSystem<'_> {
}
}

fn put_summary(&self, _key: (&str, &Version), _value: &[u8]) {
panic!("unsupported");
}

fn invalidate(&self, key: &str) {
let cache_path = &self.cache_path(key);
if let Err(e) = fs::remove_file(cache_path) {
Expand All @@ -341,7 +365,7 @@ struct LocalDatabase<'gctx> {
/// Connection to the SQLite database.
conn: OnceCell<Option<RefCell<Connection>>>,
/// [`GlobalContext`] reference for convenience.
deferred_writes: RefCell<BTreeMap<String, Vec<u8>>>,
deferred_writes: RefCell<BTreeMap<String, Vec<(String, Vec<u8>)>>>,
gctx: &'gctx GlobalContext,
}

Expand All @@ -351,7 +375,7 @@ impl LocalDatabase<'_> {
LocalDatabase {
cache_root,
conn: OnceCell::new(),
deferred_writes: RefCell::new(BTreeMap::new()),
deferred_writes: Default::default(),
gctx,
}
}
Expand Down Expand Up @@ -386,9 +410,11 @@ impl LocalDatabase<'_> {
let mut conn = conn.borrow_mut();
let tx = conn.transaction()?;
let mut stmt =
tx.prepare_cached("INSERT OR REPLACE INTO summaries (name, value) VALUES (?, ?)")?;
for (key, value) in self.deferred_writes.borrow().iter() {
stmt.execute(params!(key, value))?;
tx.prepare_cached("INSERT OR REPLACE INTO summaries (name, version, value) VALUES (?, ?, ?)")?;
for (name, summaries) in self.deferred_writes.borrow().iter() {
for (version, value) in summaries {
stmt.execute(params!(name, version, value))?;
}
}
drop(stmt);
tx.commit()?;
Expand All @@ -406,19 +432,36 @@ impl Drop for LocalDatabase<'_> {
}

impl CacheStore for LocalDatabase<'_> {
fn get(&self, key: &str) -> Option<Vec<u8>> {
fn get(&self, key: &str) -> Option<MaybeSummaries> {
self.conn()?
.borrow()
.prepare_cached("SELECT value FROM summaries WHERE name = ? LIMIT 1")
.and_then(|mut stmt| stmt.query_row([key], |row| row.get(0)))
.map_err(|e| tracing::debug!(key, "cache missing: {e}"))
.prepare_cached("SELECT version, value FROM summaries WHERE name = ?")
.and_then(|mut stmt| {
let rows = stmt.query_map([key], |row| Ok((row.get(0)?, row.get(1)?)))?;
let mut summaries = Summaries::default();
for row in rows {
let (version, raw_data): (String, Vec<u8>) = row?;
let version = Version::parse(&version).expect("semver");
summaries.versions.insert(version, MaybeIndexSummary::UnparsedData(raw_data));
}
Ok(MaybeSummaries::Parsed(summaries))
})
.map_err(|e| {
tracing::debug!(key, "cache missing: {e}");
})
.ok()
}

fn put(&self, key: &str, value: &[u8]) {
fn put(&self, _key: &str, _value: &[u8]) {
panic!("unsupported");
}

fn put_summary(&self, (name, version): (&str, &Version), value: &[u8]) {
self.deferred_writes
.borrow_mut()
.insert(key.into(), value.into());
.entry(name.into())
.or_insert(Default::default())
.push((version.to_string(), value.to_vec()));
}

fn invalidate(&self, key: &str) {
Expand All @@ -440,8 +483,10 @@ impl CacheStore for LocalDatabase<'_> {
fn migrations() -> Vec<Migration> {
vec![basic_migration(
"CREATE TABLE IF NOT EXISTS summaries (
name TEXT PRIMARY KEY NOT NULL,
value BLOB NOT NULL
name TEXT NOT NULL,
version TEXT NOT NULL,
value BLOB NOT NULL,
PRIMARY KEY (name, version)
)",
)]
}
46 changes: 32 additions & 14 deletions src/cargo/sources/registry/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::task::{ready, Poll};
use tracing::{debug, info};

mod cache;
use self::cache::CacheManager;
use self::cache::{CacheManager, MaybeSummaries};
use self::cache::SummariesCache;

/// The maximum schema version of the `v` field in the index this version of
Expand Down Expand Up @@ -115,7 +115,8 @@ struct Summaries {
enum MaybeIndexSummary {
/// A summary which has not been parsed, The `start` and `end` are pointers
/// into [`Summaries::raw_data`] which this is an entry of.
Unparsed { start: usize, end: usize },
Unparsed(std::ops::Range<usize>),
UnparsedData(Vec<u8>),

/// An actually parsed summary.
Parsed(IndexSummary),
Expand Down Expand Up @@ -551,14 +552,20 @@ impl Summaries {

let mut cached_summaries = None;
let mut index_version = None;
if let Some(contents) = cache_manager.get(name) {
match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
if let Some(maybe_summaries) = cache_manager.get(name) {
match maybe_summaries {
MaybeSummaries::Unparsed(contents) => match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
}
Err(e) => {
tracing::debug!("failed to parse {name:?} cache: {e}");
}
}
Err(e) => {
tracing::debug!("failed to parse {name:?} cache: {e}");
MaybeSummaries::Parsed(summaries) => {
cached_summaries = Some(summaries);
index_version = Some("2".into());
}
}
}
Expand Down Expand Up @@ -611,9 +618,18 @@ impl Summaries {
}
};
let version = summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
if cache_manager.is_sqlite() {
cache_manager.put_summary((&name, &version), line);
} else {
cache.versions.push((version.clone(), line));
}
ret.versions.insert(version, summary.into());
}

if cache_manager.is_sqlite() {
return Poll::Ready(Ok(Some(ret)));
}

if let Some(index_version) = index_version {
tracing::trace!("caching index_version {}", index_version);
let cache_bytes = cache.serialize(index_version.as_str());
Expand Down Expand Up @@ -649,7 +665,7 @@ impl Summaries {
for (version, summary) in cache.versions {
let (start, end) = subslice_bounds(&contents, summary);
ret.versions
.insert(version, MaybeIndexSummary::Unparsed { start, end });
.insert(version, MaybeIndexSummary::Unparsed(start..end));
}
ret.raw_data = contents;
return Ok((ret, index_version));
Expand Down Expand Up @@ -680,14 +696,16 @@ impl MaybeIndexSummary {
source_id: SourceId,
bindeps: bool,
) -> CargoResult<&IndexSummary> {
let (start, end) = match self {
MaybeIndexSummary::Unparsed { start, end } => (*start, *end),
let data = match self {
MaybeIndexSummary::Unparsed(range) => &raw_data[range.clone()],
MaybeIndexSummary::UnparsedData(data) => data,
MaybeIndexSummary::Parsed(summary) => return Ok(summary),
};
let summary = IndexSummary::parse(&raw_data[start..end], source_id, bindeps)?;
let summary = IndexSummary::parse(data, source_id, bindeps)?;
*self = MaybeIndexSummary::Parsed(summary);
match self {
MaybeIndexSummary::Unparsed { .. } => unreachable!(),
MaybeIndexSummary::UnparsedData { .. } => unreachable!(),
MaybeIndexSummary::Parsed(summary) => Ok(summary),
}
}
Expand Down

0 comments on commit aadab5f

Please sign in to comment.