Skip to content

Commit

Permalink
memtrie 4
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near committed Nov 2, 2023
1 parent ade198e commit cd49133
Show file tree
Hide file tree
Showing 35 changed files with 1,522 additions and 331 deletions.
31 changes: 31 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,16 @@ impl Chain {
};
store_update.commit()?;

// We must load in-memory tries here, and not inside runtime, because
// if we were initializing from genesis, the runtime would be
// initialized when no blocks or flat storage were initialized. We
// require flat storage in order to load in-memory tries.
// TODO(#9511): The calculation of shard UIDs is not precise in the case
// of resharding. We need to revisit this.
let tip = store.head()?;
let shard_uids = epoch_manager.get_shard_layout(&tip.epoch_id)?.get_shard_uids();
runtime_adapter.load_mem_tries_on_startup(&shard_uids)?;

info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}",
header_head.height, header_head.last_block_hash,
block_head.height, block_head.last_block_hash);
Expand Down Expand Up @@ -2413,6 +2423,7 @@ impl Chain {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?;
self.garbage_collect_memtrie_roots(&block, shard_uid);
}
}

Expand Down Expand Up @@ -2459,6 +2470,17 @@ impl Chain {
Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance })
}

fn garbage_collect_memtrie_roots(&self, block: &Block, shard_uid: ShardUId) {
let tries = self.runtime_adapter.get_tries();
let last_final_block = block.header().last_final_block();
if last_final_block != &CryptoHash::default() {
let header = self.store.get_block_header(last_final_block).unwrap();
if let Some(prev_height) = header.prev_height() {
tries.delete_memtrie_roots_up_to_height(shard_uid, prev_height);
}
}
}

/// Preprocess a block before applying chunks, verify that we have the necessary information
/// to process the block an the block is valid.
/// Note that this function does NOT introduce any changes to chain state.
Expand Down Expand Up @@ -3576,6 +3598,7 @@ impl Chain {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let flat_storage_manager = self.runtime_adapter.get_flat_storage_manager();
flat_storage_manager.update_flat_storage_for_shard(shard_uid, &block)?;
self.garbage_collect_memtrie_roots(&block, shard_uid);
}
}

Expand Down Expand Up @@ -4098,6 +4121,7 @@ impl Chain {
)?;

