From 6acd57cb1a94f6a3004b1c82a09eaa4643aef8fe Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sat, 11 Nov 2023 21:14:18 -0500 Subject: [PATCH] add merkle root meta column to blockstore (#33979) * add merkle root meta column to blockstore * pr feedback: remove write/reads to column * pr feedback: u64 -> u32 + revert * pr feedback: fec_set_index u32, use Self::Index * pr feedback: key size 16 -> 12 (cherry picked from commit e457c0287906f313ad0284e0f317c5d00a42d467) # Conflicts: # ledger/src/blockstore.rs --- ledger/src/blockstore.rs | 8 ++++ ledger/src/blockstore/blockstore_purge.rs | 8 ++++ ledger/src/blockstore_db.rs | 51 +++++++++++++++++++++++ ledger/src/blockstore_meta.rs | 10 +++++ 4 files changed, 77 insertions(+) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2a5ea113fd4928..fda97e14881670 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -193,7 +193,12 @@ pub struct Blockstore { program_costs_cf: LedgerColumn, bank_hash_cf: LedgerColumn, optimistic_slots_cf: LedgerColumn, +<<<<<<< HEAD last_root: RwLock, +======= + max_root: AtomicU64, + merkle_root_meta_cf: LedgerColumn, +>>>>>>> e457c02879 (add merkle root meta column to blockstore (#33979)) insert_shreds_lock: Mutex<()>, new_shreds_signals: Mutex>>, completed_slots_senders: Mutex>, @@ -301,6 +306,7 @@ impl Blockstore { let program_costs_cf = db.column(); let bank_hash_cf = db.column(); let optimistic_slots_cf = db.column(); + let merkle_root_meta_cf = db.column(); let db = Arc::new(db); @@ -353,6 +359,7 @@ impl Blockstore { program_costs_cf, bank_hash_cf, optimistic_slots_cf, + merkle_root_meta_cf, new_shreds_signals: Mutex::default(), completed_slots_senders: Mutex::default(), shred_timing_point_sender: None, @@ -721,6 +728,7 @@ impl Blockstore { self.program_costs_cf.submit_rocksdb_cf_metrics(); self.bank_hash_cf.submit_rocksdb_cf_metrics(); self.optimistic_slots_cf.submit_rocksdb_cf_metrics(); + self.merkle_root_meta_cf.submit_rocksdb_cf_metrics(); } fn try_shred_recovery( diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 9757d4b8d3b090..7dd4e729c330fe 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -214,6 +214,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); let mut w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); @@ -337,6 +341,10 @@ impl Blockstore { .db .delete_file_in_range_cf::(from_slot, to_slot) .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() } /// Purges special columns (using a non-Slot primary-index) exactly, by diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 14be3095b212d7..5973e0e76a1153 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -2,6 +2,7 @@ pub use rocksdb::Direction as IteratorDirection; use { crate::{ blockstore_meta, + blockstore_meta::MerkleRootMeta, blockstore_metrics::{ maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET, @@ -102,6 +103,8 @@ const BLOCK_HEIGHT_CF: &str = "block_height"; const PROGRAM_COSTS_CF: &str = "program_costs"; /// Column family for optimistic slots const OPTIMISTIC_SLOTS_CF: &str = "optimistic_slots"; +/// Column family for merkle roots +const MERKLE_ROOT_META_CF: &str = "merkle_root_meta"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -322,6 +325,19 @@ pub mod columns { /// * value type: [`blockstore_meta::OptimisticSlotMetaVersioned`] pub struct OptimisticSlots; + #[derive(Debug)] + /// The merkle root meta column + /// + /// Each merkle shred is part of a merkle tree for + /// its FEC set. This column stores that merkle root and associated + /// meta information about the first shred received. + /// + /// Its index type is (Slot, fec_set_index). + /// + /// * index type: `crate::shred::ErasureSetId` `(Slot, fec_set_index: u32)` + /// * value type: [`blockstore_meta::MerkleRootMeta`]` + pub struct MerkleRootMeta; + // When adding a new column ... // - Add struct below and implement `Column` and `ColumnName` traits // - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns() @@ -446,6 +462,7 @@ impl Rocks { new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), ] } @@ -473,6 +490,7 @@ impl Rocks { BlockHeight::NAME, ProgramCosts::NAME, OptimisticSlots::NAME, + MerkleRootMeta::NAME, ] } @@ -1072,6 +1090,39 @@ impl TypedColumn for columns::OptimisticSlots { type Type = blockstore_meta::OptimisticSlotMetaVersioned; } +impl Column for columns::MerkleRootMeta { + type Index = (Slot, /*fec_set_index:*/ u32); + + fn index(key: &[u8]) -> Self::Index { + let slot = BigEndian::read_u64(&key[..8]); + let fec_set_index = BigEndian::read_u32(&key[8..]); + + (slot, fec_set_index) + } + + fn key((slot, fec_set_index): Self::Index) -> Vec { + let mut key = vec![0; 12]; + BigEndian::write_u64(&mut key[..8], slot); + BigEndian::write_u32(&mut key[8..], fec_set_index); + key + } + + fn slot((slot, _fec_set_index): Self::Index) -> Slot { + slot + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } +} + +impl ColumnName for columns::MerkleRootMeta { + const NAME: &'static str = MERKLE_ROOT_META_CF; +} +impl TypedColumn for columns::MerkleRootMeta { + type Type = MerkleRootMeta; +} + #[derive(Debug)] pub struct Database { backend: Arc, diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 65a5c2c2ed0757..625150847aa429 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -138,6 +138,16 @@ pub(crate) struct ErasureConfig { num_coding: usize, } +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct MerkleRootMeta { + /// The merkle root + merkle_root: Hash, + /// The first received shred index + first_received_shred_index: u32, + /// The shred type of the first received shred + first_received_shred_type: ShredType, +} + #[derive(Deserialize, Serialize)] pub struct DuplicateSlotProof { #[serde(with = "serde_bytes")]