Skip to content

Commit

Permalink
[ENH] Make block and sparse index caches use persistent type.
Browse files Browse the repository at this point in the history
End-to-end tests confirm that with this change foyer will initialize the
disk path (assuming it's writable) and instantiate a disk.  A memory
cache will also instantiate.

Note that the PersistentCache type means that the key and value
implement StorageKey and StorageValue, so it's totally acceptable for
every non-persistent cache to implement the trait.  This means that any
place that takes a persistent cache will also take the non-persistent
versions.  It's trait logic, not "is a".
  • Loading branch information
rescrv committed Oct 4, 2024
1 parent 8e9dbc9 commit 97a14a5
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 36 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
BlockfileReader, BlockfileWriter, Key, Value,
};
use async_trait::async_trait;
use chroma_cache::{Cache, CacheError};
use chroma_cache::{CacheError, PersistentCache};
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_storage::Storage;
Expand All @@ -34,8 +34,8 @@ impl ArrowBlockfileProvider {
pub fn new(
storage: Storage,
max_block_size_bytes: usize,
block_cache: Box<dyn Cache<Uuid, Block>>,
sparse_index_cache: Box<dyn Cache<Uuid, SparseIndex>>,
block_cache: Box<dyn PersistentCache<Uuid, Block>>,
sparse_index_cache: Box<dyn PersistentCache<Uuid, SparseIndex>>,
) -> Self {
Self {
block_manager: BlockManager::new(storage.clone(), max_block_size_bytes, block_cache),
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Configurable<(ArrowBlockfileProviderConfig, Storage)> for ArrowBlockfilePro
config: &(ArrowBlockfileProviderConfig, Storage),
) -> Result<Self, Box<dyn ChromaError>> {
let (blockfile_config, storage) = config;
let block_cache = match chroma_cache::from_config(
let block_cache = match chroma_cache::from_config_persistent(
&blockfile_config.block_manager_config.block_cache_config,
)
.await
Expand All @@ -124,7 +124,7 @@ impl Configurable<(ArrowBlockfileProviderConfig, Storage)> for ArrowBlockfilePro
return Err(e);
}
};
let sparse_index_cache = match chroma_cache::from_config(
let sparse_index_cache = match chroma_cache::from_config_persistent(
&blockfile_config
.sparse_index_manager_config
.sparse_index_cache_config,
Expand Down Expand Up @@ -187,7 +187,7 @@ impl ChromaError for ForkError {
/// is a placeholder for that.
#[derive(Clone)]
pub(super) struct BlockManager {
block_cache: Arc<dyn Cache<Uuid, Block>>,
block_cache: Arc<dyn PersistentCache<Uuid, Block>>,
storage: Storage,
max_block_size_bytes: usize,
write_mutex: Arc<tokio::sync::Mutex<()>>,
Expand All @@ -197,9 +197,9 @@ impl BlockManager {
pub(super) fn new(
storage: Storage,
max_block_size_bytes: usize,
block_cache: Box<dyn Cache<Uuid, Block>>,
block_cache: Box<dyn PersistentCache<Uuid, Block>>,
) -> Self {
let block_cache: Arc<dyn Cache<Uuid, Block>> = block_cache.into();
let block_cache: Arc<dyn PersistentCache<Uuid, Block>> = block_cache.into();
Self {
block_cache,
storage,
Expand Down Expand Up @@ -378,13 +378,13 @@ impl ChromaError for SparseIndexManagerError {

#[derive(Clone)]
pub(super) struct SparseIndexManager {
cache: Arc<dyn Cache<Uuid, SparseIndex>>,
cache: Arc<dyn PersistentCache<Uuid, SparseIndex>>,
storage: Storage,
}

impl SparseIndexManager {
pub fn new(storage: Storage, cache: Box<dyn Cache<Uuid, SparseIndex>>) -> Self {
let cache: Arc<dyn Cache<Uuid, SparseIndex>> = cache.into();
pub fn new(storage: Storage, cache: Box<dyn PersistentCache<Uuid, SparseIndex>>) -> Self {
let cache: Arc<dyn PersistentCache<Uuid, SparseIndex>> = cache.into();
Self { cache, storage }
}

Expand Down
6 changes: 3 additions & 3 deletions rust/blockstore/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::memory::storage::{Readable, Writeable};
use super::types::BlockfileWriter;
use super::{BlockfileReader, Key, Value};
use async_trait::async_trait;
use chroma_cache::Cache;
use chroma_cache::PersistentCache;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_storage::Storage;
Expand Down Expand Up @@ -47,8 +47,8 @@ impl BlockfileProvider {
pub fn new_arrow(
storage: Storage,
max_block_size_bytes: usize,
block_cache: Box<dyn Cache<Uuid, Block>>,
sparse_index_cache: Box<dyn Cache<Uuid, SparseIndex>>,
block_cache: Box<dyn PersistentCache<Uuid, Block>>,
sparse_index_cache: Box<dyn PersistentCache<Uuid, SparseIndex>>,
) -> Self {
BlockfileProvider::ArrowBlockfileProvider(ArrowBlockfileProvider::new(
storage,
Expand Down
30 changes: 24 additions & 6 deletions rust/cache/src/foyer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl FoyerCacheConfig {
K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + StorageValue + Weighted + 'static,
{
FoyerHybridCache::hybrid(self).await
Ok(Box::new(FoyerHybridCache::hybrid(self).await?))
}

/// Build an in-memory-only cache.
Expand All @@ -194,7 +194,18 @@ impl FoyerCacheConfig {
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Weighted + 'static,
{
FoyerPlainCache::memory(self).await
Ok(Box::new(FoyerPlainCache::memory(self).await?))
}

/// Build an in-memory-only cache.
pub async fn build_memory_persistent<K, V>(
&self,
) -> Result<Box<dyn super::PersistentCache<K, V>>, Box<dyn ChromaError>>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static,
V: Clone + Send + Sync + Weighted + StorageValue + 'static,
{
Ok(Box::new(FoyerPlainCache::memory(self).await?))
}
}

Expand All @@ -215,7 +226,7 @@ where
/// Build a hybrid disk and memory cache.
pub async fn hybrid(
config: &FoyerCacheConfig,
) -> Result<Box<dyn super::PersistentCache<K, V>>, Box<dyn ChromaError>> {
) -> Result<FoyerHybridCache<K, V>, Box<dyn ChromaError>> {
let tracing_config = TracingConfig::default();
tracing_config
.set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _));
Expand Down Expand Up @@ -281,7 +292,7 @@ where
e
))) as _
})?;
Ok(Box::new(FoyerHybridCache { cache }))
Ok(FoyerHybridCache { cache })
}
}

Expand Down Expand Up @@ -332,7 +343,7 @@ where
/// Build an in-memory cache.
pub async fn memory(
config: &FoyerCacheConfig,
) -> Result<Box<dyn super::Cache<K, V>>, Box<dyn ChromaError>> {
) -> Result<FoyerPlainCache<K, V>, Box<dyn ChromaError>> {
let tracing_config = TracingConfig::default();
tracing_config
.set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _));
Expand All @@ -348,7 +359,7 @@ where
let cache = CacheBuilder::new(config.capacity)
.with_shards(config.shards)
.build();
Ok(Box::new(FoyerPlainCache { cache }))
Ok(FoyerPlainCache { cache })
}
}

Expand All @@ -375,3 +386,10 @@ where
Ok(())
}
}

impl<K, V> super::PersistentCache<K, V> for FoyerPlainCache<K, V>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static,
V: Clone + Send + Sync + Weighted + StorageValue + 'static,
{
}
32 changes: 30 additions & 2 deletions rust/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ pub trait Weighted {
fn weight(&self) -> usize;
}

/// Create a new cache from the provided config.
/// Create a new cache from the provided config. This is solely for caches that cannot implement
/// the persistent cache trait. Attempts to construct a disk-based cache will return an error.
pub async fn from_config<K, V>(
config: &CacheConfig,
) -> Result<Box<dyn Cache<K, V>>, Box<dyn ChromaError>>
Expand All @@ -102,8 +103,35 @@ where
}
}

/// Create a new cache from the provided config.
pub async fn from_config_persistent<K, V>(
config: &CacheConfig,
) -> Result<Box<dyn PersistentCache<K, V>>, Box<dyn ChromaError>>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static,
V: Clone + Send + Sync + StorageValue + Weighted + 'static,
{
match config {
CacheConfig::Unbounded(unbounded_config) => {
Ok(Box::new(UnboundedCache::new(unbounded_config)))
}
CacheConfig::Memory(c) => Ok(c.build_memory_persistent().await?),
CacheConfig::Disk(c) => Ok(c.build_hybrid().await? as _),
CacheConfig::Nop => Ok(Box::new(NopCache)),
}
}

/// Create a new cache for testing purposes.
pub fn new_cache_for_test<K, V>() -> Box<dyn PersistentCache<K, V>>
where
K: Send + Sync + Clone + Eq + PartialEq + Hash + StorageKey + 'static,
V: Send + Sync + Clone + Weighted + StorageValue + 'static,
{
Box::new(UnboundedCache::new(&UnboundedCacheConfig::default()))
}

/// Create a new cache for testing purposes.
pub fn new_cache_for_test<K, V>() -> Box<dyn Cache<K, V>>
pub fn new_non_persistent_cache_for_test<K, V>() -> Box<dyn Cache<K, V>>
where
K: Send + Sync + Clone + Eq + PartialEq + Hash + 'static,
V: Send + Sync + Clone + Weighted + 'static,
Expand Down
9 changes: 8 additions & 1 deletion rust/cache/src/nop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::hash::Hash;

use super::{CacheError, Weighted};
use super::{CacheError, StorageKey, StorageValue, Weighted};

/// A zero-configuration cache that doesn't evict.
pub struct NopCache;
Expand All @@ -23,3 +23,10 @@ where
Ok(())
}
}

