Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang committed Sep 7, 2023
1 parent 82e454c commit 758f4e6
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ impl DiskCacheStore {

let mut guard = ExecutionGuard::new(|| {
for cache_key in &need_fetch_block_cache_key {
let _ = self.request_notifiers.take_notifiers(&cache_key);
let _ = self.request_notifiers.take_notifiers(cache_key);
}
});

Expand All @@ -645,11 +645,18 @@ impl DiskCacheStore {
.await;

guard.cancel();
drop(guard);

// need take all correspond cache_key's notifiers from request_notifiers to
// prevent future cancelled
let notifiers_vec: Vec<_> = need_fetch_block_cache_key
.iter()
.map(|cache_key| self.request_notifiers.take_notifiers(cache_key).unwrap())
.collect();

let fetched_bytes = match fetched_bytes {
Err(err) => {
for cache_key in &need_fetch_block_cache_key {
let notifiers = self.request_notifiers.take_notifiers(cache_key).unwrap();
for notifiers in notifiers_vec {
for notifier in notifiers {
if let Err(e) = notifier.send(Err(Error::WaitNotifier {
message: err.to_string(),
Expand All @@ -664,14 +671,12 @@ impl DiskCacheStore {
Ok(v) => v,
};

for (bytes, cache_key) in fetched_bytes
for ((bytes, notifiers), cache_key) in fetched_bytes
.into_iter()
.zip(need_fetch_block_cache_key.iter())
.zip(notifiers_vec.into_iter())
.zip(need_fetch_block_cache_key.into_iter())
{
let notifiers = self.request_notifiers.take_notifiers(cache_key).unwrap();
self.cache
.insert_data(cache_key.clone(), bytes.clone())
.await;
self.cache.insert_data(cache_key, bytes.clone()).await;
for notifier in notifiers {
if let Err(e) = notifier.send(Ok(bytes.clone())) {
error!("Failed to send notifier success result, err:{:?}.", e);
Expand Down

0 comments on commit 758f4e6

Please sign in to comment.