From 3baaa495fd461bfe93469735ca6ecfde75a970a3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 8 Feb 2024 17:28:09 +0800 Subject: [PATCH 1/4] Store linear blocks in freezer db Co-authored-by: Michael Sproul --- Cargo.lock | 1 + Cargo.toml | 1 + .../beacon_chain/src/beacon_block_streamer.rs | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 14 +- .../src/beacon_fork_choice_store.rs | 2 +- beacon_node/beacon_chain/src/builder.rs | 4 +- .../beacon_chain/src/canonical_head.rs | 4 +- beacon_node/beacon_chain/src/fork_revert.rs | 2 +- .../src/light_client_server_cache.rs | 15 +- beacon_node/beacon_chain/src/migrate.rs | 2 +- .../src/pre_finalization_cache.rs | 2 +- .../src/schema_change/migration_schema_v18.rs | 2 +- beacon_node/src/cli.rs | 10 ++ beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/config.rs | 27 ++++ beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 129 +++++++++++++++++- beacon_node/store/src/impls.rs | 1 + .../store/src/impls/frozen_block_slot.rs | 19 +++ beacon_node/store/src/iter.rs | 7 +- beacon_node/store/src/lib.rs | 14 +- beacon_node/store/src/reconstruct.rs | 2 +- 22 files changed, 228 insertions(+), 34 deletions(-) create mode 100644 beacon_node/store/src/impls/frozen_block_slot.rs diff --git a/Cargo.lock b/Cargo.lock index 1a6784b84f9..f1d2afad66b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7985,6 +7985,7 @@ dependencies = [ "strum", "tempfile", "types", + "zstd", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b942d1719e2..79f44bc4ef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,6 +181,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] } warp = { version = "0.3.7", default-features = false, features = ["tls"] } zeroize = { version = "1", features = ["zeroize_derive"] } zip = "0.6" +zstd = "0.11.2" # Local crates. account_utils = { path = "common/account_utils" } diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index f0a68b6be55..0feefa8976f 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -441,7 +441,7 @@ impl BeaconBlockStreamer { continue; } - match streamer.beacon_chain.store.try_get_full_block(&root) { + match streamer.beacon_chain.store.try_get_full_block(&root, None) { Err(e) => db_blocks.push((root, Err(e.into()))), Ok(opt_block) => db_blocks.push(( root, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 77e1bc095ed..f5e19332aeb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -924,8 +924,14 @@ impl BeaconChain { ) -> Result>, Error> { let root = self.block_root_at_slot(request_slot, skips)?; + // Only hint the slot if expect a block at this exact slot. + let slot_hint = match skips { + WhenSlotSkipped::Prev => None, + WhenSlotSkipped::None => Some(request_slot), + }; + if let Some(block_root) = root { - Ok(self.store.get_blinded_block(&block_root)?) + Ok(self.store.get_blinded_block(&block_root, slot_hint)?) } else { Ok(None) } @@ -1180,7 +1186,7 @@ impl BeaconChain { ) -> Result>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. - let blinded_block = match self.store.try_get_full_block(block_root)? { + let blinded_block = match self.store.try_get_full_block(block_root, None)? { Some(DatabaseBlock::Full(block)) => return Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => block, None => return Ok(None), @@ -1248,7 +1254,7 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - Ok(self.store.get_blinded_block(block_root)?) + Ok(self.store.get_blinded_block(block_root, None)?) } /// Return the status of a block as it progresses through the various caches of the beacon @@ -6379,7 +6385,7 @@ impl BeaconChain { let beacon_block = self .store - .get_blinded_block(&beacon_block_root)? + .get_blinded_block(&beacon_block_root, None)? .ok_or_else(|| { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2a42b49b422..900c6b1d8c3 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -315,7 +315,7 @@ where metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let justified_block = self .store - .get_blinded_block(&self.justified_checkpoint.root) + .get_blinded_block(&self.justified_checkpoint.root, None) .map_err(Error::FailedToReadBlock)? .ok_or(Error::MissingBlock(self.justified_checkpoint.root))? .deconstruct() diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 90461b8f03e..dc1427eb035 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -292,7 +292,7 @@ where .ok_or("Fork choice not found in store")?; let genesis_block = store - .get_blinded_block(&chain.genesis_block_root) + .get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0))) .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; let genesis_state = store @@ -732,7 +732,7 @@ where // Try to decode the head block according to the current fork, if that fails, try // to backtrack to before the most recent fork. let (head_block_root, head_block, head_reverted) = - match store.get_full_block(&initial_head_block_root) { + match store.get_full_block(&initial_head_block_root, None) { Ok(Some(block)) => (initial_head_block_root, block, false), Ok(None) => return Err("Head block not found in store".into()), Err(StoreError::SszDecodeError(_)) => { diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index a84cfab298d..aa782870463 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -295,7 +295,7 @@ impl CanonicalHead { let fork_choice_view = fork_choice.cached_fork_choice_view(); let beacon_block_root = fork_choice_view.head_block_root; let beacon_block = store - .get_full_block(&beacon_block_root)? + .get_full_block(&beacon_block_root, None)? .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; let current_slot = fork_choice.fc_store().get_current_slot(); let (_, beacon_state) = store @@ -651,7 +651,7 @@ impl BeaconChain { let mut new_snapshot = { let beacon_block = self .store - .get_full_block(&new_view.head_block_root)? + .get_full_block(&new_view.head_block_root, None)? .ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?; let (_, beacon_state) = self diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 8d1c29f46f6..04f4540fe41 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -105,7 +105,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It let finalized_checkpoint = head_state.finalized_checkpoint(); let finalized_block_root = finalized_checkpoint.root; let finalized_block = store - .get_full_block(&finalized_block_root) + .get_full_block(&finalized_block_root, None) .map_err(|e| format!("Error loading finalized block: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index ca029057373..7f8f205fd31 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -84,13 +84,12 @@ impl LightClientServerCache { let signature_slot = block_slot; let attested_block_root = block_parent_root; - let attested_block = - store - .get_full_block(attested_block_root)? - .ok_or(BeaconChainError::DBInconsistent(format!( - "Block not available {:?}", - attested_block_root - )))?; + let attested_block = store.get_full_block(attested_block_root, None)?.ok_or( + BeaconChainError::DBInconsistent(format!( + "Block not available {:?}", + attested_block_root + )), + )?; let cached_parts = self.get_or_compute_prev_block_cache( store.clone(), @@ -130,7 +129,7 @@ impl LightClientServerCache { if is_latest_finality & !cached_parts.finalized_block_root.is_zero() { // Immediately after checkpoint sync the finalized block may not be available yet. if let Some(finalized_block) = - store.get_full_block(&cached_parts.finalized_block_root)? + store.get_full_block(&cached_parts.finalized_block_root, None)? { *self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new( &attested_block, diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 08b2a51720d..daa05d7abc8 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -527,7 +527,7 @@ impl, Cold: ItemStore> BackgroundMigrator block.state_root(), Ok(None) => { return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) diff --git a/beacon_node/beacon_chain/src/pre_finalization_cache.rs b/beacon_node/beacon_chain/src/pre_finalization_cache.rs index 22b76e026cb..3b337d4228b 100644 --- a/beacon_node/beacon_chain/src/pre_finalization_cache.rs +++ b/beacon_node/beacon_chain/src/pre_finalization_cache.rs @@ -73,7 +73,7 @@ impl BeaconChain { } // 2. Check on disk. - if self.store.get_blinded_block(&block_root)?.is_some() { + if self.store.get_blinded_block(&block_root, None)?.is_some() { cache.block_roots.put(block_root, ()); return Ok(true); } diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs index 04a9da84128..07294f6951f 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs @@ -17,7 +17,7 @@ fn get_slot_clock( log: &Logger, ) -> Result, Error> { let spec = db.get_chain_spec(); - let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else { + let Some(genesis_block) = db.get_blinded_block(&Hash256::zero(), Some(Slot::new(0)))? else { error!(log, "Missing genesis block"); return Ok(None); }; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 40a343a7fe4..0ec5b0d5777 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -922,6 +922,16 @@ pub fn cli_app() -> Command { .default_value("true") .display_order(0) ) + .arg( + Arg::new("compression-level") + .long("compression-level") + .value_name("LEVEL") + .help("Compression level (-99 to 22) for zstd compression applied to states on disk \ + [default: 1]. You may change the compression level freely without re-syncing.") + .action(ArgAction::Set) + .default_value("1") + .display_order(0) + ) .arg( Arg::new("prune-payloads") .long("prune-payloads") diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 7bf1ef76bef..5a11df817f1 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -25,3 +25,4 @@ lru = { workspace = true } sloggers = { workspace = true } directory = { workspace = true } strum = { workspace = true } +zstd = { workspace = true } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index d43999d8220..24d1a5fe8b0 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -10,6 +10,8 @@ pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(5); pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128); +pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1; +const EST_COMPRESSION_FACTOR: usize = 2; pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1); pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1; pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0; @@ -25,6 +27,8 @@ pub struct StoreConfig { pub block_cache_size: NonZeroUsize, /// Maximum number of states to store in the in-memory state cache. pub state_cache_size: NonZeroUsize, + /// Compression level for blocks, state diffs and other compressed values. + pub compression_level: i32, /// Maximum number of states from freezer database to store in the in-memory state cache. pub historic_state_cache_size: NonZeroUsize, /// Whether to compact the database on initialization. @@ -33,6 +37,8 @@ pub struct StoreConfig { pub compact_on_prune: bool, /// Whether to prune payloads on initialization and finalization. pub prune_payloads: bool, + /// Whether to store finalized blocks compressed and linearised in the freezer database. + pub linear_blocks: bool, /// Whether to prune blobs older than the blob data availability boundary. pub prune_blobs: bool, /// Frequency of blob pruning in epochs. Default: 1 (every epoch). @@ -61,10 +67,12 @@ impl Default for StoreConfig { slots_per_restore_point_set_explicitly: false, block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, state_cache_size: DEFAULT_STATE_CACHE_SIZE, + compression_level: DEFAULT_COMPRESSION_LEVEL, historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, prune_payloads: true, + linear_blocks: true, prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, @@ -91,6 +99,25 @@ impl StoreConfig { } Ok(()) } + + /// Estimate the size of `len` bytes after compression at the current compression level. + pub fn estimate_compressed_size(&self, len: usize) -> usize { + if self.compression_level == 0 { + len + } else { + len / EST_COMPRESSION_FACTOR + } + } + + /// Estimate the size of `len` compressed bytes after decompression at the current compression + /// level. + pub fn estimate_decompressed_size(&self, len: usize) -> usize { + if self.compression_level == 0 { + len + } else { + len * EST_COMPRESSION_FACTOR + } + } } impl StoreItem for OnDiskStoreConfig { diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 91e6a920ba3..559c7f782f7 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -43,6 +43,7 @@ pub enum Error { computed: Hash256, }, BlockReplayError(BlockReplayError), + Compression(std::io::Error), AddPayloadLogicError, SlotClockUnavailableForMigration, InvalidKey, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9c247c983a9..0de7ececbcf 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -6,7 +6,10 @@ use crate::config::{ PREV_DEFAULT_SLOTS_PER_RESTORE_POINT, }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; -use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::impls::{ + beacon_state::{get_full_state, store_full_state}, + frozen_block_slot::FrozenBlockSlot, +}; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; @@ -35,12 +38,15 @@ use state_processing::{ SlotProcessingError, }; use std::cmp::min; +use std::convert::TryInto; +use std::io::{Read, Write}; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; +use zstd::{Decoder, Encoder}; /// On-disk database that stores finalized states efficiently. /// @@ -432,6 +438,7 @@ impl, Cold: ItemStore> HotColdDB pub fn try_get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); @@ -442,7 +449,7 @@ impl, Cold: ItemStore> HotColdDB } // Load the blinded block. - let Some(blinded_block) = self.get_blinded_block(block_root)? else { + let Some(blinded_block) = self.get_blinded_block(block_root, slot)? else { return Ok(None); }; @@ -490,8 +497,9 @@ impl, Cold: ItemStore> HotColdDB pub fn get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { - match self.try_get_full_block(block_root)? { + match self.try_get_full_block(block_root, slot)? { Some(DatabaseBlock::Full(block)) => Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => Err( HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot()) @@ -522,12 +530,115 @@ impl, Cold: ItemStore> HotColdDB pub fn get_blinded_block( &self, block_root: &Hash256, - ) -> Result>>, Error> { + slot: Option, + ) -> Result>, Error> { + let split = self.get_split_info(); + if let Some(slot) = slot { + if (slot < split.slot || slot == 0) && *block_root != split.block_root { + // To the freezer DB. + self.get_cold_blinded_block_by_slot(slot) + } else { + self.get_hot_blinded_block(block_root) + } + } else { + match self.get_hot_blinded_block(block_root)? { + Some(block) => Ok(Some(block)), + None => self.get_cold_blinded_block_by_root(block_root), + } + } + } + + pub fn get_hot_blinded_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { self.get_block_with(block_root, |bytes| { SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) }) } + pub fn get_cold_blinded_block_by_root( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // Load slot. + if let Some(FrozenBlockSlot(block_slot)) = self.cold_db.get(block_root)? { + self.get_cold_blinded_block_by_slot(block_slot) + } else { + Ok(None) + } + } + + pub fn get_cold_blinded_block_by_slot( + &self, + slot: Slot, + ) -> Result>, Error> { + let Some(bytes) = self.cold_db.get_bytes( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + )? + else { + return Ok(None); + }; + + let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(Some(SignedBeaconBlock::from_ssz_bytes( + &ssz_bytes, &self.spec, + )?)) + } + + pub fn put_cold_blinded_block( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + ) -> Result<(), Error> { + let mut ops = Vec::with_capacity(2); + self.blinded_block_as_cold_kv_store_ops(block_root, block, &mut ops)?; + self.cold_db.do_atomically(ops) + } + + pub fn blinded_block_as_cold_kv_store_ops( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + kv_store_ops: &mut Vec, + ) -> Result<(), Error> { + // Write the block root to slot mapping. + let slot = block.slot(); + kv_store_ops.push(FrozenBlockSlot(slot).as_kv_store_op(*block_root)); + + // Write the slot to block root mapping. + kv_store_ops.push(KeyValueStoreOp::PutKeyValue( + get_key_for_col( + DBColumn::BeaconBlockRoots.into(), + &slot.as_u64().to_be_bytes(), + ), + block_root.as_bytes().to_vec(), + )); + + // Write the block keyed by slot. + let db_key = get_key_for_col( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + ); + + let ssz_bytes = block.as_ssz_bytes(); + let mut compressed_value = + Vec::with_capacity(self.config.estimate_compressed_size(ssz_bytes.len())); + let mut encoder = Encoder::new(&mut compressed_value, self.config.compression_level) + .map_err(Error::Compression)?; + encoder.write_all(&ssz_bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + + kv_store_ops.push(KeyValueStoreOp::PutKeyValue(db_key, compressed_value)); + + Ok(()) + } + /// Fetch a block from the store, ignoring which fork variant it *should* be for. pub fn get_block_any_variant>( &self, @@ -589,10 +700,14 @@ impl, Cold: ItemStore> HotColdDB .key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } - /// Determine whether a block exists in the database. + /// Determine whether a block exists in the database (hot *or* cold). pub fn block_exists(&self, block_root: &Hash256) -> Result { - self.hot_db - .key_exists(DBColumn::BeaconBlock.into(), block_root.as_bytes()) + Ok(self + .hot_db + .key_exists(DBColumn::BeaconBlock.into(), block_root.as_bytes())? + || self + .cold_db + .key_exists(DBColumn::BeaconBlock.into(), block_root.as_bytes())?) } /// Delete a block from the store and the block cache. diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs index 736585a72aa..b2af9a408ef 100644 --- a/beacon_node/store/src/impls.rs +++ b/beacon_node/store/src/impls.rs @@ -1,2 +1,3 @@ pub mod beacon_state; pub mod execution_payload; +pub mod frozen_block_slot; diff --git a/beacon_node/store/src/impls/frozen_block_slot.rs b/beacon_node/store/src/impls/frozen_block_slot.rs new file mode 100644 index 00000000000..67d11b4f081 --- /dev/null +++ b/beacon_node/store/src/impls/frozen_block_slot.rs @@ -0,0 +1,19 @@ +use crate::{DBColumn, Error, StoreItem}; +use ssz::{Decode, Encode}; +use types::Slot; + +pub struct FrozenBlockSlot(pub Slot); + +impl StoreItem for FrozenBlockSlot { + fn db_column() -> DBColumn { + DBColumn::BeaconBlock + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(FrozenBlockSlot(Slot::from_ssz_bytes(bytes)?)) + } +} diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 03090ca14c5..6c0f2529830 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -189,7 +189,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, block_hash: Hash256, ) -> Result { let block = store - .get_blinded_block(&block_hash)? + .get_blinded_block(&block_hash, None)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; let state = store .get_state(&block.state_root(), Some(block.slot()))? @@ -286,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> let block = if self.decode_any_variant { self.store.get_block_any_variant(&block_root) } else { - self.store.get_blinded_block(&block_root) + self.store.get_blinded_block(&block_root, None) }? .ok_or(Error::BlockNotFound(block_root))?; self.next_block_root = block.message().parent_root(); @@ -329,7 +329,8 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, fn do_next(&mut self) -> Result>>, Error> { if let Some(result) = self.roots.next() { let (root, _slot) = result?; - self.roots.inner.store.get_blinded_block(&root) + // Don't use slot hint here as it could be a skipped slot. + self.roots.inner.store.get_blinded_block(&root, None) } else { Ok(None) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 66032d89c52..aca93df0e5f 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -222,8 +222,19 @@ pub enum DBColumn { /// For data related to the database itself. #[strum(serialize = "bma")] BeaconMeta, + /// Data related to blocks. + /// + /// - Key: `Hash256` block root. + /// - Value in hot DB: SSZ-encoded blinded block. + /// - Value in cold DB: 8-byte slot of block. #[strum(serialize = "blk")] BeaconBlock, + /// Frozen beacon blocks. + /// + /// - Key: 8-byte slot. + /// - Value: ZSTD-compressed SSZ-encoded blinded block. + #[strum(serialize = "bbf")] + BeaconBlockFrozen, #[strum(serialize = "blb")] BeaconBlob, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). @@ -312,7 +323,8 @@ impl DBColumn { | Self::BeaconStateRoots | Self::BeaconHistoricalRoots | Self::BeaconHistoricalSummaries - | Self::BeaconRandaoMixes => 8, + | Self::BeaconRandaoMixes + | Self::BeaconBlockFrozen => 8, } } } diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 8ef4886565c..050efa4c2e6 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -75,7 +75,7 @@ where None } else { Some( - self.get_blinded_block(&block_root)? + self.get_blinded_block(&block_root, Some(slot))? .ok_or(Error::BlockNotFound(block_root))?, ) }; From 241818fb3d1d1c76a5d08a4fc1c0563e951969b7 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:10:44 +0200 Subject: [PATCH 2/4] Migrate blocks --- beacon_node/store/src/hot_cold_store.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0de7ececbcf..cb95be7c431 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -139,6 +139,7 @@ pub enum HotColdDBError { proposed_split_slot: Slot, }, MissingStateToFreeze(Hash256), + MissingBlockToFreeze(Hash256), MissingRestorePointHash(u64), MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), @@ -2706,6 +2707,14 @@ pub fn migrate_database, Cold: ItemStore>( }) { let (block_root, state_root, slot) = maybe_tuple?; + // Delete block from hot DB + hot_db_ops.push(StoreOp::DeleteBlock(block_root)); + // Write block to cold DB + let block = store + .get_hot_blinded_block(&block_root)? + .ok_or(HotColdDBError::MissingBlockToFreeze(block_root))?; + store.blinded_block_as_cold_kv_store_ops(&block_root, &block, &mut cold_db_ops)?; + // Delete the execution payload if payload pruning is enabled. At a skipped slot we may // delete the payload for the finalized block itself, but that's OK as we only guarantee // that payloads are present for slots >= the split slot. The payload fetching code is also From 8b99c51035978d0d742befacddb4a119d1ed5c6d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:24:31 +0200 Subject: [PATCH 3/4] Fix tests --- beacon_node/beacon_chain/src/builder.rs | 2 +- .../beacon_chain/tests/attestation_verification.rs | 2 +- .../beacon_chain/tests/payload_invalidation.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 12 ++++++------ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index dc1427eb035..0601ea32ef4 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1212,7 +1212,7 @@ mod test { assert_eq!( chain .store - .get_blinded_block(&Hash256::zero()) + .get_blinded_block(&Hash256::zero(), None) .expect("should read db") .expect("should find genesis block"), block.clone_as_blinded(), diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 1463d1c5c15..df581382235 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -1059,7 +1059,7 @@ async fn attestation_that_skips_epochs() { let block_slot = harness .chain .store - .get_blinded_block(&block_root) + .get_blinded_block(&block_root, None) .expect("should not error getting block") .expect("should find attestation block") .message() diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 0c36d21f2ec..2de2867b36b 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -316,7 +316,7 @@ impl InvalidPayloadRig { self.harness .chain .store - .get_full_block(&block_root) + .get_full_block(&block_root, None) .unwrap() .unwrap(), *block, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 5da92573f77..e1c3cd82bd5 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -608,7 +608,7 @@ async fn epoch_boundary_state_attestation_processing() { // load_epoch_boundary_state is idempotent! let block_root = attestation.data.beacon_block_root; let block = store - .get_blinded_block(&block_root) + .get_blinded_block(&block_root, None) .unwrap() .expect("block exists"); let epoch_boundary_state = store @@ -849,7 +849,7 @@ async fn delete_blocks_and_states() { ); let faulty_head_block = store - .get_blinded_block(&faulty_head.into()) + .get_blinded_block(&faulty_head.into(), None) .expect("no errors") .expect("faulty head block exists"); @@ -891,7 +891,7 @@ async fn delete_blocks_and_states() { break; } store.delete_block(&block_root).unwrap(); - assert_eq!(store.get_blinded_block(&block_root).unwrap(), None); + assert_eq!(store.get_blinded_block(&block_root, None).unwrap(), None); } // Deleting frozen states should do nothing @@ -1135,7 +1135,7 @@ fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconStat let head_block = harness .chain .store - .get_blinded_block(&block_root) + .get_blinded_block(&block_root, None) .unwrap() .unwrap(); harness @@ -2355,7 +2355,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let wss_block = harness .chain .store - .get_full_block(&wss_block_root) + .get_full_block(&wss_block_root, None) .unwrap() .unwrap(); let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap(); @@ -2576,7 +2576,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .unwrap() .map(Result::unwrap) { - let block = store.get_blinded_block(&block_root).unwrap().unwrap(); + let block = store.get_blinded_block(&block_root, None).unwrap().unwrap(); if block_root != prev_block_root { assert_eq!(block.slot(), slot); } From cf13c969aa11a1cee7662d470e60febad49cb9f2 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:59:05 +0200 Subject: [PATCH 4/4] Add migration --- beacon_node/beacon_chain/src/schema_change.rs | 8 ++ .../src/schema_change/migration_schema_v20.rs | 108 ++++++++++++++++++ beacon_node/store/src/metadata.rs | 2 +- 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 63eb72c43ab..afbde222d8c 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v17; mod migration_schema_v18; mod migration_schema_v19; +mod migration_schema_v20; use crate::beacon_chain::BeaconChainTypes; use crate::types::ChainSpec; @@ -78,6 +79,13 @@ pub fn migrate_schema( let ops = migration_schema_v19::downgrade_from_v19::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(19), SchemaVersion(20)) => { + let ops = migration_schema_v20::upgrade_to_v20::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(20), SchemaVersion(19)) => { + unimplemented!() + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs new file mode 100644 index 00000000000..bbeb40622fb --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs @@ -0,0 +1,108 @@ +use crate::BeaconChainTypes; +use slog::{info, Logger}; +use std::sync::Arc; +use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp}; +use types::{Hash256, Slot}; + +/// Chunk size for freezer block roots in the old database schema. +const OLD_SCHEMA_CHUNK_SIZE: u64 = 128; + +fn old_schema_chunk_key(cindex: u64) -> [u8; 8] { + (cindex + 1).to_be_bytes() +} + +pub fn upgrade_to_v20( + db: Arc>, + log: Logger, +) -> Result, Error> { + info!(log, "Upgrading freezer database schema"); + upgrade_freezer_database::(&db, &log)?; + + // No hot DB changes + return Ok(vec![]); +} + +fn upgrade_freezer_database( + db: &HotColdDB, + log: &Logger, +) -> Result<(), Error> { + let mut cold_db_ops = vec![]; + + // Re-write the beacon block roots array. + let mut freezer_block_roots = vec![]; + let oldest_block_slot = db.get_oldest_block_slot(); + let mut current_slot = oldest_block_slot; + + for result in db + .cold_db + .iter_column::>(DBColumn::BeaconBlockRoots) + { + let (chunk_key, chunk_bytes) = result?; + + // Stage this chunk for deletion. + cold_db_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconBlockRoots.into(), + &chunk_key, + ))); + + // Skip the 0x0 key which is for the genesis block. + if chunk_key.iter().all(|b| *b == 0u8) { + continue; + } + // Skip the 0x00..01 key which is for slot 0. + if chunk_key == old_schema_chunk_key(0).as_slice() && current_slot != 0 { + continue; + } + + let current_chunk_index = current_slot.as_u64() / OLD_SCHEMA_CHUNK_SIZE; + if chunk_key != old_schema_chunk_key(current_chunk_index).as_slice() { + return Err(Error::DBError { + message: format!( + "expected chunk index {} but got {:?}", + current_chunk_index, chunk_key + ), + }); + } + + for (i, block_root_bytes) in chunk_bytes.chunks_exact(32).enumerate() { + let block_root = Hash256::from_slice(block_root_bytes); + + if block_root.is_zero() { + continue; + } + + let slot = Slot::new(current_chunk_index * OLD_SCHEMA_CHUNK_SIZE + i as u64); + if slot != current_slot { + return Err(Error::DBError { + message: format!( + "expected block root for slot {} but got {}", + current_slot, slot + ), + }); + } + freezer_block_roots.push((slot, block_root)); + current_slot += 1; + } + } + + // Write the freezer block roots in the new schema. + for (slot, block_root) in freezer_block_roots { + cold_db_ops.push(KeyValueStoreOp::PutKeyValue( + get_key_for_col( + DBColumn::BeaconBlockRoots.into(), + &slot.as_u64().to_be_bytes(), + ), + block_root.as_bytes().to_vec(), + )); + } + + db.cold_db.do_atomically(cold_db_ops)?; + info!( + log, + "Freezer database upgrade complete"; + "oldest_block_slot" => oldest_block_slot, + "newest_block_slot" => current_slot - 1 + ); + + Ok(()) +} diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 1675051bd80..116926ad3f1 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(19); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(20); // All the keys that get stored under the `BeaconMeta` column. //