From 97a14a53607b798483285dee9555e3809303d229 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Fri, 4 Oct 2024 16:32:30 -0700 Subject: [PATCH] [ENH] Make block and sparse index caches use persistent type. End-to-end tests confirm that with this change foyer will initialize the disk path (assuming it's writable) and instantiate a disk. A memory cache will also instantiate. Note that the PersistentCache type means that the key and value implement StorageKey and StorageValue, so it's totally acceptable for every non-persistent cache to implement the trait. This means that any place that takes a persistent cache will also take the non-persistent versions. It's trait logic, not "is a". --- Cargo.lock | 1 - rust/blockstore/src/arrow/provider.rs | 22 ++++++------- rust/blockstore/src/provider.rs | 6 ++-- rust/cache/src/foyer.rs | 30 +++++++++++++---- rust/cache/src/lib.rs | 32 +++++++++++++++++-- rust/cache/src/nop.rs | 9 +++++- rust/cache/src/unbounded.rs | 13 ++++++-- rust/index/benches/full_text.rs | 4 +-- rust/index/build.rs | 2 +- rust/index/src/hnsw_provider.rs | 4 +-- .../src/compactor/compaction_manager.rs | 4 +-- rust/worker/src/server.rs | 4 +-- 12 files changed, 95 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e5efad13aba..049f867ea6ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6159,7 +6159,6 @@ dependencies = [ "chroma-error", "chroma-index", "chroma-storage", - "chroma-test", "chroma-types", "criterion", "figment", diff --git a/rust/blockstore/src/arrow/provider.rs b/rust/blockstore/src/arrow/provider.rs index 0d3cfe012418..2eadd7c9b9b3 100644 --- a/rust/blockstore/src/arrow/provider.rs +++ b/rust/blockstore/src/arrow/provider.rs @@ -12,7 +12,7 @@ use crate::{ BlockfileReader, BlockfileWriter, Key, Value, }; use async_trait::async_trait; -use chroma_cache::{Cache, CacheError}; +use chroma_cache::{CacheError, PersistentCache}; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_storage::Storage; @@ -34,8 +34,8 @@ impl ArrowBlockfileProvider { pub fn new( storage: Storage, max_block_size_bytes: usize, - block_cache: Box>, - sparse_index_cache: Box>, + block_cache: Box>, + sparse_index_cache: Box>, ) -> Self { Self { block_manager: BlockManager::new(storage.clone(), max_block_size_bytes, block_cache), @@ -114,7 +114,7 @@ impl Configurable<(ArrowBlockfileProviderConfig, Storage)> for ArrowBlockfilePro config: &(ArrowBlockfileProviderConfig, Storage), ) -> Result> { let (blockfile_config, storage) = config; - let block_cache = match chroma_cache::from_config( + let block_cache = match chroma_cache::from_config_persistent( &blockfile_config.block_manager_config.block_cache_config, ) .await @@ -124,7 +124,7 @@ impl Configurable<(ArrowBlockfileProviderConfig, Storage)> for ArrowBlockfilePro return Err(e); } }; - let sparse_index_cache = match chroma_cache::from_config( + let sparse_index_cache = match chroma_cache::from_config_persistent( &blockfile_config .sparse_index_manager_config .sparse_index_cache_config, @@ -187,7 +187,7 @@ impl ChromaError for ForkError { /// is a placeholder for that. #[derive(Clone)] pub(super) struct BlockManager { - block_cache: Arc>, + block_cache: Arc>, storage: Storage, max_block_size_bytes: usize, write_mutex: Arc>, @@ -197,9 +197,9 @@ impl BlockManager { pub(super) fn new( storage: Storage, max_block_size_bytes: usize, - block_cache: Box>, + block_cache: Box>, ) -> Self { - let block_cache: Arc> = block_cache.into(); + let block_cache: Arc> = block_cache.into(); Self { block_cache, storage, @@ -378,13 +378,13 @@ impl ChromaError for SparseIndexManagerError { #[derive(Clone)] pub(super) struct SparseIndexManager { - cache: Arc>, + cache: Arc>, storage: Storage, } impl SparseIndexManager { - pub fn new(storage: Storage, cache: Box>) -> Self { - let cache: Arc> = cache.into(); + pub fn new(storage: Storage, cache: Box>) -> Self { + let cache: Arc> = cache.into(); Self { cache, storage } } diff --git a/rust/blockstore/src/provider.rs b/rust/blockstore/src/provider.rs index 4d2bcdfc0188..f663c1853d77 100644 --- a/rust/blockstore/src/provider.rs +++ b/rust/blockstore/src/provider.rs @@ -11,7 +11,7 @@ use super::memory::storage::{Readable, Writeable}; use super::types::BlockfileWriter; use super::{BlockfileReader, Key, Value}; use async_trait::async_trait; -use chroma_cache::Cache; +use chroma_cache::PersistentCache; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; use chroma_storage::Storage; @@ -47,8 +47,8 @@ impl BlockfileProvider { pub fn new_arrow( storage: Storage, max_block_size_bytes: usize, - block_cache: Box>, - sparse_index_cache: Box>, + block_cache: Box>, + sparse_index_cache: Box>, ) -> Self { BlockfileProvider::ArrowBlockfileProvider(ArrowBlockfileProvider::new( storage, diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs index f610179945bd..a7cf51c5b72b 100644 --- a/rust/cache/src/foyer.rs +++ b/rust/cache/src/foyer.rs @@ -183,7 +183,7 @@ impl FoyerCacheConfig { K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, V: Clone + Send + Sync + StorageValue + Weighted + 'static, { - FoyerHybridCache::hybrid(self).await + Ok(Box::new(FoyerHybridCache::hybrid(self).await?)) } /// Build an in-memory-only cache. @@ -194,7 +194,18 @@ impl FoyerCacheConfig { K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, V: Clone + Send + Sync + Weighted + 'static, { - FoyerPlainCache::memory(self).await + Ok(Box::new(FoyerPlainCache::memory(self).await?)) + } + + /// Build an in-memory-only cache. + pub async fn build_memory_persistent( + &self, + ) -> Result>, Box> + where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + Weighted + StorageValue + 'static, + { + Ok(Box::new(FoyerPlainCache::memory(self).await?)) } } @@ -215,7 +226,7 @@ where /// Build a hybrid disk and memory cache. pub async fn hybrid( config: &FoyerCacheConfig, - ) -> Result>, Box> { + ) -> Result, Box> { let tracing_config = TracingConfig::default(); tracing_config .set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _)); @@ -281,7 +292,7 @@ where e ))) as _ })?; - Ok(Box::new(FoyerHybridCache { cache })) + Ok(FoyerHybridCache { cache }) } } @@ -332,7 +343,7 @@ where /// Build an in-memory cache. pub async fn memory( config: &FoyerCacheConfig, - ) -> Result>, Box> { + ) -> Result, Box> { let tracing_config = TracingConfig::default(); tracing_config .set_record_hybrid_insert_threshold(Duration::from_micros(config.trace_insert_us as _)); @@ -348,7 +359,7 @@ where let cache = CacheBuilder::new(config.capacity) .with_shards(config.shards) .build(); - Ok(Box::new(FoyerPlainCache { cache })) + Ok(FoyerPlainCache { cache }) } } @@ -375,3 +386,10 @@ where Ok(()) } } + +impl super::PersistentCache for FoyerPlainCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + Weighted + StorageValue + 'static, +{ +} diff --git a/rust/cache/src/lib.rs b/rust/cache/src/lib.rs index 45553141da59..39fec6caf0a2 100644 --- a/rust/cache/src/lib.rs +++ b/rust/cache/src/lib.rs @@ -82,7 +82,8 @@ pub trait Weighted { fn weight(&self) -> usize; } -/// Create a new cache from the provided config. +/// Create a new cache from the provided config. This is solely for caches that cannot implement +/// the persistent cache trait. Attempts to construct a disk-based cache will return an error. pub async fn from_config( config: &CacheConfig, ) -> Result>, Box> @@ -102,8 +103,35 @@ where } } +/// Create a new cache from the provided config. +pub async fn from_config_persistent( + config: &CacheConfig, +) -> Result>, Box> +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + StorageValue + Weighted + 'static, +{ + match config { + CacheConfig::Unbounded(unbounded_config) => { + Ok(Box::new(UnboundedCache::new(unbounded_config))) + } + CacheConfig::Memory(c) => Ok(c.build_memory_persistent().await?), + CacheConfig::Disk(c) => Ok(c.build_hybrid().await? as _), + CacheConfig::Nop => Ok(Box::new(NopCache)), + } +} + +/// Create a new cache for testing purposes. +pub fn new_cache_for_test() -> Box> +where + K: Send + Sync + Clone + Eq + PartialEq + Hash + StorageKey + 'static, + V: Send + Sync + Clone + Weighted + StorageValue + 'static, +{ + Box::new(UnboundedCache::new(&UnboundedCacheConfig::default())) +} + /// Create a new cache for testing purposes. -pub fn new_cache_for_test() -> Box> +pub fn new_non_persistent_cache_for_test() -> Box> where K: Send + Sync + Clone + Eq + PartialEq + Hash + 'static, V: Send + Sync + Clone + Weighted + 'static, diff --git a/rust/cache/src/nop.rs b/rust/cache/src/nop.rs index 1e703c20b452..e699b4b9cc30 100644 --- a/rust/cache/src/nop.rs +++ b/rust/cache/src/nop.rs @@ -1,6 +1,6 @@ use std::hash::Hash; -use super::{CacheError, Weighted}; +use super::{CacheError, StorageKey, StorageValue, Weighted}; /// A zero-configuration cache that doesn't evict. pub struct NopCache; @@ -23,3 +23,10 @@ where Ok(()) } } + +impl super::PersistentCache for NopCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + Weighted + StorageValue + 'static, +{ +} diff --git a/rust/cache/src/unbounded.rs b/rust/cache/src/unbounded.rs index 49a3a30e15e6..1ca250cc2f83 100644 --- a/rust/cache/src/unbounded.rs +++ b/rust/cache/src/unbounded.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use parking_lot::RwLock; -use super::{CacheError, Weighted}; +use super::{CacheError, StorageKey, StorageValue, Weighted}; /// A zero-configuration cache that doesn't evict. /// Mostly useful for testing. @@ -12,12 +12,12 @@ use super::{CacheError, Weighted}; pub struct UnboundedCacheConfig {} impl UnboundedCacheConfig { - pub fn build(&self) -> Box> + pub fn build(&self) -> UnboundedCache where K: Clone + Send + Sync + Eq + PartialEq + Hash + 'static, V: Clone + Send + Sync + Clone + Weighted + 'static, { - Box::new(UnboundedCache::new(self)) + UnboundedCache::new(self) } } @@ -67,3 +67,10 @@ where Ok(()) } } + +impl super::PersistentCache for UnboundedCache +where + K: Clone + Send + Sync + Eq + PartialEq + Hash + StorageKey + 'static, + V: Clone + Send + Sync + Weighted + StorageValue + 'static, +{ +} diff --git a/rust/index/benches/full_text.rs b/rust/index/benches/full_text.rs index a7a9e4d9cc32..10d498f3b4b4 100644 --- a/rust/index/benches/full_text.rs +++ b/rust/index/benches/full_text.rs @@ -82,8 +82,8 @@ const BLOCK_SIZE: usize = 8 * 1024 * 1024; // 8MB fn create_blockfile_provider(storage_dir: &str) -> BlockfileProvider { let storage = Storage::Local(LocalStorage::new(storage_dir)); - let block_cache = UnboundedCacheConfig {}.build(); - let sparse_index_cache = UnboundedCacheConfig {}.build(); + let block_cache = Box::new(UnboundedCacheConfig {}.build()) as _; + let sparse_index_cache = Box::new(UnboundedCacheConfig {}.build()) as _; let arrow_blockfile_provider = ArrowBlockfileProvider::new(storage.clone(), BLOCK_SIZE, block_cache, sparse_index_cache); BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider) diff --git a/rust/index/build.rs b/rust/index/build.rs index abe688f789e9..b1bc0e7082f6 100644 --- a/rust/index/build.rs +++ b/rust/index/build.rs @@ -8,7 +8,7 @@ fn main() -> Result<(), Box> { .flag("-std=c++11") .flag("-Ofast") .flag("-DHAVE_CXX0X") - .flag("-fpic") + .flag("-fPIC") .flag("-ftree-vectorize") .flag("-w") .compile("bindings"); diff --git a/rust/index/src/hnsw_provider.rs b/rust/index/src/hnsw_provider.rs index 09aa1cacc1f4..03d3874681d8 100644 --- a/rust/index/src/hnsw_provider.rs +++ b/rust/index/src/hnsw_provider.rs @@ -568,7 +568,7 @@ pub enum HnswIndexProviderFileError { #[cfg(test)] mod tests { use super::*; - use chroma_cache::new_cache_for_test; + use chroma_cache::new_non_persistent_cache_for_test; use chroma_storage::local::LocalStorage; use chroma_types::SegmentType; use std::collections::HashMap; @@ -582,7 +582,7 @@ mod tests { tokio::fs::create_dir_all(&hnsw_tmp_path).await.unwrap(); let storage = Storage::Local(LocalStorage::new(storage_dir.to_str().unwrap())); - let cache = new_cache_for_test(); + let cache = new_non_persistent_cache_for_test(); let provider = HnswIndexProvider::new(storage, hnsw_tmp_path, cache); let segment = Segment { id: Uuid::new_v4(), diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 210d43edb4af..67df4c3b0520 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -338,7 +338,7 @@ mod tests { use crate::log::log::InternalLogRecord; use crate::sysdb::test_sysdb::TestSysDb; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; - use chroma_cache::new_cache_for_test; + use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test}; use chroma_storage::local::LocalStorage; use chroma_types::{Collection, LogRecord, Operation, OperationRecord, Segment}; use std::collections::HashMap; @@ -527,7 +527,7 @@ mod tests { let block_cache = new_cache_for_test(); let sparse_index_cache = new_cache_for_test(); - let hnsw_cache = new_cache_for_test(); + let hnsw_cache = new_non_persistent_cache_for_test(); let mut manager = CompactionManager::new( scheduler, log, diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index 2e9731254ffc..664352c6b712 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -649,7 +649,7 @@ mod tests { use crate::system; use chroma_blockstore::arrow::config::TEST_MAX_BLOCK_SIZE_BYTES; #[cfg(test)] - use chroma_cache::new_cache_for_test; + use chroma_cache::{new_cache_for_test, new_non_persistent_cache_for_test}; use chroma_proto::debug_client::DebugClient; use chroma_storage::{local::LocalStorage, Storage}; use tempfile::tempdir; @@ -662,7 +662,7 @@ mod tests { let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); let block_cache = new_cache_for_test(); let sparse_index_cache = new_cache_for_test(); - let hnsw_index_cache = new_cache_for_test(); + let hnsw_index_cache = new_non_persistent_cache_for_test(); let port = random_port::PortPicker::new().pick().unwrap(); let mut server = WorkerServer { dispatcher: None,