From cd49133c93a01cd5f2ed446b6aba04a9f7ce4f22 Mon Sep 17 00:00:00 2001 From: robin-near Date: Tue, 31 Oct 2023 21:24:11 -0700 Subject: [PATCH] memtrie 4 --- chain/chain/src/chain.rs | 31 ++ chain/chain/src/store.rs | 1 + chain/chain/src/test_utils/kv_runtime.rs | 16 +- chain/chain/src/tests/gc.rs | 1 + chain/chain/src/types.rs | 4 + chain/client/src/test_utils/test_env.rs | 29 +- .../client/src/test_utils/test_env_builder.rs | 9 +- core/primitives/src/errors.rs | 2 + core/store/src/config.rs | 15 + core/store/src/flat/mod.rs | 2 + core/store/src/flat/storage.rs | 102 +---- core/store/src/flat/test_utils.rs | 100 +++++ core/store/src/test_utils.rs | 143 +++++-- core/store/src/trie/accounting_cache.rs | 3 +- core/store/src/trie/config.rs | 19 +- core/store/src/trie/iterator.rs | 4 +- core/store/src/trie/mem/loading.rs | 322 +++++++++++++++- core/store/src/trie/mem/lookup.rs | 35 +- core/store/src/trie/mem/metrics.rs | 11 +- core/store/src/trie/mem/mod.rs | 6 +- core/store/src/trie/mem/updating.rs | 41 +- core/store/src/trie/mod.rs | 166 +++++--- core/store/src/trie/shard_tries.rs | 142 +++++-- core/store/src/trie/state_parts.rs | 6 +- core/store/src/trie/trie_recording.rs | 160 +++++++- core/store/src/trie/trie_tests.rs | 4 +- core/store/src/trie/update.rs | 15 +- integration-tests/src/runtime_utils.rs | 8 +- .../src/tests/client/features.rs | 1 + .../tests/client/features/in_memory_tries.rs | 361 ++++++++++++++++++ .../src/tests/client/state_snapshot.rs | 3 + nearcore/src/runtime/mod.rs | 34 ++ nearcore/src/test_utils.rs | 51 ++- tools/state-viewer/src/contract_accounts.rs | 5 +- tools/state-viewer/src/state_changes.rs | 1 + 35 files changed, 1522 insertions(+), 331 deletions(-) create mode 100644 core/store/src/flat/test_utils.rs create mode 100644 integration-tests/src/tests/client/features/in_memory_tries.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 57e1cf914ef..8ac47b1ea4f 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -702,6 +702,16 @@ impl Chain { }; store_update.commit()?; + // We must load in-memory tries here, and not inside runtime, because + // if we were initializing from genesis, the runtime would be + // initialized when no blocks or flat storage were initialized. We + // require flat storage in order to load in-memory tries. + // TODO(#9511): The calculation of shard UIDs is not precise in the case + // of resharding. We need to revisit this. + let tip = store.head()?; + let shard_uids = epoch_manager.get_shard_layout(&tip.epoch_id)?.get_shard_uids(); + runtime_adapter.load_mem_tries_on_startup(&shard_uids)?; + info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}", header_head.height, header_head.last_block_hash, block_head.height, block_head.last_block_hash); @@ -2413,6 +2423,7 @@ impl Chain { let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?; let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager(); flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?; + self.garbage_collect_memtrie_roots(&block, shard_uid); } } @@ -2459,6 +2470,17 @@ impl Chain { Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance }) } + fn garbage_collect_memtrie_roots(&self, block: &Block, shard_uid: ShardUId) { + let tries = self.runtime_adapter.get_tries(); + let last_final_block = block.header().last_final_block(); + if last_final_block != &CryptoHash::default() { + let header = self.store.get_block_header(last_final_block).unwrap(); + if let Some(prev_height) = header.prev_height() { + tries.delete_memtrie_roots_up_to_height(shard_uid, prev_height); + } + } + } + /// Preprocess a block before applying chunks, verify that we have the necessary information /// to process the block an the block is valid. /// Note that this function does NOT introduce any changes to chain state. @@ -3576,6 +3598,7 @@ impl Chain { let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?; let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager(); flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?; + self.garbage_collect_memtrie_roots(&block, shard_uid); } } @@ -4098,6 +4121,7 @@ impl Chain { )?; let block_hash = *block.hash(); + let block_height = block.header().height(); let challenges_result = block.header().challenges_result().clone(); let block_timestamp = block.header().raw_timestamp(); let next_gas_price = prev_block.header().next_gas_price(); @@ -4143,6 +4167,7 @@ impl Chain { epoch_manager.as_ref(), runtime.as_ref(), &block_hash, + block_height, &prev_block_hash, &apply_result, split_state_roots, @@ -4179,6 +4204,7 @@ impl Chain { let new_extra = self.get_chunk_extra(&prev_block_hash, &shard_uid)?; let block_hash = *block.hash(); + let block_height = block.header().height(); let challenges_result = block.header().challenges_result().clone(); let block_timestamp = block.header().raw_timestamp(); let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?; @@ -4230,6 +4256,7 @@ impl Chain { epoch_manager.as_ref(), runtime.as_ref(), &block_hash, + block_height, &prev_block_hash, &apply_result, split_state_roots, @@ -4263,6 +4290,7 @@ impl Chain { let state_changes = self.store().get_state_changes_for_split_states(block.hash(), shard_id)?; let block_hash = *block.hash(); + let block_height = block.header().height(); Ok(Some(Box::new(move |parent_span| -> Result { let _span = tracing::debug_span!( target: "chain", @@ -4273,6 +4301,7 @@ impl Chain { .entered(); let results = runtime.apply_update_to_split_states( &block_hash, + block_height, split_state_roots, &next_epoch_shard_layout, state_changes, @@ -5166,6 +5195,7 @@ impl<'a> ChainUpdate<'a> { epoch_manager: &dyn EpochManagerAdapter, runtime_adapter: &dyn RuntimeAdapter, block_hash: &CryptoHash, + block_height: BlockHeight, prev_block_hash: &CryptoHash, apply_result: &ApplyTransactionResult, split_state_roots: Option>, @@ -5182,6 +5212,7 @@ impl<'a> ChainUpdate<'a> { if let Some(state_roots) = split_state_roots { let split_state_results = runtime_adapter.apply_update_to_split_states( block_hash, + block_height, state_roots, &next_epoch_shard_layout, state_changes, diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 8432d0764c4..d4113720040 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -3140,6 +3140,7 @@ impl<'a> ChainStoreUpdate<'a> { // from the store. let mut deletions_store_update = self.store().store_update(); for mut wrapped_trie_changes in self.trie_changes.drain(..) { + wrapped_trie_changes.apply_mem_changes(); wrapped_trie_changes.insertions_into(&mut store_update); wrapped_trie_changes.deletions_into(&mut deletions_store_update); wrapped_trie_changes.state_changes_into(&mut store_update); diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 84ff449a541..1e5e2d1cf1f 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -7,6 +7,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_epoch_manager::types::BlockHeaderInfo; use near_epoch_manager::{EpochManagerAdapter, RngSeed}; use near_primitives::state_part::PartId; +use near_store::test_utils::TestTriesBuilder; use num_rational::Ratio; use near_chain_configs::{ProtocolConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP}; @@ -41,7 +42,7 @@ use near_primitives::views::{ QueryRequest, QueryResponse, QueryResponseKind, ViewStateResult, }; use near_store::{ - set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, Store, StoreUpdate, Trie, + set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, StorageError,Store, StoreUpdate, Trie, TrieChanges, WrappedTrieChanges, }; @@ -334,7 +335,10 @@ impl KeyValueRuntime { let num_shards = epoch_manager.num_shards(&EpochId::default()).unwrap(); let epoch_length = epoch_manager.get_epoch_config(&EpochId::default()).unwrap().epoch_length; - let tries = ShardTries::test(store.clone(), num_shards); + let tries = TestTriesBuilder::new() + .with_store(store.clone()) + .with_shard_layout(0, num_shards) + .build(); let mut initial_amounts = HashMap::new(); for (i, validator_stake) in epoch_manager .validators_by_valset @@ -1019,7 +1023,7 @@ impl RuntimeAdapter for KeyValueRuntime { &self, shard_id: ShardId, storage_config: RuntimeStorageConfig, - _height: BlockHeight, + height: BlockHeight, _block_timestamp: u64, _prev_block_hash: &CryptoHash, block_hash: &CryptoHash, @@ -1172,6 +1176,7 @@ impl RuntimeAdapter for KeyValueRuntime { TrieChanges::empty(state_root), Default::default(), *block_hash, + height, ), new_root: state_root, outcomes: tx_results, @@ -1363,10 +1368,15 @@ impl RuntimeAdapter for KeyValueRuntime { fn apply_update_to_split_states( &self, _block_hash: &CryptoHash, + _block_height: BlockHeight, _state_roots: HashMap, _next_shard_layout: &ShardLayout, _state_changes: StateChangesForSplitStates, ) -> Result, Error> { Ok(vec![]) } + + fn load_mem_tries_on_startup(&self, _shard_uids: &[ShardUId]) -> Result<(), StorageError> { + Ok(()) + } } diff --git a/chain/chain/src/tests/gc.rs b/chain/chain/src/tests/gc.rs index d120f5a8566..898894d8396 100644 --- a/chain/chain/src/tests/gc.rs +++ b/chain/chain/src/tests/gc.rs @@ -139,6 +139,7 @@ fn do_fork( trie_changes, Default::default(), *block.hash(), + block.header().height(), ); store_update.save_trie_changes(wrapped_trie_changes); diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 8abb046a387..45bee8a06f2 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -6,6 +6,7 @@ use chrono::Utc; use near_chain_configs::StateSplitConfig; use near_primitives::sandbox::state_patch::SandboxStatePatch; use near_store::flat::FlatStorageManager; +use near_store::StorageError; use num_rational::Rational32; use near_chain_configs::{Genesis, ProtocolConfig}; @@ -392,6 +393,7 @@ pub trait RuntimeAdapter: Send + Sync { fn apply_update_to_split_states( &self, block_hash: &CryptoHash, + block_height: BlockHeight, state_roots: HashMap, next_shard_layout: &ShardLayout, state_changes: StateChangesForSplitStates, @@ -426,6 +428,8 @@ pub trait RuntimeAdapter: Send + Sync { ) -> bool; fn get_protocol_config(&self, epoch_id: &EpochId) -> Result; + + fn load_mem_tries_on_startup(&self, shard_uids: &[ShardUId]) -> Result<(), StorageError>; } /// The last known / checked height and time when we have processed it. diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 6c8020ab9c4..522d6a06cb3 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -22,7 +22,6 @@ use near_primitives::epoch_manager::RngSeed; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::runtime::config::RuntimeConfig; -use near_primitives::shard_layout::ShardUId; use near_primitives::sharding::PartialEncodedChunk; use near_primitives::test_utils::create_test_signer; use near_primitives::transaction::{Action, FunctionCallAction, SignedTransaction}; @@ -315,13 +314,17 @@ impl TestEnv { } pub fn query_account(&mut self, account_id: AccountId) -> AccountView { - let head = self.clients[0].chain.head().unwrap(); - let last_block = self.clients[0].chain.get_block(&head.last_block_hash).unwrap(); - let last_chunk_header = &last_block.chunks()[0]; - let response = self.clients[0] + let client = &self.clients[0]; + let head = client.chain.head().unwrap(); + let last_block = client.chain.get_block(&head.last_block_hash).unwrap(); + let shard_id = + client.epoch_manager.account_id_to_shard_id(&account_id, &head.epoch_id).unwrap(); + let shard_uid = client.epoch_manager.shard_id_to_uid(shard_id, &head.epoch_id).unwrap(); + let last_chunk_header = &last_block.chunks()[shard_id as usize]; + let response = client .runtime_adapter .query( - ShardUId::single_shard(), + shard_uid, &last_chunk_header.prev_state_root(), last_block.header().height(), last_block.header().raw_timestamp(), @@ -338,13 +341,17 @@ impl TestEnv { } pub fn query_state(&mut self, account_id: AccountId) -> Vec { - let head = self.clients[0].chain.head().unwrap(); - let last_block = self.clients[0].chain.get_block(&head.last_block_hash).unwrap(); - let last_chunk_header = &last_block.chunks()[0]; - let response = self.clients[0] + let client = &self.clients[0]; + let head = client.chain.head().unwrap(); + let last_block = client.chain.get_block(&head.last_block_hash).unwrap(); + let shard_id = + client.epoch_manager.account_id_to_shard_id(&account_id, &head.epoch_id).unwrap(); + let shard_uid = client.epoch_manager.shard_id_to_uid(shard_id, &head.epoch_id).unwrap(); + let last_chunk_header = &last_block.chunks()[shard_id as usize]; + let response = client .runtime_adapter .query( - ShardUId::single_shard(), + shard_uid, &last_chunk_header.prev_state_root(), last_block.header().height(), last_block.header().raw_timestamp(), diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index b122a6bd49c..495e075c94f 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -19,7 +19,7 @@ use near_network::test_utils::MockPeerManagerAdapter; use near_primitives::epoch_manager::{AllEpochConfigTestOverrides, RngSeed}; use near_primitives::types::{AccountId, NumShards}; use near_store::test_utils::create_test_store; -use near_store::{NodeStorage, ShardUId, Store, StoreConfig}; +use near_store::{NodeStorage, ShardUId, Store, StoreConfig, TrieConfig}; use super::setup::{setup_client_with_runtime, setup_synchronous_shards_manager}; use super::test_env::TestEnv; @@ -299,11 +299,13 @@ impl TestEnvBuilder { pub fn internal_initialize_nightshade_runtimes( self, runtime_configs: Vec, + trie_configs: Vec, nightshade_runtime_creator: impl Fn( PathBuf, Store, Arc, RuntimeConfigStore, + TrieConfig, ) -> Arc, ) -> Self { let builder = self.ensure_home_dirs().ensure_epoch_managers().ensure_stores(); @@ -312,15 +314,16 @@ impl TestEnvBuilder { builder.stores.clone().unwrap(), builder.epoch_managers.clone().unwrap(), runtime_configs, + trie_configs, )) - .map(|(home_dir, store, epoch_manager, runtime_config)| { + .map(|(home_dir, store, epoch_manager, runtime_config, trie_config)| { let epoch_manager = match epoch_manager { EpochManagerKind::Mock(_) => { panic!("NightshadeRuntime can only be instantiated with EpochManagerHandle") } EpochManagerKind::Handle(handle) => handle, }; - nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config) + nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config, trie_config) }) .collect(); builder.runtimes(runtimes) diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 16ba9beee1c..1fe4434d070 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -111,6 +111,8 @@ pub enum StorageError { /// We guarantee that such block cannot become final, thus block processing /// must resume normally. FlatStorageBlockNotSupported(String), + /// In-memory trie could not be loaded for some reason. + MemTrieLoadingError(String), } impl std::fmt::Display for StorageError { diff --git a/core/store/src/config.rs b/core/store/src/config.rs index 61b9a2149ee..d2ae7c1e4e1 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -66,6 +66,17 @@ pub struct StoreConfig { /// This config option is temporary and will be removed once flat storage is implemented. pub sweat_prefetch_senders: Vec, + /// List of shard UIDs for which we should load the tries in memory. If resharding happens, + /// any descendant shard of these shards are loaded into memory. + pub load_mem_tries_for_shards: Vec, + /// If true, load mem tries for all shards. + pub load_mem_tries_for_all_shards: bool, + /// Maximum size, in number of bytes, of a single shard in memory. + /// This amount is reserved upfront with mmap. If the machine does not have + /// that much RAM, enable memory overcommit. The actual memory usage is only + /// the real size of the loaded tries. + pub max_mem_tries_size_per_shard: usize, + /// Path where to create RocksDB checkpoints during database migrations or /// `false` to disable that feature. /// @@ -262,6 +273,10 @@ impl Default for StoreConfig { "sweat_the_oracle.testnet".to_owned(), ], + load_mem_tries_for_shards: vec!["s3.v1".to_owned()], + load_mem_tries_for_all_shards: false, + max_mem_tries_size_per_shard: 16 * 1024 * 1024 * 1024, + migration_snapshot: Default::default(), // We checked that this number of threads doesn't impact diff --git a/core/store/src/flat/mod.rs b/core/store/src/flat/mod.rs index 5e30aa71efe..d195bf7e3ee 100644 --- a/core/store/src/flat/mod.rs +++ b/core/store/src/flat/mod.rs @@ -32,6 +32,8 @@ mod manager; mod metrics; mod storage; pub mod store_helper; +#[cfg(test)] +pub mod test_utils; mod types; pub use chunk_view::FlatStorageChunkView; diff --git a/core/store/src/flat/storage.rs b/core/store/src/flat/storage.rs index 077d59345dc..2dab259aa69 100644 --- a/core/store/src/flat/storage.rs +++ b/core/store/src/flat/storage.rs @@ -487,7 +487,8 @@ mod tests { }; use crate::flat::manager::FlatStorageManager; use crate::flat::storage::FlatStorageInner; - use crate::flat::types::{BlockInfo, FlatStorageError}; + use crate::flat::test_utils::MockChain; + use crate::flat::types::FlatStorageError; use crate::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus}; use crate::test_utils::create_test_store; use crate::StorageError; @@ -501,105 +502,6 @@ mod tests { use rand::{thread_rng, Rng}; use std::collections::HashMap; - struct MockChain { - height_to_hashes: HashMap, - blocks: HashMap, - head_height: BlockHeight, - } - - impl MockChain { - fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo { - *self.blocks.get(block_hash).unwrap() - } - - fn block_hash(height: BlockHeight) -> CryptoHash { - hash(&borsh::to_vec(&height).unwrap()) - } - - /// Build a chain with given set of heights and a function mapping block heights to heights of their parents. - fn build( - heights: Vec, - get_parent: fn(BlockHeight) -> Option, - ) -> MockChain { - let height_to_hashes: HashMap<_, _> = heights - .iter() - .cloned() - .map(|height| (height, MockChain::block_hash(height))) - .collect(); - let blocks = heights - .iter() - .cloned() - .map(|height| { - let hash = *height_to_hashes.get(&height).unwrap(); - let prev_hash = match get_parent(height) { - None => CryptoHash::default(), - Some(parent_height) => *height_to_hashes.get(&parent_height).unwrap(), - }; - (hash, BlockInfo { hash, height, prev_hash }) - }) - .collect(); - MockChain { height_to_hashes, blocks, head_height: *heights.last().unwrap() } - } - - // Create a chain with no forks with length n. - fn linear_chain(n: usize) -> MockChain { - Self::build( - (0..n as BlockHeight).collect(), - |i| if i == 0 { None } else { Some(i - 1) }, - ) - } - - // Create a linear chain of length n where blocks with odd numbers are skipped: - // 0 -> 2 -> 4 -> ... - fn linear_chain_with_skips(n: usize) -> MockChain { - Self::build((0..n as BlockHeight).map(|i| i * 2).collect(), |i| { - if i == 0 { - None - } else { - Some(i - 2) - } - }) - } - - // Create a chain with two forks, where blocks 1 and 2 have a parent block 0, and each next block H - // has a parent block H-2: - // 0 |-> 1 -> 3 -> 5 -> ... - // --> 2 -> 4 -> 6 -> ... - fn chain_with_two_forks(n: usize) -> MockChain { - Self::build((0..n as BlockHeight).collect(), |i| { - if i == 0 { - None - } else { - Some(i.max(2) - 2) - } - }) - } - - fn get_block_hash(&self, height: BlockHeight) -> CryptoHash { - *self.height_to_hashes.get(&height).unwrap() - } - - fn get_block(&self, height: BlockHeight) -> BlockInfo { - self.blocks[&self.height_to_hashes[&height]] - } - - /// create a new block on top the current chain head, return the new block hash - fn create_block(&mut self) -> CryptoHash { - let hash = MockChain::block_hash(self.head_height + 1); - self.height_to_hashes.insert(self.head_height + 1, hash); - self.blocks.insert( - hash, - BlockInfo { - hash, - height: self.head_height + 1, - prev_hash: self.get_block_hash(self.head_height), - }, - ); - self.head_height += 1; - hash - } - } - #[test] fn flat_storage_errors() { // Create a chain with two forks. Set flat head to be at block 0. diff --git a/core/store/src/flat/test_utils.rs b/core/store/src/flat/test_utils.rs new file mode 100644 index 00000000000..5f5b3a0e0dd --- /dev/null +++ b/core/store/src/flat/test_utils.rs @@ -0,0 +1,100 @@ +use super::BlockInfo; +use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::types::BlockHeight; +use std::collections::HashMap; + +pub struct MockChain { + height_to_hashes: HashMap, + blocks: HashMap, + head_height: BlockHeight, +} + +impl MockChain { + pub fn get_block_info(&self, block_hash: &CryptoHash) -> BlockInfo { + *self.blocks.get(block_hash).unwrap() + } + + pub fn block_hash(height: BlockHeight) -> CryptoHash { + hash(&borsh::to_vec(&height).unwrap()) + } + + /// Build a chain with given set of heights and a function mapping block heights to heights of their parents. + pub fn build( + heights: Vec, + get_parent: fn(BlockHeight) -> Option, + ) -> MockChain { + let height_to_hashes: HashMap<_, _> = + heights.iter().cloned().map(|height| (height, MockChain::block_hash(height))).collect(); + let blocks = heights + .iter() + .cloned() + .map(|height| { + let hash = *height_to_hashes.get(&height).unwrap(); + let prev_hash = match get_parent(height) { + None => CryptoHash::default(), + Some(parent_height) => *height_to_hashes.get(&parent_height).unwrap(), + }; + (hash, BlockInfo { hash, height, prev_hash }) + }) + .collect(); + MockChain { height_to_hashes, blocks, head_height: *heights.last().unwrap() } + } + + // Create a chain with no forks with length n. + pub fn linear_chain(n: usize) -> MockChain { + Self::build((0..n as BlockHeight).collect(), |i| if i == 0 { None } else { Some(i - 1) }) + } + + // Create a linear chain of length n where blocks with odd numbers are skipped: + // 0 -> 2 -> 4 -> ... + pub fn linear_chain_with_skips(n: usize) -> MockChain { + Self::build((0..n as BlockHeight).map(|i| i * 2).collect(), |i| { + if i == 0 { + None + } else { + Some(i - 2) + } + }) + } + + // Create a chain with two forks, where blocks 1 and 2 have a parent block 0, and each next block H + // has a parent block H-2: + // 0 |-> 1 -> 3 -> 5 -> ... + // --> 2 -> 4 -> 6 -> ... + pub fn chain_with_two_forks(n: usize) -> MockChain { + Self::build( + (0..n as BlockHeight).collect(), + |i| { + if i == 0 { + None + } else { + Some(i.max(2) - 2) + } + }, + ) + } + + pub fn get_block_hash(&self, height: BlockHeight) -> CryptoHash { + *self.height_to_hashes.get(&height).unwrap() + } + + pub fn get_block(&self, height: BlockHeight) -> BlockInfo { + self.blocks[&self.height_to_hashes[&height]] + } + + /// create a new block on top the current chain head, return the new block hash + pub fn create_block(&mut self) -> CryptoHash { + let hash = MockChain::block_hash(self.head_height + 1); + self.height_to_hashes.insert(self.head_height + 1, hash); + self.blocks.insert( + hash, + BlockInfo { + hash, + height: self.head_height + 1, + prev_hash: self.get_block_hash(self.head_height), + }, + ); + self.head_height += 1; + hash + } +} diff --git a/core/store/src/test_utils.rs b/core/store/src/test_utils.rs index 795f309abc5..0aeecb1397d 100644 --- a/core/store/src/test_utils.rs +++ b/core/store/src/test_utils.rs @@ -1,22 +1,25 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use itertools::Itertools; -use near_primitives::state::{FlatStateValue, ValueRef}; -use near_primitives::trie_key::TrieKey; -use rand::seq::SliceRandom; -use rand::Rng; - use crate::db::TestDB; -use crate::flat::{store_helper, BlockInfo, FlatStorageReadyStatus, FlatStorageStatus}; +use crate::flat::{ + store_helper, BlockInfo, FlatStorageManager, FlatStorageReadyStatus, FlatStorageStatus, +}; use crate::metadata::{DbKind, DbVersion, DB_VERSION}; -use crate::{get, get_delayed_receipt_indices, DBCol, NodeStorage, ShardTries, Store}; +use crate::{ + get, get_delayed_receipt_indices, DBCol, NodeStorage, ShardTries, StateSnapshotConfig, Store, + TrieConfig, +}; +use itertools::Itertools; use near_primitives::account::id::AccountId; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::receipt::{DataReceipt, Receipt, ReceiptEnum}; use near_primitives::shard_layout::{ShardUId, ShardVersion}; +use near_primitives::state::{FlatStateValue, ValueRef}; +use near_primitives::trie_key::TrieKey; use near_primitives::types::{NumShards, StateRoot}; +use rand::seq::SliceRandom; +use rand::Rng; +use std::collections::HashMap; use std::str::from_utf8; +use std::sync::Arc; /// Creates an in-memory node storage. /// @@ -61,44 +64,100 @@ pub fn create_test_store() -> Store { create_test_node_storage(DB_VERSION, DbKind::RPC).get_hot_store() } -/// Creates a Trie using an in-memory database. -pub fn create_tries() -> ShardTries { - create_tries_complex(0, 1) +pub struct TestTriesBuilder { + store: Option, + shard_version: ShardVersion, + num_shards: NumShards, + enable_flat_storage: bool, + enable_in_memory_tries: bool, } -pub fn create_tries_complex(shard_version: ShardVersion, num_shards: NumShards) -> ShardTries { - let store = create_test_store(); - ShardTries::test_shard_version(store, shard_version, num_shards) -} +impl TestTriesBuilder { + pub fn new() -> Self { + Self { + store: None, + shard_version: 0, + num_shards: 1, + enable_flat_storage: false, + enable_in_memory_tries: false, + } + } -pub fn create_tries_with_flat_storage() -> ShardTries { - create_tries_complex_with_flat_storage(0, 1) -} + pub fn with_store(mut self, store: Store) -> Self { + self.store = Some(store); + self + } -pub fn create_tries_complex_with_flat_storage( - shard_version: ShardVersion, - num_shards: NumShards, -) -> ShardTries { - let tries = create_tries_complex(shard_version, num_shards); - let mut store_update = tries.store_update(); - for shard_id in 0..num_shards { - let shard_uid = ShardUId { version: shard_version, shard_id: shard_id.try_into().unwrap() }; - store_helper::set_flat_storage_status( - &mut store_update, - shard_uid, - FlatStorageStatus::Ready(FlatStorageReadyStatus { - flat_head: BlockInfo::genesis(CryptoHash::default(), 0), - }), - ); + pub fn with_shard_layout(mut self, shard_version: ShardVersion, num_shards: NumShards) -> Self { + self.shard_version = shard_version; + self.num_shards = num_shards; + self } - store_update.commit().unwrap(); - let flat_storage_manager = tries.get_flat_storage_manager(); - for shard_id in 0..num_shards { - let shard_uid = ShardUId { version: shard_version, shard_id: shard_id.try_into().unwrap() }; - flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + pub fn with_flat_storage(mut self) -> Self { + self.enable_flat_storage = true; + self } - tries + + pub fn with_in_memory_tries(mut self) -> Self { + self.enable_in_memory_tries = true; + self + } + + pub fn build(self) -> ShardTries { + let store = self.store.unwrap_or_else(create_test_store); + let shard_uids = (0..self.num_shards) + .map(|shard_id| ShardUId { shard_id: shard_id as u32, version: self.shard_version }) + .collect::>(); + let flat_storage_manager = FlatStorageManager::new(store.clone()); + let tries = ShardTries::new( + store, + TrieConfig { + load_mem_tries_for_all_shards: self.enable_in_memory_tries, + max_mem_tries_size_per_shard: 100 * 1024 * 1024, + ..Default::default() + }, + &shard_uids, + flat_storage_manager, + StateSnapshotConfig::default(), + ); + if self.enable_flat_storage { + let mut store_update = tries.store_update(); + for shard_id in 0..self.num_shards { + let shard_uid = ShardUId { + version: self.shard_version, + shard_id: shard_id.try_into().unwrap(), + }; + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { + flat_head: BlockInfo::genesis(CryptoHash::default(), 0), + }), + ); + } + store_update.commit().unwrap(); + + let flat_storage_manager = tries.get_flat_storage_manager(); + for shard_id in 0..self.num_shards { + let shard_uid = ShardUId { + version: self.shard_version, + shard_id: shard_id.try_into().unwrap(), + }; + flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); + } + } + if self.enable_in_memory_tries { + tries.load_mem_tries_for_enabled_shards(&shard_uids).unwrap(); + } + tries + } +} + +/// Creates a Trie using an in-memory database. +/// Deprecated. New users please use TestTriesBuilder::new().build(). +pub fn create_tries() -> ShardTries { + TestTriesBuilder::new().build() } pub fn test_populate_trie( diff --git a/core/store/src/trie/accounting_cache.rs b/core/store/src/trie/accounting_cache.rs index 153efc7e1bd..92a2c502766 100644 --- a/core/store/src/trie/accounting_cache.rs +++ b/core/store/src/trie/accounting_cache.rs @@ -1,3 +1,4 @@ +use crate::{metrics, TrieStorage}; use near_o11y::metrics::prometheus; use near_o11y::metrics::prometheus::core::{GenericCounter, GenericGauge}; use near_primitives::errors::StorageError; @@ -7,8 +8,6 @@ use near_vm_runner::logic::TrieNodesCount; use std::collections::HashMap; use std::sync::Arc; -use crate::{metrics, TrieStorage}; - /// Deterministic cache to store trie nodes that have been accessed so far /// during the cache's lifetime. It is used for deterministic gas accounting /// so that previously accessed trie nodes and values are charged at a diff --git a/core/store/src/trie/config.rs b/core/store/src/trie/config.rs index 59669b41305..2b9e8343a57 100644 --- a/core/store/src/trie/config.rs +++ b/core/store/src/trie/config.rs @@ -1,5 +1,6 @@ use crate::config::TrieCacheConfig; use crate::StoreConfig; +use near_primitives::shard_layout::ShardUId; use near_primitives::types::AccountId; use std::str::FromStr; use tracing::error; @@ -20,7 +21,7 @@ pub(crate) const DEFAULT_SHARD_CACHE_DELETIONS_QUEUE_CAPACITY: usize = const TRIE_LIMIT_CACHED_VALUE_SIZE: usize = 1000; /// Stores necessary configuration for the creation of tries. -#[derive(Default)] +#[derive(Clone, Default)] pub struct TrieConfig { pub shard_cache_config: TrieCacheConfig, pub view_shard_cache_config: TrieCacheConfig, @@ -30,6 +31,14 @@ pub struct TrieConfig { pub sweat_prefetch_receivers: Vec, /// List of allowed predecessor accounts for SWEAT prefetching. pub sweat_prefetch_senders: Vec, + /// List of shards we will load into memory. + pub load_mem_tries_for_shards: Vec, + pub load_mem_tries_for_all_shards: bool, + /// Maximum size, in number of bytes, of a single shard in memory. + /// This amount is reserved upfront with mmap. If the machine does not have + /// that much RAM, enable memory overcommit. The actual memory usage is only + /// the real size of the loaded tries. + pub max_mem_tries_size_per_shard: usize, } impl TrieConfig { @@ -53,6 +62,14 @@ impl TrieConfig { Err(e) => error!(target: "config", "invalid account id {account}: {e}"), } } + for shard_uid in &config.load_mem_tries_for_shards { + match ShardUId::from_str(shard_uid) { + Ok(shard_uid) => this.load_mem_tries_for_shards.push(shard_uid), + Err(e) => error!(target: "config", "invalid shard uid {shard_uid}: {e}"), + } + } + this.load_mem_tries_for_all_shards = config.load_mem_tries_for_all_shards; + this.max_mem_tries_size_per_shard = config.max_mem_tries_size_per_shard; this } diff --git a/core/store/src/trie/iterator.rs b/core/store/src/trie/iterator.rs index ac97f6e507d..537a2a6b789 100644 --- a/core/store/src/trie/iterator.rs +++ b/core/store/src/trie/iterator.rs @@ -433,7 +433,7 @@ mod tests { use rand::Rng; use crate::test_utils::{ - create_tries, create_tries_complex, gen_changes, simplify_changes, test_populate_trie, + create_tries, gen_changes, simplify_changes, test_populate_trie, TestTriesBuilder, }; use crate::trie::iterator::IterStep; use crate::trie::nibble_slice::NibbleSlice; @@ -613,7 +613,7 @@ mod tests { fn gen_random_trie( rng: &mut rand::rngs::ThreadRng, ) -> (Vec<(Vec, Option>)>, BTreeMap, Vec>, Trie) { - let tries = create_tries_complex(1, 2); + let tries = TestTriesBuilder::new().with_shard_layout(1, 2).build(); let shard_uid = ShardUId { version: 1, shard_id: 0 }; let trie_changes = gen_changes(rng, 10); let trie_changes = simplify_changes(&trie_changes); diff --git a/core/store/src/trie/mem/loading.rs b/core/store/src/trie/mem/loading.rs index 026fe380f5c..f878e8970a1 100644 --- a/core/store/src/trie/mem/loading.rs +++ b/core/store/src/trie/mem/loading.rs @@ -1,13 +1,20 @@ use super::node::MemTrieNodeId; use super::MemTries; -use crate::flat::store_helper::decode_flat_state_db_key; +use crate::flat::store_helper::{ + decode_flat_state_db_key, get_all_deltas_metadata, get_delta_changes, get_flat_storage_status, +}; +use crate::flat::{FlatStorageError, FlatStorageStatus}; use crate::trie::mem::construction::TrieConstructor; +use crate::trie::mem::updating::apply_memtrie_changes; use crate::{DBCol, Store}; +use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::ShardUId; +use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; use near_primitives::state::FlatStateValue; +use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::BlockHeight; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use std::collections::BTreeSet; use std::time::Instant; use tracing::{debug, info}; @@ -19,10 +26,10 @@ pub fn load_trie_from_flat_state( state_root: CryptoHash, block_height: BlockHeight, maximum_arena_size: usize, -) -> anyhow::Result { +) -> Result { let mut tries = MemTries::new(maximum_arena_size, shard_uid); - tries.construct_root(block_height, |arena| -> anyhow::Result> { + tries.construct_root(block_height, |arena| -> Result, StorageError> { info!(target: "memtrie", shard_uid=%shard_uid, "Loading trie from flat state..."); let load_start = Instant::now(); let mut recon = TrieConstructor::new(arena); @@ -30,8 +37,13 @@ pub fn load_trie_from_flat_state( for item in store .iter_prefix_ser::(DBCol::FlatState, &borsh::to_vec(&shard_uid).unwrap()) { - let (key, value) = item?; - let (_, key) = decode_flat_state_db_key(&key)?; + let (key, value) = item.map_err(|err| { + FlatStorageError::StorageInternalError(format!("Error iterating over FlatState: {err}")) + })?; + let (_, key) = decode_flat_state_db_key(&key).map_err(|err| { + FlatStorageError::StorageInternalError(format!( + "invalid FlatState key format: {err}" + ))})?; recon.add_leaf(&key, value); num_keys_loaded += 1; if num_keys_loaded % 1000000 == 0 { @@ -75,21 +87,119 @@ pub fn load_trie_from_flat_state( Ok(tries) } +fn get_state_root( + store: &Store, + block_hash: CryptoHash, + shard_uid: ShardUId, +) -> Result { + let chunk_extra = store + .get_ser::(DBCol::ChunkExtra, &get_block_shard_uid(&block_hash, &shard_uid)) + .map_err(|err| { + StorageError::StorageInconsistentState(format!( + "Cannot fetch ChunkExtra for block {} in shard {}: {:?}", + block_hash, shard_uid, err + )) + })? + .ok_or_else(|| { + StorageError::StorageInconsistentState(format!( + "No ChunkExtra for block {} in shard {}", + block_hash, shard_uid + )) + })?; + Ok(*chunk_extra.state_root()) +} + +/// Constructs in-memory tries for the given shard, so that they represent the +/// same information as the flat storage, including the final state and the +/// deltas. The returned tries would contain a root for each block that the +/// flat storage currently has, i.e. one for the final block, and one for each +/// block that flat storage has a delta for, possibly in more than one fork. +pub fn load_trie_from_flat_state_and_delta( + store: &Store, + shard_uid: ShardUId, + maximum_arena_size: usize, +) -> Result { + debug!(target: "memtrie", "Loading base trie for {} from flat state...", shard_uid); + let flat_head = match get_flat_storage_status(&store, shard_uid)? { + FlatStorageStatus::Ready(status) => status.flat_head, + other => { + return Err(StorageError::MemTrieLoadingError(format!( + "Cannot load memtries when flat storage is not ready for shard {}, actual status: {:?}", + shard_uid, other + ))); + } + }; + + let mut mem_tries = load_trie_from_flat_state( + &store, + shard_uid, + get_state_root(store, flat_head.hash, shard_uid)?, + flat_head.height, + maximum_arena_size, + ) + .unwrap(); + + debug!(target: "memtrie", "Loading flat state deltas for {}...", shard_uid); + // We load the deltas in order of height, so that we always have the previous state root + // already loaded. + let mut sorted_deltas: BTreeSet<(BlockHeight, CryptoHash, CryptoHash)> = Default::default(); + for delta in get_all_deltas_metadata(&store, shard_uid).unwrap() { + sorted_deltas.insert((delta.block.height, delta.block.hash, delta.block.prev_hash)); + } + + debug!(target: "memtrie", "{} deltas to apply for {}", sorted_deltas.len(), shard_uid); + for (height, hash, prev_hash) in sorted_deltas.into_iter() { + let delta = get_delta_changes(&store, shard_uid, hash).unwrap(); + if let Some(changes) = delta { + let old_state_root = get_state_root(store, prev_hash, shard_uid)?; + let new_state_root = get_state_root(store, hash, shard_uid)?; + + let mut trie_update = mem_tries.update(old_state_root, false)?; + for (key, value) in changes.0 { + match value { + Some(value) => { + trie_update.insert_memtrie_only(&key, value); + } + None => trie_update.delete(&key), + }; + } + + let mem_trie_changes = trie_update.to_mem_trie_changes_only(); + let new_root_after_apply = + apply_memtrie_changes(&mut mem_tries, &mem_trie_changes, height); + assert_eq!(new_root_after_apply, new_state_root); + } + debug!(target:"memtrie", "Applied memtrie changes for height {}, shard {}", height, shard_uid); + } + + debug!(target: "memtrie", "Done loading for {}", shard_uid); + Ok(mem_tries) +} + #[cfg(test)] mod tests { + use crate::flat::test_utils::MockChain; + use crate::flat::{store_helper, BlockInfo, FlatStorageReadyStatus, FlatStorageStatus}; use crate::test_utils::{ - create_tries, simplify_changes, test_populate_flat_storage, test_populate_trie, + create_test_store, create_tries, simplify_changes, test_populate_flat_storage, + test_populate_trie, TestTriesBuilder, }; use crate::trie::mem::loading::load_trie_from_flat_state; use crate::trie::mem::lookup::memtrie_lookup; - use crate::{KeyLookupMode, NibbleSlice, Trie, TrieUpdate}; + use crate::{DBCol, NibbleSlice, ShardTries, Store, Trie, TrieUpdate}; use near_primitives::hash::CryptoHash; - use near_primitives::shard_layout::ShardUId; + use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; + use near_primitives::state::FlatStateValue; + use near_primitives::trie_key::TrieKey; + use near_primitives::types::chunk_extra::ChunkExtra; + use near_primitives::types::StateChangeCause; use near_vm_runner::logic::TrieNodesCount; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::collections::HashSet; + use super::load_trie_from_flat_state_and_delta; + fn check(keys: Vec>) { let shard_tries = create_tries(); let shard_uid = ShardUId::single_shard(); @@ -120,16 +230,26 @@ mod tests { return; } - let trie_update = TrieUpdate::new(shard_tries.get_trie_for_shard(shard_uid, state_root)); + let trie_update = TrieUpdate::new(shard_tries.get_trie_with_block_hash_for_shard( + shard_uid, + state_root, + &CryptoHash::default(), + false, + )); trie_update.set_trie_cache_mode(near_primitives::types::TrieCacheMode::CachingChunk); let trie = trie_update.trie(); let root = in_memory_trie.get_root(&state_root).unwrap(); let mut cache = HashSet::new(); let mut nodes_count = TrieNodesCount { db_reads: 0, mem_reads: 0 }; for key in keys.iter() { - let actual_value_ref = memtrie_lookup(root, key, Some(&mut cache), &mut nodes_count) - .map(|v| v.to_value_ref()); - let expected_value_ref = trie.get_ref(key, KeyLookupMode::Trie).unwrap(); + let actual_value_ref = memtrie_lookup(root, key, |_, hash, _| { + if cache.insert(hash) { + nodes_count.db_reads += 1; + } else { + nodes_count.mem_reads += 1; + } + }); + let expected_value_ref = trie.get_flat_value(key).unwrap(); assert_eq!(actual_value_ref, expected_value_ref, "{:?}", NibbleSlice::new(key)); assert_eq!(&nodes_count, &trie.get_trie_nodes_count()); } @@ -253,4 +373,180 @@ mod tests { fn test_memtrie_rand_large_data() { check_random(32, 100000, 1); } + + #[test] + fn test_memtrie_load_with_delta() { + let test_key = TrieKey::ContractData { + account_id: "test_account".parse().unwrap(), + key: b"test_key".to_vec(), + }; + let test_val0 = b"test_val0".to_vec(); + let test_val1 = b"test_val1".to_vec(); + let test_val2 = b"test_val2".to_vec(); + let test_val3 = b"test_val3".to_vec(); + let test_val4 = b"test_val4".to_vec(); + + // A chain with two forks. + // 0 |-> 1 -> 3 + // --> 2 -> 4 + let chain = MockChain::chain_with_two_forks(5); + let store = create_test_store(); + let shard_tries = TestTriesBuilder::new().with_store(store.clone()).build(); + let shard_uid = ShardUId { version: 1, shard_id: 1 }; + + // Populate the initial flat storage state at block 0. + let mut store_update = shard_tries.store_update(); + store_helper::set_flat_storage_status( + &mut store_update, + shard_uid, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head: chain.get_block(0) }), + ); + store_helper::set_flat_state_value( + &mut store_update, + shard_uid, + test_key.to_vec(), + Some(FlatStateValue::inlined(&test_val0)), + ); + store_update.commit().unwrap(); + + // Populate the initial trie at block 0 too. + let state_root_0 = test_populate_trie( + &shard_tries, + &Trie::EMPTY_ROOT, + shard_uid, + vec![(test_key.to_vec(), Some(test_val0.clone()))], + ); + write_chunk_extra(&store, chain.get_block(0).hash, shard_uid, state_root_0); + + // Apply four changes to the trie, and for each, a flat storage delta. + let state_root_1 = apply_trie_changes( + &shard_tries, + shard_uid, + state_root_0, + chain.get_block(1), + vec![(test_key.clone(), test_val1.clone())], + ); + write_chunk_extra(&store, chain.get_block(1).hash, shard_uid, state_root_1); + + let state_root_2 = apply_trie_changes( + &shard_tries, + shard_uid, + state_root_0, + chain.get_block(2), + vec![(test_key.clone(), test_val2.clone())], + ); + write_chunk_extra(&store, chain.get_block(2).hash, shard_uid, state_root_2); + + let state_root_3 = apply_trie_changes( + &shard_tries, + shard_uid, + state_root_1, + chain.get_block(3), + vec![(test_key.clone(), test_val3.clone())], + ); + write_chunk_extra(&store, chain.get_block(3).hash, shard_uid, state_root_3); + + let state_root_4 = apply_trie_changes( + &shard_tries, + shard_uid, + state_root_2, + chain.get_block(4), + vec![(test_key.clone(), test_val4.clone())], + ); + write_chunk_extra(&store, chain.get_block(4).hash, shard_uid, state_root_4); + + // Load into memory. It should load the base flat state (block 0), plus all + // four deltas. We'll check against the state roots at each block; they should + // all exist in the loaded memtrie. + let mem_tries = + load_trie_from_flat_state_and_delta(&store, shard_uid, 10 * 1024 * 1024).unwrap(); + + assert_eq!( + memtrie_lookup( + mem_tries.get_root(&state_root_0).unwrap(), + &test_key.to_vec(), + |_, _, _| {} + ), + Some(FlatStateValue::inlined(&test_val0)) + ); + assert_eq!( + memtrie_lookup( + mem_tries.get_root(&state_root_1).unwrap(), + &test_key.to_vec(), + |_, _, _| {} + ), + Some(FlatStateValue::inlined(&test_val1)) + ); + assert_eq!( + memtrie_lookup( + mem_tries.get_root(&state_root_2).unwrap(), + &test_key.to_vec(), + |_, _, _| {} + ), + Some(FlatStateValue::inlined(&test_val2)) + ); + assert_eq!( + memtrie_lookup( + mem_tries.get_root(&state_root_3).unwrap(), + &test_key.to_vec(), + |_, _, _| {} + ), + Some(FlatStateValue::inlined(&test_val3)) + ); + assert_eq!( + memtrie_lookup( + mem_tries.get_root(&state_root_4).unwrap(), + &test_key.to_vec(), + |_, _, _| {} + ), + Some(FlatStateValue::inlined(&test_val4)) + ); + } + + fn apply_trie_changes( + tries: &ShardTries, + shard_uid: ShardUId, + old_state_root: CryptoHash, + block: BlockInfo, + changes: Vec<(TrieKey, Vec)>, + ) -> CryptoHash { + let mut trie_update = tries.new_trie_update(shard_uid, old_state_root); + for (key, value) in changes { + trie_update.set(key, value); + } + trie_update + .commit(StateChangeCause::TransactionProcessing { tx_hash: CryptoHash::default() }); + let (_, trie_changes, state_changes) = trie_update.finalize().unwrap(); + let mut store_update = tries.store_update(); + tries.apply_insertions(&trie_changes, shard_uid, &mut store_update); + store_update.merge( + tries + .get_flat_storage_manager() + .save_flat_state_changes( + block.hash, + block.prev_hash, + block.height, + shard_uid, + &state_changes, + ) + .unwrap(), + ); + store_update.commit().unwrap(); + + trie_changes.new_root + } + + fn write_chunk_extra( + store: &Store, + block_hash: CryptoHash, + shard_uid: ShardUId, + state_root: CryptoHash, + ) { + let chunk_extra = ChunkExtra::new(&state_root, CryptoHash::default(), Vec::new(), 0, 0, 0); + let mut store_update = store.store_update(); + store_update + .set_ser(DBCol::ChunkExtra, &get_block_shard_uid(&block_hash, &shard_uid), &chunk_extra) + .unwrap(); + store_update.commit().unwrap(); + } } diff --git a/core/store/src/trie/mem/lookup.rs b/core/store/src/trie/mem/lookup.rs index b49de56b095..919ecf285ae 100644 --- a/core/store/src/trie/mem/lookup.rs +++ b/core/store/src/trie/mem/lookup.rs @@ -1,37 +1,28 @@ +use super::metrics::MEM_TRIE_NUM_LOOKUPS; use super::node::{MemTrieNodePtr, MemTrieNodeView}; use crate::NibbleSlice; -use near_primitives::hash::CryptoHash; +use near_primitives::hash::{hash, CryptoHash}; use near_primitives::state::FlatStateValue; -use near_vm_runner::logic::TrieNodesCount; -use std::collections::HashSet; /// Performs a lookup in an in-memory trie, while taking care of cache /// accounting for gas calculation purposes. pub fn memtrie_lookup( root: MemTrieNodePtr<'_>, key: &[u8], - mut accessed_cache: Option<&mut HashSet>, - nodes_count: &mut TrieNodesCount, + mut node_access_callback: impl FnMut(bool, CryptoHash, Vec), ) -> Option { + MEM_TRIE_NUM_LOOKUPS.inc(); let mut nibbles = NibbleSlice::new(key); let mut node = root; - loop { - // This logic is carried from on-disk trie. It must remain unchanged, - // because gas accounting is dependent on these counters. - if let Some(cache) = &mut accessed_cache { - if cache.insert(node.view().node_hash()) { - nodes_count.db_reads += 1; - } else { - nodes_count.mem_reads += 1; - } - } else { - nodes_count.db_reads += 1; - } - match node.view() { + let result = loop { + let view = node.view(); + let raw_node_serialized = borsh::to_vec(&view.to_raw_trie_node_with_size()).unwrap(); + node_access_callback(false, view.node_hash(), raw_node_serialized); + match view { MemTrieNodeView::Leaf { extension, value } => { if nibbles == NibbleSlice::from_encoded(extension.raw_slice()).0 { - return Some(value.to_flat_value()); + break value.to_flat_value(); } else { return None; } @@ -58,7 +49,7 @@ pub fn memtrie_lookup( } MemTrieNodeView::BranchWithValue { children, value, .. } => { if nibbles.is_empty() { - return Some(value.to_flat_value()); + break value.to_flat_value(); } let first = nibbles.at(0); nibbles = nibbles.mid(1); @@ -68,5 +59,9 @@ pub fn memtrie_lookup( }; } } + }; + if let FlatStateValue::Inlined(value) = &result { + node_access_callback(true, hash(value), value.clone()); } + Some(result) } diff --git a/core/store/src/trie/mem/metrics.rs b/core/store/src/trie/mem/metrics.rs index 26b67ab2f4d..ce1d76c78e8 100644 --- a/core/store/src/trie/mem/metrics.rs +++ b/core/store/src/trie/mem/metrics.rs @@ -1,5 +1,6 @@ use near_o11y::metrics::{ - try_create_int_counter_vec, try_create_int_gauge_vec, IntCounterVec, IntGaugeVec, + try_create_int_counter, try_create_int_counter_vec, try_create_int_gauge_vec, IntCounter, + IntCounterVec, IntGaugeVec, }; use once_cell::sync::Lazy; @@ -20,3 +21,11 @@ pub static MEM_TRIE_NUM_NODES_CREATED_FROM_UPDATES: Lazy = Lazy:: ) .unwrap() }); + +pub static MEM_TRIE_NUM_LOOKUPS: Lazy = Lazy::new(|| { + try_create_int_counter( + "near_mem_trie_num_lookups", + "Number of in-memory trie lookups (number of keys looked up)", + ) + .unwrap() +}); diff --git a/core/store/src/trie/mem/mod.rs b/core/store/src/trie/mem/mod.rs index d844eb7fad9..988fc41852f 100644 --- a/core/store/src/trie/mem/mod.rs +++ b/core/store/src/trie/mem/mod.rs @@ -13,7 +13,7 @@ mod construction; mod flexible_data; pub mod loading; pub mod lookup; -mod metrics; +pub mod metrics; pub mod node; pub mod updating; @@ -86,7 +86,7 @@ impl MemTries { self.roots.entry(state_root).or_default().push(mem_root); } MEM_TRIE_NUM_ROOTS - .with_label_values(&[&self.shard_uid.shard_id.to_string()]) + .with_label_values(&[&self.shard_uid.to_string()]) .set(self.roots.len() as i64); } @@ -131,7 +131,7 @@ impl MemTries { debug_assert!(false, "Deleting non-existent root: {}", state_root); } MEM_TRIE_NUM_ROOTS - .with_label_values(&[&self.shard_uid.shard_id.to_string()]) + .with_label_values(&[&self.shard_uid.to_string()]) .set(self.roots.len() as i64); } diff --git a/core/store/src/trie/mem/updating.rs b/core/store/src/trie/mem/updating.rs index 6161420a681..833b6ebac9d 100644 --- a/core/store/src/trie/mem/updating.rs +++ b/core/store/src/trie/mem/updating.rs @@ -743,7 +743,6 @@ impl<'a> MemTrieUpdate<'a> { /// Converts the changes to memtrie changes. Also returns the list of new nodes inserted, /// in hash and serialized form. fn to_mem_trie_changes_internal( - block_height: BlockHeight, shard_uid: String, arena: &ArenaMemory, updated_nodes: Vec>, @@ -762,7 +761,7 @@ impl<'a> MemTrieUpdate<'a> { .map(|(node_id, hash, _)| (*node_id, *hash)) .collect(); ( - MemTrieChanges { node_ids_with_hashes, updated_nodes, block_height }, + MemTrieChanges { node_ids_with_hashes, updated_nodes }, nodes_hashes_and_serialized .into_iter() .map(|(_, hash, serialized)| (hash, serialized)) @@ -771,20 +770,20 @@ impl<'a> MemTrieUpdate<'a> { } /// Converts the updates to memtrie changes only. - pub fn to_mem_trie_changes_only(self, block_height: BlockHeight) -> MemTrieChanges { + pub fn to_mem_trie_changes_only(self) -> MemTrieChanges { let Self { arena, updated_nodes, shard_uid, .. } = self; let (mem_trie_changes, _) = - Self::to_mem_trie_changes_internal(block_height, shard_uid, arena, updated_nodes); + Self::to_mem_trie_changes_internal(shard_uid, arena, updated_nodes); mem_trie_changes } /// Converts the updates to trie changes as well as memtrie changes. - pub fn to_trie_changes(self, block_height: BlockHeight) -> TrieChanges { + pub fn to_trie_changes(self) -> TrieChanges { let Self { root, arena, shard_uid, trie_refcount_changes, updated_nodes } = self; let mut trie_refcount_changes = trie_refcount_changes.expect("Cannot to_trie_changes for memtrie changes only"); let (mem_trie_changes, hashes_and_serialized) = - Self::to_mem_trie_changes_internal(block_height, shard_uid, arena, updated_nodes); + Self::to_mem_trie_changes_internal(shard_uid, arena, updated_nodes); // We've accounted for the dereferenced nodes, as well as value addition/subtractions. // The only thing left is to increment refcount for all new nodes. @@ -809,9 +808,13 @@ impl<'a> MemTrieUpdate<'a> { /// Applies the given memtrie changes to the in-memory trie data structure. /// Returns the new root hash. -pub fn apply_memtrie_changes(memtries: &mut MemTries, changes: &MemTrieChanges) -> CryptoHash { +pub fn apply_memtrie_changes( + memtries: &mut MemTries, + changes: &MemTrieChanges, + block_height: BlockHeight, +) -> CryptoHash { memtries - .construct_root(changes.block_height, |arena| { + .construct_root(block_height, |arena| { let mut last_node_id: Option = None; let map_to_new_node_id = |node_id: OldOrUpdatedNodeId, old_to_new_map: &HashMap< @@ -869,7 +872,7 @@ pub fn apply_memtrie_changes(memtries: &mut MemTries, changes: &MemTrieChanges) #[cfg(test)] mod tests { - use crate::test_utils::create_test_store; + use crate::test_utils::TestTriesBuilder; use crate::trie::mem::lookup::memtrie_lookup; use crate::trie::mem::updating::apply_memtrie_changes; use crate::trie::mem::MemTries; @@ -878,7 +881,6 @@ mod tests { use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; use near_primitives::state::{FlatStateValue, ValueRef}; - use near_vm_runner::logic::TrieNodesCount; use rand::Rng; use std::collections::{HashMap, HashSet}; @@ -893,8 +895,7 @@ mod tests { impl TestTries { fn new(check_deleted_keys: bool) -> Self { let mem = MemTries::new(100 * 1024 * 1024, ShardUId::single_shard()); - let store = create_test_store(); - let disk = ShardTries::test(store, 1); + let disk = TestTriesBuilder::new().build(); Self { mem, disk, @@ -916,7 +917,7 @@ mod tests { update.delete(&key); } } - update.to_trie_changes(0) + update.to_trie_changes() } fn make_memtrie_changes_only( @@ -934,7 +935,7 @@ mod tests { update.delete(&key); } } - update.to_mem_trie_changes_only(0) + update.to_mem_trie_changes_only() } fn make_disk_changes_only( @@ -959,7 +960,7 @@ mod tests { assert_eq!(disk_changes, all_changes); // Then apply the changes and check consistency of new state roots. - let new_state_root_from_mem = apply_memtrie_changes(&mut self.mem, &memtrie_changes); + let new_state_root_from_mem = apply_memtrie_changes(&mut self.mem, &memtrie_changes, 0); let mut store_update = self.disk.store_update(); let new_state_root_from_disk = self.disk.apply_all(&disk_changes, ShardUId::single_shard(), &mut store_update); @@ -989,14 +990,8 @@ mod tests { }; let disk_trie = self.disk.get_trie_for_shard(ShardUId::single_shard(), self.state_root); - let memtrie_result = memtrie_root.and_then(|memtrie_root| { - memtrie_lookup( - memtrie_root, - key, - None, - &mut TrieNodesCount { db_reads: 0, mem_reads: 0 }, - ) - }); + let memtrie_result = memtrie_root + .and_then(|memtrie_root| memtrie_lookup(memtrie_root, key, |_, _, _| {})); let disk_result = disk_trie.get_ref(key, KeyLookupMode::Trie).unwrap(); if let Some(value_ref) = value_ref { let memtrie_value_ref = memtrie_result diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 2607f830970..6c61651b776 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -1,5 +1,7 @@ use self::accounting_cache::TrieAccountingCache; +use self::mem::lookup::memtrie_lookup; use self::mem::updating::{UpdatedMemTrieNode, UpdatedMemTrieNodeId}; +use self::mem::MemTries; use self::trie_recording::TrieRecorder; use self::trie_storage::TrieMemoryPartialStorage; use crate::flat::{FlatStateChanges, FlatStorageChunkView}; @@ -25,7 +27,7 @@ use near_primitives::state_record::StateRecord; use near_primitives::trie_key::trie_key_parsers::parse_account_id_prefix; use near_primitives::trie_key::TrieKey; pub use near_primitives::types::TrieNodesCount; -use near_primitives::types::{AccountId, BlockHeight, StateRoot, StateRootNode}; +use near_primitives::types::{AccountId, StateRoot, StateRootNode}; use near_vm_runner::ContractCode; pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize}; use std::cell::RefCell; @@ -34,7 +36,7 @@ use std::fmt::Write; use std::hash::Hash; use std::rc::Rc; use std::str; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; pub mod accounting_cache; mod config; @@ -330,6 +332,7 @@ impl std::fmt::Debug for TrieNode { pub struct Trie { storage: Rc, + memtries: Option>>, root: StateRoot, /// If present, flat storage is used to look up keys (if asked for). /// Otherwise, we would crawl through the trie. @@ -486,7 +489,6 @@ impl TrieRefcountDeltaMap { pub struct MemTrieChanges { node_ids_with_hashes: Vec<(UpdatedMemTrieNodeId, CryptoHash)>, updated_nodes: Vec>, - block_height: BlockHeight, } /// @@ -568,6 +570,15 @@ impl Trie { storage: Rc, root: StateRoot, flat_storage_chunk_view: Option, + ) -> Self { + Self::new_with_memtries(storage, None, root, flat_storage_chunk_view) + } + + pub fn new_with_memtries( + storage: Rc, + memtries: Option>>, + root: StateRoot, + flat_storage_chunk_view: Option, ) -> Self { let accounting_cache = match storage.as_caching_storage() { Some(caching_storage) => RefCell::new(TrieAccountingCache::new(Some(( @@ -578,6 +589,7 @@ impl Trie { }; Trie { storage, + memtries, root, charge_gas_for_trie_node_access: flat_storage_chunk_view.is_none(), flat_storage_chunk_view, @@ -1108,20 +1120,25 @@ impl Trie { // as they are needed to prove the value. Also, it's important that this lookup // is done even if the key was not found, because intermediate trie nodes may be // needed to prove the non-existence of the key. - let value_ref_from_trie = - self.lookup_from_state_column(NibbleSlice::new(key), false)?; - match &value { - Some(FlatStateValue::Inlined(value)) => { - assert!(value_ref_from_trie.is_some()); - let value_from_trie = - self.retrieve_value(&value_ref_from_trie.unwrap().hash)?; - assert_eq!(&value_from_trie, value); - } - Some(FlatStateValue::Ref(value_ref)) => { - assert_eq!(value_ref_from_trie.as_ref(), Some(value_ref)); - } - None => { - assert!(value_ref_from_trie.is_none()); + if self.memtries.is_some() { + let value_from_trie = self.lookup_from_memory(key, ref_only, false)?; + assert_eq!(&value_from_trie, &value); + } else { + let value_ref_from_trie = + self.lookup_from_state_column(NibbleSlice::new(key), false)?; + match &value { + Some(FlatStateValue::Inlined(value)) => { + assert!(value_ref_from_trie.is_some()); + let value_from_trie = + self.retrieve_value(&value_ref_from_trie.unwrap().hash)?; + assert_eq!(&value_from_trie, value); + } + Some(FlatStateValue::Ref(value_ref)) => { + assert_eq!(value_ref_from_trie.as_ref(), Some(value_ref)); + } + None => { + assert!(value_ref_from_trie.is_none()); + } } } } else { @@ -1192,6 +1209,42 @@ impl Trie { } } + fn lookup_from_memory( + &self, + key: &[u8], + ref_only: bool, + use_accounting_cache: bool, + ) -> Result, StorageError> { + if self.root == Self::EMPTY_ROOT { + return Ok(None); + } + let lock = self.memtries.as_ref().unwrap().read().unwrap(); + let root = lock.get_root(&self.root).ok_or_else(|| { + StorageError::StorageInconsistentState(format!( + "Failed to find root node {} in memtrie", + self.root + )) + })?; + + let result = memtrie_lookup(root, key, |is_leaf, hash, data| { + if ref_only && is_leaf { + return; + } + let data: Arc<[u8]> = data.into(); + if use_accounting_cache || is_leaf { + self.accounting_cache.borrow_mut().retroactively_account(hash, data.clone()); + } + if let Some(recorder) = &self.recorder { + recorder.borrow_mut().record(&hash, data); + } + }); + if ref_only { + Ok(result.map(|value| FlatStateValue::Ref(value.to_value_ref()))) + } else { + Ok(result) + } + } + /// For debugging only. Returns the raw node at the given path starting from the root. /// The format of the nibbles parameter is that each element represents 4 bits of the /// path. (Even though we use a u8 for each element, we only use the lower 4 bits.) @@ -1285,7 +1338,11 @@ impl Trie { mode == KeyLookupMode::FlatStorage && self.flat_storage_chunk_view.is_some(); let charge_gas_for_trie_node_access = mode == KeyLookupMode::Trie || self.charge_gas_for_trie_node_access; - if use_flat_storage { + if self.memtries.is_some() { + Ok(self + .lookup_from_memory(key, true, charge_gas_for_trie_node_access)? + .map(|value| value.to_value_ref())) + } else if use_flat_storage { Ok(self.lookup_from_flat_storage(key, true)?.map(|value| value.to_value_ref())) } else { self.lookup_from_state_column(NibbleSlice::new(key), charge_gas_for_trie_node_access) @@ -1295,8 +1352,10 @@ impl Trie { /// Retrieves a value, which may or may not be the complete value, for the given key. /// If the full value could be obtained cheaply it is returned; otherwise only the reference /// is returned. - fn get_flat_value(&self, key: &[u8]) -> Result, StorageError> { - if self.flat_storage_chunk_view.is_some() { + pub fn get_flat_value(&self, key: &[u8]) -> Result, StorageError> { + if self.memtries.is_some() { + self.lookup_from_memory(key, false, self.charge_gas_for_trie_node_access) + } else if self.flat_storage_chunk_view.is_some() { self.lookup_from_flat_storage(key, false) } else { Ok(self @@ -1321,21 +1380,38 @@ impl Trie { where I: IntoIterator, Option>)>, { - let mut memory = NodesStorage::new(); - let mut root_node = self.move_node_to_mutable(&mut memory, &self.root)?; - for (key, value) in changes { - let key = NibbleSlice::new(&key); - root_node = match value { - Some(arr) => self.insert(&mut memory, root_node, key, arr), - None => self.delete(&mut memory, root_node, key), - }?; - } + match &self.memtries { + Some(memtries) => { + let guard = memtries.read().unwrap(); + let mut trie_update = guard.update(self.root, true)?; + for (key, value) in changes { + match value { + Some(arr) => { + trie_update.insert(&key, arr); + } + None => trie_update.delete(&key), + } + } + Ok(trie_update.to_trie_changes()) + } + None => { + let mut memory = NodesStorage::new(); + let mut root_node = self.move_node_to_mutable(&mut memory, &self.root)?; + for (key, value) in changes { + let key = NibbleSlice::new(&key); + root_node = match value { + Some(arr) => self.insert(&mut memory, root_node, key, arr), + None => self.delete(&mut memory, root_node, key), + }?; + } - #[cfg(test)] - { - self.memory_usage_verify(&memory, NodeHandle::InMemory(root_node)); + #[cfg(test)] + { + self.memory_usage_verify(&memory, NodeHandle::InMemory(root_node)); + } + Trie::flatten_nodes(&self.root, memory, root_node) + } } - Trie::flatten_nodes(&self.root, memory, root_node) } pub fn iter<'a>(&'a self) -> Result, StorageError> { @@ -1401,8 +1477,8 @@ mod tests { use rand::Rng; use crate::test_utils::{ - create_test_store, create_tries, create_tries_complex, gen_changes, simplify_changes, - test_populate_trie, + create_test_store, create_tries, gen_changes, simplify_changes, test_populate_trie, + TestTriesBuilder, }; use crate::{DBCol, MissingTrieValueContext}; @@ -1434,7 +1510,7 @@ mod tests { #[test] fn test_basic_trie() { // test trie version > 0 - let tries = create_tries_complex(SHARD_VERSION, 2); + let tries = TestTriesBuilder::new().with_shard_layout(SHARD_VERSION, 2).build(); let shard_uid = ShardUId { version: SHARD_VERSION, shard_id: 0 }; let trie = tries.get_trie_for_shard(shard_uid, Trie::EMPTY_ROOT); assert_eq!(trie.get(&[122]), Ok(None)); @@ -1454,7 +1530,7 @@ mod tests { #[test] fn test_trie_iter() { - let tries = create_tries_complex(SHARD_VERSION, 2); + let tries = TestTriesBuilder::new().with_shard_layout(SHARD_VERSION, 2).build(); let shard_uid = ShardUId { version: SHARD_VERSION, shard_id: 0 }; let pairs = vec![ (b"a".to_vec(), Some(b"111".to_vec())), @@ -1488,7 +1564,7 @@ mod tests { #[test] fn test_trie_leaf_into_branch() { - let tries = create_tries_complex(SHARD_VERSION, 2); + let tries = TestTriesBuilder::new().with_shard_layout(SHARD_VERSION, 2).build(); let shard_uid = ShardUId { version: SHARD_VERSION, shard_id: 0 }; let changes = vec![ (b"dog".to_vec(), Some(b"puppy".to_vec())), @@ -1697,7 +1773,7 @@ mod tests { #[test] fn test_trie_restart() { let store = create_test_store(); - let tries = ShardTries::test(store.clone(), 1); + let tries = TestTriesBuilder::new().with_store(store.clone()).build(); let empty_root = Trie::EMPTY_ROOT; let changes = vec![ (b"doge".to_vec(), Some(b"coin".to_vec())), @@ -1709,7 +1785,7 @@ mod tests { ]; let root = test_populate_trie(&tries, &empty_root, ShardUId::single_shard(), changes); - let tries2 = ShardTries::test(store, 1); + let tries2 = TestTriesBuilder::new().with_store(store).build(); let trie2 = tries2.get_trie_for_shard(ShardUId::single_shard(), root); assert_eq!(trie2.get(b"doge"), Ok(Some(b"coin".to_vec()))); } @@ -1717,8 +1793,7 @@ mod tests { // TODO: somehow also test that we don't record unnecessary nodes #[test] fn test_trie_recording_reads() { - let store = create_test_store(); - let tries = ShardTries::test(store, 1); + let tries = TestTriesBuilder::new().build(); let empty_root = Trie::EMPTY_ROOT; let changes = vec![ (b"doge".to_vec(), Some(b"coin".to_vec())), @@ -1750,8 +1825,7 @@ mod tests { #[test] fn test_trie_recording_reads_update() { - let store = create_test_store(); - let tries = ShardTries::test(store, 1); + let tries = TestTriesBuilder::new().build(); let empty_root = Trie::EMPTY_ROOT; let changes = vec![ (b"doge".to_vec(), Some(b"coin".to_vec())), @@ -1786,7 +1860,7 @@ mod tests { #[test] fn test_dump_load_trie() { let store = create_test_store(); - let tries = ShardTries::test(store.clone(), 1); + let tries = TestTriesBuilder::new().with_store(store.clone()).build(); let empty_root = Trie::EMPTY_ROOT; let changes = vec![ (b"doge".to_vec(), Some(b"coin".to_vec())), @@ -1797,7 +1871,7 @@ mod tests { store.save_state_to_file(&dir.path().join("test.bin")).unwrap(); let store2 = create_test_store(); store2.load_state_from_file(&dir.path().join("test.bin")).unwrap(); - let tries2 = ShardTries::test(store2, 1); + let tries2 = TestTriesBuilder::new().with_store(store2).build(); let trie2 = tries2.get_trie_for_shard(ShardUId::single_shard(), root); assert_eq!(trie2.get(b"doge").unwrap().unwrap(), b"coin"); } diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 48741217cfd..63b892fe590 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -1,28 +1,32 @@ +use super::mem::MemTries; +use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; +use super::TrieRefcountSubtraction; use crate::flat::{FlatStorageManager, FlatStorageStatus}; use crate::trie::config::TrieConfig; +use crate::trie::mem::loading::load_trie_from_flat_state_and_delta; +use crate::trie::mem::updating::apply_memtrie_changes; use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; use crate::trie::{TrieRefcountAddition, POISONED_LOCK_ERR}; use crate::{metrics, DBCol, PrefetchApi}; use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate}; - use near_primitives::errors::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::shard_layout::{self, ShardUId, ShardVersion}; +use near_primitives::shard_layout::{self, ShardUId}; use near_primitives::trie_key::TrieKey; use near_primitives::types::{ - NumShards, RawStateChange, RawStateChangesWithTrieKey, StateChangeCause, StateRoot, + BlockHeight, RawStateChange, RawStateChangesWithTrieKey, StateChangeCause, StateRoot, }; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use std::collections::HashMap; use std::rc::Rc; use std::sync::{Arc, RwLock}; - -use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; -use super::TrieRefcountSubtraction; +use tracing::info; struct ShardTriesInner { store: Store, trie_config: TrieConfig, + mem_tries: RwLock>>>, /// Cache reserved for client actor to use caches: RwLock>, /// Cache for readers. @@ -56,6 +60,7 @@ impl ShardTries { ShardTries(Arc::new(ShardTriesInner { store, trie_config, + mem_tries: RwLock::new(HashMap::new()), caches: RwLock::new(caches), view_caches: RwLock::new(view_caches), flat_storage_manager, @@ -65,29 +70,6 @@ impl ShardTries { })) } - /// Create `ShardTries` with a fixed number of shards with shard version 0. - /// - /// If your test cares about the shard version, use `test_shard_version` instead. - pub fn test(store: Store, num_shards: NumShards) -> Self { - let shard_version = 0; - Self::test_shard_version(store, shard_version, num_shards) - } - - pub fn test_shard_version(store: Store, version: ShardVersion, num_shards: NumShards) -> Self { - assert_ne!(0, num_shards); - let shard_uids: Vec = - (0..num_shards as u32).map(|shard_id| ShardUId { shard_id, version }).collect(); - let trie_config = TrieConfig::default(); - - ShardTries::new( - store.clone(), - trie_config, - &shard_uids, - FlatStorageManager::new(store), - StateSnapshotConfig::default(), - ) - } - /// Create caches for all shards according to the trie config. fn create_initial_caches( config: &TrieConfig, @@ -159,8 +141,8 @@ impl ShardTries { )); let flat_storage_chunk_view = block_hash .and_then(|block_hash| self.0.flat_storage_manager.chunk_view(shard_uid, block_hash)); - - Trie::new(storage, state_root, flat_storage_chunk_view) + let memtries = self.get_mem_tries(shard_uid); + Trie::new_with_memtries(storage, memtries, state_root, flat_storage_chunk_view) } pub fn get_trie_for_shard(&self, shard_uid: ShardUId, state_root: StateRoot) -> Trie { @@ -348,6 +330,29 @@ impl ShardTries { self.apply_all_inner(trie_changes, shard_uid, true, store_update) } + pub fn apply_memtrie_changes( + &self, + trie_changes: &TrieChanges, + shard_uid: ShardUId, + block_height: BlockHeight, + ) { + if let Some(memtries) = self.get_mem_tries(shard_uid) { + apply_memtrie_changes( + &mut memtries.write().unwrap(), + trie_changes + .mem_trie_changes + .as_ref() + .expect("Memtrie changes must be present if memtrie is loaded"), + block_height, + ); + } else { + assert!( + trie_changes.mem_trie_changes.is_none(), + "Memtrie changes must not be present if memtrie is not loaded" + ); + } + } + /// Returns the status of the given shard of flat storage in the state snapshot. /// `sync_prev_prev_hash` needs to match the block hash that identifies that snapshot. pub fn get_snapshot_flat_storage_status( @@ -358,6 +363,59 @@ impl ShardTries { let (_store, manager) = self.get_state_snapshot(&sync_prev_prev_hash)?; Ok(manager.get_flat_storage_status(shard_uid)) } + + /// Should be called upon startup to load in-memory tries for enabled shards. + pub fn load_mem_tries_for_enabled_shards( + &self, + shard_uids: &[ShardUId], + ) -> Result<(), StorageError> { + let trie_config = &self.0.trie_config; + let shard_uids_to_load = shard_uids + .iter() + .copied() + .filter(|shard_uid| { + trie_config.load_mem_tries_for_all_shards + || trie_config.load_mem_tries_for_shards.contains(shard_uid) + }) + .collect::>(); + let store = self.0.store.clone(); + info!(target: "memtrie", "Loading tries to memory for shards {:?}...", shard_uids); + shard_uids_to_load + .into_par_iter() + .map(|shard_uid| -> Result<(), StorageError> { + let mem_tries = load_trie_from_flat_state_and_delta( + &store, + shard_uid, + trie_config.max_mem_tries_size_per_shard, + )?; + self.0 + .mem_tries + .write() + .unwrap() + .insert(shard_uid, Arc::new(RwLock::new(mem_tries))); + Ok(()) + }) + .collect::>>() + .into_iter() + .collect::>()?; + + info!(target: "memtrie", "Memtries loading complete for shards {:?}", shard_uids); + Ok(()) + } + + /// Retrieves the in-memory tries for the shard. + pub fn get_mem_tries(&self, shard_uid: ShardUId) -> Option>> { + let guard = self.0.mem_tries.write().unwrap(); + guard.get(&shard_uid).cloned() + } + + /// Garbage collects the in-memory tries for the shard up to (and including) the given + /// height. + pub fn delete_memtrie_roots_up_to_height(&self, shard_uid: ShardUId, height: BlockHeight) { + if let Some(memtries) = self.get_mem_tries(shard_uid) { + memtries.write().unwrap().delete_until_height(height); + } + } } pub struct WrappedTrieChanges { @@ -366,6 +424,7 @@ pub struct WrappedTrieChanges { trie_changes: TrieChanges, state_changes: Vec, block_hash: CryptoHash, + block_height: BlockHeight, } // Partial implementation. Skips `tries` due to its complexity and @@ -378,6 +437,7 @@ impl std::fmt::Debug for WrappedTrieChanges { .field("trie_changes", &"") .field("state_changes", &"") .field("block_hash", &self.block_hash) + .field("block_height", &self.block_height) .finish() } } @@ -389,14 +449,26 @@ impl WrappedTrieChanges { trie_changes: TrieChanges, state_changes: Vec, block_hash: CryptoHash, + block_height: BlockHeight, ) -> Self { - WrappedTrieChanges { tries, shard_uid, trie_changes, state_changes, block_hash } + WrappedTrieChanges { + tries, + shard_uid, + trie_changes, + state_changes, + block_hash, + block_height, + } } pub fn state_changes(&self) -> &[RawStateChangesWithTrieKey] { &self.state_changes } + pub fn apply_mem_changes(&self) { + self.tries.apply_memtrie_changes(&self.trie_changes, self.shard_uid, self.block_height); + } + /// Save insertions of trie nodes into Store. pub fn insertions_into(&self, store_update: &mut StoreUpdate) { self.tries.apply_insertions(&self.trie_changes, self.shard_uid, store_update) @@ -684,6 +756,9 @@ mod test { enable_receipt_prefetching: false, sweat_prefetch_receivers: Vec::new(), sweat_prefetch_senders: Vec::new(), + load_mem_tries_for_shards: Vec::new(), + load_mem_tries_for_all_shards: false, + max_mem_tries_size_per_shard: 0, }; let shard_uids = Vec::from([ShardUId { shard_id: 0, version: 0 }]); let shard_uid = *shard_uids.first().unwrap(); @@ -734,6 +809,9 @@ mod test { enable_receipt_prefetching: false, sweat_prefetch_receivers: Vec::new(), sweat_prefetch_senders: Vec::new(), + load_mem_tries_for_shards: Vec::new(), + load_mem_tries_for_all_shards: false, + max_mem_tries_size_per_shard: 0, }; let shard_uids = Vec::from([ShardUId { shard_id: 0, version: 0 }]); let shard_uid = *shard_uids.first().unwrap(); diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 52e39705be8..5b46f379049 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -519,9 +519,7 @@ mod tests { use near_primitives::hash::{hash, CryptoHash}; - use crate::test_utils::{ - create_tries, create_tries_with_flat_storage, gen_changes, test_populate_trie, - }; + use crate::test_utils::{create_tries, gen_changes, test_populate_trie, TestTriesBuilder}; use crate::trie::iterator::CrumbStatus; use crate::trie::{ TrieRefcountAddition, TrieRefcountDeltaMap, TrieRefcountSubtraction, ValueHandle, @@ -1149,7 +1147,7 @@ mod tests { fn get_trie_nodes_for_part_with_flat_storage() { let value_len = 1000usize; - let tries = create_tries_with_flat_storage(); + let tries = TestTriesBuilder::new().with_flat_storage().build(); let shard_uid = ShardUId::single_shard(); let block_hash = CryptoHash::default(); let part_id = PartId::new(1, 3); diff --git a/core/store/src/trie/trie_recording.rs b/core/store/src/trie/trie_recording.rs index bff264958b6..63f49f74cab 100644 --- a/core/store/src/trie/trie_recording.rs +++ b/core/store/src/trie/trie_recording.rs @@ -28,12 +28,13 @@ impl TrieRecorder { #[cfg(test)] mod trie_recording_tests { use crate::test_utils::{ - create_tries_complex_with_flat_storage, gen_larger_changes, simplify_changes, - test_populate_flat_storage, test_populate_trie, + gen_larger_changes, simplify_changes, test_populate_flat_storage, test_populate_trie, + TestTriesBuilder, }; - use crate::Trie; + use crate::{DBCol, Trie}; use near_primitives::hash::CryptoHash; - use near_primitives::shard_layout::ShardUId; + use near_primitives::shard_layout::{get_block_shard_uid, ShardUId}; + use near_primitives::types::chunk_extra::ChunkExtra; use near_vm_runner::logic::TrieNodesCount; use std::collections::HashMap; @@ -45,7 +46,7 @@ mod trie_recording_tests { fn test_trie_recording_consistency(enable_accounting_cache: bool, use_missing_keys: bool) { let mut rng = rand::thread_rng(); for _ in 0..NUM_ITERATIONS_PER_TEST { - let tries = create_tries_complex_with_flat_storage(1, 2); + let tries = TestTriesBuilder::new().with_shard_layout(1, 2).with_flat_storage().build(); let shard_uid = ShardUId { version: 1, shard_id: 0 }; let trie_changes = gen_larger_changes(&mut rng, 50); @@ -136,7 +137,7 @@ mod trie_recording_tests { ) { let mut rng = rand::thread_rng(); for _ in 0..NUM_ITERATIONS_PER_TEST { - let tries = create_tries_complex_with_flat_storage(1, 2); + let tries = TestTriesBuilder::new().with_shard_layout(1, 2).with_flat_storage().build(); let shard_uid = ShardUId { version: 1, shard_id: 0 }; let trie_changes = gen_larger_changes(&mut rng, 50); @@ -250,4 +251,151 @@ mod trie_recording_tests { fn test_trie_recording_consistency_with_flat_storage_with_accounting_cache_and_missing_keys() { test_trie_recording_consistency_with_flat_storage(true, true); } + + // Do the same test but now with in-memory tries. + fn test_trie_recording_consistency_with_in_memory_tries( + enable_accounting_cache: bool, + use_missing_keys: bool, + ) { + let mut rng = rand::thread_rng(); + for _ in 0..NUM_ITERATIONS_PER_TEST { + let tries_for_building = TestTriesBuilder::new().with_flat_storage().build(); + + let shard_uid = ShardUId::single_shard(); + let trie_changes = gen_larger_changes(&mut rng, 50); + let trie_changes = simplify_changes(&trie_changes); + if trie_changes.is_empty() { + continue; + } + let state_root = test_populate_trie( + &tries_for_building, + &Trie::EMPTY_ROOT, + shard_uid, + trie_changes.clone(), + ); + test_populate_flat_storage( + &tries_for_building, + shard_uid, + &CryptoHash::default(), + &CryptoHash::default(), + &trie_changes, + ); + + // ChunkExtra is needed for in-memory trie loading code to query state roots. + let chunk_extra = + ChunkExtra::new(&state_root, CryptoHash::default(), Vec::new(), 0, 0, 0); + let mut update_for_chunk_extra = tries_for_building.store_update(); + update_for_chunk_extra + .set_ser( + DBCol::ChunkExtra, + &get_block_shard_uid(&CryptoHash::default(), &shard_uid), + &chunk_extra, + ) + .unwrap(); + update_for_chunk_extra.commit().unwrap(); + + let data_in_trie = trie_changes + .iter() + .map(|(key, value)| (key.clone(), value.clone().unwrap())) + .collect::>(); + let keys_to_test_with = trie_changes + .iter() + .map(|(key, _)| { + let mut key = key.clone(); + if use_missing_keys { + key.push(100); + } + key + }) + .collect::>(); + + let tries = TestTriesBuilder::new() + .with_store(tries_for_building.get_store()) + .with_flat_storage() + .with_in_memory_tries() + .build(); + + // First, check that the trie is using flat storage, so that counters are all zero. + // Only use get_ref(), because get() will actually dereference values which can + // cause trie reads. + let trie = tries.get_trie_with_block_hash_for_shard( + shard_uid, + state_root, + &CryptoHash::default(), + false, + ); + for key in &keys_to_test_with { + trie.get_ref(&key, crate::KeyLookupMode::FlatStorage).unwrap(); + } + assert_eq!(trie.get_trie_nodes_count(), TrieNodesCount { db_reads: 0, mem_reads: 0 }); + + // Now, let's capture the baseline node counts - this is what will happen + // in production. + let trie = tries.get_trie_with_block_hash_for_shard( + shard_uid, + state_root, + &CryptoHash::default(), + false, + ); + trie.accounting_cache.borrow_mut().set_enabled(enable_accounting_cache); + for key in &keys_to_test_with { + assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); + } + let baseline_trie_nodes_count = trie.get_trie_nodes_count(); + println!("Baseline trie nodes count: {:?}", baseline_trie_nodes_count); + + // Let's do this again, but this time recording reads. We'll make sure + // the counters are exactly the same even when we're recording. + let trie = tries + .get_trie_with_block_hash_for_shard( + shard_uid, + state_root, + &CryptoHash::default(), + false, + ) + .recording_reads(); + trie.accounting_cache.borrow_mut().set_enabled(enable_accounting_cache); + for key in &keys_to_test_with { + assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); + } + assert_eq!(trie.get_trie_nodes_count(), baseline_trie_nodes_count); + + // Now, let's check that when doing the same lookups with the captured partial storage, + // we still get the same counters. + let partial_storage = trie.recorded_storage().unwrap(); + println!( + "Partial storage has {} nodes from {} entries", + partial_storage.nodes.len(), + trie_changes.len() + ); + let trie = Trie::from_recorded_storage(partial_storage, state_root, true); + trie.accounting_cache.borrow_mut().set_enabled(enable_accounting_cache); + for key in &keys_to_test_with { + assert_eq!(trie.get(key).unwrap(), data_in_trie.get(key).cloned()); + } + assert_eq!(trie.get_trie_nodes_count(), baseline_trie_nodes_count); + } + } + + #[test] + fn test_trie_recording_consistency_with_in_memory_tries_no_accounting_cache() { + test_trie_recording_consistency_with_in_memory_tries(false, false); + } + + #[test] + fn test_trie_recording_consistency_with_in_memory_tries_with_accounting_cache() { + test_trie_recording_consistency_with_in_memory_tries(true, false); + } + + #[test] + fn test_trie_recording_consistency_with_in_memory_tries_no_accounting_cache_with_missing_keys() + { + test_trie_recording_consistency_with_in_memory_tries(false, true); + } + + #[test] + fn test_trie_recording_consistency_with_in_memory_tries_with_accounting_cache_and_missing_keys() + { + test_trie_recording_consistency_with_in_memory_tries(true, true); + } } diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index ff2afc9df27..55dd7de708b 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -1,4 +1,4 @@ -use crate::test_utils::{create_tries_complex, gen_changes, simplify_changes, test_populate_trie}; +use crate::test_utils::{gen_changes, simplify_changes, test_populate_trie, TestTriesBuilder}; use crate::trie::trie_storage::{TrieMemoryPartialStorage, TrieStorage}; use crate::{PartialStorage, Trie, TrieUpdate}; use assert_matches::assert_matches; @@ -101,7 +101,7 @@ where fn test_reads_with_incomplete_storage() { let mut rng = rand::thread_rng(); for _ in 0..50 { - let tries = create_tries_complex(1, 2); + let tries = TestTriesBuilder::new().with_shard_layout(1, 2).build(); let shard_uid = ShardUId { version: 1, shard_id: 0 }; let trie_changes = gen_changes(&mut rng, 20); let trie_changes = simplify_changes(&trie_changes); diff --git a/core/store/src/trie/update.rs b/core/store/src/trie/update.rs index 28de5a37839..b162277c0eb 100644 --- a/core/store/src/trie/update.rs +++ b/core/store/src/trie/update.rs @@ -3,7 +3,6 @@ use super::Trie; use crate::trie::{KeyLookupMode, TrieChanges}; use crate::StorageError; use near_primitives::hash::CryptoHash; -use near_primitives::state::ValueRef; use near_primitives::trie_key::TrieKey; use near_primitives::types::{ RawStateChange, RawStateChanges, RawStateChangesWithTrieKey, StateChangeCause, StateRoot, @@ -75,11 +74,9 @@ impl TrieUpdate { } } - self.trie.get_ref(&key, mode).map(|option| { - option.map(|ValueRef { length, hash }| { - TrieUpdateValuePtr::HashAndSize(&self.trie, length, hash) - }) - }) + Ok(self.trie.get_ref(&key, mode)?.map(|value_ref| { + TrieUpdateValuePtr::HashAndSize(&self.trie, value_ref.length, value_ref.hash) + })) } pub fn get(&self, key: &TrieKey) -> Result>, StorageError> { @@ -169,7 +166,7 @@ impl crate::TrieAccess for TrieUpdate { #[cfg(test)] mod tests { - use crate::test_utils::{create_tries, create_tries_complex}; + use crate::test_utils::{create_tries, TestTriesBuilder}; use super::*; use crate::ShardUId; @@ -182,7 +179,7 @@ mod tests { #[test] fn trie() { - let tries = create_tries_complex(SHARD_VERSION, 2); + let tries = TestTriesBuilder::new().with_shard_layout(SHARD_VERSION, 2).build(); let root = Trie::EMPTY_ROOT; let mut trie_update = tries.new_trie_update(COMPLEX_SHARD_UID, root); trie_update.set(test_key(b"dog".to_vec()), b"puppy".to_vec()); @@ -209,7 +206,7 @@ mod tests { #[test] fn trie_remove() { - let tries = create_tries_complex(SHARD_VERSION, 2); + let tries = TestTriesBuilder::new().with_shard_layout(SHARD_VERSION, 2).build(); // Delete non-existing element. let mut trie_update = tries.new_trie_update(COMPLEX_SHARD_UID, Trie::EMPTY_ROOT); diff --git a/integration-tests/src/runtime_utils.rs b/integration-tests/src/runtime_utils.rs index 599c43e23b6..af007bb73fd 100644 --- a/integration-tests/src/runtime_utils.rs +++ b/integration-tests/src/runtime_utils.rs @@ -6,7 +6,7 @@ use near_primitives::state_record::{state_record_to_account_id, StateRecord}; use near_primitives::types::AccountId; use near_primitives::types::StateRoot; use near_store::genesis::GenesisStateApplier; -use near_store::test_utils::create_tries_complex_with_flat_storage; +use near_store::test_utils::TestTriesBuilder; use near_store::{ShardTries, TrieUpdate}; use nearcore::config::GenesisExt; use node_runtime::config::RuntimeConfig; @@ -34,8 +34,10 @@ pub fn get_test_trie_viewer() -> (TrieViewer, TrieUpdate) { pub fn get_runtime_and_trie_from_genesis(genesis: &Genesis) -> (Runtime, ShardTries, StateRoot) { let shard_layout = &genesis.config.shard_layout; - let tries = - create_tries_complex_with_flat_storage(shard_layout.version(), shard_layout.num_shards()); + let tries = TestTriesBuilder::new() + .with_shard_layout(shard_layout.version(), shard_layout.num_shards()) + .with_flat_storage() + .build(); let runtime = Runtime::new(); let mut account_ids: HashSet = HashSet::new(); genesis.for_each_record(|record: &StateRecord| { diff --git a/integration-tests/src/tests/client/features.rs b/integration-tests/src/tests/client/features.rs index 758b2064a1c..81da1a41ec9 100644 --- a/integration-tests/src/tests/client/features.rs +++ b/integration-tests/src/tests/client/features.rs @@ -10,6 +10,7 @@ mod delegate_action; mod fix_contract_loading_cost; mod fix_storage_usage; mod flat_storage; +mod in_memory_tries; mod increase_deployment_cost; mod increase_storage_compute_cost; mod limit_contract_functions_number; diff --git a/integration-tests/src/tests/client/features/in_memory_tries.rs b/integration-tests/src/tests/client/features/in_memory_tries.rs new file mode 100644 index 00000000000..2a99db64f06 --- /dev/null +++ b/integration-tests/src/tests/client/features/in_memory_tries.rs @@ -0,0 +1,361 @@ +use std::collections::{HashMap, HashSet}; + +use near_chain::{ChainGenesis, Provenance}; +use near_chain_configs::{Genesis, GenesisConfig, GenesisRecords}; +use near_client::test_utils::TestEnv; +use near_client::ProcessTxResponse; +use near_o11y::testonly::init_test_logger; +use near_primitives::block::Tip; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::state_record::StateRecord; +use near_primitives::test_utils::{create_test_signer, create_user_test_signer}; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountInfo, EpochId}; +use near_primitives_core::account::{AccessKey, Account}; +use near_primitives_core::hash::CryptoHash; +use near_primitives_core::types::AccountId; +use near_primitives_core::version::PROTOCOL_VERSION; +use near_store::test_utils::create_test_store; +use near_store::trie::mem::metrics::MEM_TRIE_NUM_ROOTS; +use near_store::{ShardUId, TrieConfig}; +use nearcore::test_utils::TestEnvNightshadeSetupExt; +use rand::seq::IteratorRandom; +use rand::{thread_rng, Rng}; + +const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; + +#[test] +fn test_in_memory_trie_node_consistency() { + // Recommended to run with RUST_LOG=memtrie=debug,chunks=error,info + init_test_logger(); + let validator_stake = 1000000 * ONE_NEAR; + let initial_balance = 10000 * ONE_NEAR; + let accounts = + (0..100).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + let mut genesis_config = GenesisConfig { + // Use the latest protocol version. Otherwise, the version may be too + // old that e.g. blocks don't even store previous heights. + protocol_version: PROTOCOL_VERSION, + // Some arbitrary starting height. Doesn't matter. + genesis_height: 10000, + // We'll test with 4 shards. This can be any number, but we want to test + // the case when some shards are loaded into memory and others are not. + // We pick the boundaries so that each shard would get some transactions. + shard_layout: ShardLayout::v1( + vec!["account3", "account5", "account7"] + .into_iter() + .map(|a| a.parse().unwrap()) + .collect(), + None, + 1, + ), + // We're going to send NEAR between accounts and then assert at the end + // that these transactions have been processed correctly, so here we set + // the gas price to 0 so that we don't have to calculate gas cost. + min_gas_price: 0, + max_gas_price: 0, + // Set the block gas limit high enough so we don't have to worry about + // transactions being throttled. + gas_limit: 100000000000000, + // Set the validity period high enough so even if a transaction gets + // included a few blocks later it won't be rejected. + transaction_validity_period: 1000, + // Make two validators. In this test we don't care about validators but + // the TestEnv framework works best if all clients are validators. So + // since we are using two clients, make two validators. + validators: vec![ + AccountInfo { + account_id: accounts[0].clone(), + amount: validator_stake, + public_key: create_test_signer(accounts[0].as_str()).public_key(), + }, + AccountInfo { + account_id: accounts[1].clone(), + amount: validator_stake, + public_key: create_test_signer(accounts[1].as_str()).public_key(), + }, + ], + // We don't care about epoch transitions in this test, and epoch + // transitions means validator selection, which can kick out validators + // (due to our test purposefully skipping blocks to create forks), and + // that's annoying to deal with. So set this to a high value to stay + // within a single epoch. + epoch_length: 10000, + // The genesis requires this, so set it to something arbitrary. + protocol_treasury_account: accounts[2].clone(), + // Simply make all validators block producers. + num_block_producer_seats: 2, + // Make all validators produce chunks for all shards. + minimum_validators_per_shard: 2, + // Even though not used for the most recent protocol version, + // this must still have the same length as the number of shards, + // or else the genesis fails validation. + num_block_producer_seats_per_shard: vec![2, 2, 2, 2], + ..Default::default() + }; + + // We'll now create the initial records. We'll set up 100 accounts, each + // with some initial balance. We'll add an access key to each account so + // we can send transactions from them. + let mut records = Vec::new(); + for (i, account) in accounts.iter().enumerate() { + // The staked amount must be consistent with validators from genesis. + let staked = if i < 2 { validator_stake } else { 0 }; + records.push(StateRecord::Account { + account_id: account.clone(), + account: Account::new(initial_balance, staked, CryptoHash::default(), 0), + }); + records.push(StateRecord::AccessKey { + account_id: account.clone(), + public_key: create_user_test_signer(&account).public_key, + access_key: AccessKey::full_access(), + }); + // The total supply must be correct to pass validation. + genesis_config.total_supply += initial_balance + staked; + } + let genesis = Genesis::new(genesis_config, GenesisRecords(records)).unwrap(); + let chain_genesis = ChainGenesis::new(&genesis); + + // Create two stores, one for each node. We'll be reusing the stores later + // to emulate node restarts. + let stores = vec![create_test_store(), create_test_store()]; + let mut env = TestEnv::builder(chain_genesis.clone()) + .clients(vec!["account0".parse().unwrap(), "account1".parse().unwrap()]) + .stores(stores.clone()) + .real_epoch_managers(&genesis.config) + .track_all_shards() + .nightshade_runtimes_with_trie_config( + &genesis, + vec![ + TrieConfig::default(), // client 0 does not load in-memory tries + TrieConfig { + // client 1 loads two of four shards into in-memory tries + load_mem_tries_for_shards: vec![ + ShardUId { version: 1, shard_id: 0 }, + ShardUId { version: 1, shard_id: 2 }, + ], + max_mem_tries_size_per_shard: 100 * 1024 * 1024, + ..Default::default() + }, + ], + ) + .build(); + + // Sanity check that we should have two block producers. + assert_eq!( + env.clients[0] + .epoch_manager + .get_epoch_block_producers_ordered( + &EpochId::default(), + &env.clients[0].chain.head().unwrap().last_block_hash + ) + .unwrap() + .len(), + 2 + ); + + // First, start up the nodes from genesis. This ensures that in-memory + // tries works correctly when starting up an empty node for the first time. + let mut nonces = + accounts.iter().map(|account| (account.clone(), 0)).collect::>(); + let mut balances = accounts + .iter() + .map(|account| (account.clone(), initial_balance)) + .collect::>(); + + run_chain_for_some_blocks_while_sending_money_around(&mut env, &mut nonces, &mut balances, 100); + // Sanity check that in-memory tries are loaded, and garbage collected properly. + // We should have 1 unique root because the last few blocks should all have the + // same state roots. + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s0.v1"]).unwrap().get(), 1); + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s2.v1"]).unwrap().get(), 1); + + // Restart nodes, and change some configs. + drop(env); + let mut env = TestEnv::builder(chain_genesis.clone()) + .clients(vec!["account0".parse().unwrap(), "account1".parse().unwrap()]) + .stores(stores.clone()) + .real_epoch_managers(&genesis.config) + .track_all_shards() + .nightshade_runtimes_with_trie_config( + &genesis, + vec![ + TrieConfig::default(), + TrieConfig { + load_mem_tries_for_shards: vec![ + ShardUId { version: 1, shard_id: 0 }, + ShardUId { version: 1, shard_id: 1 }, // shard 2 changed to shard 1. + ], + max_mem_tries_size_per_shard: 100 * 1024 * 1024, + ..Default::default() + }, + ], + ) + .build(); + run_chain_for_some_blocks_while_sending_money_around(&mut env, &mut nonces, &mut balances, 100); + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s0.v1"]).unwrap().get(), 1); + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s1.v1"]).unwrap().get(), 1); + + // Restart again, but this time flip the nodes. + drop(env); + let mut env = TestEnv::builder(chain_genesis) + .clients(vec!["account0".parse().unwrap(), "account1".parse().unwrap()]) + .stores(stores) + .real_epoch_managers(&genesis.config) + .track_all_shards() + .nightshade_runtimes_with_trie_config( + &genesis, + vec![ + // client 0 now loads in-memory tries + TrieConfig { + load_mem_tries_for_shards: vec![ + ShardUId { version: 1, shard_id: 1 }, + ShardUId { version: 1, shard_id: 3 }, + ], + max_mem_tries_size_per_shard: 100 * 1024 * 1024, + ..Default::default() + }, + // client 1 no longer loads in-memory tries + TrieConfig::default(), + ], + ) + .build(); + run_chain_for_some_blocks_while_sending_money_around(&mut env, &mut nonces, &mut balances, 100); + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s0.v1"]).unwrap().get(), 1); + assert_eq!(MEM_TRIE_NUM_ROOTS.get_metric_with_label_values(&["s3.v1"]).unwrap().get(), 1); +} + +// Returns the block producer for the height of head + height_offset. +fn get_block_producer(env: &TestEnv, head: &Tip, height_offset: u64) -> AccountId { + let client = &env.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = &head.last_block_hash; + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = head.height + height_offset; + let block_producer = epoch_manager.get_block_producer(&epoch_id, height).unwrap(); + block_producer +} + +/// Runs the chain for some number of blocks, sending money around randomly between +/// the test accounts, updating the corresponding nonces and balances. At the end, +/// check that the balances are correct, i.e. the transactions have been executed +/// correctly. If this runs successfully, it would also mean that the two nodes +/// being tested are consistent with each other. If, for example, there is a state +/// root mismatch issue, the two nodes would not be able to apply each others' +/// blocks because the block hashes would be different. +fn run_chain_for_some_blocks_while_sending_money_around( + env: &mut TestEnv, + nonces: &mut HashMap, + balances: &mut HashMap, + num_rounds: usize, +) { + // Run the chain for some extra blocks, to ensure that all transactions are + // included in the chain and are executed completely. + for round in 0..(num_rounds + 10) { + let heads = env + .clients + .iter() + .map(|client| client.chain.head().unwrap().last_block_hash) + .collect::>(); + assert_eq!(heads.len(), 1, "All clients should have the same head"); + let tip = env.clients[0].chain.head().unwrap(); + + if round < num_rounds { + // Make 50 random transactions that send money between random accounts. + for _ in 0..50 { + let sender = nonces.keys().choose(&mut thread_rng()).unwrap().clone(); + let receiver = nonces.keys().choose(&mut thread_rng()).unwrap().clone(); + let nonce = nonces.get_mut(&sender).unwrap(); + *nonce += 1; + + let txn = SignedTransaction::send_money( + *nonce, + sender.clone(), + receiver.clone(), + &create_user_test_signer(&sender), + ONE_NEAR, + tip.last_block_hash, + ); + match env.clients[0].process_tx(txn, false, false) { + ProcessTxResponse::NoResponse => panic!("No response"), + ProcessTxResponse::InvalidTx(err) => panic!("Invalid tx: {}", err), + _ => {} + } + *balances.get_mut(&sender).unwrap() -= ONE_NEAR; + *balances.get_mut(&receiver).unwrap() += ONE_NEAR; + } + } + + let cur_block_producer = get_block_producer(&env, &tip, 1); + let next_block_producer = get_block_producer(&env, &tip, 2); + println!("Producing block at height {} by {}", tip.height + 1, cur_block_producer); + let block = env.client(&cur_block_producer).produce_block(tip.height + 1).unwrap().unwrap(); + + // Let's produce some skip blocks too so that we test that in-memory tries are able to + // deal with forks. + // At the end, finish with a bunch of non-skip blocks so that we can test that in-memory + // trie garbage collection works properly (final block is N - 2 so we should keep no more + // than 3 roots). + let mut skip_block = None; + if cur_block_producer != next_block_producer + && round < num_rounds + && thread_rng().gen_bool(0.5) + { + println!( + "Producing skip block at height {} by {}", + tip.height + 2, + next_block_producer + ); + // Produce some skip blocks too so that we test that in-memory tries are able to deal + // with forks. + skip_block = Some( + env.client(&next_block_producer).produce_block(tip.height + 2).unwrap().unwrap(), + ); + } + + // Apply height + 1 block. + for i in 0..env.clients.len() { + println!( + " Applying block at height {} at {}", + block.header().height(), + env.get_client_id(i) + ); + let blocks_processed = + env.clients[i].process_block_test(block.clone().into(), Provenance::NONE).unwrap(); + assert_eq!(blocks_processed, vec![*block.hash()]); + } + // Apply skip block if one was produced. + if let Some(skip_block) = skip_block { + for i in 0..env.clients.len() { + println!( + " Applying skip block at height {} at {}", + skip_block.header().height(), + env.get_client_id(i) + ); + let blocks_processed = env.clients[i] + .process_block_test(skip_block.clone().into(), Provenance::NONE) + .unwrap(); + assert_eq!(blocks_processed, vec![*skip_block.hash()]); + } + } + + // Send partial encoded chunks around so that the newly produced chunks + // can be included and processed in the next block. Having to do this + // sucks, because this test has nothing to do with partial encoded + // chunks, but it is the unfortunate reality when using TestEnv with + // multiple nodes. + env.process_partial_encoded_chunks(); + for j in 0..env.clients.len() { + env.process_shards_manager_responses_and_finish_processing_blocks(j); + } + } + + for (account, balance) in balances { + assert_eq!( + env.query_balance(account.clone()), + *balance, + "Balance mismatch for {}", + account, + ); + } +} diff --git a/integration-tests/src/tests/client/state_snapshot.rs b/integration-tests/src/tests/client/state_snapshot.rs index 398ead29b78..7ee25e1b1ac 100644 --- a/integration-tests/src/tests/client/state_snapshot.rs +++ b/integration-tests/src/tests/client/state_snapshot.rs @@ -45,6 +45,9 @@ impl StateSnaptshotTestEnv { enable_receipt_prefetching: false, sweat_prefetch_receivers: Vec::new(), sweat_prefetch_senders: Vec::new(), + load_mem_tries_for_shards: Vec::new(), + load_mem_tries_for_all_shards: false, + max_mem_tries_size_per_shard: 0, }; let flat_storage_manager = FlatStorageManager::new(store.clone()); let shard_uids = [ShardUId::single_shard()]; diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 4b8aac8c356..b06b7ad57d1 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -192,6 +192,33 @@ impl NightshadeRuntime { ) } + pub fn test_with_trie_config( + home_dir: &Path, + store: Store, + genesis_config: &GenesisConfig, + epoch_manager: Arc, + trie_config: TrieConfig, + state_snapshot_type: StateSnapshotType, + ) -> Arc { + Self::new( + store, + genesis_config, + epoch_manager, + None, + None, + None, + DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + trie_config, + StateSnapshotConfig { + state_snapshot_type, + home_dir: home_dir.to_path_buf(), + hot_store_path: PathBuf::from("data"), + state_snapshot_subdir: PathBuf::from("state_snapshot"), + compaction_enabled: false, + }, + ) + } + pub fn test( home_dir: &Path, store: Store, @@ -432,6 +459,7 @@ impl NightshadeRuntime { apply_result.trie_changes, apply_result.state_changes, *block_hash, + apply_state.block_height, ), new_root: apply_result.state_root, outcomes: apply_result.outcomes, @@ -1054,6 +1082,7 @@ impl RuntimeAdapter for NightshadeRuntime { fn apply_update_to_split_states( &self, block_hash: &CryptoHash, + block_height: BlockHeight, state_roots: HashMap, next_epoch_shard_layout: &ShardLayout, state_changes_for_split_states: StateChangesForSplitStates, @@ -1081,6 +1110,7 @@ impl RuntimeAdapter for NightshadeRuntime { trie_changes, state_changes, *block_hash, + block_height, ); applied_split_state_results.push(ApplySplitStateResult { shard_uid, @@ -1192,6 +1222,10 @@ impl RuntimeAdapter for NightshadeRuntime { let epoch_manager = self.epoch_manager.read(); Ok(epoch_manager.will_shard_layout_change(parent_hash)?) } + + fn load_mem_tries_on_startup(&self, shard_uids: &[ShardUId]) -> Result<(), StorageError> { + self.tries.load_mem_tries_for_enabled_shards(shard_uids) + } } impl node_runtime::adapter::ViewRuntimeAdapter for NightshadeRuntime { diff --git a/nearcore/src/test_utils.rs b/nearcore/src/test_utils.rs index 6850f0cc36c..ec748389b89 100644 --- a/nearcore/src/test_utils.rs +++ b/nearcore/src/test_utils.rs @@ -4,7 +4,7 @@ use near_client::test_utils::TestEnvBuilder; use near_epoch_manager::EpochManagerHandle; use near_primitives::runtime::config_store::RuntimeConfigStore; use near_store::genesis::initialize_genesis_state; -use near_store::Store; +use near_store::{Store, TrieConfig}; use std::path::PathBuf; use std::sync::Arc; @@ -17,6 +17,11 @@ pub trait TestEnvNightshadeSetupExt { genesis: &Genesis, runtime_configs: Vec, ) -> Self; + fn nightshade_runtimes_with_trie_config( + self, + genesis: &Genesis, + trie_configs: Vec, + ) -> Self; } impl TestEnvNightshadeSetupExt for TestEnvBuilder { @@ -34,7 +39,8 @@ impl TestEnvNightshadeSetupExt for TestEnvBuilder { let nightshade_runtime_creator = |home_dir: PathBuf, store: Store, epoch_manager: Arc, - runtime_config: RuntimeConfigStore| + runtime_config: RuntimeConfigStore, + _| -> Arc { // TODO: It's not ideal to initialize genesis state with the nightshade runtime here for tests // Tests that don't use nightshade runtime have genesis initialized in kv_runtime. @@ -50,6 +56,45 @@ impl TestEnvNightshadeSetupExt for TestEnvBuilder { state_snapshot_type.clone(), ) }; - self.internal_initialize_nightshade_runtimes(runtime_configs, nightshade_runtime_creator) + let dummy_trie_configs = vec![TrieConfig::default(); self.num_clients()]; + self.internal_initialize_nightshade_runtimes( + runtime_configs, + dummy_trie_configs, + nightshade_runtime_creator, + ) + } + + fn nightshade_runtimes_with_trie_config( + self, + genesis: &Genesis, + trie_configs: Vec, + ) -> Self { + let state_snapshot_type = self.state_snapshot_type(); + let nightshade_runtime_creator = |home_dir: PathBuf, + store: Store, + epoch_manager: Arc, + _, + trie_config: TrieConfig| + -> Arc { + // TODO: It's not ideal to initialize genesis state with the nightshade runtime here for tests + // Tests that don't use nightshade runtime have genesis initialized in kv_runtime. + // We should instead try to do this while configuring store. + let home_dir = home_dir.as_path(); + initialize_genesis_state(store.clone(), genesis, Some(home_dir)); + NightshadeRuntime::test_with_trie_config( + home_dir, + store, + &genesis.config, + epoch_manager, + trie_config, + state_snapshot_type.clone(), + ) + }; + let dummy_runtime_configs = vec![RuntimeConfigStore::test(); self.num_clients()]; + self.internal_initialize_nightshade_runtimes( + dummy_runtime_configs, + trie_configs, + nightshade_runtime_creator, + ) } } diff --git a/tools/state-viewer/src/contract_accounts.rs b/tools/state-viewer/src/contract_accounts.rs index 95bf5d9c8c8..d13e92472b9 100644 --- a/tools/state-viewer/src/contract_accounts.rs +++ b/tools/state-viewer/src/contract_accounts.rs @@ -493,8 +493,9 @@ mod tests { use near_primitives::types::AccountId; use near_store::test_utils::{ create_test_store, test_populate_store, test_populate_store_rc, test_populate_trie, + TestTriesBuilder, }; - use near_store::{DBCol, ShardTries, ShardUId, Store, Trie}; + use near_store::{DBCol, ShardUId, Store, Trie}; use std::fmt::Write; #[test] @@ -635,7 +636,7 @@ mod tests { let store = create_test_store(); test_populate_store(&store, store_data); test_populate_store_rc(&store, store_data_rc); - let tries = ShardTries::test_shard_version(store.clone(), 0, 1); + let tries = TestTriesBuilder::new().with_store(store.clone()).build(); let root = test_populate_trie(&tries, &Trie::EMPTY_ROOT, ShardUId::single_shard(), trie_data); let trie = tries.get_trie_for_shard(ShardUId::single_shard(), root); diff --git a/tools/state-viewer/src/state_changes.rs b/tools/state-viewer/src/state_changes.rs index 9dee25a6c7a..4ec39df8877 100644 --- a/tools/state-viewer/src/state_changes.rs +++ b/tools/state-viewer/src/state_changes.rs @@ -189,6 +189,7 @@ fn apply_state_changes( trie_update, state_changes, *block_hash, + block_height, ); let mut store_update = chain_store.store_update(); store_update.save_trie_changes(wrapped_trie_changes);