diff --git a/Cargo.lock b/Cargo.lock index 7aa60f9e753..1f0f8c88b25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3464,6 +3464,7 @@ name = "near-chain-configs" version = "0.0.0" dependencies = [ "anyhow", + "bytesize", "chrono", "derive_more", "near-config-utils", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 09939bc1889..dc70566e8be 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -26,6 +26,7 @@ use chrono::Duration; use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; use lru::LruCache; +use near_chain_configs::StateSplitConfig; use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError}; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::types::BlockHeaderInfo; @@ -477,6 +478,8 @@ pub struct Chain { /// Lets trigger new state snapshots. state_snapshot_helper: Option, + + pub(crate) state_split_config: near_chain_configs::StateSplitConfig, } /// Lets trigger new state snapshots. @@ -579,6 +582,7 @@ impl Chain { pending_state_patch: Default::default(), requested_state_parts: StateRequestTracker::new(), state_snapshot_helper: None, + state_split_config: StateSplitConfig::default(), }) } @@ -749,6 +753,7 @@ impl Chain { .state_snapshot_every_n_blocks .map(|n| (0, n)), }), + state_split_config: chain_config.state_split_config, }) } diff --git a/chain/chain/src/resharding.rs b/chain/chain/src/resharding.rs index f07b66df4a8..27a4476dc65 100644 --- a/chain/chain/src/resharding.rs +++ b/chain/chain/src/resharding.rs @@ -4,10 +4,13 @@ /// by the client_actor while the heavy resharding build_state_for_split_shards is done by SyncJobsActor /// so as to not affect client. use crate::metrics::{ - ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS, + ReshardingStatus, RESHARDING_BATCH_APPLY_TIME, RESHARDING_BATCH_COMMIT_TIME, + RESHARDING_BATCH_COUNT, RESHARDING_BATCH_PREPARE_TIME, RESHARDING_BATCH_SIZE, + RESHARDING_STATUS, }; use crate::Chain; use itertools::Itertools; +use near_chain_configs::StateSplitConfig; use near_chain_primitives::error::Error; use near_primitives::errors::StorageError::StorageInconsistentState; use near_primitives::hash::CryptoHash; @@ -27,8 +30,6 @@ use std::sync::Arc; use std::time::Duration; use tracing::debug; -// This is the approx batch size of the trie key, value pair entries that are written to the child shard trie. -const RESHARDING_BATCH_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(300 * bytesize::MIB); const MAX_RESHARDING_POLL_TIME: Duration = Duration::from_secs(5 * 60 * 60); // 5 hrs /// StateSplitRequest has all the information needed to start a resharding job. This message is sent @@ -51,6 +52,8 @@ pub struct StateSplitRequest { pub next_epoch_shard_layout: ShardLayout, // Time we've spent polling for the state snapshot to be ready. We autofail after a certain time. pub curr_poll_time: Duration, + // Configuration for resharding. Can be used to throttle resharding if needed. + pub config: StateSplitConfig, } // Skip `runtime_adapter`, because it's a complex object that has complex logic @@ -108,8 +111,9 @@ struct TrieUpdateBatch { } // Function to return batches of trie key, value pairs from flat storage iter. We return None at the end of iter. -// The batch size is roughly RESHARDING_BATCH_MEMORY_LIMIT (300 MB) +// The batch size is roughly batch_memory_limit. fn get_trie_update_batch( + config: &StateSplitConfig, iter: &mut impl Iterator, Option>)>, ) -> Option { let mut size: u64 = 0; @@ -117,7 +121,7 @@ fn get_trie_update_batch( while let Some((key, value)) = iter.next() { size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64); entries.push((key, value)); - if size > RESHARDING_BATCH_MEMORY_LIMIT.as_u64() { + if size > config.batch_size.as_u64() { break; } } @@ -129,6 +133,7 @@ fn get_trie_update_batch( } fn apply_delayed_receipts<'a>( + config: &StateSplitConfig, tries: &ShardTries, orig_shard_uid: ShardUId, orig_state_root: StateRoot, @@ -140,7 +145,7 @@ fn apply_delayed_receipts<'a>( let mut start_index = None; let mut new_state_roots = state_roots; while let Some((next_index, receipts)) = - get_delayed_receipts(&orig_trie_update, start_index, RESHARDING_BATCH_MEMORY_LIMIT)? + get_delayed_receipts(&orig_trie_update, start_index, config.batch_size)? { let (store_update, updated_state_roots) = tries.apply_delayed_receipts_to_split_states( &new_state_roots, @@ -203,6 +208,7 @@ impl Chain { state_root, next_epoch_shard_layout, curr_poll_time: Duration::ZERO, + config: self.state_split_config.clone(), }); for shard_uid in child_shard_uids.unwrap_or_default() { @@ -250,10 +256,11 @@ impl Chain { shard_uid, state_root, next_epoch_shard_layout, + config, .. } = state_split_request; - tracing::debug!(target: "resharding", ?shard_uid, "build_state_for_split_shards_impl starting"); + tracing::debug!(target: "resharding", ?config, ?shard_uid, "build_state_for_split_shards_impl starting"); let shard_id = shard_uid.shard_id(); let new_shards = next_epoch_shard_layout @@ -317,25 +324,49 @@ impl Chain { let checked_account_id_to_shard_uid = get_checked_account_id_to_shard_uid_fn(shard_uid, new_shards, next_epoch_shard_layout); + let shard_uid_string = shard_uid.to_string(); + let metrics_labels = [shard_uid_string.as_str()]; + // Once we build the iterator, we break it into batches using the get_trie_update_batch function. - while let Some(batch) = get_trie_update_batch(&mut iter) { + loop { + // Prepare the batch. + let batch = { + let histogram = RESHARDING_BATCH_PREPARE_TIME.with_label_values(&metrics_labels); + let _timer = histogram.start_timer(); + let Some(batch) = get_trie_update_batch(&config, &mut iter) else { break }; + batch + }; + + // Apply the batch - add values to the split states. let TrieUpdateBatch { entries, size } = batch; - // TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id - // A better way would be to use the boundary account to construct the from and to key range for flat storage iterator - let (store_update, new_state_roots) = tries.add_values_to_split_states( - &state_roots, - entries, - &checked_account_id_to_shard_uid, - )?; - state_roots = new_state_roots; - store_update.commit()?; - RESHARDING_BATCH_COUNT.with_label_values(&[shard_uid.to_string().as_str()]).inc(); - RESHARDING_BATCH_SIZE - .with_label_values(&[shard_uid.to_string().as_str()]) - .add(size as i64) + let store_update = { + let histogram = RESHARDING_BATCH_APPLY_TIME.with_label_values(&metrics_labels); + let _timer = histogram.start_timer(); + // TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id + // A better way would be to use the boundary account to construct the from and to key range for flat storage iterator + let (store_update, new_state_roots) = tries.add_values_to_split_states( + &state_roots, + entries, + &checked_account_id_to_shard_uid, + )?; + state_roots = new_state_roots; + store_update + }; + + // Commit the store update. + { + let histogram = RESHARDING_BATCH_COMMIT_TIME.with_label_values(&metrics_labels); + let _timer = histogram.start_timer(); + store_update.commit()?; + } + + RESHARDING_BATCH_COUNT.with_label_values(&metrics_labels).inc(); + RESHARDING_BATCH_SIZE.with_label_values(&metrics_labels).add(size as i64); + std::thread::sleep(config.batch_delay); } state_roots = apply_delayed_receipts( + &config, &tries, shard_uid, state_root, diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index baf31036f86..57396ef484e 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use borsh::{BorshDeserialize, BorshSerialize}; use chrono::DateTime; use chrono::Utc; +use near_chain_configs::StateSplitConfig; use near_primitives::sandbox::state_patch::SandboxStatePatch; use near_store::flat::FlatStorageManager; use num_rational::Rational32; @@ -211,6 +212,7 @@ pub struct ChainConfig { /// Currently used for flat storage background creation. pub background_migration_threads: usize, pub state_snapshot_every_n_blocks: Option, + pub state_split_config: StateSplitConfig, } impl ChainConfig { @@ -219,6 +221,7 @@ impl ChainConfig { save_trie_changes: true, background_migration_threads: 1, state_snapshot_every_n_blocks: None, + state_split_config: StateSplitConfig::default(), } } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index c7b53657c63..b7c9ae28d20 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -225,6 +225,7 @@ impl Client { save_trie_changes: config.save_trie_changes, background_migration_threads: config.client_background_migration_threads, state_snapshot_every_n_blocks: config.state_snapshot_every_n_blocks, + state_split_config: config.state_split_config, }; let chain = Chain::new( epoch_manager.clone(), diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index feb5ea52a4a..4413cb9e1a2 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -20,7 +20,7 @@ use near_chain::state_snapshot_actor::MakeSnapshotCallback; use near_chain::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule}; use near_chain::types::{ChainConfig, RuntimeAdapter}; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; -use near_chain_configs::ClientConfig; +use near_chain_configs::{ClientConfig, StateSplitConfig}; use near_chunks::adapter::ShardsManagerRequestFromClient; use near_chunks::client::ShardsManagerResponse; use near_chunks::shards_manager_actor::start_shards_manager; @@ -114,6 +114,7 @@ pub fn setup( save_trie_changes: true, background_migration_threads: 1, state_snapshot_every_n_blocks: None, + state_split_config: StateSplitConfig::default(), }, None, ) @@ -236,6 +237,7 @@ pub fn setup_only_view( save_trie_changes: true, background_migration_threads: 1, state_snapshot_every_n_blocks: None, + state_split_config: StateSplitConfig::default(), }, None, ) @@ -1004,6 +1006,7 @@ pub fn setup_synchronous_shards_manager( save_trie_changes: true, background_migration_threads: 1, state_snapshot_every_n_blocks: None, + state_split_config: StateSplitConfig::default(), }, // irrelevant None, ) diff --git a/core/chain-configs/Cargo.toml b/core/chain-configs/Cargo.toml index 1c59b650c4a..6af0802a453 100644 --- a/core/chain-configs/Cargo.toml +++ b/core/chain-configs/Cargo.toml @@ -11,6 +11,7 @@ publish = true [dependencies] anyhow.workspace = true +bytesize.workspace = true chrono.workspace = true derive_more.workspace = true num-rational.workspace = true @@ -31,10 +32,6 @@ nightly_protocol = [ "near-o11y/nightly_protocol", "near-primitives/nightly_protocol", ] -nightly = [ - "nightly_protocol", - "near-o11y/nightly", - "near-primitives/nightly", -] +nightly = ["nightly_protocol", "near-o11y/nightly", "near-primitives/nightly"] default = [] metrics = ["near-o11y"] diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 552efc78133..0f08e242018 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -162,6 +162,24 @@ impl SyncConfig { } } +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)] +pub struct StateSplitConfig { + /// The soft limit on the size of a single batch. The batch size can be + /// decreased if resharding is consuming too many resources and interfering + /// with regular node operation. + pub batch_size: bytesize::ByteSize, + /// The delay between writing batches to the db. The batch delay can be + /// increased if resharding is consuming too many resources and interfering + /// with regular node operation. + pub batch_delay: Duration, +} + +impl Default for StateSplitConfig { + fn default() -> Self { + Self { batch_size: bytesize::ByteSize::mb(30), batch_delay: Duration::from_millis(100) } + } +} + /// ClientConfig where some fields can be updated at runtime. #[derive(Clone, serde::Serialize)] pub struct ClientConfig { @@ -272,6 +290,8 @@ pub struct ClientConfig { pub transaction_pool_size_limit: Option, // Allows more detailed logging, for example a list of orphaned blocks. pub enable_multiline_logging: bool, + // Configuration for resharding. + pub state_split_config: StateSplitConfig, } impl ClientConfig { @@ -347,6 +367,7 @@ impl ClientConfig { state_snapshot_every_n_blocks: None, transaction_pool_size_limit: None, enable_multiline_logging: false, + state_split_config: StateSplitConfig::default(), } } } diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index 27e7dcbaa79..e6ecef7ae44 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -7,7 +7,7 @@ mod updateable_config; pub use client_config::{ ClientConfig, DumpConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, - LogSummaryStyle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + LogSummaryStyle, StateSplitConfig, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; pub use genesis_config::{ diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index a6bfd3ac874..9a2d8c32c2d 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -3,7 +3,7 @@ use crate::dyn_config::LOG_CONFIG_FILENAME; use anyhow::{anyhow, bail, Context}; use near_chain_configs::{ get_initial_supply, ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, - LogSummaryStyle, MutableConfigValue, StateSyncConfig, + LogSummaryStyle, MutableConfigValue, StateSplitConfig, StateSyncConfig, }; use near_config_utils::{ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer}; @@ -346,6 +346,7 @@ pub struct Config { /// chunks and underutilizing the capacity of the network. #[serde(default = "default_transaction_pool_size_limit")] pub transaction_pool_size_limit: Option, + pub state_split_config: StateSplitConfig, } fn is_false(value: &bool) -> bool { @@ -386,6 +387,7 @@ impl Default for Config { state_sync_enabled: None, transaction_pool_size_limit: default_transaction_pool_size_limit(), enable_multiline_logging: None, + state_split_config: StateSplitConfig::default(), } } } @@ -683,6 +685,7 @@ impl NearConfig { state_snapshot_every_n_blocks: None, transaction_pool_size_limit: config.transaction_pool_size_limit, enable_multiline_logging: config.enable_multiline_logging.unwrap_or(true), + state_split_config: config.state_split_config, }, network_config: NetworkConfig::new( config.network, diff --git a/tools/speedy_sync/src/main.rs b/tools/speedy_sync/src/main.rs index 29109ce4bb3..d3950af8f34 100644 --- a/tools/speedy_sync/src/main.rs +++ b/tools/speedy_sync/src/main.rs @@ -1,7 +1,7 @@ use borsh::{BorshDeserialize, BorshSerialize}; use near_chain::types::{ChainConfig, Tip}; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; -use near_chain_configs::GenesisValidationMode; +use near_chain_configs::{GenesisValidationMode, StateSplitConfig}; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::types::EpochInfoAggregator; use near_epoch_manager::EpochManager; @@ -244,6 +244,7 @@ fn load_snapshot(load_cmd: LoadCmd) { save_trie_changes: config.client_config.save_trie_changes, background_migration_threads: 1, state_snapshot_every_n_blocks: None, + state_split_config: StateSplitConfig::default(), }, None, )