diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 6b535d7c30c29b..e5f5f204c6d946 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -8,7 +8,7 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError}, solana_ledger::{ blockstore::{Blockstore, PurgeType}, - blockstore_db::Result as BlockstoreResult, + blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF}, }, solana_measure::measure::Measure, solana_sdk::clock::Slot, @@ -100,43 +100,61 @@ impl LedgerCleanupService { root: Slot, max_ledger_shreds: u64, ) -> (bool, Slot, u64) { - let mut total_slots = Vec::new(); - let mut iterate_time = Measure::start("iterate_time"); - let mut total_shreds = 0; - for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() { - if i == 0 { - debug!("purge: searching from slot: {}", slot); - } - // Unrooted slots are not eligible for cleaning - if slot > root { - break; - } - // This method not exact since non-full slots will have holes - total_shreds += meta.received; - total_slots.push((slot, meta.received)); - } - iterate_time.stop(); + let data_shred_cf_name = DATA_SHRED_CF.to_string(); + + let live_files = blockstore + .live_files_metadata() + .expect("Blockstore::live_files_metadata()"); + let num_shreds = live_files + .iter() + .filter(|live_file| live_file.column_family_name == data_shred_cf_name) + .map(|file_meta| file_meta.num_entries) + .sum(); + + // Using the difference between the lowest and highest slot seen will + // result in overestimating the number of slots in the blockstore since + // there are likely to be some missing slots, such as when a leader is + // delinquent for their leader slots. + // + // With the below calculations, we will then end up underestimating the + // mean number of shreds per slot present in the blockstore which will + // result in cleaning more slots than necessary to get us + // below max_ledger_shreds. + // + // Given that the service runs on an interval, this is good because it + // means that we are building some headroom so the peak number of alive + // shreds doesn't get too large before the service's next run. + let lowest_slot = blockstore.lowest_slot(); + let highest_slot = blockstore + .highest_slot() + .expect("Blockstore::highest_slot()") + .unwrap_or(lowest_slot); + // The + 1 ensures we count the correct number of slots and that the + // divisor in the subsequent division is strictly >= 1. + let num_slots = highest_slot - lowest_slot + 1; + let mean_shreds_per_slot = num_shreds / num_slots; info!( - "total_slots={} total_shreds={} max_ledger_shreds={}, {}", - total_slots.len(), - total_shreds, - max_ledger_shreds, - iterate_time + "{} alive shreds in slots [{}, {}], mean of {} shreds per slot", + num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot ); - if total_shreds < max_ledger_shreds { - return (false, 0, total_shreds); - } - let mut num_shreds_to_clean = 0; - let mut lowest_cleanup_slot = total_slots[0].0; - for (slot, num_shreds) in total_slots.iter().rev() { - num_shreds_to_clean += *num_shreds; - if num_shreds_to_clean > max_ledger_shreds { - lowest_cleanup_slot = *slot; - break; - } + + if num_shreds <= max_ledger_shreds { + return (false, 0, num_shreds); } - (true, lowest_cleanup_slot, total_shreds) + // Add an extra (mean_shreds_per_slot - 1) in the numerator + // so that our integer division rounds up + let num_slots_to_clean = (num_shreds - max_ledger_shreds + mean_shreds_per_slot - 1) + .checked_div(mean_shreds_per_slot); + + if let Some(num_slots_to_clean) = num_slots_to_clean { + // Ensure we don't cleanup anything past the last root we saw + let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root); + (true, lowest_cleanup_slot, num_shreds) + } else { + warn!("calculated mean of 0 shreds per slot, skipping cleanup"); + (false, 0, num_shreds) + } } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -270,13 +288,26 @@ mod tests { solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete}, }; + + fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore { + // The find_slots_to_clean() routine uses a method that queries data + // from RocksDB SST files. On a running validator, these are created + // fairly reguarly as new data is coming in and contents of memory are + // pushed to disk. In a unit test environment, we aren't pushing nearly + // enough data for this to happen organically. So, instead open and + // close the Blockstore which will perform the flush to SSTs. + let ledger_path = blockstore.ledger_path().clone(); + drop(blockstore); + Blockstore::open(&ledger_path).unwrap() + } + #[test] fn test_find_slots_to_clean() { // LedgerCleanupService::find_slots_to_clean() does not modify the // Blockstore, so we can make repeated calls on the same slots solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); // Construct and build some shreds for slots [1, 10] let num_slots: u64 = 10; @@ -286,6 +317,9 @@ mod tests { assert!(shreds_per_slot > 1); blockstore.insert_shreds(shreds, None, false).unwrap(); + // Initiate a flush so inserted shreds found by find_slots_to_clean() + let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore)); + // Ensure no cleaning of slots > last_root let last_root = 0; let max_ledger_shreds = 0; @@ -294,11 +328,11 @@ mod tests { // Slot 0 will exist in blockstore with zero shreds since it is slot // 1's parent. Thus, slot 0 will be identified for clean. assert!(should_clean && lowest_purged == 0); - // Now, set max_ledger_shreds to 1 so that slot 0 is left alone + // Now, set max_ledger_shreds to 1, slot 0 still eligible for clean let max_ledger_shreds = 1; let (should_clean, lowest_purged, _) = LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds); - assert!(!should_clean && lowest_purged == 0); + assert!(should_clean && lowest_purged == 0); // Ensure no cleaning if blockstore contains fewer than max_ledger_shreds let last_root = num_slots; @@ -340,7 +374,9 @@ mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); - let blockstore = Arc::new(blockstore); + + // Initiate a flush so inserted shreds found by find_slots_to_clean() + let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore)); let (sender, receiver) = unbounded(); //send a signal to kill all but 5 shreds, which will be in the newest slots diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 7801babc93a774..e4d8c3ab6fc954 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -69,7 +69,7 @@ const ROOT_CF: &str = "root"; /// Column family for indexes const INDEX_CF: &str = "index"; /// Column family for Data Shreds -const DATA_SHRED_CF: &str = "data_shred"; +pub const DATA_SHRED_CF: &str = "data_shred"; /// Column family for Code Shreds const CODE_SHRED_CF: &str = "code_shred"; /// Column family for Transaction Status