impl<K, V> super::PersistentCache<K, V> for NopCache
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static,
V: Clone + Send + Sync + Weighted + StorageValue + 'static,
{
}
13 changes: 10 additions & 3 deletions rust/cache/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use std::sync::Arc;

use parking_lot::RwLock;

use super::{CacheError, Weighted};
use super::{CacheError, StorageKey, StorageValue, Weighted};

/// A zero-configuration cache that doesn't evict.
/// Mostly useful for testing.
#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
pub struct UnboundedCacheConfig {}

impl UnboundedCacheConfig {
pub fn build<K, V>(&self) -> Box<dyn super::Cache<K, V>>
pub fn build<K, V>(&self) -> UnboundedCache<K, V>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + Clone + Weighted + 'static,
{
Box::new(UnboundedCache::new(self))
UnboundedCache::new(self)
}
}

Expand Down Expand Up @@ -67,3 +67,10 @@ where
Ok(())
}
}

impl<K, V> super::PersistentCache<K, V> for UnboundedCache<K, V>
where
K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static,
V: Clone + Send + Sync + Weighted + StorageValue + 'static,
{
}
4 changes: 2 additions & 2 deletions rust/index/benches/full_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ const BLOCK_SIZE: usize = 8 * 1024 * 1024; // 8MB

