Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid error when disk cache miss #790

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! An ObjectStore implementation with disk as cache.
//! The disk cache is a read-through caching, with page as its minimal cache
Expand Down Expand Up @@ -169,14 +169,23 @@ impl DiskCache {
self.update_cache(filename, None).await
}

async fn get(&self, key: &str) -> Result<Option<Bytes>> {
async fn get(&self, key: &str) -> Option<Bytes> {
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
let mut cache = self.cache.lock().await;
if cache.get(key).is_some() {
// TODO: release lock when doing IO
return self.read_bytes(key).await.map(Some);
match self.read_bytes(key).await {
Ok(v) => Some(v),
Err(e) => {
error!(
"Read disk cache failed but ignored, key:{}, err:{}.",
key, e
);
None
}
}
} else {
None
}

Ok(None)
}

async fn persist_bytes(&self, filename: &str, value: Bytes) -> Result<()> {
Expand Down Expand Up @@ -437,7 +446,7 @@ impl ObjectStore for DiskCacheStore {
let mut missing_ranges = Vec::new();
for range in aligned_ranges {
let cache_key = Self::cache_key(location, &range);
if let Some(bytes) = self.cache.get(&cache_key).await? {
if let Some(bytes) = self.cache.get(&cache_key).await {
ranged_bytes.insert(range.start, bytes);
} else {
missing_ranges.push(range);
Expand All @@ -447,6 +456,7 @@ impl ObjectStore for DiskCacheStore {
for range in missing_ranges {
let range_start = range.start;
let cache_key = Self::cache_key(location, &range);
// TODO: we should use get_ranges here.
let bytes = self.underlying_store.get_range(location, range).await?;
self.cache.insert(cache_key, bytes.clone()).await?;
ranged_bytes.insert(range_start, bytes);
Expand Down Expand Up @@ -782,4 +792,66 @@ mod test {
assert_eq!(actual, expect);
}
}

#[tokio::test]
async fn corrupted_disk_cache() {
let StoreWithCacheDir {
inner: store,
cache_dir,
} = prepare_store(16, 1024).await;
let test_file_name = "corrupted_disk_cache_file";
let test_file_path = Path::from(test_file_name);
let test_file_bytes = Bytes::from("corrupted_disk_cache_file_data");

// Put data into store and get it to let the cache load the data.
store
.put(&test_file_path, test_file_bytes.clone())
.await
.unwrap();

// The data should be in the cache.
let got_bytes = store
.get_range(&test_file_path, 0..test_file_bytes.len())
.await
.unwrap();
assert_eq!(got_bytes, test_file_bytes);

// Corrupt files in the cache dir.
let mut cache_read_dir = tokio::fs::read_dir(cache_dir.as_ref()).await.unwrap();
while let Some(entry) = cache_read_dir.next_entry().await.unwrap() {
let path_buf = entry.path();
let path = path_buf.to_str().unwrap();
if path.contains(test_file_name) {
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.open(path)
.await
.unwrap();
file.write_all(b"corrupted").await.unwrap();
}
}

// The data should be removed from the cache.
let got_bytes = store
.get_range(&test_file_path, 0..test_file_bytes.len())
.await
.unwrap();
assert_eq!(got_bytes, test_file_bytes);
// The cache should be updated.
let mut cache_read_dir = tokio::fs::read_dir(cache_dir.as_ref()).await.unwrap();
while let Some(entry) = cache_read_dir.next_entry().await.unwrap() {
let path_buf = entry.path();
let path = path_buf.to_str().unwrap();
if path.contains(test_file_name) {
let mut file = tokio::fs::OpenOptions::new()
.read(true)
.open(path)
.await
.unwrap();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await.unwrap();
assert_ne!(buffer, b"corrupted");
}
}
}
}