Skip to content

Commit

Permalink
feat: recover file cache index asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 3, 2024
1 parent 5bdea1a commit 985f10e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
9 changes: 7 additions & 2 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use std::{env, path};

use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
Expand Down Expand Up @@ -153,8 +153,13 @@ async fn build_cache_layer(
.context(error::InitBackendSnafu)?;

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
let moved_cache_layer = cache_layer.clone();
tokio::spawn(async move {
if let Err(err) = moved_cache_layer.recover_cache().await {
error!(err; "Failed to recover file cache.")
}
});

info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
Expand Down
13 changes: 9 additions & 4 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
Expand Down Expand Up @@ -67,11 +67,16 @@ impl WriteCache {
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
let moved_file_cache = file_cache.clone();
tokio::spawn(async move {
if let Err(err) = moved_file_cache.recover().await {
error!(err; "Failed to recover file cache.")
}
});

Ok(Self {
file_cache: Arc::new(file_cache),
file_cache,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
Expand Down
22 changes: 15 additions & 7 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use opendal::raw::{
};
use opendal::Result;
mod read_cache;
use std::time::Instant;

use common_telemetry::info;
use read_cache::ReadCache;

Expand All @@ -39,17 +41,23 @@ impl<C: Access> Clone for LruCacheLayer<C> {
}

impl<C: Access> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;
Ok(Self { read_cache })
}

/// Recovers cache
pub async fn recover_cache(&self) -> Result<()> {
let now = Instant::now();
let (entries, bytes) = self.read_cache.recover_cache().await?;
info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer",
entries, bytes
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
);

Ok(Self { read_cache })
Ok(())
}

/// Returns true when the local cache contains the specific file
Expand Down
10 changes: 7 additions & 3 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());

LruCacheLayer::new(file_cache, 32).await.unwrap()
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache().await.unwrap();
cache_layer
};

let store = OperatorBuilder::new(store)
Expand Down Expand Up @@ -308,7 +310,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = file_cache.clone();

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache().await.unwrap();
let store = store.layer(cache_layer.clone());

// create several object handler.
Expand Down Expand Up @@ -436,7 +439,8 @@ async fn test_object_store_cache_policy() -> Result<()> {

drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap();
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache().await.unwrap();

// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
Expand Down

0 comments on commit 985f10e

Please sign in to comment.