Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configure num threads for flat storage creation #8088

Merged
merged 8 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, GCMode};
use crate::types::{
AcceptedBlock, ApplySplitStateResult, ApplySplitStateResultOrStateChanges,
ApplyTransactionResult, Block, BlockEconomicsConfig, BlockHeader, BlockHeaderInfo, BlockStatus,
ChainGenesis, Provenance, RuntimeAdapter,
ChainConfig, ChainGenesis, Provenance, RuntimeAdapter,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra,
Expand Down Expand Up @@ -545,11 +545,12 @@ impl Chain {
runtime_adapter: Arc<dyn RuntimeAdapter>,
chain_genesis: &ChainGenesis,
doomslug_threshold_mode: DoomslugThresholdMode,
save_trie_changes: bool,
chain_config: ChainConfig,
) -> Result<Chain, Error> {
// Get runtime initial state and create genesis block out of it.
let (store, state_roots) = runtime_adapter.genesis_state();
let mut store = ChainStore::new(store, chain_genesis.height, save_trie_changes);
let mut store =
ChainStore::new(store, chain_genesis.height, chain_config.save_trie_changes);
let genesis_chunks = genesis_chunks(
state_roots.clone(),
runtime_adapter.num_shards(&EpochId::default())?,
Expand Down Expand Up @@ -653,7 +654,11 @@ impl Chain {
store_update.commit()?;

// Create flat storage or initiate migration to flat storage.
let flat_storage_creator = FlatStorageCreator::new(runtime_adapter.clone(), &store);
let flat_storage_creator = FlatStorageCreator::new(
runtime_adapter.clone(),
&store,
chain_config.background_work_threads,
);

info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}",
header_head.height, header_head.last_block_hash,
Expand Down
13 changes: 7 additions & 6 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use near_chain_primitives::Error;
use near_primitives::types::{BlockHeight, ShardId};
#[cfg(feature = "protocol_feature_flat_state")]
use near_store::flat_state::store_helper;
use near_store::flat_state::{FlatStorageStateStatus, NUM_PARTS_IN_ONE_STEP};
use near_store::flat_state::FlatStorageStateStatus;
use std::sync::Arc;
#[cfg(feature = "protocol_feature_flat_state")]
use tracing::debug;
Expand Down Expand Up @@ -109,7 +109,11 @@ pub struct FlatStorageCreator {
}

impl FlatStorageCreator {
pub fn new(runtime_adapter: Arc<dyn RuntimeAdapter>, chain_store: &ChainStore) -> Option<Self> {
pub fn new(
runtime_adapter: Arc<dyn RuntimeAdapter>,
chain_store: &ChainStore,
num_threads: usize,
) -> Option<Self> {
let chain_head = chain_store.head().unwrap();
let num_shards = runtime_adapter.num_shards(&chain_head.epoch_id).unwrap();
let start_height = chain_head.height;
Expand Down Expand Up @@ -138,10 +142,7 @@ impl FlatStorageCreator {
if creation_needed {
Some(Self {
shard_creators,
pool: rayon::ThreadPoolBuilder::new()
.num_threads(NUM_PARTS_IN_ONE_STEP as usize)
.build()
.unwrap(),
pool: rayon::ThreadPoolBuilder::new().num_threads(num_threads).build().unwrap(),
})
} else {
None
Expand Down
10 changes: 8 additions & 2 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,7 @@ mod tests {
use crate::store::{ChainStoreAccess, GCMode};
use crate::store_validator::StoreValidator;
use crate::test_utils::{KeyValueRuntime, ValidatorSchedule};
use crate::types::ChainConfig;
use crate::{Chain, ChainGenesis, DoomslugThresholdMode, RuntimeAdapter};

fn get_chain() -> Chain {
Expand All @@ -3003,8 +3004,13 @@ mod tests {
.block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]]);
let runtime_adapter =
Arc::new(KeyValueRuntime::new_with_validators(store, vs, epoch_length));
Chain::new(runtime_adapter, &chain_genesis, DoomslugThresholdMode::NoApprovals, true)
.unwrap()
Chain::new(
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
ChainConfig::default(),
)
.unwrap()
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ mod tests {
use near_store::test_utils::create_test_store;

use crate::test_utils::KeyValueRuntime;
use crate::types::ChainConfig;
use crate::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode};

use super::*;
Expand All @@ -395,7 +396,7 @@ mod tests {
runtime_adapter.clone(),
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::default(),
)
.unwrap();
(chain, StoreValidator::new(None, genesis, runtime_adapter, store, false))
Expand Down
7 changes: 4 additions & 3 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ use crate::block_processing_utils::BlockNotInPoolError;
use crate::chain::Chain;
use crate::store::ChainStoreAccess;
use crate::types::{
AcceptedBlock, ApplySplitStateResult, ApplyTransactionResult, BlockHeaderInfo, ChainGenesis,
AcceptedBlock, ApplySplitStateResult, ApplyTransactionResult, BlockHeaderInfo, ChainConfig,
ChainGenesis,
};
use crate::{BlockHeader, DoomslugThresholdMode, RuntimeAdapter};
use crate::{BlockProcessingArtifact, Provenance};
Expand Down Expand Up @@ -1425,7 +1426,7 @@ pub fn setup_with_tx_validity_period(
protocol_version: PROTOCOL_VERSION,
},
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::default(),
)
.unwrap();
let test_account = "test".parse::<AccountId>().unwrap();
Expand Down Expand Up @@ -1465,7 +1466,7 @@ pub fn setup_with_validators(
protocol_version: PROTOCOL_VERSION,
},
DoomslugThresholdMode::NoApprovals,
true,
ChainConfig::default(),
)
.unwrap();
(chain, runtime, signers)
Expand Down
10 changes: 8 additions & 2 deletions chain/chain/src/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::chain::Chain;
use crate::test_utils::{KeyValueRuntime, ValidatorSchedule};
use crate::types::{ChainGenesis, Tip};
use crate::types::{ChainConfig, ChainGenesis, Tip};
use crate::DoomslugThresholdMode;

use near_chain_configs::GCConfig;
Expand Down Expand Up @@ -30,7 +30,13 @@ fn get_chain_with_epoch_length_and_num_shards(
.block_producers_per_epoch(vec![vec!["test1".parse().unwrap()]])
.num_shards(num_shards);
let runtime_adapter = Arc::new(KeyValueRuntime::new_with_validators(store, vs, epoch_length));
Chain::new(runtime_adapter, &chain_genesis, DoomslugThresholdMode::NoApprovals, true).unwrap()
Chain::new(
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::NoApprovals,
ChainConfig::default(),
)
.unwrap()
}

// Build a chain of num_blocks on top of prev_block
Expand Down
14 changes: 14 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ pub struct ChainGenesis {
pub protocol_version: ProtocolVersion,
}

pub struct ChainConfig {
/// Whether we should save `TrieChanges` on disk or not.
Longarithm marked this conversation as resolved.
Show resolved Hide resolved
pub save_trie_changes: bool,
/// Number of threads to execute auxiliary background work.
/// Currently used for flat storage background creation.
pub background_work_threads: usize,
}

impl Default for ChainConfig {
fn default() -> Self {
Self { save_trie_changes: true, background_work_threads: 1 }
}
}
Longarithm marked this conversation as resolved.
Show resolved Hide resolved

impl ChainGenesis {
pub fn new(genesis: &Genesis) -> Self {
Self {
Expand Down
7 changes: 5 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use near_chain::chain::{
OrphanMissingChunks, StateSplitRequest, TX_ROUTING_HEIGHT_HORIZON,
};
use near_chain::test_utils::format_hash;
use near_chain::types::LatestKnown;
use near_chain::types::{ChainConfig, LatestKnown};
use near_chain::{
BlockProcessingArtifact, BlockStatus, Chain, ChainGenesis, ChainStoreAccess,
DoneApplyChunkCallback, Doomslug, DoomslugThresholdMode, Provenance, RuntimeAdapter,
Expand Down Expand Up @@ -188,7 +188,10 @@ impl Client {
runtime_adapter.clone(),
&chain_genesis,
doomslug_threshold_mode,
!config.archive,
ChainConfig {
save_trie_changes: !config.archive,
background_work_threads: config.client_background_work_threads,
},
)?;
let me = validator_signer.as_ref().map(|x| x.validator_id().clone());
let shards_mgr = ShardsManager::new(
Expand Down
10 changes: 8 additions & 2 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use near_chain::test_utils::{KeyValueRuntime, ValidatorSchedule};
use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_network::test_utils::peer_id_from_seed;
use near_primitives::version::PROTOCOL_VERSION;
Expand Down Expand Up @@ -539,8 +540,13 @@ mod tests {
protocol_version: PROTOCOL_VERSION,
};
let doomslug_threshold_mode = DoomslugThresholdMode::TwoThirds;
let chain =
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, true).unwrap();
let chain = Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig::default(),
)
.unwrap();

let telemetry = info_helper.telemetry_info(
&chain.head().unwrap(),
Expand Down
18 changes: 15 additions & 3 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use near_chain::test_utils::{
wait_for_all_blocks_in_processing, wait_for_block_in_processing, KeyValueRuntime,
ValidatorSchedule,
};
use near_chain::types::ChainConfig;
use near_chain::{
Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Provenance, RuntimeAdapter,
};
Expand Down Expand Up @@ -217,8 +218,13 @@ pub fn setup(
} else {
DoomslugThresholdMode::NoApprovals
};
let chain =
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, !archive).unwrap();
let chain = Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig { save_trie_changes: !archive, background_work_threads: 1 },
)
.unwrap();
let genesis_block = chain.get_block(&chain.genesis().hash().clone()).unwrap();

let signer = Arc::new(InMemoryValidatorSigner::from_seed(
Expand Down Expand Up @@ -302,7 +308,13 @@ pub fn setup_only_view(
} else {
DoomslugThresholdMode::NoApprovals
};
Chain::new(runtime.clone(), &chain_genesis, doomslug_threshold_mode, !archive).unwrap();
Chain::new(
runtime.clone(),
&chain_genesis,
doomslug_threshold_mode,
ChainConfig { save_trie_changes: !archive, background_work_threads: 1 },
)
.unwrap();

let signer = Arc::new(InMemoryValidatorSigner::from_seed(
account_id.clone(),
Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct ClientConfig {
pub max_gas_burnt_view: Option<Gas>,
/// Re-export storage layer statistics as prometheus metrics.
pub enable_statistics_export: bool,
/// Number of threads to execute auxiliary background work in client.
pub client_background_work_threads: usize,
}

impl ClientConfig {
Expand Down Expand Up @@ -215,6 +217,7 @@ impl ClientConfig {
trie_viewer_state_size_limit: None,
max_gas_burnt_view: None,
enable_statistics_export: true,
client_background_work_threads: 1,
}
}
}
9 changes: 9 additions & 0 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pub struct StoreConfig {
/// be copied between the databases.
#[serde(skip_serializing_if = "MigrationSnapshot::is_default")]
pub migration_snapshot: MigrationSnapshot,

/// Number of threads to execute background work with storage.
/// Needed to create flat storage which need to happen in parallel
/// with block processing.
pub background_work_threads: usize,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -204,6 +209,10 @@ impl Default for StoreConfig {
],

migration_snapshot: Default::default(),

// We checked that this number of threads doesn't impact
// regular block processing significantly.
background_work_threads: 8,
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions integration-tests/src/genesis_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use tempfile::tempdir;

use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::Genesis;
use near_primitives::block::{Block, BlockHeader};
Expand All @@ -20,8 +21,13 @@ pub fn genesis_header(genesis: &Genesis) -> BlockHeader {
let store = create_test_store();
let chain_genesis = ChainGenesis::new(genesis);
let runtime = Arc::new(NightshadeRuntime::test(dir.path(), store, genesis));
let chain =
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, true).unwrap();
let chain = Chain::new(
runtime,
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
ChainConfig::default(),
)
.unwrap();
chain.genesis().clone()
}

Expand All @@ -31,7 +37,12 @@ pub fn genesis_block(genesis: &Genesis) -> Block {
let store = create_test_store();
let chain_genesis = ChainGenesis::new(genesis);
let runtime = Arc::new(NightshadeRuntime::test(dir.path(), store, genesis));
let chain =
Chain::new(runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, true).unwrap();
let chain = Chain::new(
runtime,
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
ChainConfig::default(),
)
.unwrap();
chain.get_block(&chain.genesis().hash().clone()).unwrap()
}
1 change: 1 addition & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ impl NearConfig {
trie_viewer_state_size_limit: config.trie_viewer_state_size_limit,
max_gas_burnt_view: config.max_gas_burnt_view,
enable_statistics_export: config.store.enable_statistics_export,
client_background_work_threads: config.store.background_work_threads,
},
network_config: NetworkConfig::new(
config.network,
Expand Down
7 changes: 5 additions & 2 deletions tools/speedy_sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use borsh::{BorshDeserialize, BorshSerialize};
use clap::Parser;
use near_chain::types::Tip;
use near_chain::types::{ChainConfig, Tip};
use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode};
use near_chain_configs::GenesisValidationMode;
use near_epoch_manager::types::EpochInfoAggregator;
Expand Down Expand Up @@ -237,7 +237,10 @@ fn load_snapshot(load_cmd: LoadCmd) {
runtime.clone(),
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
!config.client_config.archive,
ChainConfig {
save_trie_changes: !config.client_config.archive,
background_work_threads: 1,
},
)
.unwrap();

Expand Down