Skip to content

Commit

Permalink
feat: resharding config allowing throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
wacban committed Oct 24, 2023
1 parent dbd2eef commit 7c03720
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use chrono::Duration;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_chain_configs::StateSplitConfig;
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::types::BlockHeaderInfo;
Expand Down Expand Up @@ -477,6 +478,8 @@ pub struct Chain {

/// Lets trigger new state snapshots.
state_snapshot_helper: Option<StateSnapshotHelper>,

pub(crate) state_split_config: near_chain_configs::StateSplitConfig,
}

/// Lets trigger new state snapshots.
Expand Down Expand Up @@ -579,6 +582,7 @@ impl Chain {
pending_state_patch: Default::default(),
requested_state_parts: StateRequestTracker::new(),
state_snapshot_helper: None,
state_split_config: StateSplitConfig::default(),
})
}

Expand Down Expand Up @@ -749,6 +753,7 @@ impl Chain {
.state_snapshot_every_n_blocks
.map(|n| (0, n)),
}),
state_split_config: chain_config.state_split_config,
})
}

Expand Down
73 changes: 52 additions & 21 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
/// by the client_actor while the heavy resharding build_state_for_split_shards is done by SyncJobsActor
/// so as to not affect client.
use crate::metrics::{
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS,
ReshardingStatus, RESHARDING_BATCH_APPLY_TIME, RESHARDING_BATCH_COMMIT_TIME,
RESHARDING_BATCH_COUNT, RESHARDING_BATCH_PREPARE_TIME, RESHARDING_BATCH_SIZE,
RESHARDING_STATUS,
};
use crate::Chain;
use itertools::Itertools;
use near_chain_configs::StateSplitConfig;
use near_chain_primitives::error::Error;
use near_primitives::errors::StorageError::StorageInconsistentState;
use near_primitives::hash::CryptoHash;
Expand All @@ -27,8 +30,6 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::debug;

// This is the approx batch size of the trie key, value pair entries that are written to the child shard trie.
const RESHARDING_BATCH_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(300 * bytesize::MIB);
const MAX_RESHARDING_POLL_TIME: Duration = Duration::from_secs(5 * 60 * 60); // 5 hrs

