Skip to content

Commit

Permalink
in hash calc, delete old cache files that will not be used earlier (s…
Browse files Browse the repository at this point in the history
…olana-labs#33432)

* in hash calc, delete old cache files that will not be used earlier

* only delete if supposed to

* fmt
  • Loading branch information
jeffwashington authored Oct 9, 2023
1 parent c924719 commit 0526775
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 86 deletions.
179 changes: 105 additions & 74 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ impl<'a> StoreTo<'a> {
}
}

enum ScanAccountStorageResult {
/// this data has already been scanned and cached
CacheFileAlreadyExists(CacheHashDataFileReference),
/// this data needs to be scanned and cached
CacheFileNeedsToBeCreated((String, Range<Slot>)),
}

#[derive(Default, Debug)]
/// hold alive accounts
/// alive means in the accounts index
Expand Down Expand Up @@ -7222,90 +7229,114 @@ impl AccountsDb {
.saturating_sub(slots_per_epoch);

stats.scan_chunks = splitter.chunk_count;
(0..splitter.chunk_count)
.into_par_iter()
.map(|chunk| {
let mut scanner = scanner.clone();

let cache_files = (0..splitter.chunk_count)
.into_par_iter()
.filter_map(|chunk| {
let range_this_chunk = splitter.get_slot_range(chunk)?;

let file_name = {
let mut load_from_cache = true;
let mut hasher = hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
let is_first_scan_pass = bin_range.start == 0;

// calculate hash representing all storages in this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
if is_first_scan_pass && slot < one_epoch_old {
self.update_old_slot_stats(stats, storage);
}
if !Self::hash_storage_info(&mut hasher, storage, slot) {
load_from_cache = false;
break;
}
let mut load_from_cache = true;
let mut hasher = hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
let is_first_scan_pass = bin_range.start == 0;

// calculate hash representing all storages in this chunk
let mut empty = true;
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
empty = false;
if is_first_scan_pass && slot < one_epoch_old {
self.update_old_slot_stats(stats, storage);
}
// we have a hash value for the storages in this chunk
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{:016x}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
if load_from_cache {
if let Ok(mapped_file) =
cache_hash_data.get_file_reference_to_map_later(&file_name)
{
return Some(mapped_file);
}
if !Self::hash_storage_info(&mut hasher, storage, slot) {
load_from_cache = false;
break;
}
}
if empty {
return None;
}
// we have a hash value for the storages in this chunk
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{:016x}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
if load_from_cache {
if let Ok(mapped_file) =
cache_hash_data.get_file_reference_to_map_later(&file_name)
{
return Some(ScanAccountStorageResult::CacheFileAlreadyExists(
mapped_file,
));
}
}

// fall through and load normally - we failed to load from a cache file
file_name
};
// fall through and load normally - we failed to load from a cache file but there are storages present
Some(ScanAccountStorageResult::CacheFileNeedsToBeCreated((
file_name,
range_this_chunk,
)))
})
.collect::<Vec<_>>();

let mut init_accum = true;
// load from cache failed, so create the cache file for this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
let ancient = slot < oldest_non_ancient_slot;
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
if init_accum {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
init_accum = false;
}
scanner.set_slot(slot);
// deletes the old files that will not be used before creating new ones
cache_hash_data.delete_old_cache_files();

Self::scan_single_account_storage(storage, &mut scanner);
});
if ancient {
stats
.sum_ancient_scans_us
.fetch_add(scan_us, Ordering::Relaxed);
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
stats
.longest_ancient_scan_us
.fetch_max(scan_us, Ordering::Relaxed);
cache_files
.into_par_iter()
.map(|chunk| {
match chunk {
ScanAccountStorageResult::CacheFileAlreadyExists(file) => Some(file),
ScanAccountStorageResult::CacheFileNeedsToBeCreated((
file_name,
range_this_chunk,
)) => {
let mut scanner = scanner.clone();
let mut init_accum = true;
// load from cache failed, so create the cache file for this chunk
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
let ancient = slot < oldest_non_ancient_slot;
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
if init_accum {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
init_accum = false;
}
scanner.set_slot(slot);

Self::scan_single_account_storage(storage, &mut scanner);
});
if ancient {
stats
.sum_ancient_scans_us
.fetch_add(scan_us, Ordering::Relaxed);
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
stats
.longest_ancient_scan_us
.fetch_max(scan_us, Ordering::Relaxed);
}
}
(!init_accum)
.then(|| {
let r = scanner.scanning_complete();
assert!(!file_name.is_empty());
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
// error if we can't write this
cache_hash_data.save(&file_name, &r).unwrap();
cache_hash_data
.get_file_reference_to_map_later(&file_name)
.unwrap()
})
})
.flatten()
}
}
(!init_accum)
.then(|| {
let r = scanner.scanning_complete();
assert!(!file_name.is_empty());
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
// error if we can't write this
cache_hash_data.save(&file_name, &r).unwrap();
cache_hash_data
.get_file_reference_to_map_later(&file_name)
.unwrap()
})
})
.flatten()
})
.filter_map(|x| x)
.collect()
Expand Down
28 changes: 16 additions & 12 deletions accounts-db/src/cache_hash_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ pub(crate) struct CacheHashData {

impl Drop for CacheHashData {
fn drop(&mut self) {
if self.should_delete_old_cache_files_on_drop {
self.delete_old_cache_files();
}
self.delete_old_cache_files();
self.stats.report();
}
}
Expand All @@ -224,18 +222,24 @@ impl CacheHashData {
result.get_cache_files();
result
}
fn delete_old_cache_files(&self) {
let old_cache_files = std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
if !old_cache_files.is_empty() {
self.stats
.unused_cache_files
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
for file_name in old_cache_files.iter() {
let result = self.cache_dir.join(file_name);
let _ = fs::remove_file(result);

/// delete all pre-existing files that will not be used
pub(crate) fn delete_old_cache_files(&self) {
if self.should_delete_old_cache_files_on_drop {
let old_cache_files =
std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
if !old_cache_files.is_empty() {
self.stats
.unused_cache_files
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
for file_name in old_cache_files.iter() {
let result = self.cache_dir.join(file_name);
let _ = fs::remove_file(result);
}
}
}
}

fn get_cache_files(&self) {
if self.cache_dir.is_dir() {
let dir = fs::read_dir(&self.cache_dir);
Expand Down

0 comments on commit 0526775

Please sign in to comment.