From 06a3e85e27aed21ab22538042ea1f2a0e732f731 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Wed, 5 Apr 2023 13:49:53 +0200 Subject: [PATCH] Merge --- chain/client/src/metrics.rs | 10 ---------- chain/client/src/sync/state.rs | 12 +++++++----- core/chain-configs/src/client_config.rs | 9 +++++++-- nearcore/src/config.rs | 26 +++++++++++++++---------- nearcore/src/metrics.rs | 2 +- nearcore/src/state_sync.rs | 5 ++--- 6 files changed, 33 insertions(+), 31 deletions(-) diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 8de327457a1..535d1106a16 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -441,13 +441,3 @@ pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = - Lazy::new(|| { - try_create_int_counter_vec( - "near_state_sync_external_parts_size_downloaded_bytes_total", - "Amount of bytes downloaded from an external storage when requesting state parts for a shard", - &["shard_id"], - ) - .unwrap() - }); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index e93952671f3..49dd1ba29bc 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -92,7 +92,7 @@ fn make_account_or_peer_id_or_hash( From::AccountId(a) => To::AccountId(a), From::PeerId(p) => To::PeerId(p), From::Hash(h) => To::Hash(h), - From::ExternalStorage() => To::ExternalStorage(), + From::ExternalStorage => To::ExternalStorage, } } @@ -707,7 +707,7 @@ impl StateSync { } download.state_requests_count += 1; download.last_target = - Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage())); + Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage)); let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts); let download_response = download.response.clone(); @@ -1236,17 +1236,19 @@ fn check_external_storage_part_response( .with_label_values(&[&shard_id.to_string()]) .inc(); tracing::warn!(target: "sync", %shard_id, %sync_hash, part_id, ?err, "Failed to save a state part"); - err_to_retry = Some(Error::Other("Failed to save a state part".to_string())); + err_to_retry = + Some(near_chain::Error::Other("Failed to save a state part".to_string())); } } } // Other HTTP status codes are considered errors. Ok((status_code, _)) => { - err_to_retry = Some(Error::Other(format!("status_code: {}", status_code).to_string())); + err_to_retry = + Some(near_chain::Error::Other(format!("status_code: {}", status_code).to_string())); } // The request failed without reaching the external storage. Err(err) => { - err_to_retry = Some(Error::Other(err.to_string())); + err_to_retry = Some(near_chain::Error::Other(err.to_string())); } }; diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index c71ae93a895..797bb43644b 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -179,7 +179,10 @@ pub struct ClientConfig { /// If disabled will perform state sync from the peers. pub state_sync_from_s3_enabled: bool, /// Number of parallel in-flight requests allowed per shard. - pub state_sync_num_s3_requests_per_shard: u64, + pub state_sync_num_concurrent_s3_requests: u64, + /// Whether to use the State Sync mechanism. + /// If disabled, the node will do Block Sync instead of State Sync. + pub state_sync_enabled: bool, } impl ClientConfig { @@ -253,7 +256,9 @@ impl ClientConfig { state_sync_s3_bucket: String::new(), state_sync_s3_region: String::new(), state_sync_restart_dump_for_shards: vec![], - state_sync_num_s3_requests_per_shard: 10, + state_sync_from_s3_enabled: false, + state_sync_num_concurrent_s3_requests: 10, + state_sync_enabled: false, } } } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 9dac587369a..bdc458a1c6c 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -344,8 +344,8 @@ pub struct Config { #[serde(skip_serializing_if = "Option::is_none")] pub state_sync: Option, /// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead. - #[serde(skip_serializing_if = "is_false")] - pub state_sync_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub state_sync_enabled: Option, } fn is_false(value: &bool) -> bool { @@ -384,7 +384,7 @@ impl Default for Config { split_storage: None, expected_shutdown: None, state_sync: None, - state_sync_enabled: false, + state_sync_enabled: None, } } } @@ -695,6 +695,7 @@ impl NearConfig { .state_sync .as_ref() .map(|x| x.dump_enabled) + .flatten() .unwrap_or(false), state_sync_s3_bucket: config .state_sync @@ -709,7 +710,7 @@ impl NearConfig { state_sync_restart_dump_for_shards: config .state_sync .as_ref() - .map(|x| x.drop_state_of_dump.clone()) + .map(|x| x.restart_dump_for_shards.clone()) .flatten() .unwrap_or(vec![]), state_sync_from_s3_enabled: config @@ -718,12 +719,13 @@ impl NearConfig { .map(|x| x.sync_from_s3_enabled) .flatten() .unwrap_or(false), - state_sync_num_s3_requests_per_shard: config + state_sync_num_concurrent_s3_requests: config .state_sync .as_ref() - .map(|x| x.num_s3_requests_per_shard) + .map(|x| x.num_concurrent_s3_requests) .flatten() .unwrap_or(100), + state_sync_enabled: config.state_sync_enabled.unwrap_or(false), }, network_config: NetworkConfig::new( config.network, @@ -1556,14 +1558,18 @@ pub struct StateSyncConfig { /// Whether a node should dump state of each epoch to the external storage. #[serde(skip_serializing_if = "Option::is_none")] pub dump_enabled: Option, - /// Use carefully in case a node that dumps state to the external storage gets in trouble. + /// Use carefully in case a node that dumps state to the external storage + /// gets in trouble. #[serde(skip_serializing_if = "Option::is_none")] - pub drop_state_of_dump: Option>, - /// If enabled, will download state parts from external storage and not from the peers. + pub restart_dump_for_shards: Option>, + /// If enabled, will download state parts from external storage and not from + /// the peers. #[serde(skip_serializing_if = "Option::is_none")] pub sync_from_s3_enabled: Option, + /// When syncing state from S3, throttle requests to this many concurrent + /// requests per shard. #[serde(skip_serializing_if = "Option::is_none")] - pub num_s3_requests_per_shard: Option, + pub num_concurrent_s3_requests: Option, } #[test] diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 19605ceeca1..125cd923d52 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -70,7 +70,7 @@ pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy = Lazy::new(|| try_create_int_counter_vec( "near_state_sync_dump_size_total", "Total size of parts written to S3", - &["shard_id"], + &["epoch_height", "shard_id"], ) .unwrap() }); diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 0df51e42802..cf680c9d934 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -119,7 +119,6 @@ async fn state_sync_dump( _node_key: PublicKey, ) { tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); - let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(10)); if config.state_sync_restart_dump_for_shards.contains(&shard_id) { tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); @@ -128,7 +127,7 @@ async fn state_sync_dump( loop { // Avoid a busy-loop when there is nothing to do. - interval.tick().await; + std::thread::sleep(std::time::Duration::from_secs(10)); let progress = chain.store().get_state_sync_dump_progress(shard_id); tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); @@ -287,7 +286,7 @@ fn update_progress( ) { // Record that a part was obtained and dumped. metrics::STATE_SYNC_DUMP_SIZE_TOTAL - .with_label_values(&[&shard_id.to_string()]) + .with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()]) .inc_by(part_len as u64); let next_progress = StateSyncDumpProgress::InProgress { epoch_id: epoch_id.clone(),