fn create_blockfile_provider(storage_dir: &str) -> BlockfileProvider {
let storage = Storage::Local(LocalStorage::new(storage_dir));
let block_cache = UnboundedCacheConfig {}.build();
let sparse_index_cache = UnboundedCacheConfig {}.build();
let block_cache = Box::new(UnboundedCacheConfig {}.build()) as _;
let sparse_index_cache = Box::new(UnboundedCacheConfig {}.build()) as _;
let arrow_blockfile_provider =
ArrowBlockfileProvider::new(storage.clone(), BLOCK_SIZE, block_cache, sparse_index_cache);
BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider)
Expand Down
2 changes: 1 addition & 1 deletion rust/index/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.flag("-std=c++11")
.flag("-Ofast")
.flag("-DHAVE_CXX0X")
.flag("-fpic")
.flag("-fPIC")
.flag("-ftree-vectorize")
.flag("-w")
.compile("bindings");
Expand Down
4 changes: 2 additions & 2 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ pub enum HnswIndexProviderFileError {
#[cfg(test)]
mod tests {
use super::*;
use chroma_cache::new_cache_for_test;
use chroma_cache::new_non_persistent_cache_for_test;
use chroma_storage::local::LocalStorage;
use chroma_types::SegmentType;
use std::collections::HashMap;
Expand All @@ -582,7 +582,7 @@ mod tests {
tokio::fs::create_dir_all(&hnsw_tmp_path).await.unwrap();

let storage = Storage::Local(LocalStorage::new(storage_dir.to_str().unwrap()));
let cache = new_cache_for_test();
let cache = new_non_persistent_cache_for_test();
let provider = HnswIndexProvider::new(storage, hnsw_tmp_path, cache);
let segment = Segment {
id: Uuid::new_v4(),
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ mod tests {
use crate::log::log::InternalLogRecord;
use crate::sysdb::test_sysdb::TestSysDb;
use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES;
use chroma_cache::new_cache_for_test;
use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test};
use chroma_storage::local::LocalStorage;
use chroma_types::{Collection, LogRecord, Operation, OperationRecord, Segment};
use std::collections::HashMap;
Expand Down Expand Up @@ -527,7 +527,7 @@ mod tests {

let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let hnsw_cache = new_cache_for_test();
let hnsw_cache = new_non_persistent_cache_for_test();
let mut manager = CompactionManager::new(
scheduler,
log,
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ mod tests {
use crate::system;
use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES;
#[cfg(test)]
use chroma_cache::new_cache_for_test;
use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test};
use chroma_proto::debug_client::DebugClient;
use chroma_storage::{local::LocalStorage, Storage};
use tempfile::tempdir;
Expand All @@ -662,7 +662,7 @@ mod tests {
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let hnsw_index_cache = new_cache_for_test();
let hnsw_index_cache = new_non_persistent_cache_for_test();
let port = random_port::PortPicker::new().pick().unwrap();
let mut server = WorkerServer {
dispatcher: None,
Expand Down

0 comments on commit 97a14a5

Please sign in to comment.