/// StateSplitRequest has all the information needed to start a resharding job. This message is sent
Expand All @@ -51,6 +52,8 @@ pub struct StateSplitRequest {
pub next_epoch_shard_layout: ShardLayout,
// Time we've spent polling for the state snapshot to be ready. We autofail after a certain time.
pub curr_poll_time: Duration,
// Configuration for resharding. Can be used to throttle resharding if needed.
pub config: StateSplitConfig,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
Expand Down Expand Up @@ -108,16 +111,17 @@ struct TrieUpdateBatch {
}

// Function to return batches of trie key, value pairs from flat storage iter. We return None at the end of iter.
// The batch size is roughly RESHARDING_BATCH_MEMORY_LIMIT (300 MB)
// The batch size is roughly batch_memory_limit.
fn get_trie_update_batch(
config: &StateSplitConfig,
iter: &mut impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> Option<TrieUpdateBatch> {
let mut size: u64 = 0;
let mut entries = Vec::new();
while let Some((key, value)) = iter.next() {
size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64);
entries.push((key, value));
if size > RESHARDING_BATCH_MEMORY_LIMIT.as_u64() {
if size > config.batch_size.as_u64() {
break;
}
}
Expand All @@ -129,6 +133,7 @@ fn get_trie_update_batch(
}

fn apply_delayed_receipts<'a>(
config: &StateSplitConfig,
tries: &ShardTries,
orig_shard_uid: ShardUId,
orig_state_root: StateRoot,
Expand All @@ -140,7 +145,7 @@ fn apply_delayed_receipts<'a>(
let mut start_index = None;
let mut new_state_roots = state_roots;
while let Some((next_index, receipts)) =
get_delayed_receipts(&orig_trie_update, start_index, RESHARDING_BATCH_MEMORY_LIMIT)?
get_delayed_receipts(&orig_trie_update, start_index, config.batch_size)?
{
let (store_update, updated_state_roots) = tries.apply_delayed_receipts_to_split_states(
&new_state_roots,
Expand Down Expand Up @@ -203,6 +208,7 @@ impl Chain {
state_root,
next_epoch_shard_layout,
curr_poll_time: Duration::ZERO,
config: self.state_split_config.clone(),
});

for shard_uid in child_shard_uids.unwrap_or_default() {
Expand Down Expand Up @@ -250,10 +256,11 @@ impl Chain {
shard_uid,
state_root,
next_epoch_shard_layout,
config,
..
} = state_split_request;

tracing::debug!(target: "resharding", ?shard_uid, "build_state_for_split_shards_impl starting");
tracing::debug!(target: "resharding", ?config, ?shard_uid, "build_state_for_split_shards_impl starting");

let shard_id = shard_uid.shard_id();
let new_shards = next_epoch_shard_layout
Expand Down Expand Up @@ -317,25 +324,49 @@ impl Chain {
let checked_account_id_to_shard_uid =
get_checked_account_id_to_shard_uid_fn(shard_uid, new_shards, next_epoch_shard_layout);

let shard_uid_string = shard_uid.to_string();
let metrics_labels = [shard_uid_string.as_str()];

// Once we build the iterator, we break it into batches using the get_trie_update_batch function.
while let Some(batch) = get_trie_update_batch(&mut iter) {
loop {
// Prepare the batch.
let batch = {
let histogram = RESHARDING_BATCH_PREPARE_TIME.with_label_values(&metrics_labels);
let _timer = histogram.start_timer();
let Some(batch) = get_trie_update_batch(&config, &mut iter) else { break };
batch
};

// Apply the batch - add values to the split states.
let TrieUpdateBatch { entries, size } = batch;
// TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id
// A better way would be to use the boundary account to construct the from and to key range for flat storage iterator
let (store_update, new_state_roots) = tries.add_values_to_split_states(
&state_roots,
entries,
&checked_account_id_to_shard_uid,
)?;
state_roots = new_state_roots;
store_update.commit()?;
RESHARDING_BATCH_COUNT.with_label_values(&[shard_uid.to_string().as_str()]).inc();
RESHARDING_BATCH_SIZE
.with_label_values(&[shard_uid.to_string().as_str()])
.add(size as i64)
let store_update = {
let histogram = RESHARDING_BATCH_APPLY_TIME.with_label_values(&metrics_labels);
let _timer = histogram.start_timer();
// TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id
// A better way would be to use the boundary account to construct the from and to key range for flat storage iterator
let (store_update, new_state_roots) = tries.add_values_to_split_states(
&state_roots,
entries,
&checked_account_id_to_shard_uid,
)?;
state_roots = new_state_roots;
store_update
};

// Commit the store update.
{
let histogram = RESHARDING_BATCH_COMMIT_TIME.with_label_values(&metrics_labels);
let _timer = histogram.start_timer();
store_update.commit()?;
}

RESHARDING_BATCH_COUNT.with_label_values(&metrics_labels).inc();
RESHARDING_BATCH_SIZE.with_label_values(&metrics_labels).add(size as i64);
std::thread::sleep(config.batch_delay);
}

state_roots = apply_delayed_receipts(
&config,
&tries,
shard_uid,
state_root,
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use borsh::{BorshDeserialize, BorshSerialize};
use chrono::DateTime;
use chrono::Utc;
use near_chain_configs::StateSplitConfig;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_store::flat::FlatStorageManager;
use num_rational::Rational32;
Expand Down Expand Up @@ -211,6 +212,7 @@ pub struct ChainConfig {
/// Currently used for flat storage background creation.
pub background_migration_threads: usize,
pub state_snapshot_every_n_blocks: Option<u64>,
pub state_split_config: StateSplitConfig,
}

impl ChainConfig {
Expand All @@ -219,6 +221,7 @@ impl ChainConfig {
save_trie_changes: true,
background_migration_threads: 1,
state_snapshot_every_n_blocks: None,
state_split_config: StateSplitConfig::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ impl Client {
save_trie_changes: config.save_trie_changes,
background_migration_threads: config.client_background_migration_threads,
state_snapshot_every_n_blocks: config.state_snapshot_every_n_blocks,
state_split_config: config.state_split_config,
};
let chain = Chain::new(
epoch_manager.clone(),
Expand Down
5 changes: 4 additions & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use near_chain::state_snapshot_actor::MakeSnapshotCallback;
use near_chain::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule};
use near_chain::types::{ChainConfig, RuntimeAdapter};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::ClientConfig;
use near_chain_configs::{ClientConfig, StateSplitConfig};
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardsManagerResponse;
use near_chunks::shards_manager_actor::start_shards_manager;
Expand Down Expand Up @@ -114,6 +114,7 @@ pub fn setup(
save_trie_changes: true,
background_migration_threads: 1,
state_snapshot_every_n_blocks: None,
state_split_config: StateSplitConfig::default(),
},
None,
)
Expand Down Expand Up @@ -236,6 +237,7 @@ pub fn setup_only_view(
save_trie_changes: true,
background_migration_threads: 1,
state_snapshot_every_n_blocks: None,
state_split_config: StateSplitConfig::default(),
},
None,
)
Expand Down Expand Up @@ -1004,6 +1006,7 @@ pub fn setup_synchronous_shards_manager(
save_trie_changes: true,
background_migration_threads: 1,
state_snapshot_every_n_blocks: None,
state_split_config: StateSplitConfig::default(),
}, // irrelevant
None,
)
Expand Down
7 changes: 2 additions & 5 deletions core/chain-configs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = true

[dependencies]
anyhow.workspace = true
bytesize.workspace = true
chrono.workspace = true
derive_more.workspace = true
num-rational.workspace = true
Expand All @@ -31,10 +32,6 @@ nightly_protocol = [
"near-o11y/nightly_protocol",
"near-primitives/nightly_protocol",
]
nightly = [
"nightly_protocol",
"near-o11y/nightly",
"near-primitives/nightly",
]
nightly = ["nightly_protocol", "near-o11y/nightly", "near-primitives/nightly"]
default = []
metrics = ["near-o11y"]
21 changes: 21 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ impl SyncConfig {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
pub struct StateSplitConfig {
/// The soft limit on the size of a single batch. The batch size can be
/// decreased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_size: bytesize::ByteSize,
/// The delay between writing batches to the db. The batch delay can be
/// increased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_delay: Duration,
}

impl Default for StateSplitConfig {
fn default() -> Self {
Self { batch_size: bytesize::ByteSize::mb(30), batch_delay: Duration::from_millis(100) }
}
}

/// ClientConfig where some fields can be updated at runtime.
#[derive(Clone, serde::Serialize)]
pub struct ClientConfig {
Expand Down Expand Up @@ -272,6 +290,8 @@ pub struct ClientConfig {
pub transaction_pool_size_limit: Option<u64>,
// Allows more detailed logging, for example a list of orphaned blocks.
pub enable_multiline_logging: bool,
// Configuration for resharding.
pub state_split_config: StateSplitConfig,
}

impl ClientConfig {
Expand Down Expand Up @@ -347,6 +367,7 @@ impl ClientConfig {
state_snapshot_every_n_blocks: None,
transaction_pool_size_limit: None,
enable_multiline_logging: false,
state_split_config: StateSplitConfig::default(),
}
}
}
2 changes: 1 addition & 1 deletion core/chain-configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod updateable_config;

pub use client_config::{
ClientConfig, DumpConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig,
LogSummaryStyle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
LogSummaryStyle, StateSplitConfig, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT,
};
pub use genesis_config::{
Expand Down
5 changes: 4 additions & 1 deletion nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::dyn_config::LOG_CONFIG_FILENAME;
use anyhow::{anyhow, bail, Context};
use near_chain_configs::{
get_initial_supply, ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode,
LogSummaryStyle, MutableConfigValue, StateSyncConfig,
LogSummaryStyle, MutableConfigValue, StateSplitConfig, StateSyncConfig,
};
use near_config_utils::{ValidationError, ValidationErrors};
use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer};
Expand Down Expand Up @@ -346,6 +346,7 @@ pub struct Config {
/// chunks and underutilizing the capacity of the network.
#[serde(default = "default_transaction_pool_size_limit")]
pub transaction_pool_size_limit: Option<u64>,
pub state_split_config: StateSplitConfig,
}

fn is_false(value: &bool) -> bool {
Expand Down Expand Up @@ -386,6 +387,7 @@ impl Default for Config {
state_sync_enabled: None,
transaction_pool_size_limit: default_transaction_pool_size_limit(),
enable_multiline_logging: None,
state_split_config: StateSplitConfig::default(),
}
}
}
Expand Down Expand Up @@ -683,6 +685,7 @@ impl NearConfig {
state_snapshot_every_n_blocks: None,
transaction_pool_size_limit: config.transaction_pool_size_limit,
enable_multiline_logging: config.enable_multiline_logging.unwrap_or(true),
state_split_config: config.state_split_config,
},
network_config: NetworkConfig::new(
config.network,
Expand Down
3 changes: 2 additions & 1 deletion tools/speedy_sync/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use borsh::{BorshDeserialize, BorshSerialize};
use near_chain::types::{ChainConfig, Tip};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::GenesisValidationMode;
use near_chain_configs::{GenesisValidationMode, StateSplitConfig};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::types::EpochInfoAggregator;
use near_epoch_manager::EpochManager;
Expand Down Expand Up @@ -244,6 +244,7 @@ fn load_snapshot(load_cmd: LoadCmd) {
save_trie_changes: config.client_config.save_trie_changes,
background_migration_threads: 1,
state_snapshot_every_n_blocks: None,
state_split_config: StateSplitConfig::default(),
},
None,
)
Expand Down

0 comments on commit 7c03720

Please sign in to comment.