diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 6b535d7c30c29b..4570892bf28d49 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -100,43 +100,64 @@ impl LedgerCleanupService { root: Slot, max_ledger_shreds: u64, ) -> (bool, Slot, u64) { - let mut total_slots = Vec::new(); - let mut iterate_time = Measure::start("iterate_time"); - let mut total_shreds = 0; - for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() { - if i == 0 { - debug!("purge: searching from slot: {}", slot); - } - // Unrooted slots are not eligible for cleaning - if slot > root { - break; - } - // This method not exact since non-full slots will have holes - total_shreds += meta.received; - total_slots.push((slot, meta.received)); - } - iterate_time.stop(); + // TODO: de-dupe this constant + let data_shred_cf_name = "data_shred".to_string(); + + let mut live_file_time = Measure::start("live_file_time"); + let live_files = blockstore + .live_files_metadata() + .expect("Blockstore::live_files_metadata()"); + let num_shreds = live_files + .iter() + .filter(|live_file| live_file.column_family_name == data_shred_cf_name) + .map(|file_meta| file_meta.num_entries) + .sum(); + live_file_time.stop(); + + // Using the difference between the lowest and highest slot will result + // in overestimating the number of slots in the blockstore since there + // are likely to be some missing slots, such as when a leader is + // delinquent for their leader slots. + // + // With the below calculations, we will then end up underestimating the + // mean number of shreds per slot present in the blockstore which will + // result in cleaning more slots than necessary to get us + // below max_ledger_shreds. + // + // Given that the service runs on an interval, this is good because it + // means that we are building some headroom so the peak number of alive + // shreds doesn't get too large before the service's next run. + let lowest_slot = blockstore.lowest_slot(); + let highest_slot = blockstore + .highest_slot() + .expect("Blockstore::highest_slot()") + .unwrap_or(lowest_slot); + // The + 1 ensures we count the correct number of slots and that the + // divisor in the subsequent division is strictly >= 1. + let num_slots = highest_slot - lowest_slot + 1; + let mean_shreds_per_slot = num_shreds / num_slots; info!( - "total_slots={} total_shreds={} max_ledger_shreds={}, {}", - total_slots.len(), - total_shreds, - max_ledger_shreds, - iterate_time + "{} alive shreds in slots [{}, {}], mean of {} shreds per slot", + num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot ); - if total_shreds < max_ledger_shreds { - return (false, 0, total_shreds); - } - let mut num_shreds_to_clean = 0; - let mut lowest_cleanup_slot = total_slots[0].0; - for (slot, num_shreds) in total_slots.iter().rev() { - num_shreds_to_clean += *num_shreds; - if num_shreds_to_clean > max_ledger_shreds { - lowest_cleanup_slot = *slot; - break; - } + + if num_shreds <= max_ledger_shreds { + return (false, 0, num_shreds); } - (true, lowest_cleanup_slot, total_shreds) + // Add an extra (mean_shreds_per_slot - 1) in the numerator + // so that our integer division rounds up + let num_slots_to_clean = (num_shreds - max_ledger_shreds + mean_shreds_per_slot - 1) + .checked_div(mean_shreds_per_slot); + + if let Some(num_slots_to_clean) = num_slots_to_clean { + // Ensure we don't cleanup anything past the last root we saw + let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root); + (true, lowest_cleanup_slot, num_shreds) + } else { + warn!("calculated mean of 0 shreds per slot, skipping cleanup"); + (false, 0, num_shreds) + } } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -276,7 +297,7 @@ mod tests { // Blockstore, so we can make repeated calls on the same slots solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); // Construct and build some shreds for slots [1, 10] let num_slots: u64 = 10; @@ -286,6 +307,12 @@ mod tests { assert!(shreds_per_slot > 1); blockstore.insert_shreds(shreds, None, false).unwrap(); + // Explicitly drop and re-open Blockstore to force a flush as the method + // used in LedgerCleanupService::find_slots_to_clean(() doesn't find + // shreds in memtables, only in SST's + drop(blockstore); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + // Ensure no cleaning of slots > last_root let last_root = 0; let max_ledger_shreds = 0; @@ -294,11 +321,11 @@ mod tests { // Slot 0 will exist in blockstore with zero shreds since it is slot // 1's parent. Thus, slot 0 will be identified for clean. assert!(should_clean && lowest_purged == 0); - // Now, set max_ledger_shreds to 1 so that slot 0 is left alone + // Now, set max_ledger_shreds to 1, slot 0 still eligible for clean let max_ledger_shreds = 1; let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); - assert!(!should_clean && lowest_purged == 0); + assert!(should_clean && lowest_purged == 0); // Ensure no cleaning if blockstore contains fewer than max_ledger_shreds let last_root = num_slots; @@ -340,7 +367,12 @@ mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); - let blockstore = Arc::new(blockstore); + drop(blockstore); + + // Explicitly drop and re-open Blockstore to force a flush as the method + // used in LedgerCleanupService::find_slots_to_clean(() doesn't find + // shreds in memtables, only in SST's + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let (sender, receiver) = unbounded(); //send a signal to kill all but 5 shreds, which will be in the newest slots