let block_hash = *block.hash();
let block_height = block.header().height();
let challenges_result = block.header().challenges_result().clone();
let block_timestamp = block.header().raw_timestamp();
let next_gas_price = prev_block.header().next_gas_price();
Expand Down Expand Up @@ -4143,6 +4167,7 @@ impl Chain {
epoch_manager.as_ref(),
runtime.as_ref(),
&block_hash,
block_height,
&prev_block_hash,
&apply_result,
split_state_roots,
Expand Down Expand Up @@ -4179,6 +4204,7 @@ impl Chain {
let new_extra = self.get_chunk_extra(&prev_block_hash, &shard_uid)?;

let block_hash = *block.hash();
let block_height = block.header().height();
let challenges_result = block.header().challenges_result().clone();
let block_timestamp = block.header().raw_timestamp();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?;
Expand Down Expand Up @@ -4230,6 +4256,7 @@ impl Chain {
epoch_manager.as_ref(),
runtime.as_ref(),
&block_hash,
block_height,
&prev_block_hash,
&apply_result,
split_state_roots,
Expand Down Expand Up @@ -4263,6 +4290,7 @@ impl Chain {
let state_changes =
self.store().get_state_changes_for_split_states(block.hash(), shard_id)?;
let block_hash = *block.hash();
let block_height = block.header().height();
Ok(Some(Box::new(move |parent_span| -> Result<ApplyChunkResult, Error> {
let _span = tracing::debug_span!(
target: "chain",
Expand All @@ -4273,6 +4301,7 @@ impl Chain {
.entered();
let results = runtime.apply_update_to_split_states(
&block_hash,
block_height,
split_state_roots,
&next_epoch_shard_layout,
state_changes,
Expand Down Expand Up @@ -5166,6 +5195,7 @@ impl<'a> ChainUpdate<'a> {
epoch_manager: &dyn EpochManagerAdapter,
runtime_adapter: &dyn RuntimeAdapter,
block_hash: &CryptoHash,
block_height: BlockHeight,
prev_block_hash: &CryptoHash,
apply_result: &ApplyTransactionResult,
split_state_roots: Option<HashMap<ShardUId, StateRoot>>,
Expand All @@ -5182,6 +5212,7 @@ impl<'a> ChainUpdate<'a> {
if let Some(state_roots) = split_state_roots {
let split_state_results = runtime_adapter.apply_update_to_split_states(
block_hash,
block_height,
state_roots,
&next_epoch_shard_layout,
state_changes,
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3140,6 +3140,7 @@ impl<'a> ChainStoreUpdate<'a> {
// from the store.
let mut deletions_store_update = self.store().store_update();
for mut wrapped_trie_changes in self.trie_changes.drain(..) {
wrapped_trie_changes.apply_mem_changes();
wrapped_trie_changes.insertions_into(&mut store_update);
wrapped_trie_changes.deletions_into(&mut deletions_store_update);
wrapped_trie_changes.state_changes_into(&mut store_update);
Expand Down
16 changes: 13 additions & 3 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use borsh::{BorshDeserialize, BorshSerialize};
use near_epoch_manager::types::BlockHeaderInfo;
use near_epoch_manager::{EpochManagerAdapter, RngSeed};
use near_primitives::state_part::PartId;
use near_store::test_utils::TestTriesBuilder;
use num_rational::Ratio;

use near_chain_configs::{ProtocolConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP};
Expand Down Expand Up @@ -41,7 +42,7 @@ use near_primitives::views::{
QueryRequest, QueryResponse, QueryResponseKind, ViewStateResult,
};
use near_store::{
set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, Store, StoreUpdate, Trie,
set_genesis_hash, set_genesis_state_roots, DBCol, ShardTries, StorageError,Store, StoreUpdate, Trie,
TrieChanges, WrappedTrieChanges,
};

Expand Down Expand Up @@ -334,7 +335,10 @@ impl KeyValueRuntime {
let num_shards = epoch_manager.num_shards(&EpochId::default()).unwrap();
let epoch_length =
epoch_manager.get_epoch_config(&EpochId::default()).unwrap().epoch_length;
let tries = ShardTries::test(store.clone(), num_shards);
let tries = TestTriesBuilder::new()
.with_store(store.clone())
.with_shard_layout(0, num_shards)
.build();
let mut initial_amounts = HashMap::new();
for (i, validator_stake) in epoch_manager
.validators_by_valset
Expand Down Expand Up @@ -1019,7 +1023,7 @@ impl RuntimeAdapter for KeyValueRuntime {
&self,
shard_id: ShardId,
storage_config: RuntimeStorageConfig,
_height: BlockHeight,
height: BlockHeight,
_block_timestamp: u64,
_prev_block_hash: &CryptoHash,
block_hash: &CryptoHash,
Expand Down Expand Up @@ -1172,6 +1176,7 @@ impl RuntimeAdapter for KeyValueRuntime {
TrieChanges::empty(state_root),
Default::default(),
*block_hash,
height,
),
new_root: state_root,
outcomes: tx_results,
Expand Down Expand Up @@ -1363,10 +1368,15 @@ impl RuntimeAdapter for KeyValueRuntime {
fn apply_update_to_split_states(
&self,
_block_hash: &CryptoHash,
_block_height: BlockHeight,
_state_roots: HashMap<ShardUId, StateRoot>,
_next_shard_layout: &ShardLayout,
_state_changes: StateChangesForSplitStates,
) -> Result<Vec<ApplySplitStateResult>, Error> {
Ok(vec![])
}

fn load_mem_tries_on_startup(&self, _shard_uids: &[ShardUId]) -> Result<(), StorageError> {
Ok(())
}
}
1 change: 1 addition & 0 deletions chain/chain/src/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ fn do_fork(
trie_changes,
Default::default(),
*block.hash(),
block.header().height(),
);
store_update.save_trie_changes(wrapped_trie_changes);

Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Utc;
use near_chain_configs::StateSplitConfig;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use near_store::StorageError;
use num_rational::Rational32;

use near_chain_configs::{Genesis, ProtocolConfig};
Expand Down Expand Up @@ -392,6 +393,7 @@ pub trait RuntimeAdapter: Send + Sync {
fn apply_update_to_split_states(
&self,
block_hash: &CryptoHash,
block_height: BlockHeight,
state_roots: HashMap<ShardUId, StateRoot>,
next_shard_layout: &ShardLayout,
state_changes: StateChangesForSplitStates,
Expand Down Expand Up @@ -426,6 +428,8 @@ pub trait RuntimeAdapter: Send + Sync {
) -> bool;

fn get_protocol_config(&self, epoch_id: &EpochId) -> Result<ProtocolConfig, Error>;

fn load_mem_tries_on_startup(&self, shard_uids: &[ShardUId]) -> Result<(), StorageError>;
}

/// The last known / checked height and time when we have processed it.
Expand Down
29 changes: 18 additions & 11 deletions chain/client/src/test_utils/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use near_primitives::epoch_manager::RngSeed;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::runtime::config::RuntimeConfig;
use near_primitives::shard_layout::ShardUId;
use near_primitives::sharding::PartialEncodedChunk;
use near_primitives::test_utils::create_test_signer;
use near_primitives::transaction::{Action, FunctionCallAction, SignedTransaction};
Expand Down Expand Up @@ -315,13 +314,17 @@ impl TestEnv {
}

pub fn query_account(&mut self, account_id: AccountId) -> AccountView {
let head = self.clients[0].chain.head().unwrap();
let last_block = self.clients[0].chain.get_block(&head.last_block_hash).unwrap();
let last_chunk_header = &last_block.chunks()[0];
let response = self.clients[0]
let client = &self.clients[0];
let head = client.chain.head().unwrap();
let last_block = client.chain.get_block(&head.last_block_hash).unwrap();
let shard_id =
client.epoch_manager.account_id_to_shard_id(&account_id, &head.epoch_id).unwrap();
let shard_uid = client.epoch_manager.shard_id_to_uid(shard_id, &head.epoch_id).unwrap();
let last_chunk_header = &last_block.chunks()[shard_id as usize];
let response = client
.runtime_adapter
.query(
ShardUId::single_shard(),
shard_uid,
&last_chunk_header.prev_state_root(),
last_block.header().height(),
last_block.header().raw_timestamp(),
Expand All @@ -338,13 +341,17 @@ impl TestEnv {
}

pub fn query_state(&mut self, account_id: AccountId) -> Vec<StateItem> {
let head = self.clients[0].chain.head().unwrap();
let last_block = self.clients[0].chain.get_block(&head.last_block_hash).unwrap();
let last_chunk_header = &last_block.chunks()[0];
let response = self.clients[0]
let client = &self.clients[0];
let head = client.chain.head().unwrap();
let last_block = client.chain.get_block(&head.last_block_hash).unwrap();
let shard_id =
client.epoch_manager.account_id_to_shard_id(&account_id, &head.epoch_id).unwrap();
let shard_uid = client.epoch_manager.shard_id_to_uid(shard_id, &head.epoch_id).unwrap();
let last_chunk_header = &last_block.chunks()[shard_id as usize];
let response = client
.runtime_adapter
.query(
ShardUId::single_shard(),
shard_uid,
&last_chunk_header.prev_state_root(),
last_block.header().height(),
last_block.header().raw_timestamp(),
Expand Down
9 changes: 6 additions & 3 deletions chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use near_network::test_utils::MockPeerManagerAdapter;
use near_primitives::epoch_manager::{AllEpochConfigTestOverrides, RngSeed};
use near_primitives::types::{AccountId, NumShards};
use near_store::test_utils::create_test_store;
use near_store::{NodeStorage, ShardUId, Store, StoreConfig};
use near_store::{NodeStorage, ShardUId, Store, StoreConfig, TrieConfig};

use super::setup::{setup_client_with_runtime, setup_synchronous_shards_manager};
use super::test_env::TestEnv;
Expand Down Expand Up @@ -299,11 +299,13 @@ impl TestEnvBuilder {
pub fn internal_initialize_nightshade_runtimes(
self,
runtime_configs: Vec<RuntimeConfigStore>,
trie_configs: Vec<TrieConfig>,
nightshade_runtime_creator: impl Fn(
PathBuf,
Store,
Arc<EpochManagerHandle>,
RuntimeConfigStore,
TrieConfig,
) -> Arc<dyn RuntimeAdapter>,
) -> Self {
let builder = self.ensure_home_dirs().ensure_epoch_managers().ensure_stores();
Expand All @@ -312,15 +314,16 @@ impl TestEnvBuilder {
builder.stores.clone().unwrap(),
builder.epoch_managers.clone().unwrap(),
runtime_configs,
trie_configs,
))
.map(|(home_dir, store, epoch_manager, runtime_config)| {
.map(|(home_dir, store, epoch_manager, runtime_config, trie_config)| {
let epoch_manager = match epoch_manager {
EpochManagerKind::Mock(_) => {
panic!("NightshadeRuntime can only be instantiated with EpochManagerHandle")
}
EpochManagerKind::Handle(handle) => handle,
};
nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config)
nightshade_runtime_creator(home_dir, store, epoch_manager, runtime_config, trie_config)
})
.collect();
builder.runtimes(runtimes)
Expand Down
2 changes: 2 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub enum StorageError {
/// We guarantee that such block cannot become final, thus block processing
/// must resume normally.
FlatStorageBlockNotSupported(String),
/// In-memory trie could not be loaded for some reason.
MemTrieLoadingError(String),
}

impl std::fmt::Display for StorageError {
Expand Down
15 changes: 15 additions & 0 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ pub struct StoreConfig {
/// This config option is temporary and will be removed once flat storage is implemented.
pub sweat_prefetch_senders: Vec<String>,

/// List of shard UIDs for which we should load the tries in memory. If resharding happens,
/// any descendant shard of these shards are loaded into memory.
pub load_mem_tries_for_shards: Vec<String>,
/// If true, load mem tries for all shards.
pub load_mem_tries_for_all_shards: bool,
/// Maximum size, in number of bytes, of a single shard in memory.
/// This amount is reserved upfront with mmap. If the machine does not have
/// that much RAM, enable memory overcommit. The actual memory usage is only
/// the real size of the loaded tries.
pub max_mem_tries_size_per_shard: usize,

/// Path where to create RocksDB checkpoints during database migrations or
/// `false` to disable that feature.
///
Expand Down Expand Up @@ -262,6 +273,10 @@ impl Default for StoreConfig {
"sweat_the_oracle.testnet".to_owned(),
],

load_mem_tries_for_shards: vec!["s3.v1".to_owned()],
load_mem_tries_for_all_shards: false,
max_mem_tries_size_per_shard: 16 * 1024 * 1024 * 1024,

migration_snapshot: Default::default(),

// We checked that this number of threads doesn't impact
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/flat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ mod manager;
mod metrics;
mod storage;
pub mod store_helper;
#[cfg(test)]
pub mod test_utils;
mod types;

pub use chunk_view::FlatStorageChunkView;
Expand Down
Loading

0 comments on commit cd49133

Please sign in to comment.