From 4a4d9529b67bab7e01c5ef982c4b3579433b3ae3 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Wed, 7 Dec 2022 17:14:45 -0600 Subject: [PATCH] Reduce the amount of IO that LedgerCleanupService performs Currently, the cleanup service counts the number of shreds in the database by iterating the entire SlotMeta column and reading the number of received shreds for each slot. This gives us a fairly accurate count at the expense of performing a good amount of IO. Instead of counting the individual slots, use the live_files() rust-rocksdb entrypoint that we expose in Blockstore. This API allows us to get the number of entries (shreds) in the data shred column family by reading file metadata. This is much more efficient from IO perspective. --- core/src/ledger_cleanup_service.rs | 106 +++++++++++++++++++---------- 1 file changed, 69 insertions(+), 37 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 6b535d7c30c29b..4570892bf28d49 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -100,43 +100,64 @@ impl LedgerCleanupService { root: Slot, max_ledger_shreds: u64, ) -> (bool, Slot, u64) { - let mut total_slots = Vec::new(); - let mut iterate_time = Measure::start("iterate_time"); - let mut total_shreds = 0; - for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() { - if i == 0 { - debug!("purge: searching from slot: {}", slot); - } - // Unrooted slots are not eligible for cleaning - if slot > root { - break; - } - // This method not exact since non-full slots will have holes - total_shreds += meta.received; - total_slots.push((slot, meta.received)); - } - iterate_time.stop(); + // TODO: de-dupe this constant + let data_shred_cf_name = "data_shred".to_string(); + + let mut live_file_time = Measure::start("live_file_time"); + let live_files = blockstore + .live_files_metadata() + .expect("Blockstore::live_files_metadata()"); + let num_shreds = live_files + .iter() + .filter(|live_file| live_file.column_family_name == data_shred_cf_name) + .map(|file_meta| file_meta.num_entries) + .sum(); + live_file_time.stop(); + + // Using the difference between the lowest and highest slot will result + // in overestimating the number of slots in the blockstore since there + // are likely to be some missing slots, such as when a leader is + // delinquent for their leader slots. + // + // With the below calculations, we will then end up underestimating the + // mean number of shreds per slot present in the blockstore which will + // result in cleaning more slots than necessary to get us + // below max_ledger_shreds. + // + // Given that the service runs on an interval, this is good because it + // means that we are building some headroom so the peak number of alive + // shreds doesn't get too large before the service's next run. + let lowest_slot = blockstore.lowest_slot(); + let highest_slot = blockstore + .highest_slot() + .expect("Blockstore::highest_slot()") + .unwrap_or(lowest_slot); + // The + 1 ensures we count the correct number of slots and that the + // divisor in the subsequent division is strictly >= 1. + let num_slots = highest_slot - lowest_slot + 1; + let mean_shreds_per_slot = num_shreds / num_slots; info!( - "total_slots={} total_shreds={} max_ledger_shreds={}, {}", - total_slots.len(), - total_shreds, - max_ledger_shreds, - iterate_time + "{} alive shreds in slots [{}, {}], mean of {} shreds per slot", + num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot ); - if total_shreds < max_ledger_shreds { - return (false, 0, total_shreds); - } - let mut num_shreds_to_clean = 0; - let mut lowest_cleanup_slot = total_slots[0].0; - for (slot, num_shreds) in total_slots.iter().rev() { - num_shreds_to_clean += *num_shreds; - if num_shreds_to_clean > max_ledger_shreds { - lowest_cleanup_slot = *slot; - break; - } + + if num_shreds <= max_ledger_shreds { + return (false, 0, num_shreds); } - (true, lowest_cleanup_slot, total_shreds) + // Add an extra (mean_shreds_per_slot - 1) in the numerator + // so that our integer division rounds up + let num_slots_to_clean = (num_shreds - max_ledger_shreds + mean_shreds_per_slot - 1) + .checked_div(mean_shreds_per_slot); + + if let Some(num_slots_to_clean) = num_slots_to_clean { + // Ensure we don't cleanup anything past the last root we saw + let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root); + (true, lowest_cleanup_slot, num_shreds) + } else { + warn!("calculated mean of 0 shreds per slot, skipping cleanup"); + (false, 0, num_shreds) + } } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -276,7 +297,7 @@ mod tests { // Blockstore, so we can make repeated calls on the same slots solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); // Construct and build some shreds for slots [1, 10] let num_slots: u64 = 10; @@ -286,6 +307,12 @@ mod tests { assert!(shreds_per_slot > 1); blockstore.insert_shreds(shreds, None, false).unwrap(); + // Explicitly drop and re-open Blockstore to force a flush as the method + // used in LedgerCleanupService::find_slots_to_clean(() doesn't find + // shreds in memtables, only in SST's + drop(blockstore); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + // Ensure no cleaning of slots > last_root let last_root = 0; let max_ledger_shreds = 0; @@ -294,11 +321,11 @@ mod tests { // Slot 0 will exist in blockstore with zero shreds since it is slot // 1's parent. Thus, slot 0 will be identified for clean. assert!(should_clean && lowest_purged == 0); - // Now, set max_ledger_shreds to 1 so that slot 0 is left alone + // Now, set max_ledger_shreds to 1, slot 0 still eligible for clean let max_ledger_shreds = 1; let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); - assert!(!should_clean && lowest_purged == 0); + assert!(should_clean && lowest_purged == 0); // Ensure no cleaning if blockstore contains fewer than max_ledger_shreds let last_root = num_slots; @@ -340,7 +367,12 @@ mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); - let blockstore = Arc::new(blockstore); + drop(blockstore); + + // Explicitly drop and re-open Blockstore to force a flush as the method + // used in LedgerCleanupService::find_slots_to_clean(() doesn't find + // shreds in memtables, only in SST's + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let (sender, receiver) = unbounded(); //send a signal to kill all but 5 shreds, which will be in the newest slots