From 7862c71bb585dbbb125a8746f74abb9ccf0e6657 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 30 Jan 2024 15:56:48 +1100 Subject: [PATCH] Fix tree-states sub-epoch diffs (#5097) --- .../beacon_chain/src/historical_blocks.rs | 2 + beacon_node/src/cli.rs | 4 +- beacon_node/src/config.rs | 19 +-- beacon_node/store/src/config.rs | 59 ++++++-- beacon_node/store/src/hdiff.rs | 33 +++++ beacon_node/store/src/hot_cold_store.rs | 19 ++- beacon_node/store/src/leveldb_store.rs | 28 ++-- beacon_node/store/src/lib.rs | 19 ++- beacon_node/store/src/memory_store.rs | 2 +- beacon_node/store/src/metadata.rs | 5 + beacon_node/store/src/reconstruct.rs | 2 +- database_manager/src/lib.rs | 128 ++++++++++++++++-- 12 files changed, 255 insertions(+), 65 deletions(-) diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 2b24e82faa0..c8c56514461 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -130,6 +130,8 @@ impl BeaconChain { } // Store block roots, including at all skip slots in the freezer DB. + // The block root mapping for `block.slot()` itself was already written when the block + // was stored, above. for slot in (block.slot().as_usize() + 1..prev_block_slot.as_usize()).rev() { cold_batch.push(KeyValueStoreOp::PutKeyValue( get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()), diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index e77d49ec036..23d366d15a5 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -593,9 +593,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("slots-per-restore-point") .long("slots-per-restore-point") .value_name("SLOT_COUNT") - .help("Specifies how often a freezer DB restore point should be stored. \ - Cannot be changed after initialization. \ - [default: 8192 (mainnet) or 64 (minimal)]") + .help("Deprecated.") .takes_value(true) ) .arg( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 7ed9de1d6f8..879d1d22b9c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -26,7 +26,6 @@ use std::num::NonZeroU16; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; -use store::hdiff::HierarchyConfig; use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN}; /// Gets the fully-initialized global client. @@ -435,22 +434,8 @@ pub fn get_config( client_config.store.epochs_per_state_diff = epochs_per_state_diff; } - if let Some(hierarchy_exponents) = - clap_utils::parse_optional::(cli_args, "hierarchy-exponents")? - { - let exponents = hierarchy_exponents - .split(',') - .map(|s| { - s.parse() - .map_err(|e| format!("invalid hierarchy-exponents: {e:?}")) - }) - .collect::, _>>()?; - - if exponents.windows(2).any(|w| w[0] >= w[1]) { - return Err("hierarchy-exponents must be in ascending order".to_string()); - } - - client_config.store.hierarchy_config = HierarchyConfig { exponents }; + if let Some(hierarchy_config) = clap_utils::parse_optional(cli_args, "hierarchy-exponents")? { + client_config.store.hierarchy_config = hierarchy_config; } if let Some(epochs_per_migration) = diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index f1902c50ff2..da4532e0bd2 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -1,5 +1,5 @@ use crate::hdiff::HierarchyConfig; -use crate::{DBColumn, Error, StoreItem}; +use crate::{AnchorInfo, DBColumn, Error, Split, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -117,15 +117,22 @@ impl StoreConfig { pub fn check_compatibility( &self, on_disk_config: &OnDiskStoreConfig, + split: &Split, + anchor: Option<&AnchorInfo>, ) -> Result<(), StoreConfigError> { let db_config = self.as_disk_config(); - if db_config.ne(on_disk_config) { - return Err(StoreConfigError::IncompatibleStoreConfig { + // Allow changing the hierarchy exponents if no historic states are stored. + if db_config.linear_blocks == on_disk_config.linear_blocks + && (db_config.hierarchy_config == on_disk_config.hierarchy_config + || anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot))) + { + Ok(()) + } else { + Err(StoreConfigError::IncompatibleStoreConfig { config: db_config, on_disk: on_disk_config.clone(), - }); + }) } - Ok(()) } /// Check that the configuration is valid. @@ -218,6 +225,8 @@ impl StoreItem for OnDiskStoreConfig { #[cfg(test)] mod test { use super::*; + use crate::{metadata::STATE_UPPER_LIMIT_NO_RETAIN, AnchorInfo, Split}; + use types::{Hash256, Slot}; #[test] fn check_compatibility_ok() { @@ -229,7 +238,10 @@ mod test { linear_blocks: true, hierarchy_config: store_config.hierarchy_config.clone(), }; - assert!(store_config.check_compatibility(&on_disk_config).is_ok()); + let split = Split::default(); + assert!(store_config + .check_compatibility(&on_disk_config, &split, None) + .is_ok()); } #[test] @@ -242,7 +254,10 @@ mod test { linear_blocks: false, hierarchy_config: store_config.hierarchy_config.clone(), }; - assert!(store_config.check_compatibility(&on_disk_config).is_err()); + let split = Split::default(); + assert!(store_config + .check_compatibility(&on_disk_config, &split, None) + .is_err()); } #[test] @@ -257,6 +272,34 @@ mod test { exponents: vec![5, 8, 11, 13, 16, 18, 21], }, }; - assert!(store_config.check_compatibility(&on_disk_config).is_err()); + let split = Split::default(); + assert!(store_config + .check_compatibility(&on_disk_config, &split, None) + .is_err()); + } + + #[test] + fn check_compatibility_hierarchy_config_update() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: true, + hierarchy_config: HierarchyConfig { + exponents: vec![5, 8, 11, 13, 16, 18, 21], + }, + }; + let split = Split::default(); + let anchor = AnchorInfo { + anchor_slot: Slot::new(0), + oldest_block_slot: Slot::new(0), + oldest_block_parent: Hash256::zero(), + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + }; + assert!(store_config + .check_compatibility(&on_disk_config, &split, Some(&anchor)) + .is_ok()); } } diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index cdb88d37680..b831df5da04 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; +use std::str::FromStr; use types::{BeaconState, ChainSpec, EthSpec, Slot, VList}; use zstd::{Decoder, Encoder}; @@ -22,6 +23,26 @@ pub struct HierarchyConfig { pub exponents: Vec, } +impl FromStr for HierarchyConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let exponents = s + .split(',') + .map(|s| { + s.parse() + .map_err(|e| format!("invalid hierarchy-exponents: {e:?}")) + }) + .collect::, _>>()?; + + if exponents.windows(2).any(|w| w[0] >= w[1]) { + return Err("hierarchy-exponents must be in ascending order".to_string()); + } + + Ok(HierarchyConfig { exponents }) + } +} + #[derive(Debug)] pub struct HierarchyModuli { moduli: Vec, @@ -267,6 +288,18 @@ impl HierarchyModuli { Ok((slot / last + 1) * last) } } + + /// Return `true` if the database ops for this slot should be committed immediately. + /// + /// This is the case for all diffs in the 2nd lowest layer and above, which are required by diffs + /// in the 1st layer. + pub fn should_commit_immediately(&self, slot: Slot) -> Result { + // If there's only 1 layer of snapshots, then commit only when writing a snapshot. + self.moduli.get(1).map_or_else( + || Ok(slot == self.next_snapshot_slot(slot)?), + |second_layer_moduli| Ok(slot % *second_layer_moduli == 0), + ) + } } #[cfg(test)] diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 400a0d2b66a..03d280fb030 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -346,7 +346,20 @@ impl HotColdDB, LevelDB> { // Ensure that any on-disk config is compatible with the supplied config. if let Some(disk_config) = db.load_config()? { - db.config.check_compatibility(&disk_config)?; + let split = db.get_split_info(); + let anchor = db.get_anchor_info(); + db.config + .check_compatibility(&disk_config, &split, anchor.as_ref())?; + + // Inform user if hierarchy config is changing. + if db.config.hierarchy_config != disk_config.hierarchy_config { + info!( + db.log, + "Updating historic state config"; + "previous_config" => ?disk_config.hierarchy_config, + "new_config" => ?db.config.hierarchy_config, + ); + } } db.store_config()?; @@ -2740,6 +2753,7 @@ impl, Cold: ItemStore> HotColdDB let columns = [ DBColumn::BeaconState, DBColumn::BeaconStateSummary, + DBColumn::BeaconStateDiff, DBColumn::BeaconRestorePoint, DBColumn::BeaconStateRoots, DBColumn::BeaconHistoricalRoots, @@ -2780,6 +2794,9 @@ impl, Cold: ItemStore> HotColdDB self.cold_db.do_atomically(cold_ops)?; } + // In order to reclaim space, we need to compact the freezer DB as well. + self.cold_db.compact()?; + Ok(()) } } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 05fb3d2d8c8..91416f728fa 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -155,25 +155,15 @@ impl KeyValueStore for LevelDB { self.transaction_mutex.lock() } - /// Compact all values in the states and states flag columns. - fn compact(&self) -> Result<(), Error> { - let endpoints = |column: DBColumn| { - ( - BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())), - BytesKey::from_vec(get_key_for_col( - column.as_str(), - Hash256::repeat_byte(0xff).as_bytes(), - )), - ) - }; - - for (start_key, end_key) in [ - endpoints(DBColumn::BeaconState), - endpoints(DBColumn::BeaconStateDiff), - endpoints(DBColumn::BeaconStateSummary), - ] { - self.db.compact(&start_key, &end_key); - } + fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + // Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for + // columns that may change size between sub-databases or schema versions. + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[])); + let end_key = BytesKey::from_vec(get_key_for_col( + column.as_str(), + &vec![0; std::cmp::max(column.key_size(), 32)], + )); + self.db.compact(&start_key, &end_key); Ok(()) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index c1026c68642..ccd719d1cb9 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -79,8 +79,23 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// this method. In future we may implement a safer mandatory locking scheme. fn begin_rw_transaction(&self) -> MutexGuard<()>; - /// Compact the database, freeing space used by deleted items. - fn compact(&self) -> Result<(), Error>; + /// Compact a single column in the database, freeing space used by deleted items. + fn compact_column(&self, column: DBColumn) -> Result<(), Error>; + + /// Compact a default set of columns that are likely to free substantial space. + fn compact(&self) -> Result<(), Error> { + // Compact state and block related columns as they are likely to have the most churn, + // i.e. entries being created and deleted. + for column in [ + DBColumn::BeaconState, + DBColumn::BeaconStateDiff, + DBColumn::BeaconStateSummary, + DBColumn::BeaconBlock, + ] { + self.compact_column(column)?; + } + Ok(()) + } /// Iterate through all keys and values in a particular column. fn iter_column(&self, column: DBColumn) -> ColumnIter { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index c2e494dce6a..302d2c2add2 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -108,7 +108,7 @@ impl KeyValueStore for MemoryStore { self.transaction_mutex.lock() } - fn compact(&self) -> Result<(), Error> { + fn compact_column(&self, _column: DBColumn) -> Result<(), Error> { Ok(()) } } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index e59b08de30f..4f34c62728b 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -108,6 +108,11 @@ impl AnchorInfo { pub fn block_backfill_complete(&self, target_slot: Slot) -> bool { self.oldest_block_slot <= target_slot } + + /// Return true if no historic states other than genesis are stored in the database. + pub fn no_historic_states_stored(&self, split_slot: Slot) -> bool { + self.state_lower_limit == 0 && self.state_upper_limit >= split_slot + } } impl StoreItem for AnchorInfo { diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 18394078790..744f97b28ce 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -106,7 +106,7 @@ where self.store_cold_state(&state_root, &state, &mut io_batch)?; // If the slot lies on an epoch boundary, commit the batch and update the anchor. - if slot % E::slots_per_epoch() == 0 || slot + 1 == upper_limit_slot { + if self.hierarchy.should_commit_immediately(slot)? || slot + 1 == upper_limit_slot { info!( self.log, "State reconstruction in progress"; diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index c33b9b0dca9..c1d5e1c24d5 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -77,7 +77,15 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("freezer") .long("freezer") .help("Inspect the freezer DB rather than the hot DB") - .takes_value(false), + .takes_value(false) + .conflicts_with("blobs-db"), + ) + .arg( + Arg::with_name("blobs-db") + .long("blobs-db") + .help("Inspect the blobs DB rather than the hot DB") + .takes_value(false) + .conflicts_with("freezer"), ) .arg( Arg::with_name("output-dir") @@ -88,6 +96,34 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn compact_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("compact") + .setting(clap::AppSettings::ColoredHelp) + .about("Compact database manually") + .arg( + Arg::with_name("column") + .long("column") + .value_name("TAG") + .help("3-byte column ID (see `DBColumn`)") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("freezer") + .long("freezer") + .help("Inspect the freezer DB rather than the hot DB") + .takes_value(false) + .conflicts_with("blobs-db"), + ) + .arg( + Arg::with_name("blobs-db") + .long("blobs-db") + .help("Inspect the blobs DB rather than the hot DB") + .takes_value(false) + .conflicts_with("freezer"), + ) +} + pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { App::new("prune-payloads") .alias("prune_payloads") @@ -143,17 +179,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .visible_aliases(&["db"]) .setting(clap::AppSettings::ColoredHelp) .about("Manage a beacon node database") - .arg( - Arg::with_name("slots-per-restore-point") - .long("slots-per-restore-point") - .value_name("SLOT_COUNT") - .help( - "Specifies how often a freezer DB restore point should be stored. \ - Cannot be changed after initialization. \ - [default: 2048 (mainnet) or 64 (minimal)]", - ) - .takes_value(true), - ) .arg( Arg::with_name("freezer-dir") .long("freezer-dir") @@ -179,9 +204,25 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Data directory for the blobs database.") .takes_value(true), ) + .arg( + Arg::with_name("hierarchy-exponents") + .long("hierarchy-exponents") + .value_name("EXPONENTS") + .help("Specifies the frequency for storing full state snapshots and hierarchical \ + diffs in the freezer DB. Accepts a comma-separated list of ascending \ + exponents. Each exponent defines an interval for storing diffs to the layer \ + above. The last exponent defines the interval for full snapshots. \ + For example, a config of '4,8,12' would store a full snapshot every \ + 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second-level \ + diffs every 16 (2^4) slots. \ + Cannot be changed after initialization. \ + [default: 5,9,11,13,16,18,21]") + .takes_value(true) + ) .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(compact_cli_app()) .subcommand(prune_payloads_app()) .subcommand(prune_blobs_app()) .subcommand(diff_app()) @@ -210,6 +251,10 @@ fn parse_client_config( client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs; } + if let Some(hierarchy_config) = clap_utils::parse_optional(cli_args, "hierarchy-exponents")? { + client_config.store.hierarchy_config = hierarchy_config; + } + Ok(client_config) } @@ -268,6 +313,7 @@ pub struct InspectConfig { skip: Option, limit: Option, freezer: bool, + blobs_db: bool, /// Configures where the inspect output should be stored. output_dir: PathBuf, } @@ -278,6 +324,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result let skip = clap_utils::parse_optional(cli_args, "skip")?; let limit = clap_utils::parse_optional(cli_args, "limit")?; let freezer = cli_args.is_present("freezer"); + let blobs_db = cli_args.is_present("blobs-db"); let output_dir: PathBuf = clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new); @@ -287,6 +334,7 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result skip, limit, freezer, + blobs_db, output_dir, }) } @@ -297,12 +345,15 @@ pub fn inspect_db( ) -> Result<(), String> { let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); let mut total = 0; let mut num_keys = 0; let sub_db = if inspect_config.freezer { LevelDB::::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))? + } else if inspect_config.blobs_db { + LevelDB::::open(&blobs_path).map_err(|e| format!("Unable to open blobs DB: {e:?}"))? } else { LevelDB::::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))? }; @@ -386,6 +437,50 @@ pub fn inspect_db( Ok(()) } +pub struct CompactConfig { + column: DBColumn, + freezer: bool, + blobs_db: bool, +} + +fn parse_compact_config(cli_args: &ArgMatches) -> Result { + let column = clap_utils::parse_required(cli_args, "column")?; + let freezer = cli_args.is_present("freezer"); + let blobs_db = cli_args.is_present("blobs-db"); + Ok(CompactConfig { + column, + freezer, + blobs_db, + }) +} + +pub fn compact_db( + compact_config: CompactConfig, + client_config: ClientConfig, + log: Logger, +) -> Result<(), Error> { + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + let column = compact_config.column; + + let (sub_db, db_name) = if compact_config.freezer { + (LevelDB::::open(&cold_path)?, "freezer_db") + } else if compact_config.blobs_db { + (LevelDB::::open(&blobs_path)?, "blobs_db") + } else { + (LevelDB::::open(&hot_path)?, "hot_db") + }; + info!( + log, + "Compacting database"; + "db" => db_name, + "column" => ?column + ); + sub_db.compact_column(column)?; + Ok(()) +} + pub struct MigrateConfig { to: SchemaVersion, } @@ -590,7 +685,10 @@ pub fn prune_states( // Check that the user has confirmed they want to proceed. if !prune_config.confirm { match db.get_anchor_info() { - Some(anchor_info) if anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => { + Some(anchor_info) + if anchor_info.state_lower_limit == 0 + && anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => + { info!(log, "States have already been pruned"); return Ok(()); } @@ -640,6 +738,10 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result let inspect_config = parse_inspect_config(cli_args)?; inspect_db::(inspect_config, client_config) } + ("compact", Some(cli_args)) => { + let compact_config = parse_compact_config(cli_args)?; + compact_db::(compact_config, client_config, log).map_err(format_err) + } ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err) }