From 5b62f507d5c23c5e84e26ddad013745328fbf16d Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sat, 21 Oct 2023 04:49:28 +0000 Subject: [PATCH] add merkle root column in blockstore --- ledger/src/blockstore.rs | 421 +++++++++++++++++++++- ledger/src/blockstore/blockstore_purge.rs | 8 + ledger/src/blockstore_db.rs | 51 +++ ledger/src/blockstore_meta.rs | 27 ++ ledger/src/shred.rs | 4 + 5 files changed, 510 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index b7a592151e65ab..846344c927d38f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -214,6 +214,7 @@ pub struct Blockstore { program_costs_cf: LedgerColumn, bank_hash_cf: LedgerColumn, optimistic_slots_cf: LedgerColumn, + merkle_root_meta_cf: LedgerColumn, last_root: RwLock, insert_shreds_lock: Mutex<()>, new_shreds_signals: Mutex>>, @@ -315,6 +316,7 @@ impl Blockstore { let program_costs_cf = db.column(); let bank_hash_cf = db.column(); let optimistic_slots_cf = db.column(); + let merkle_root_meta_cf = db.column(); let db = Arc::new(db); @@ -352,6 +354,7 @@ impl Blockstore { program_costs_cf, bank_hash_cf, optimistic_slots_cf, + merkle_root_meta_cf, new_shreds_signals: Mutex::default(), completed_slots_senders: Mutex::default(), shred_timing_point_sender: None, @@ -721,6 +724,7 @@ impl Blockstore { self.program_costs_cf.submit_rocksdb_cf_metrics(); self.bank_hash_cf.submit_rocksdb_cf_metrics(); self.optimistic_slots_cf.submit_rocksdb_cf_metrics(); + self.merkle_root_meta_cf.submit_rocksdb_cf_metrics(); } fn try_shred_recovery( @@ -841,6 +845,7 @@ impl Blockstore { let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); let mut duplicate_shreds = vec![]; @@ -860,6 +865,7 @@ impl Blockstore { match self.check_insert_data_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -897,6 +903,7 @@ impl Blockstore { self.check_insert_coding_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, @@ -943,6 +950,7 @@ impl Blockstore { match self.check_insert_data_shred( shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -1008,6 +1016,10 @@ impl Blockstore { write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch.put::(erasure_set.store_key(), &merkle_root_meta)?; + } + for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { write_batch.put::(slot, &index_working_set_entry.index)?; @@ -1165,6 +1177,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1204,6 +1217,11 @@ impl Blockstore { .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); + let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { + self.merkle_root_meta(erasure_set) + .expect("Expect database get to succeed") + .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) + }); if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; @@ -1341,6 +1359,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1366,6 +1385,12 @@ impl Blockstore { ); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); + let erasure_set = shred.erasure_set(); + let _merkle_root_meta = merkle_root_metas.entry(erasure_set).or_insert_with(|| { + self.merkle_root_meta(erasure_set) + .expect("Expect database get to succeed") + .unwrap_or_else(|| MerkleRootMeta::from_shred(&shred)) + }); if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -1400,7 +1425,6 @@ impl Blockstore { } } - let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), @@ -3150,6 +3174,10 @@ impl Blockstore { .unwrap_or(false) } + fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { + self.merkle_root_meta_cf.get(erasure_set.store_key()) + } + pub fn insert_optimistic_slot( &self, slot: Slot, @@ -6686,6 +6714,394 @@ pub mod tests { ),); } + #[test] + fn test_merkle_root_metas_coding() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let slot = 1; + let index = 12; + let fec_set_index = 11; + let coding_shred = Shred::new_from_parity_shard( + slot, + index, + &[], // parity_shard + fec_set_index, + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + assert!(blockstore.check_insert_coding_shred( + coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.store_key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let new_coding_shred = Shred::new_from_parity_shard( + slot, + index + 1, + &[], // parity_shard + fec_set_index, + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_coding_shred = Shred::new_from_parity_shard( + slot, + new_index, + &[], // parity_shard + fec_set_index + 30, // fec_set_index + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_coding_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + + #[test] + fn test_merkle_root_metas_data() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let slot = 1; + let index = 12; + let fec_set_index = 11; + let data_shred = Shred::new_from_data( + slot, + index, + 1, // parent_offset + &[1, 1, 1], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index, + ); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut slot_meta_working_set = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + blockstore + .check_insert_data_shred( + data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.store_key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let new_data_shred = Shred::new_from_data( + slot, + index + 1, + 1, // parent_offset + &[2, 2, 2], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index, + ); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_data_shred = Shred::new_from_data( + slot, + new_index, + 1, // parent_offset + &[3, 3, 3], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index + 30, + ); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_data_shred.merkle_root().unwrap_or_default() + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + #[test] fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -6704,6 +7120,7 @@ pub mod tests { ); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); @@ -6711,6 +7128,7 @@ pub mod tests { assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -6726,6 +7144,7 @@ pub mod tests { assert!(!blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 9669f8bd305a00..f6b3662ed19e28 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -220,6 +220,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); match purge_type { PurgeType::Exact => { @@ -329,6 +333,10 @@ impl Blockstore { .db .delete_file_in_range_cf::(from_slot, to_slot) .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() } /// Returns true if the special columns, TransactionStatus and diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index b65df82ee00c9e..627e35e88ec82e 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -2,6 +2,7 @@ pub use rocksdb::Direction as IteratorDirection; use { crate::{ blockstore_meta, + blockstore_meta::MerkleRootMeta, blockstore_metrics::{ maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET, @@ -103,6 +104,8 @@ const BLOCK_HEIGHT_CF: &str = "block_height"; const PROGRAM_COSTS_CF: &str = "program_costs"; /// Column family for optimistic slots const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots"; +/// Column family for merkle roots +const MERKLE_ROOT_CF: &str = "merkle_root_meta"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -339,6 +342,19 @@ pub mod columns { /// * value type: [`blockstore_meta::OptimisticSlotMetaVersioned`] pub struct OptimisticSlots; + #[derive(Debug)] + /// The merkle root meta column + /// + /// Each merkle shred is part of a merkle tree for + /// its FEC set. This column stores that merkle root and associated + /// meta information about the first shred received. + /// + /// Its index type is (Slot, FEC) set index. + /// + /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u64)` + /// * value type: [`blockstore_meta::MerkleRootMeta`]` + pub struct MerkleRootMeta; + // When adding a new column ... // - Add struct below and implement `Column` and `ColumnName` traits // - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns() @@ -474,6 +490,7 @@ impl Rocks { new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), ] } @@ -501,6 +518,7 @@ impl Rocks { BlockHeight::NAME, ProgramCosts::NAME, OptimisticSlots::NAME, + MerkleRootMeta::NAME, ] } @@ -1227,6 +1245,39 @@ impl TypedColumn for columns::OptimisticSlots { type Type = blockstore_meta::OptimisticSlotMetaVersioned; } +impl Column for columns::MerkleRootMeta { + type Index = (Slot, u64); + + fn index(key: &[u8]) -> (Slot, u64) { + let slot = BigEndian::read_u64(&key[..8]); + let set_index = BigEndian::read_u64(&key[8..]); + + (slot, set_index) + } + + fn key((slot, set_index): (Slot, u64)) -> Vec { + let mut key = vec![0; 16]; + BigEndian::write_u64(&mut key[..8], slot); + BigEndian::write_u64(&mut key[8..], set_index); + key + } + + fn slot(index: Self::Index) -> Slot { + index.0 + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } +} + +impl ColumnName for columns::MerkleRootMeta { + const NAME: &'static str = MERKLE_ROOT_CF; +} +impl TypedColumn for columns::MerkleRootMeta { + type Type = MerkleRootMeta; +} + #[derive(Debug)] pub struct Database { backend: Arc, diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 79954ee96b6d04..abc3e5c11af717 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -138,6 +138,14 @@ pub(crate) struct ErasureConfig { num_coding: usize, } +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct MerkleRootMeta { + /// The merkle root + merkle_root: Hash, + /// The first received shred index + first_received_shred_index: u32, +} + #[derive(Deserialize, Serialize)] pub struct DuplicateSlotProof { #[serde(with = "serde_bytes")] @@ -396,6 +404,25 @@ impl ErasureMeta { } } +impl MerkleRootMeta { + pub(crate) fn from_shred(shred: &Shred) -> Self { + Self { + merkle_root: shred.merkle_root().unwrap_or_default(), + first_received_shred_index: shred.index(), + } + } + + #[cfg(test)] + pub(crate) fn merkle_root(&self) -> Hash { + self.merkle_root + } + + #[cfg(test)] + pub(crate) fn first_received_shred_index(&self) -> u32 { + self.first_received_shred_index + } +} + impl DuplicateSlotProof { pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { DuplicateSlotProof { shred1, shred2 } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 5fda160e29b976..50056cd3ab1e33 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -551,6 +551,10 @@ impl Shred { Self::ShredData(_) => Err(Error::InvalidShredType), } } + + pub fn merkle_root(&self) -> Option { + layout::get_merkle_root(self.payload()) + } } // Helper methods to extract pieces of the shred from the payload