Skip to content

Commit

Permalink
refactor guard
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Sep 11, 2023
1 parent 758f4e6 commit 76b533e
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,22 +633,30 @@ impl DiskCacheStore {
rxs.push(rx);
}

let mut guard = ExecutionGuard::new(|| {
for cache_key in &need_fetch_block_cache_key {
let _ = self.request_notifiers.take_notifiers(cache_key);
}
});
if rxs.is_empty() {
// All ranges are not first, return directly.
return Ok(rxs);
}

let fetched_bytes = self
.underlying_store
.get_ranges(location, &need_fetch_block[..])
.await;
let fetched_bytes = {
// This guard will ensure notifiers being taken out when futures get cancelled
// during `get_ranges`.
let mut guard = ExecutionGuard::new(|| {
for cache_key in &need_fetch_block_cache_key {
let _ = self.request_notifiers.take_notifiers(cache_key);
}
});

let bytes = self
.underlying_store
.get_ranges(location, &need_fetch_block[..])
.await;

guard.cancel();
drop(guard);
guard.cancel();
bytes
};

// need take all correspond cache_key's notifiers from request_notifiers to
// prevent future cancelled
// Take all cache_key's notifiers out from request_notifiers immediately.
let notifiers_vec: Vec<_> = need_fetch_block_cache_key
.iter()
.map(|cache_key| self.request_notifiers.take_notifiers(cache_key).unwrap())
Expand Down

0 comments on commit 76b533e

Please sign in to comment.