Skip to content

Commit

Permalink
Blockstore: clean/save old TransactionMemos sensibly (solana-labs#33678)
Browse files Browse the repository at this point in the history
* Convert OldestSlot to named struct

* Add clean_slot_0 to OldestSlot

* Set AtomicBool to true when all primary-index keys returning slot 0 should be purged

* Add PurgedFilter::clean_slot_0

* Use clean_slot_0 to preserve deprecated TransactionMemos

* Also set AtomicBool to true immediately on boot, if highest_primary_index_slot.is_none

* Add test

* Fixup test
  • Loading branch information
CriesofCarrots authored Oct 13, 2023
1 parent 53925b6 commit 01a3b1b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 5 deletions.
5 changes: 5 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,8 @@ impl Blockstore {
}
if highest_primary_index_slot.is_some() {
self.set_highest_primary_index_slot(highest_primary_index_slot);
} else {
self.db.set_clean_slot_0(true);
}
Ok(())
}
Expand All @@ -2167,6 +2169,9 @@ impl Blockstore {
self.transaction_status_index_cf.delete(1)?;
}
}
if w_highest_primary_index_slot.is_none() {
self.db.set_clean_slot_0(true);
}
Ok(())
}

Expand Down
105 changes: 105 additions & 0 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,109 @@ pub mod tests {
}
assert_eq!(count, max_slot - (oldest_slot - 1));
}

#[test]
fn test_purge_transaction_memos_compaction_filter() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let oldest_slot = 5;

fn random_signature() -> Signature {
use rand::Rng;

let mut key = [0u8; 64];
rand::thread_rng().fill(&mut key[..]);
Signature::from(key)
}

// Insert some deprecated TransactionMemos
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"this is a memo".to_string())
.unwrap();
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"another memo".to_string())
.unwrap();
// Set clean_slot_0 to false, since we have deprecated memos
blockstore.db.set_clean_slot_0(false);

// Insert some current TransactionMemos
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot - 1),
&"this is a new memo in slot 4".to_string(),
)
.unwrap();
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot),
&"this is a memo in slot 5 ".to_string(),
)
.unwrap();

let first_index = {
let mut memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
memos_iterator.next().unwrap().unwrap().0
};
let last_index = {
let mut memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::End);
memos_iterator.next().unwrap().unwrap().0
};

// Purge at slot 0 should not affect any memos
blockstore.db.set_oldest_slot(0);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let _item = item.unwrap();
count += 1;
}
assert_eq!(count, 4);

// Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4
blockstore.db.set_oldest_slot(oldest_slot);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let (key, _value) = item.unwrap();
let slot = <cf::TransactionMemos as Column>::index(&key).1;
assert!(slot == 0 || slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 3);

// Purge at oldest_slot with clean_slot_0 purges deprecated memos
blockstore.db.set_clean_slot_0(true);
blockstore
.db
.compact_range_cf::<cf::TransactionMemos>(&first_index, &last_index);
let memos_iterator = blockstore
.transaction_memos_cf
.iterator_cf_raw_key(IteratorMode::Start);
let mut count = 0;
for item in memos_iterator {
let (key, _value) = item.unwrap();
let slot = <cf::TransactionMemos as Column>::index(&key).1;
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 1);
}
}
32 changes: 27 additions & 5 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use {
marker::PhantomData,
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
},
Expand Down Expand Up @@ -348,15 +348,18 @@ pub mod columns {
}

#[derive(Default, Clone, Debug)]
struct OldestSlot(Arc<AtomicU64>);
struct OldestSlot {
slot: Arc<AtomicU64>,
clean_slot_0: Arc<AtomicBool>,
}

impl OldestSlot {
pub fn set(&self, oldest_slot: Slot) {
// this is independently used for compaction_filter without any data dependency.
// also, compaction_filters are created via its factories, creating short-lived copies of
// this atomic value for the single job of compaction. So, Relaxed store can be justified
// in total
self.0.store(oldest_slot, Ordering::Relaxed);
self.slot.store(oldest_slot, Ordering::Relaxed);
}

pub fn get(&self) -> Slot {
Expand All @@ -365,7 +368,15 @@ impl OldestSlot {
// requirement at the moment
// also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
// require strictly synchronized semantics in this regard
self.0.load(Ordering::Relaxed)
self.slot.load(Ordering::Relaxed)
}

pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
self.clean_slot_0.store(clean_slot_0, Ordering::Relaxed);
}

pub(crate) fn get_clean_slot_0(&self) -> bool {
self.clean_slot_0.load(Ordering::Relaxed)
}
}

Expand Down Expand Up @@ -1427,6 +1438,10 @@ impl Database {
self.backend.oldest_slot.set(oldest_slot);
}

pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) {
self.backend.oldest_slot.set_clean_slot_0(clean_slot_0);
}

pub fn live_files_metadata(&self) -> Result<Vec<LiveFile>> {
self.backend.live_files_metadata()
}
Expand Down Expand Up @@ -1835,6 +1850,10 @@ impl<'a> WriteBatch<'a> {
struct PurgedSlotFilter<C: Column + ColumnName> {
/// The oldest slot to keep; any slot < oldest_slot will be removed
oldest_slot: Slot,
/// Whether to preserve keys that return slot 0, even when oldest_slot > 0.
// This is used to delete old column data that wasn't keyed with a Slot, and so always returns
// `C::slot() == 0`
clean_slot_0: bool,
name: CString,
_phantom: PhantomData<C>,
}
Expand All @@ -1844,7 +1863,7 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
use rocksdb::CompactionDecision::*;

let slot_in_key = C::slot(C::index(key));
if slot_in_key >= self.oldest_slot {
if slot_in_key >= self.oldest_slot || (slot_in_key == 0 && !self.clean_slot_0) {
Keep
} else {
Remove
Expand All @@ -1867,8 +1886,10 @@ impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory

fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
let copied_oldest_slot = self.oldest_slot.get();
let copied_clean_slot_0 = self.oldest_slot.get_clean_slot_0();
PurgedSlotFilter::<C> {
oldest_slot: copied_oldest_slot,
clean_slot_0: copied_clean_slot_0,
name: CString::new(format!(
"purged_slot_filter({}, {:?})",
C::NAME,
Expand Down Expand Up @@ -2113,6 +2134,7 @@ pub mod tests {
is_manual_compaction: true,
};
let oldest_slot = OldestSlot::default();
oldest_slot.set_clean_slot_0(true);

let mut factory = PurgedSlotFilterFactory::<ShredData> {
oldest_slot: oldest_slot.clone(),
Expand Down

0 comments on commit 01a3b1b

Please sign in to comment.