From b224412b9d69ece71ecd9ad484ca20606931f243 Mon Sep 17 00:00:00 2001 From: wacban Date: Thu, 2 Mar 2023 12:11:48 +0000 Subject: [PATCH] feat: use split store in the view client (#8656) - use split store in the view client - when configured - removed the get_store(Temperature) method and replaced usages with safer get_cold_store and get_hot_store - added SplitStorageConfig and used it to configure a few things --- nearcore/src/config.rs | 48 +++++- nearcore/src/lib.rs | 78 ++++++---- nearcore/src/runtime/mod.rs | 131 +++++++++++------ tools/cold-store/src/cli.rs | 268 ++-------------------------------- tools/state-viewer/src/cli.rs | 5 +- 5 files changed, 192 insertions(+), 338 deletions(-) diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 56be887c4cf..f8bc90dc195 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -337,7 +337,9 @@ pub struct Config { /// This feature is under development, do not use in production. #[serde(default, skip_serializing_if = "Option::is_none")] pub cold_store: Option, - + /// Configuration for the + #[serde(default, skip_serializing_if = "Option::is_none")] + pub split_storage: Option, // TODO(mina86): Remove those two altogether at some point. We need to be // somewhat careful though and make sure that we don’t start silently // ignoring this option without users setting corresponding store option. @@ -388,12 +390,56 @@ impl Default for Config { use_db_migration_snapshot: None, store: near_store::StoreConfig::default(), cold_store: None, + split_storage: None, expected_shutdown: None, state_sync: None, } } } +fn default_enable_split_storage_view_client() -> bool { + false +} + +fn default_cold_store_initial_migration_batch_size() -> usize { + 500_000_000 +} + +fn default_cold_store_initial_migration_loop_sleep_duration() -> Duration { + Duration::from_secs(30) +} + +fn default_cold_store_loop_sleep_duration() -> Duration { + Duration::from_secs(1) +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct SplitStorageConfig { + #[serde(default = "default_enable_split_storage_view_client")] + pub enable_split_storage_view_client: bool, + + #[serde(default = "default_cold_store_initial_migration_batch_size")] + pub cold_store_initial_migration_batch_size: usize, + #[serde(default = "default_cold_store_initial_migration_loop_sleep_duration")] + pub cold_store_initial_migration_loop_sleep_duration: Duration, + + #[serde(default = "default_cold_store_loop_sleep_duration")] + pub cold_store_loop_sleep_duration: Duration, +} + +impl Default for SplitStorageConfig { + fn default() -> Self { + SplitStorageConfig { + enable_split_storage_view_client: default_enable_split_storage_view_client(), + cold_store_initial_migration_batch_size: + default_cold_store_initial_migration_batch_size(), + cold_store_initial_migration_loop_sleep_duration: + default_cold_store_initial_migration_loop_sleep_duration(), + cold_store_loop_sleep_duration: default_cold_store_loop_sleep_duration(), + } + } +} + impl Config { /// load Config from config.json without panic. Do semantic validation on field values. /// If config file issues occur, a ValidationError::ConfigFileError will be returned; diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 7ea9b4f5eed..7498a01830d 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -2,7 +2,6 @@ use crate::cold_storage::spawn_cold_store_loop; pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE}; pub use crate::runtime::NightshadeRuntime; pub use crate::shard_tracker::TrackedConfig; -use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle}; use actix::{Actor, Addr}; use actix_rt::ArbiterHandle; use actix_web; @@ -13,16 +12,17 @@ use near_async::messaging::{IntoSender, LateBoundSender}; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; +use near_primitives::time; + use near_network::PeerManagerActor; use near_primitives::block::GenesisId; -use near_primitives::time; -use near_store::{DBCol, Mode, NodeStorage, StoreOpenerError, Temperature}; +use near_store::metadata::DbKind; +use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; use near_telemetry::TelemetryActor; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::info; - +use tracing::{info, trace}; pub mod append_only_map; mod cold_storage; pub mod config; @@ -33,7 +33,6 @@ mod metrics; pub mod migrations; mod runtime; mod shard_tracker; -mod state_sync; pub fn get_default_home() -> PathBuf { if let Ok(near_home) = std::env::var("NEAR_HOME") { @@ -154,6 +153,33 @@ fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Result Ok(storage) } +// Safely get the split store while checking that all conditions to use it are met. +fn get_split_store(config: &NearConfig, storage: &NodeStorage) -> anyhow::Result> { + // SplitStore should only be used on archival nodes. + if !config.config.archive { + return Ok(None); + } + + // SplitStore should only be used if cold store is configured. + if config.config.cold_store.is_none() { + return Ok(None); + } + + // SplitStore should only be used in the view client if it is enabled. + if !config.config.split_storage.as_ref().map_or(false, |c| c.enable_split_storage_view_client) { + return Ok(None); + } + + // SplitStore should only be used if the migration is finished. The + // migration to cold store is finished when the db kind of the hot store is + // changed from Archive to Hot. + if storage.get_hot_store().get_db_kind()? != Some(DbKind::Hot) { + return Ok(None); + } + + Ok(storage.get_split_store()) +} + pub struct NearNode { pub client: Addr, pub view_client: Addr, @@ -162,8 +188,6 @@ pub struct NearNode { /// The cold_store_loop_handle will only be set if the cold store is configured. /// It's a handle to a background thread that copies data from the hot store to the cold store. pub cold_store_loop_handle: Option, - /// Contains handles to background threads that may be dumping state to S3. - pub state_sync_dump_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -180,11 +204,17 @@ pub fn start_with_config_and_synchronization( ) -> anyhow::Result { let store = open_storage(home_dir, &mut config)?; - let runtime = Arc::new(NightshadeRuntime::from_config( - home_dir, - store.get_store(Temperature::Hot), - &config, - )); + let runtime = + Arc::new(NightshadeRuntime::from_config(home_dir, store.get_hot_store(), &config)); + + // Get the split store. If split store is some then create a new runtime for + // the view client. Otherwise just re-use the existing runtime. + let split_store = get_split_store(&config, &store)?; + let view_runtime = if let Some(split_store) = split_store { + Arc::new(NightshadeRuntime::from_config(home_dir, split_store, &config)) + } else { + runtime.clone() + }; let cold_store_loop_handle = spawn_cold_store_loop(&config, &store, runtime.clone())?; @@ -205,14 +235,14 @@ pub fn start_with_config_and_synchronization( let view_client = start_view_client( config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), chain_genesis.clone(), - runtime.clone(), + view_runtime, network_adapter.clone().into(), config.client_config.clone(), adv.clone(), ); let (client_actor, client_arbiter_handle) = start_client( config.client_config.clone(), - chain_genesis.clone(), + chain_genesis, runtime.clone(), node_id, network_adapter.clone().into(), @@ -225,22 +255,15 @@ pub fn start_with_config_and_synchronization( ); client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context()); let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager( - runtime.clone(), + runtime, network_adapter.as_sender(), client_adapter_for_shards_manager.as_sender(), config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), - store.get_store(Temperature::Hot), + store.get_hot_store(), config.client_config.chunk_request_retry_period, ); shards_manager_adapter.bind(shards_manager_actor); - let state_sync_dump_handle = spawn_state_sync_dump( - &config, - &chain_genesis.clone(), - runtime.clone(), - config.network_config.node_id().public_key(), - )?; - #[allow(unused_mut)] let mut rpc_servers = Vec::new(); let network_actor = PeerManagerActor::spawn( @@ -281,7 +304,7 @@ pub fn start_with_config_and_synchronization( rpc_servers.shrink_to_fit(); - tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated"); + trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated"); Ok(NearNode { client: client_actor, @@ -289,7 +312,6 @@ pub fn start_with_config_and_synchronization( rpc_servers, arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle], cold_store_loop_handle, - state_sync_dump_handle, }) } @@ -334,7 +356,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu "Recompressing database"); info!("Opening database at {}", src_path.display()); - let src_store = src_opener.open_in_mode(Mode::ReadOnly)?.get_store(Temperature::Hot); + let src_store = src_opener.open_in_mode(Mode::ReadOnly)?.get_hot_store(); let final_head_height = if skip_columns.contains(&DBCol::PartialChunks) { let tip: Option = @@ -351,7 +373,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu }; info!("Creating database at {}", dst_path.display()); - let dst_store = dst_opener.open_in_mode(Mode::Create)?.get_store(Temperature::Hot); + let dst_store = dst_opener.open_in_mode(Mode::Create)?.get_hot_store(); const BATCH_SIZE_BYTES: u64 = 150_000_000; diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 358387ae547..32595e631d1 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -48,7 +48,10 @@ use near_primitives::views::{ AccessKeyInfoView, CallResult, QueryRequest, QueryResponse, QueryResponseKind, ViewApplyState, ViewStateResult, }; -use near_store::flat::{store_helper, FlatStorage, FlatStorageManager, FlatStorageStatus}; +use near_store::flat_state::ChainAccessForFlatStorage; +use near_store::flat_state::{ + store_helper, FlatStateFactory, FlatStorageCreationStatus, FlatStorageState, +}; use near_store::metadata::DbKind; use near_store::split_state::get_delayed_receipts; use near_store::{ @@ -86,7 +89,7 @@ pub struct NightshadeRuntime { store: Store, tries: ShardTries, trie_viewer: TrieViewer, - flat_storage_manager: FlatStorageManager, + flat_state_factory: FlatStateFactory, pub runtime: Runtime, epoch_manager: EpochManagerHandle, shard_tracker: ShardTracker, @@ -142,12 +145,12 @@ impl NightshadeRuntime { ); let state_roots = Self::initialize_genesis_state_if_needed(store.clone(), home_dir, genesis); - let flat_storage_manager = FlatStorageManager::new(store.clone()); + let flat_state_factory = FlatStateFactory::new(store.clone()); let tries = ShardTries::new( store.clone(), trie_config, &genesis_config.shard_layout.get_shard_uids(), - flat_storage_manager.clone(), + flat_state_factory.clone(), ); let epoch_manager = EpochManager::new_from_genesis_config(store.clone().into(), &genesis_config) @@ -163,7 +166,7 @@ impl NightshadeRuntime { trie_viewer, epoch_manager, shard_tracker, - flat_storage_manager, + flat_state_factory, genesis_state_roots: state_roots, migration_data: Arc::new(load_migration_data(&genesis.config.chain_id)), gc_num_epochs_to_keep: gc_num_epochs_to_keep.max(MIN_GC_NUM_EPOCHS_TO_KEEP), @@ -270,7 +273,7 @@ impl NightshadeRuntime { store.clone(), TrieConfig::default(), &genesis.config.shard_layout.get_shard_uids(), - FlatStorageManager::new(store), + FlatStateFactory::new(store), ); let runtime = Runtime::new(); let runtime_config_store = @@ -553,6 +556,11 @@ impl NightshadeRuntime { metrics::APPLY_CHUNK_DELAY .with_label_values(&[&format_total_gas_burnt(total_gas_burnt)]) .observe(elapsed.as_secs_f64()); + if total_gas_burnt > 0 { + metrics::SECONDS_PER_PETAGAS + .with_label_values(&[&shard_id.to_string()]) + .observe(elapsed.as_secs_f64() * 1e15 / total_gas_burnt as f64); + } let total_balance_burnt = apply_result .stats .tx_burnt_amount @@ -746,46 +754,43 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(self.tries.get_view_trie_for_shard(shard_uid, state_root)) } - fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option { - self.flat_storage_manager.get_flat_storage_for_shard(shard_uid) + fn get_flat_storage_for_shard(&self, shard_id: ShardId) -> Option { + self.flat_storage_manager.get_flat_storage_for_shard(shard_id) } - fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus { - store_helper::get_flat_storage_status(&self.store, shard_uid) + fn get_flat_storage_creation_status(&self, shard_id: ShardId) -> FlatStorageCreationStatus { + store_helper::get_flat_storage_creation_status(&self.store, shard_id) } // TODO (#7327): consider passing flat storage errors here to handle them gracefully fn create_flat_storage_for_shard(&self, shard_uid: ShardUId) { let flat_storage = FlatStorage::new(self.store.clone(), shard_uid); - self.flat_storage_manager.add_flat_storage_for_shard(shard_uid, flat_storage); + self.flat_storage_manager.add_flat_storage_for_shard(shard_uid.shard_id(), flat_storage); } - fn remove_flat_storage_for_shard( + fn remove_flat_storage_state_for_shard( &self, shard_uid: ShardUId, epoch_id: &EpochId, ) -> Result<(), Error> { let shard_layout = self.get_shard_layout(epoch_id)?; self.flat_storage_manager - .remove_flat_storage_for_shard(shard_uid, shard_layout) + .remove_flat_storage_for_shard(shard_id, shard_layout) .map_err(|e| Error::StorageError(e))?; Ok(()) } - fn set_flat_storage_for_genesis( + fn set_flat_storage_state_for_genesis( &self, genesis_block: &CryptoHash, - genesis_block_height: BlockHeight, genesis_epoch_id: &EpochId, ) -> Result { let mut store_update = self.store.store_update(); for shard_id in 0..self.num_shards(genesis_epoch_id)? { - let shard_uid = self.shard_id_to_uid(shard_id, genesis_epoch_id)?; - self.flat_storage_manager.set_flat_storage_for_genesis( + self.flat_state_factory.set_flat_storage_state_for_genesis( &mut store_update, - shard_uid, + shard_id, genesis_block, - genesis_block_height, ); } Ok(store_update) @@ -1393,10 +1398,8 @@ impl RuntimeAdapter for NightshadeRuntime { let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, epoch_id)?; let mut store_update = tries.store_update(); tries.apply_all(&trie_changes, shard_uid, &mut store_update); - if cfg!(feature = "protocol_feature_flat_state") { - debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len()); - flat_state_delta.apply_to_flat_state(&mut store_update, shard_uid); - } + debug!(target: "chain", %shard_id, "Inserting {} values to flat storage", flat_state_delta.len()); + flat_state_delta.apply_to_flat_state(&mut store_update); self.precompile_contracts(epoch_id, contract_codes)?; Ok(store_update.commit()?) } @@ -1574,7 +1577,6 @@ mod test { use near_epoch_manager::shard_tracker::TrackedConfig; use near_primitives::test_utils::create_test_signer; use near_primitives::types::validator_stake::ValidatorStake; - use near_store::flat::{FlatStateChanges, FlatStateDelta, FlatStateDeltaMetadata}; use num_rational::Ratio; use crate::config::{GenesisExt, TESTING_INIT_BALANCE, TESTING_INIT_STAKE}; @@ -1593,7 +1595,7 @@ mod test { AccountView, CurrentEpochValidatorInfo, EpochValidatorInfo, NextEpochValidatorInfo, ValidatorKickoutView, }; - use near_store::NodeStorage; + use near_store::{flat_state, FlatStateDelta, NodeStorage}; use super::*; @@ -1655,13 +1657,12 @@ mod test { ) .unwrap(); let mut store_update = self.store.store_update(); - let flat_state_changes = - FlatStateChanges::from_state_changes(&result.trie_changes.state_changes()); + let flat_state_delta = + FlatStateDelta::from_state_changes(&result.trie_changes.state_changes()); result.trie_changes.insertions_into(&mut store_update); result.trie_changes.state_changes_into(&mut store_update); - let shard_uid = self.shard_id_to_uid(shard_id, &EpochId::default()).unwrap(); - match self.get_flat_storage_for_shard(shard_uid) { + match self.get_flat_storage_for_shard(shard_id) { Some(flat_storage) => { let delta = FlatStateDelta { changes: flat_state_changes, @@ -1673,7 +1674,9 @@ mod test { }, }, }; - let new_store_update = flat_storage.add_delta(delta).unwrap(); + let new_store_update = flat_storage_state + .add_block(&block_hash, flat_state_delta, block_info) + .unwrap(); store_update.merge(new_store_update); } None => {} @@ -1684,6 +1687,43 @@ mod test { } } + /// Stores chain data for genesis block to initialize flat storage in test environment. + struct MockChainForFlatStorage { + height_to_hashes: HashMap, + blocks: HashMap, + } + + impl ChainAccessForFlatStorage for MockChainForFlatStorage { + fn get_block_info(&self, block_hash: &CryptoHash) -> flat_state::BlockInfo { + self.blocks.get(block_hash).unwrap().clone() + } + + fn get_block_hashes_at_height(&self, block_height: BlockHeight) -> HashSet { + HashSet::from([self.get_block_hash(block_height)]) + } + } + + impl MockChainForFlatStorage { + /// Creates mock chain containing only genesis block data. + pub fn new(genesis_height: BlockHeight, genesis_hash: CryptoHash) -> Self { + Self { + height_to_hashes: HashMap::from([(genesis_height, genesis_hash)]), + blocks: HashMap::from([( + genesis_hash, + flat_state::BlockInfo { + hash: genesis_hash, + height: genesis_height, + prev_hash: CryptoHash::default(), + }, + )]), + } + } + + fn get_block_hash(&self, height: BlockHeight) -> CryptoHash { + *self.height_to_hashes.get(&height).unwrap() + } + } + /// Environment to test runtime behaviour separate from Chain. /// Runtime operates in a mock chain where i-th block is attached to (i-1)-th one, has height `i` and hash /// `hash([i])`. @@ -1787,18 +1827,18 @@ mod test { // Create flat storage. Naturally it happens on Chain creation, but here we test only Runtime behaviour // and use a mock chain, so we need to initialize flat storage manually. - if cfg!(feature = "protocol_feature_flat_state") { - let store_update = runtime - .set_flat_storage_for_genesis(&genesis_hash, 0, &EpochId::default()) - .unwrap(); - store_update.commit().unwrap(); - for shard_id in 0..runtime.num_shards(&EpochId::default()).unwrap() { - let shard_uid = runtime.shard_id_to_uid(shard_id, &EpochId::default()).unwrap(); - assert!(matches!( - runtime.get_flat_storage_status(shard_uid), - FlatStorageStatus::Ready(_) - )); - runtime.create_flat_storage_for_shard(shard_uid); + let store_update = runtime + .set_flat_storage_state_for_genesis(&genesis_hash, &EpochId::default()) + .unwrap(); + store_update.commit().unwrap(); + let mock_chain = MockChainForFlatStorage::new(0, genesis_hash); + for shard_id in 0..runtime.num_shards(&EpochId::default()).unwrap() { + let status = runtime.get_flat_storage_creation_status(shard_id); + if cfg!(feature = "protocol_feature_flat_state") { + assert_eq!(status, FlatStorageCreationStatus::Ready); + runtime.create_flat_storage_state_for_shard(shard_id, 0, &mock_chain); + } else { + assert_eq!(status, FlatStorageCreationStatus::DontCreate); } } @@ -3066,16 +3106,13 @@ mod test { .runtime .get_trie_for_shard(0, &env.head.prev_block_hash, Trie::EMPTY_ROOT, true) .unwrap(); - assert_eq!( - trie.flat_storage_chunk_view.is_some(), - cfg!(feature = "protocol_feature_flat_state") - ); + assert_eq!(trie.flat_state.is_some(), cfg!(feature = "protocol_feature_flat_state")); let trie = env .runtime .get_view_trie_for_shard(0, &env.head.prev_block_hash, Trie::EMPTY_ROOT) .unwrap(); - assert!(trie.flat_storage_chunk_view.is_none()); + assert!(trie.flat_state.is_none()); } /// Check that querying trie and flat state gives the same result. diff --git a/tools/cold-store/src/cli.rs b/tools/cold-store/src/cli.rs index a6a2f976f33..a5f11780d03 100644 --- a/tools/cold-store/src/cli.rs +++ b/tools/cold-store/src/cli.rs @@ -1,16 +1,13 @@ -use crate::cli::SubCommand::CheckStateRoot; use anyhow; -use anyhow::Context; use clap; use near_epoch_manager::EpochManagerAdapter; use near_primitives::block::Tip; use near_primitives::hash::CryptoHash; use near_store::cold_storage::{copy_all_data_to_cold, update_cold_db, update_cold_head}; use near_store::metadata::DbKind; -use near_store::{DBCol, NodeStorage, Store, StoreOpener}; +use near_store::{DBCol, NodeStorage, Store}; use near_store::{COLD_HEAD_KEY, FINAL_HEAD_KEY, HEAD_KEY, TAIL_KEY}; use nearcore::{NearConfig, NightshadeRuntime}; -use rand::seq::SliceRandom; use std::io::Result; use std::path::Path; use std::sync::Arc; @@ -18,10 +15,6 @@ use strum::IntoEnumIterator; #[derive(clap::Parser)] pub struct ColdStoreCommand { - /// By default state viewer opens rocks DB in the read only mode, which allows it to run - /// multiple instances in parallel and be sure that no unintended changes get written to the DB. - #[clap(long, short = 'w')] - readwrite: bool, #[clap(subcommand)] subcmd: SubCommand, } @@ -46,28 +39,23 @@ enum SubCommand { /// - config.cold_store.path points to an existing database with kind Cold /// - store_relative_path points to an existing database with kind Rpc PrepareHot(PrepareHotCmd), - /// Traverse trie and check that every node is in cold db. - /// Can start from given state_root or compute previous roots for every chunk in provided block - /// and use them as starting point. - /// You can provide maximum depth and/or maximum number of vertices to traverse for each root. - /// Trie is traversed using DFS with randomly shuffled kids for every node. - CheckStateRoot(CheckStateRootCmd), } impl ColdStoreCommand { pub fn run(self, home_dir: &Path) -> anyhow::Result<()> { - let mode = - if self.readwrite { near_store::Mode::ReadWrite } else { near_store::Mode::ReadOnly }; - let mut near_config = nearcore::config::load_config( + let near_config = nearcore::config::load_config( &home_dir, near_chain_configs::GenesisValidationMode::Full, ) .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); - let opener = self.get_opener(home_dir, &mut near_config); - - let storage = - opener.open_in_mode(mode).unwrap_or_else(|e| panic!("Error opening storage: {:#}", e)); + let opener = NodeStorage::opener( + home_dir, + true, + &near_config.config.store, + near_config.config.cold_store.as_ref(), + ); + let storage = opener.open().unwrap_or_else(|e| panic!("Error opening storage: {:#}", e)); let hot_runtime = NightshadeRuntime::from_config(home_dir, storage.get_hot_store(), &near_config); @@ -85,53 +73,8 @@ impl ColdStoreCommand { Ok(()) } SubCommand::PrepareHot(cmd) => cmd.run(&storage, &home_dir, &near_config), - SubCommand::CheckStateRoot(cmd) => cmd.run(&storage), } } - - /// Returns opener suitable for subcommand. - /// If subcommand is CheckStateRoot, creates checkpoint for cold db - /// and modifies `near_config.config.cold_store.path` to path to that checkpoint. - /// Then returns opener for dbs at `store.path` and `cold_store.path`. - pub fn get_opener<'a>( - &'a self, - home_dir: &Path, - near_config: &'a mut NearConfig, - ) -> StoreOpener<'a> { - if !near_config.config.archive { - tracing::warn!("Expected archive option in config to be set to true."); - } - - let opener = NodeStorage::opener( - home_dir, - near_config.config.archive, - &near_config.config.store, - near_config.config.cold_store.as_ref(), - ); - - match self.subcmd { - CheckStateRoot(_) => { - let (hot_snapshot, cold_snapshot) = opener - .create_snapshots(near_store::Mode::ReadOnly) - .expect("Failed to create snapshots"); - if let Some(_) = &hot_snapshot.0 { - hot_snapshot.remove().expect("Failed to remove unnecessary hot snapshot"); - } - if let Some(cold_store_config) = near_config.config.cold_store.as_mut() { - cold_store_config.path = - Some(cold_snapshot.0.clone().expect("cold_snapshot should be Some")); - } - } - _ => {} - } - - NodeStorage::opener( - home_dir, - near_config.config.archive, - &near_config.config.store, - near_config.config.cold_store.as_ref(), - ) - } } #[derive(clap::Parser)] @@ -438,196 +381,3 @@ impl PrepareHotCmd { Ok(()) } } - -/// The StateRootSelector is a subcommand that allows the user to select the state root either by block height or by the state root hash. -#[derive(clap::Subcommand)] -enum StateRootSelector { - Height { height: near_primitives::types::BlockHeight }, - Hash { hash: CryptoHash }, -} - -impl StateRootSelector { - pub fn get_hashes( - &self, - storage: &NodeStorage, - cold_store: &Store, - ) -> anyhow::Result> { - match self { - // If height is provided, calculate previous state roots for this block's chunks. - StateRootSelector::Height { height } => { - let hash_key = { - let height_key = height.to_le_bytes(); - storage - .get_hot_store() - .get(DBCol::BlockHeight, &height_key)? - .ok_or(anyhow::anyhow!( - "Failed to find block hash for height {:?}", - height - ))? - .as_slice() - .to_vec() - }; - let block = cold_store - .get_ser::(DBCol::Block, &hash_key)? - .ok_or(anyhow::anyhow!("Failed to find Block: {:?}", hash_key))?; - let mut hashes = vec![]; - for chunk in block.chunks().iter() { - hashes.push( - cold_store - .get_ser::( - DBCol::Chunks, - chunk.chunk_hash().as_bytes(), - )? - .ok_or(anyhow::anyhow!( - "Failed to find Chunk: {:?}", - chunk.chunk_hash() - ))? - .take_header() - .prev_state_root(), - ); - } - Ok(hashes) - } - // If state root is provided, then just use it. - StateRootSelector::Hash { hash } => Ok(vec![*hash]), - } - } -} - -/// Struct that holds all conditions for node in Trie -/// to be checked by CheckStateRootCmd::check_trie. -#[derive(Debug)] -struct PruneCondition { - /// Maximum depth (measured in number of nodes, not trie key length). - max_depth: Option, - /// Maximum number of nodes checked for each state_root. - max_count: Option, -} - -/// Struct that holds data related to pruning of node in CheckStateRootCmd::check_trie. -#[derive(Debug)] -struct PruneState { - /// Depth of node in trie (measured in number of nodes, not trie key length). - depth: u64, - /// Number of already checked nodes. - count: u64, -} - -impl PruneState { - pub fn new() -> Self { - Self { depth: 0, count: 0 } - } - - /// Return `true` if node should be pruned. - pub fn should_prune(&self, condition: &PruneCondition) -> bool { - if let Some(md) = condition.max_depth { - if self.depth > md { - return true; - } - } - if let Some(mc) = condition.max_count { - if self.count > mc { - return true; - } - } - false - } - - /// Modify self to reflect going down a tree. - /// We increment node count, because we are visiting a new node. - pub fn down(&mut self) { - self.count += 1; - self.depth += 1; - } - - /// Modify self to reflect going up a tree. - /// We do not change node count, because we already visited parent node before. - pub fn up(&mut self) { - self.depth -= 1; - } -} - -#[derive(clap::Args)] -struct CheckStateRootCmd { - /// Maximum depth (measured in number of nodes, not trie key length) for checking trie. - #[clap(long)] - max_depth: Option, - /// Maximum number of nodes checked for each state_root. - #[clap(long)] - max_count: Option, - #[clap(subcommand)] - state_root_selector: StateRootSelector, -} - -impl CheckStateRootCmd { - pub fn run(self, storage: &NodeStorage) -> anyhow::Result<()> { - let cold_store = - storage.get_cold_store().ok_or(anyhow::anyhow!("Cold storage is not configured"))?; - - let hashes = self.state_root_selector.get_hashes(storage, &cold_store)?; - for hash in hashes.iter() { - Self::check_trie( - &cold_store, - &hash, - &mut PruneState::new(), - &PruneCondition { max_depth: self.max_depth, max_count: self.max_count }, - )?; - } - - Ok(()) - } - - /// Check that trie subtree of `hash` is fully present in `store`. - fn check_trie( - store: &Store, - hash: &CryptoHash, - prune_state: &mut PruneState, - prune_condition: &PruneCondition, - ) -> anyhow::Result<()> { - tracing::debug!(target: "check_trie", "Checking {:?} at {:?}", hash, prune_state); - if prune_state.should_prune(prune_condition) { - tracing::debug!(target: "check_trie", "Reached prune condition: {:?}", prune_condition); - return Ok(()); - } - - let bytes = Self::read_state(store, hash.as_ref()) - .with_context(|| format!("Failed to read raw bytes for hash {:?}", hash))? - .with_context(|| format!("Failed to find raw bytes for hash {:?}", hash))?; - let node = near_store::RawTrieNodeWithSize::decode(&bytes)?; - match node.node { - near_store::RawTrieNode::Leaf(..) => { - tracing::debug!(target: "check_trie", "Reached leaf node"); - return Ok(()); - } - near_store::RawTrieNode::Branch(mut children, _) => { - children.0.shuffle(&mut rand::thread_rng()); - for (_, child) in children.iter() { - // Record in prune state that we are visiting a child node - prune_state.down(); - // Visit a child node - Self::check_trie(store, child, prune_state, prune_condition)?; - // Record in prune state that we are returning from a child node - prune_state.up(); - } - } - near_store::RawTrieNode::Extension(_, child) => { - // Record in prune state that we are visiting a child node - prune_state.down(); - // Visit a child node - Self::check_trie(store, &child, prune_state, prune_condition)?; - // Record in prune state that we are returning from a child node - prune_state.up(); - } - } - Ok(()) - } - - fn read_state<'a>( - store: &'a Store, - trie_key: &'a [u8], - ) -> std::io::Result>> { - // As cold db strips shard_uid at the beginning of State key, we can add any 8 u8s as prefix. - let cold_state_key = [&[1; 8], trie_key.as_ref()].concat(); - store.get(DBCol::State, &cold_state_key) - } -} diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index baa55daa44a..1b620f84e89 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -105,8 +105,7 @@ impl StateViewerSubCommand { let storage = store_opener.open_in_mode(mode).unwrap(); let store = match temperature { Temperature::Hot => storage.get_hot_store(), - // Cold store on it's own is useless in majority of subcommands - Temperature::Cold => storage.get_split_store().unwrap(), + Temperature::Cold => storage.get_cold_store().unwrap(), }; match self { @@ -410,7 +409,7 @@ impl DumpTxCmd { #[derive(clap::Args)] pub struct EpochInfoCmd { #[clap(subcommand)] - epoch_selection: crate::epoch_info::EpochSelection, + epoch_selection: epoch_info::EpochSelection, /// Displays kickouts of the given validator and expected and missed blocks and chunks produced. #[clap(long)] validator_account_id: Option,