Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 3, 2024
1 parent 985f10e commit a33b7b5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 36 deletions.
10 changes: 2 additions & 8 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use std::{env, path};

use common_base::readable_size::ReadableSize;
use common_telemetry::{error, info, warn};
use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
Expand Down Expand Up @@ -154,13 +154,7 @@ async fn build_cache_layer(

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.context(error::InitBackendSnafu)?;
let moved_cache_layer = cache_layer.clone();
tokio::spawn(async move {
if let Err(err) = moved_cache_layer.recover_cache().await {
error!(err; "Failed to recover file cache.")
}
});

cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
path, cache_capacity
Expand Down
33 changes: 25 additions & 8 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, Instant};

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -188,16 +188,16 @@ impl FileCache {
}
}

/// Recovers the index from local store.
pub(crate) async fn recover(&self) -> Result<()> {
let now = Instant::now();

async fn recover_inner(&self) -> Result<()> {
let mut lister = self
.local_store
.lister_with(FILE_DIR)
.metakey(Metakey::ContentLength)
.await
.context(OpenDalSnafu)?;

let now = Instant::now();

// Use i64 for total_size to reduce the risk of overflow.
// It is possible that the total size of the cache is larger than i32::MAX.
let (mut total_size, mut total_keys) = (0i64, 0);
Expand Down Expand Up @@ -225,10 +225,23 @@ impl FileCache {
total_size,
now.elapsed()
);

Ok(())
}

/// Recovers the index from local store.
pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
let moved_self = self.clone();
let handle = tokio::spawn(async move {
if let Err(err) = moved_self.recover_inner().await {
error!(err; "Failed to recover file cache.")
}
});

if sync {
let _ = handle.await;
}
}

/// Returns the cache file path for the key.
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
cache_file_path(FILE_DIR, key)
Expand Down Expand Up @@ -536,13 +549,17 @@ mod tests {
}

// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let cache = Arc::new(FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
));
// No entry before recovery.
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
.await
.is_none());
cache.recover().await.unwrap();
cache.recover(true).await;

// Check size.
cache.memory_index.run_pending_tasks().await;
Expand Down
9 changes: 2 additions & 7 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
Expand Down Expand Up @@ -68,12 +68,7 @@ impl WriteCache {
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
let moved_file_cache = file_cache.clone();
tokio::spawn(async move {
if let Err(err) = moved_file_cache.recover().await {
error!(err; "Failed to recover file cache.")
}
});
file_cache.recover(false).await;

Ok(Self {
file_cache,
Expand Down
1 change: 1 addition & 0 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ opendal = { version = "0.49", features = [
"services-s3",
] }
prometheus.workspace = true
tokio.workspace = true
uuid.workspace = true

[dev-dependencies]
Expand Down
27 changes: 17 additions & 10 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use opendal::Result;
mod read_cache;
use std::time::Instant;

use common_telemetry::info;
use common_telemetry::{error, info};
use read_cache::ReadCache;

/// An opendal layer with local LRU file cache supporting.
Expand All @@ -48,16 +48,23 @@ impl<C: Access> LruCacheLayer<C> {
}

/// Recovers cache
pub async fn recover_cache(&self) -> Result<()> {
pub async fn recover_cache(&self, sync: bool) {
let now = Instant::now();
let (entries, bytes) = self.read_cache.recover_cache().await?;
info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
);
Ok(())
let moved_read_cache = self.read_cache.clone();
let handle = tokio::spawn(async move {
match moved_read_cache.recover_cache().await {
Ok((entries, bytes)) => info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
),
Err(err) => error!(err; "Failed to recover file cache."),
}
});
if sync {
let _ = handle.await;
}
}

/// Returns true when the local cache contains the specific file
Expand Down
6 changes: 3 additions & 3 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
let file_cache = Arc::new(builder.build().unwrap());

let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache().await.unwrap();
cache_layer.recover_cache(true).await;
cache_layer
};

Expand Down Expand Up @@ -311,7 +311,7 @@ async fn test_object_store_cache_policy() -> Result<()> {

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache().await.unwrap();
cache_layer.recover_cache(true).await;
let store = store.layer(cache_layer.clone());

// create several object handler.
Expand Down Expand Up @@ -440,7 +440,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache().await.unwrap();
cache_layer.recover_cache(true).await;

// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
Expand Down

0 comments on commit a33b7b5

Please sign in to comment.