diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eafa24785a..b9d76de3218 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ ### Non-protocol Changes +* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789) + ## 1.33.0 ### Protocol Changes diff --git a/Cargo.lock b/Cargo.lock index 1f42e3693b2..7c993d27381 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3256,6 +3256,7 @@ dependencies = [ "once_cell", "rand 0.8.5", "reed-solomon-erasure", + "rust-s3", "serde_json", "strum", "sysinfo", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index e389a376007..dedc1369b1c 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3139,7 +3139,10 @@ impl Chain { let state_root = *chunk.take_header().take_inner().prev_state_root(); if !self.runtime_adapter.validate_state_part(&state_root, part_id, data) { byzantine_assert!(false); - return Err(Error::Other("set_state_part failed: validate_state_part failed".into())); + return Err(Error::Other(format!( + "set_state_part failed: validate_state_part failed. state_root={:?}", + state_root + ))); } // Saving the part data. diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 5436c1041a9..ac9a77d2c74 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -22,7 +22,7 @@ pub use near_primitives::views::{StatusResponse, StatusSyncInfo}; use once_cell::sync::OnceCell; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; /// Combines errors coming from chain, tx pool and block producer. #[derive(Debug, thiserror::Error)] @@ -50,6 +50,7 @@ pub enum AccountOrPeerIdOrHash { AccountId(AccountId), PeerId(PeerId), Hash(CryptoHash), + ExternalStorage, } #[derive(Debug, serde::Serialize)] @@ -61,18 +62,39 @@ pub struct DownloadStatus { pub done: bool, pub state_requests_count: u64, pub last_target: Option, + #[serde(skip_serializing, skip_deserializing)] + pub response: Arc), String>>>>, +} + +impl DownloadStatus { + pub fn new(now: DateTime) -> Self { + Self { + start_time: now, + prev_update_time: now, + run_me: Arc::new(AtomicBool::new(true)), + error: false, + done: false, + state_requests_count: 0, + last_target: None, + response: Arc::new(Mutex::new(None)), + } + } } impl Clone for DownloadStatus { + /// Clones an object, but it clones the value of `run_me` instead of the + /// `Arc` that wraps that value. fn clone(&self) -> Self { DownloadStatus { start_time: self.start_time, prev_update_time: self.prev_update_time, + // Creates a new `Arc` holding the same value. run_me: Arc::new(AtomicBool::new(self.run_me.load(Ordering::SeqCst))), error: self.error, done: self.done, state_requests_count: self.state_requests_count, last_target: self.last_target.clone(), + response: self.response.clone(), } } } @@ -90,6 +112,21 @@ pub enum ShardSyncStatus { StateSyncDone, } +impl ShardSyncStatus { + pub fn repr(&self) -> u8 { + match self { + ShardSyncStatus::StateDownloadHeader => 0, + ShardSyncStatus::StateDownloadParts => 1, + ShardSyncStatus::StateDownloadScheduling => 2, + ShardSyncStatus::StateDownloadApplying => 3, + ShardSyncStatus::StateDownloadComplete => 4, + ShardSyncStatus::StateSplitScheduling => 5, + ShardSyncStatus::StateSplitApplying(_) => 6, + ShardSyncStatus::StateSyncDone => 7, + } + } +} + /// Manually implement compare for ShardSyncStatus to compare only based on variant name impl PartialEq for ShardSyncStatus { fn eq(&self, other: &Self) -> bool { @@ -164,25 +201,26 @@ pub struct ShardSyncDownload { } impl ShardSyncDownload { - /// Creates a instance of self which includes initial statuses for shard sync and download at the given time. - pub fn new(now: DateTime) -> Self { + /// Creates a instance of self which includes initial statuses for shard state header download at the given time. + pub fn new_download_state_header(now: DateTime) -> Self { Self { - downloads: vec![ - DownloadStatus { - start_time: now, - prev_update_time: now, - run_me: Arc::new(AtomicBool::new(true)), - error: false, - done: false, - state_requests_count: 0, - last_target: None, - }; - 1 - ], + downloads: vec![DownloadStatus::new(now)], status: ShardSyncStatus::StateDownloadHeader, } } + + /// Creates a instance of self which includes initial statuses for shard state parts download at the given time. + pub fn new_download_state_parts(now: DateTime, num_parts: u64) -> Self { + // Avoid using `vec![x; num_parts]`, because each element needs to have + // its own independent value of `response`. + let mut downloads = Vec::with_capacity(num_parts as usize); + for _ in 0..num_parts { + downloads.push(DownloadStatus::new(now)); + } + Self { downloads, status: ShardSyncStatus::StateDownloadParts } + } } + /// Various status sync can be in, whether it's fast sync or archival. #[derive(Clone, Debug, strum::AsRefStr)] pub enum SyncStatus { diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index a07766d503e..7add2bb5916 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -20,6 +20,7 @@ num-rational.workspace = true once_cell.workspace = true rand.workspace = true reed-solomon-erasure.workspace = true +rust-s3.workspace = true serde_json.workspace = true strum.workspace = true sysinfo.workspace = true diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 57594fcca26..e8885cac80c 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -248,7 +248,15 @@ impl Client { config.archive, config.state_sync_enabled, ); - let state_sync = StateSync::new(network_adapter.clone(), config.state_sync_timeout); + let state_sync = StateSync::new( + network_adapter.clone(), + config.state_sync_timeout, + &config.chain_id, + config.state_sync_from_s3_enabled, + &config.state_sync_s3_bucket, + &config.state_sync_s3_region, + config.state_sync_num_concurrent_s3_requests, + ); let num_block_producer_seats = config.num_block_producer_seats as usize; let data_parts = runtime_adapter.num_data_parts(); let parity_parts = runtime_adapter.num_total_parts() - data_parts; @@ -2121,7 +2129,15 @@ impl Client { let (state_sync, new_shard_sync, blocks_catch_up_state) = self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { ( - StateSync::new(network_adapter1, state_sync_timeout), + StateSync::new( + network_adapter1, + state_sync_timeout, + &self.config.chain_id, + self.config.state_sync_from_s3_enabled, + &self.config.state_sync_s3_bucket, + &self.config.state_sync_s3_region, + self.config.state_sync_num_concurrent_s3_requests, + ), new_shard_sync, BlocksCatchUpState::new(sync_hash, epoch_id), ) diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 4c3d202c06a..8de327457a1 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -354,3 +354,100 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version) ]) .inc(); } + +pub static STATE_SYNC_STAGE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_stage", + "Stage of state sync per shard", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_RETRY_PART: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_retry_part_total", + "Number of part requests retried", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_PARTS_DONE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_parts_done", + "Number of parts downloaded", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_PARTS_TOTAL: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_parts_per_shard", + "Number of parts that need to be downloaded for the shard", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_DISCARD_PARTS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_discard_parts_total", + "Number of times all downloaded parts were discarded to try again", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy = + Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_external_parts_done_total", + "Number of parts successfully retrieved from an external storage", + &["shard_id"], + ) + .unwrap() + }); + +pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy = + Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_external_parts_failed_total", + "Number of parts failed attempts to retrieve parts from an external storage", + &["shard_id"], + ) + .unwrap() + }); + +pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy = + Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_external_parts_scheduling_delay_sec", + "Delay for a request for parts from an external storage", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() + }); + +pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = + Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_external_parts_request_delay_sec", + "Latency of state part requests to external storage", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() + }); + +pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy = + Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_external_parts_size_downloaded_bytes_total", + "Amount of bytes downloaded from an external storage when requesting state parts for a shard", + &["shard_id"], + ) + .unwrap() + }); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 378be79df37..5db95e58aa7 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -20,6 +20,7 @@ //! here to depend more on local peers instead. //! +use crate::metrics; use ansi_term::Color::{Purple, Yellow}; use ansi_term::Style; use chrono::{DateTime, Duration, Utc}; @@ -41,12 +42,12 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::state_part::PartId; use near_primitives::static_clock::StaticClock; use near_primitives::syncing::{get_num_state_parts, ShardStateSyncResponse}; -use near_primitives::types::{AccountId, ShardId, StateRoot}; +use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot}; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use std::collections::HashMap; use std::ops::Add; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration as TimeDuration; @@ -94,35 +95,83 @@ fn make_account_or_peer_id_or_hash( } } +/// How to retrieve the state data. +enum StateSyncInner { + /// Request both the state header and state parts from the peers. + Peers { + /// Which parts were requested from which peer and when. + last_part_id_requested: HashMap<(AccountOrPeerIdOrHash, ShardId), PendingRequestStatus>, + /// Map from which part we requested to whom. + requested_target: lru::LruCache<(u64, CryptoHash), AccountOrPeerIdOrHash>, + }, + /// Requests the state header from peers but gets the state parts from an + /// external storage. + PartsFromExternal { + /// Chain ID. + chain_id: String, + /// Connection to the external storage. + bucket: Arc, + /// The number of requests for state parts from external storage that are + /// allowed to be started for this shard. + requests_remaining: Arc, + }, +} + /// Helper to track state sync. pub struct StateSync { + /// How to retrieve the state data. + inner: StateSyncInner, + + /// Is used for communication with the peers. network_adapter: PeerManagerAdapter, + /// When the "sync block" was requested. + /// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block. last_time_block_requested: Option>, - last_part_id_requested: HashMap<(AccountOrPeerIdOrHash, ShardId), PendingRequestStatus>, - /// Map from which part we requested to whom. - requested_target: lru::LruCache<(u64, CryptoHash), AccountOrPeerIdOrHash>, - /// Timeout (set in config - by default to 60 seconds) is used to figure out how long we should wait /// for the answer from the other node before giving up. timeout: Duration, - /// Maps shard_id to result of applying downloaded state + /// Maps shard_id to result of applying downloaded state. state_parts_apply_results: HashMap>, - /// Maps shard_id to result of splitting state for resharding + /// Maps shard_id to result of splitting state for resharding. split_state_roots: HashMap, near_chain::Error>>, } impl StateSync { - pub fn new(network_adapter: PeerManagerAdapter, timeout: TimeDuration) -> Self { + pub fn new( + network_adapter: PeerManagerAdapter, + timeout: TimeDuration, + chain_id: &str, + state_sync_from_s3_enabled: bool, + s3_bucket: &str, + s3_region: &str, + num_s3_requests_per_shard: u64, + ) -> Self { + let inner = if state_sync_from_s3_enabled { + let bucket = create_bucket(s3_bucket, s3_region, timeout); + if let Err(err) = bucket { + panic!("Failed to create an S3 bucket: {}", err); + } + StateSyncInner::PartsFromExternal { + chain_id: chain_id.to_string(), + bucket: Arc::new(bucket.unwrap()), + requests_remaining: Arc::new(AtomicI64::new(num_s3_requests_per_shard as i64)), + } + } else { + StateSyncInner::Peers { + last_part_id_requested: Default::default(), + requested_target: lru::LruCache::new(MAX_PENDING_PART as usize), + } + }; + let timeout = Duration::from_std(timeout).unwrap(); StateSync { + inner, network_adapter, last_time_block_requested: None, - last_part_id_requested: Default::default(), - requested_target: lru::LruCache::new(MAX_PENDING_PART as usize), - timeout: Duration::from_std(timeout).unwrap(), + timeout, state_parts_apply_results: HashMap::new(), split_state_roots: HashMap::new(), } @@ -198,11 +247,14 @@ impl StateSync { let shard_sync_download = new_shard_sync.entry(shard_id).or_insert_with(|| { run_shard_state_download = true; update_sync_status = true; - ShardSyncDownload::new(now) + ShardSyncDownload::new_download_state_header(now) }); let old_status = shard_sync_download.status.clone(); let mut shard_sync_done = false; + metrics::STATE_SYNC_STAGE + .with_label_values(&[&shard_id.to_string()]) + .set(shard_sync_download.status.repr() as i64); match &shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { (download_timeout, run_shard_state_download) = self @@ -215,8 +267,16 @@ impl StateSync { )?; } ShardSyncStatus::StateDownloadParts => { - (download_timeout, run_shard_state_download) = - self.sync_shards_download_parts_status(shard_sync_download, now); + let res = self.sync_shards_download_parts_status( + shard_id, + shard_sync_download, + sync_hash, + chain, + now, + ); + download_timeout = res.0; + run_shard_state_download = res.1; + update_sync_status |= res.2; } ShardSyncStatus::StateDownloadScheduling => { self.sync_shards_download_scheduling_status( @@ -290,13 +350,13 @@ impl StateSync { // Execute syncing for shard `shard_id` if run_shard_state_download { update_sync_status = true; - *shard_sync_download = self.request_shard( + self.request_shard( me, shard_id, chain, runtime_adapter, sync_hash, - shard_sync_download.clone(), + shard_sync_download, highest_height_peers, )?; } @@ -347,25 +407,6 @@ impl StateSync { } } - fn sent_request_part( - &mut self, - target: AccountOrPeerIdOrHash, - part_id: u64, - shard_id: ShardId, - sync_hash: CryptoHash, - ) { - // FIXME: something is wrong - the index should have a shard_id too. - self.requested_target.put((part_id, sync_hash), target.clone()); - - let timeout = self.timeout; - self.last_part_id_requested - .entry((target, shard_id)) - .and_modify(|pending_request| { - pending_request.missing_parts += 1; - }) - .or_insert_with(|| PendingRequestStatus::new(timeout)); - } - // Function called when our node receives the network response with a part. pub fn received_requested_part( &mut self, @@ -373,17 +414,24 @@ impl StateSync { shard_id: ShardId, sync_hash: CryptoHash, ) { - let key = (part_id, sync_hash); - // Check that it came from the target that we requested it from. - if let Some(target) = self.requested_target.get(&key) { - if self.last_part_id_requested.get_mut(&(target.clone(), shard_id)).map_or( - false, - |request| { - request.missing_parts = request.missing_parts.saturating_sub(1); - request.missing_parts == 0 - }, - ) { - self.last_part_id_requested.remove(&(target.clone(), shard_id)); + match &mut self.inner { + StateSyncInner::Peers { last_part_id_requested, requested_target } => { + let key = (part_id, sync_hash); + // Check that it came from the target that we requested it from. + if let Some(target) = requested_target.get(&key) { + if last_part_id_requested.get_mut(&(target.clone(), shard_id)).map_or( + false, + |request| { + request.missing_parts = request.missing_parts.saturating_sub(1); + request.missing_parts == 0 + }, + ) { + last_part_id_requested.remove(&(target.clone(), shard_id)); + } + } + } + StateSyncInner::PartsFromExternal { .. } => { + // Do nothing. } } } @@ -400,14 +448,12 @@ impl StateSync { sync_hash: CryptoHash, highest_height_peers: &[HighestHeightPeerInfo], ) -> Result, near_chain::Error> { - // Remove candidates from pending list if request expired due to timeout - self.last_part_id_requested.retain(|_, request| !request.expired()); - let prev_block_hash = *chain.get_block_header(&sync_hash)?.prev_hash(); let epoch_hash = runtime_adapter.get_epoch_id_from_prev_block(&prev_block_hash)?; - Ok(runtime_adapter - .get_epoch_block_producers_ordered(&epoch_hash, &sync_hash)? + let block_producers = + runtime_adapter.get_epoch_block_producers_ordered(&epoch_hash, &sync_hash)?; + let peers = block_producers .iter() .filter_map(|(validator_stake, _slashed)| { let account_id = validator_stake.account_id(); @@ -437,12 +483,33 @@ impl StateSync { } else { None } - })) - .filter(|candidate| { - // If we still have a pending request from this node - don't add another one. - !self.last_part_id_requested.contains_key(&(candidate.clone(), shard_id)) - }) - .collect::>()) + })); + Ok(self.select_peers(peers.collect(), shard_id)?) + } + + /// Avoids peers that already have outstanding requests for parts. + fn select_peers( + &mut self, + peers: Vec, + shard_id: ShardId, + ) -> Result, near_chain::Error> { + let res = match &mut self.inner { + StateSyncInner::Peers { + last_part_id_requested, + requested_target: _requested_target, + } => { + last_part_id_requested.retain(|_, request| !request.expired()); + peers + .into_iter() + .filter(|candidate| { + // If we still have a pending request from this node - don't add another one. + !last_part_id_requested.contains_key(&(candidate.clone(), shard_id)) + }) + .collect::>() + } + StateSyncInner::PartsFromExternal { .. } => peers, + }; + Ok(res) } /// Returns new ShardSyncDownload if successful, otherwise returns given shard_sync_download @@ -453,9 +520,9 @@ impl StateSync { chain: &Chain, runtime_adapter: &Arc, sync_hash: CryptoHash, - shard_sync_download: ShardSyncDownload, + shard_sync_download: &mut ShardSyncDownload, highest_height_peers: &[HighestHeightPeerInfo], - ) -> Result { + ) -> Result<(), near_chain::Error> { let possible_targets = self.possible_targets( me, shard_id, @@ -467,19 +534,17 @@ impl StateSync { if possible_targets.is_empty() { // In most cases it means that all the targets are currently busy (that we have a pending request with them). - return Ok(shard_sync_download); + return Ok(()); } // Downloading strategy starts here - let mut new_shard_sync_download = shard_sync_download.clone(); - match shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { self.request_shard_header( shard_id, sync_hash, &possible_targets, - &mut new_shard_sync_download, + shard_sync_download, ); } ShardSyncStatus::StateDownloadParts => { @@ -487,15 +552,17 @@ impl StateSync { shard_id, sync_hash, possible_targets, - &mut new_shard_sync_download, + shard_sync_download, + chain, ); } _ => {} } - Ok(new_shard_sync_download) + Ok(()) } + /// Makes a StateRequestHeader header to one of the peers. fn request_shard_header( &mut self, shard_id: ShardId, @@ -528,61 +595,74 @@ impl StateSync { ); } + /// Makes requests to download state parts for the given epoch of the given shard. fn request_shard_parts( &mut self, shard_id: ShardId, sync_hash: CryptoHash, possible_targets: Vec, new_shard_sync_download: &mut ShardSyncDownload, + chain: &Chain, ) { - // We'll select all the 'highest' peers + validators as candidates (excluding those that gave us timeout in the past). - // And for each one of them, we'll ask for up to 16 (MAX_STATE_PART_REQUEST) parts. - let possible_targets_sampler = - SamplerLimited::new(possible_targets, MAX_STATE_PART_REQUEST); - // Iterate over all parts that needs to be requested (i.e. download.run_me is true). // Parts are ordered such that its index match its part_id. - // Finally, for every part that needs to be requested it is selected one peer (target) randomly - // to request the part from. - // IMPORTANT: here we use 'zip' with possible_target_sampler - which is limited. So at any moment we'll not request more than - // possible_targets.len() * MAX_STATE_PART_REQUEST parts. - for ((part_id, download), target) in new_shard_sync_download - .downloads - .iter_mut() - .enumerate() - .filter(|(_, download)| download.run_me.load(Ordering::SeqCst)) - .zip(possible_targets_sampler) - { - self.sent_request_part(target.clone(), part_id as u64, shard_id, sync_hash); - download.run_me.store(false, Ordering::SeqCst); - download.state_requests_count += 1; - download.last_target = Some(make_account_or_peer_id_or_hash(target.clone())); - let run_me = download.run_me.clone(); - - near_performance_metrics::actix::spawn( - std::any::type_name::(), - self.network_adapter - .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestPart { - shard_id, - sync_hash, - part_id: part_id as u64, - target: target.clone(), - }, - )) - .then(move |result| { - // TODO: possible optimization - in the current code, even if one of the targets it not present in the network graph - // (so we keep getting RouteNotFound) - we'll still keep trying to assign parts to it. - // Fortunately only once every 60 seconds (timeout value). - if let Ok(NetworkResponses::RouteNotFound) = - result.map(|f| f.as_network_response()) - { - // Send a StateRequestPart on the next iteration - run_me.store(true, Ordering::SeqCst); - } - future::ready(()) - }), - ); + match &mut self.inner { + StateSyncInner::Peers { last_part_id_requested, requested_target } => { + // We'll select all the 'highest' peers + validators as candidates (excluding those that gave us timeout in the past). + // And for each one of them, we'll ask for up to 16 (MAX_STATE_PART_REQUEST) parts. + let possible_targets_sampler = + SamplerLimited::new(possible_targets, MAX_STATE_PART_REQUEST); + + // For every part that needs to be requested it is selected one + // peer (target) randomly to request the part from. + // IMPORTANT: here we use 'zip' with possible_target_sampler - + // which is limited. So at any moment we'll not request more + // than possible_targets.len() * MAX_STATE_PART_REQUEST parts. + for ((part_id, download), target) in + parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler) + { + sent_request_part( + target.clone(), + part_id, + shard_id, + sync_hash, + last_part_id_requested, + requested_target, + self.timeout, + ); + request_part_from_peers( + part_id, + target, + download, + shard_id, + sync_hash, + &self.network_adapter, + ); + } + } + StateSyncInner::PartsFromExternal { chain_id, bucket, requests_remaining } => { + let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); + let epoch_id = sync_block_header.epoch_id(); + let epoch_info = chain.runtime_adapter.get_epoch_info(epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + + let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap(); + let state_num_parts = + get_num_state_parts(shard_state_header.state_root_node().memory_usage); + + for (part_id, download) in parts_to_fetch(new_shard_sync_download) { + request_part_from_external_storage( + part_id, + download, + shard_id, + epoch_height, + state_num_parts, + &chain_id.clone(), + bucket.clone(), + requests_remaining.clone(), + ); + } + } } } @@ -605,7 +685,7 @@ impl StateSync { use_colour: bool, ) -> Result { let _span = tracing::debug_span!(target: "sync", "run", sync = "StateSync").entered(); - tracing::debug!(target: "sync", %sync_hash, ?tracking_shards, "syncing state"); + tracing::trace!(target: "sync", %sync_hash, ?tracking_shards, "syncing state"); let prev_hash = *chain.get_block_header(&sync_hash)?.prev_hash(); let now = StaticClock::utc(); @@ -738,21 +818,8 @@ impl StateSync { get_num_state_parts(shard_state_header.state_root_node().memory_usage); // If the header was downloaded successfully - move to phase 2 (downloading parts). // Create the vector with entry for each part. - *shard_sync_download = ShardSyncDownload { - downloads: vec![ - DownloadStatus { - start_time: now, - prev_update_time: now, - run_me: Arc::new(AtomicBool::new(true)), - error: false, - done: false, - state_requests_count: 0, - last_target: None, - }; - state_num_parts as usize - ], - status: ShardSyncStatus::StateDownloadParts, - }; + *shard_sync_download = + ShardSyncDownload::new_download_state_parts(now, state_num_parts); run_shard_state_download = true; } else { let prev = shard_sync_download.downloads[0].prev_update_time; @@ -773,45 +840,82 @@ impl StateSync { /// Checks if the parts are downloaded. /// If download of all parts is complete, then moves forward to `StateDownloadScheduling`. - /// Returns `(download_timeout, run_shard_state_download)` where: + /// Returns `(download_timeout, run_shard_state_download, update_sync_status)` where: /// * `download_timeout` means that the state header request timed out (and needs to be retried). /// * `run_shard_state_download` means that header or part download requests need to run for this shard. + /// * `update_sync_status` means that something changed in `ShardSyncDownload` and it needs to be persisted. fn sync_shards_download_parts_status( &mut self, + shard_id: ShardId, shard_sync_download: &mut ShardSyncDownload, + sync_hash: CryptoHash, + chain: &mut Chain, now: DateTime, - ) -> (bool, bool) { + ) -> (bool, bool, bool) { // Step 2 - download all the parts (each part is usually around 1MB). let mut download_timeout = false; let mut run_shard_state_download = false; + let mut update_sync_status = false; let mut parts_done = true; - for part_download in shard_sync_download.downloads.iter_mut() { + let num_parts = shard_sync_download.downloads.len(); + let mut num_parts_done = 0; + for (part_id, part_download) in shard_sync_download.downloads.iter_mut().enumerate() { + if !part_download.done { + // Check if a download from an external storage is finished. + update_sync_status |= check_external_storage_part_response( + part_id as u64, + num_parts as u64, + shard_id, + sync_hash, + part_download, + chain, + ); + } if !part_download.done { parts_done = false; let prev = part_download.prev_update_time; - let error = part_download.error; - let part_timeout = now - prev > self.timeout; - // Retry parts that failed. - if part_timeout || error { + let part_timeout = now - prev > self.timeout; // Retry parts that failed. + if part_timeout || part_download.error { download_timeout |= part_timeout; - part_download.run_me.store(true, Ordering::SeqCst); - part_download.error = false; - part_download.prev_update_time = now; + if part_timeout || + part_download.last_target != Some(near_client_primitives::types::AccountOrPeerIdOrHash::ExternalStorage) { + // Don't immediately retry failed requests from external + // storage. Most often error is a state part not + // available. That error doesn't get fixed by retrying, + // but rather by waiting. + metrics::STATE_SYNC_RETRY_PART + .with_label_values(&[&shard_id.to_string()]) + .inc(); + part_download.run_me.store(true, Ordering::SeqCst); + part_download.error = false; + part_download.prev_update_time = now; + update_sync_status = true; + } } if part_download.run_me.load(Ordering::SeqCst) { run_shard_state_download = true; } } + if part_download.done { + num_parts_done += 1; + } } + metrics::STATE_SYNC_PARTS_DONE + .with_label_values(&[&shard_id.to_string()]) + .set(num_parts_done); + metrics::STATE_SYNC_PARTS_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .set(num_parts as i64); // If all parts are done - we can move towards scheduling. if parts_done { *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateDownloadScheduling, }; + update_sync_status = true; } - (download_timeout, run_shard_state_download) + (download_timeout, run_shard_state_download, update_sync_status) } fn sync_shards_download_scheduling_status( @@ -844,8 +948,9 @@ impl StateSync { Err(err) => { // Cannot finalize the downloaded state. // The reasonable behavior here is to start from the very beginning. + metrics::STATE_SYNC_DISCARD_PARTS.with_label_values(&[&shard_id.to_string()]).inc(); tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync finalizing error"); - *shard_sync_download = ShardSyncDownload::new(now); + *shard_sync_download = ShardSyncDownload::new_download_state_header(now); chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?; } } @@ -874,8 +979,11 @@ impl StateSync { Err(err) => { // Cannot finalize the downloaded state. // The reasonable behavior here is to start from the very beginning. + metrics::STATE_SYNC_DISCARD_PARTS + .with_label_values(&[&shard_id.to_string()]) + .inc(); tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync finalizing error"); - *shard_sync_download = ShardSyncDownload::new(now); + *shard_sync_download = ShardSyncDownload::new_download_state_header(now); let shard_state_header = chain.get_state_header(shard_id, sync_hash)?; let state_num_parts = get_num_state_parts(shard_state_header.state_root_node().memory_usage); @@ -959,9 +1067,258 @@ impl StateSync { } } -fn paint(s: &str, colour: Style, use_colour: bool) -> String { - if use_colour { - colour.paint(s).to_string() +fn create_bucket( + bucket: &str, + region: &str, + timeout: TimeDuration, +) -> Result { + let mut bucket = s3::Bucket::new( + bucket, + region.parse::().map_err(|err| near_chain::Error::Other(err.to_string()))?, + s3::creds::Credentials::anonymous() + .map_err(|err| near_chain::Error::Other(err.to_string()))?, + ) + .map_err(|err| near_chain::Error::Other(err.to_string()))?; + // Ensure requests finish in finite amount of time. + bucket.set_request_timeout(Some(timeout)); + Ok(bucket) +} + +/// Returns parts that still need to be fetched. +fn parts_to_fetch( + new_shard_sync_download: &mut ShardSyncDownload, +) -> impl Iterator { + new_shard_sync_download + .downloads + .iter_mut() + .enumerate() + .filter(|(_, download)| download.run_me.load(Ordering::SeqCst)) + .map(|(part_id, download)| (part_id as u64, download)) +} + +/// Starts an asynchronous network request to external storage to fetch the given state part. +fn request_part_from_external_storage( + part_id: u64, + download: &mut DownloadStatus, + shard_id: ShardId, + epoch_height: EpochHeight, + num_parts: u64, + chain_id: &str, + bucket: Arc, + requests_remaining: Arc, +) { + if !allow_request(&requests_remaining) { + return; + } else { + if !download.run_me.swap(false, Ordering::SeqCst) { + tracing::info!(target: "sync", %shard_id, part_id, "run_me is already false"); + return; + } + } + download.state_requests_count += 1; + download.last_target = None; + + let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts); + let download_response = download.response.clone(); + let scheduled = StaticClock::utc(); + near_performance_metrics::actix::spawn("StateSync", { + async move { + tracing::info!(target: "sync", %shard_id, part_id, location, "Getting an object from the external storage"); + let started = StaticClock::utc(); + metrics::STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY + .with_label_values(&[&shard_id.to_string()]) + .observe( + started.signed_duration_since(scheduled).num_nanoseconds().unwrap_or(0) as f64 + / 1e9, + ); + let result = bucket.get_object(location.clone()).await; + let completed = StaticClock::utc(); + finished_request(&requests_remaining); + metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY + .with_label_values(&[&shard_id.to_string()]) + .observe( + completed.signed_duration_since(started).num_nanoseconds().unwrap_or(0) as f64 + / 1e9, + ); + match result { + Ok(response) => { + tracing::info!(target: "sync", %shard_id, part_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished"); + let mut lock = download_response.lock().unwrap(); + *lock = Some(Ok((response.status_code(), response.bytes().to_vec()))); + } + Err(err) => { + tracing::info!(target: "sync", %shard_id, part_id, location, ?err, "S3 request failed"); + let mut lock = download_response.lock().unwrap(); + *lock = Some(Err(err.to_string())); + } + } + } + }); +} + +/// Asynchronously requests a state part from a suitable peer. +fn request_part_from_peers( + part_id: u64, + target: AccountOrPeerIdOrHash, + download: &mut DownloadStatus, + shard_id: ShardId, + sync_hash: CryptoHash, + network_adapter: &PeerManagerAdapter, +) { + download.run_me.store(false, Ordering::SeqCst); + download.state_requests_count += 1; + download.last_target = Some(make_account_or_peer_id_or_hash(target.clone())); + let run_me = download.run_me.clone(); + + near_performance_metrics::actix::spawn( + "StateSync", + network_adapter + .send_async(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, target }, + )) + .then(move |result| { + // TODO: possible optimization - in the current code, even if one of the targets it not present in the network graph + // (so we keep getting RouteNotFound) - we'll still keep trying to assign parts to it. + // Fortunately only once every 60 seconds (timeout value). + if let Ok(NetworkResponses::RouteNotFound) = result.map(|f| f.as_network_response()) + { + // Send a StateRequestPart on the next iteration + run_me.store(true, Ordering::SeqCst); + } + future::ready(()) + }), + ); +} + +fn sent_request_part( + target: AccountOrPeerIdOrHash, + part_id: u64, + shard_id: ShardId, + sync_hash: CryptoHash, + last_part_id_requested: &mut HashMap<(AccountOrPeerIdOrHash, ShardId), PendingRequestStatus>, + requested_target: &mut lru::LruCache<(u64, CryptoHash), AccountOrPeerIdOrHash>, + timeout: Duration, +) { + // FIXME: something is wrong - the index should have a shard_id too. + requested_target.put((part_id, sync_hash), target.clone()); + last_part_id_requested + .entry((target, shard_id)) + .and_modify(|pending_request| { + pending_request.missing_parts += 1; + }) + .or_insert_with(|| PendingRequestStatus::new(timeout)); +} + +/// Verifies that one more concurrent request can be started. +fn allow_request(requests_remaining: &AtomicI64) -> bool { + let remaining = requests_remaining.fetch_sub(1, Ordering::SeqCst); + if remaining <= 0 { + requests_remaining.fetch_add(1, Ordering::SeqCst); + false + } else { + true + } +} + +fn finished_request(requests_remaining: &AtomicI64) { + requests_remaining.fetch_add(1, Ordering::SeqCst); +} + +/// Works around how data requests to external storage are done. +/// The response is stored on the DownloadStatus object. +/// This function investigates if the response is available and updates `done` and `error` appropriately. +/// If the response is successful, then also writes the state part to the DB. +/// +/// Returns whether something changed in `DownloadStatus` which means it needs to be persisted. +fn check_external_storage_part_response( + part_id: u64, + num_parts: u64, + shard_id: ShardId, + sync_hash: CryptoHash, + part_download: &mut DownloadStatus, + chain: &mut Chain, +) -> bool { + let external_storage_response = { + let mut lock = part_download.response.lock().unwrap(); + if let Some(response) = lock.clone() { + tracing::debug!(target: "sync", %shard_id, part_id, "Got response from external storage"); + // Remove the response from DownloadStatus, because + // we're going to write state parts to DB and don't need to keep + // them in `DownloadStatus`. + *lock = None; + response + } else { + return false; + } + }; + + let mut err_to_retry = None; + match external_storage_response { + // HTTP status code 200 means success. + Ok((200, data)) => { + tracing::debug!(target: "sync", %shard_id, part_id, "Got 200 response from external storage"); + match chain.set_state_part( + shard_id, + sync_hash, + PartId::new(part_id as u64, num_parts as u64), + &data, + ) { + Ok(_) => { + metrics::STATE_SYNC_EXTERNAL_PARTS_DONE + .with_label_values(&[&shard_id.to_string()]) + .inc(); + metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED + .with_label_values(&[&shard_id.to_string()]) + .inc_by(data.len() as u64); + part_download.done = true; + tracing::debug!(target: "sync", %shard_id, part_id, ?part_download, "Set state part success"); + } + Err(err) => { + metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED + .with_label_values(&[&shard_id.to_string()]) + .inc(); + tracing::warn!(target: "sync", %shard_id, %sync_hash, part_id, ?err, "Failed to save a state part"); + err_to_retry = + Some(near_chain::Error::Other("Failed to save a state part".to_string())); + } + } + } + // Other HTTP status codes are considered errors. + Ok((status_code, _)) => { + tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, status_code, "Wrong response code, expected 200"); + err_to_retry = Some(near_chain::Error::Other(format!("status_code: {}", status_code))); + } + // The request failed without reaching the external storage. + Err(err) => { + err_to_retry = Some(near_chain::Error::Other(err)); + } + }; + + if let Some(err) = err_to_retry { + tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, ?err, "Failed to get a part from external storage, will retry"); + part_download.error = true; + } + true +} + +/// Construct a location on the external storage. +pub fn s3_location( + chain_id: &str, + epoch_height: u64, + shard_id: u64, + part_id: u64, + num_parts: u64, +) -> String { + format!( + "chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}", + chain_id, epoch_height, shard_id, part_id, num_parts + ) +} + +/// Applies style if `use_colour` is enabled. +fn paint(s: &str, style: Style, use_style: bool) -> String { + if use_style { + style.paint(s).to_string() } else { s.to_string() } @@ -977,8 +1334,15 @@ fn format_shard_sync_phase(shard_sync_download: &ShardSyncDownload, use_colour: shard_sync_download.downloads[0].last_target ), ShardSyncStatus::StateDownloadParts => { + let mut num_parts_done = 0; + let mut num_parts_not_done = 0; let mut text = "".to_string(); for (i, download) in shard_sync_download.downloads.iter().enumerate() { + if download.done { + num_parts_done += 1; + continue; + } + num_parts_not_done += 1; text.push_str(&format!( "[{}: {}, {}, {:?}] ", paint(&i.to_string(), Yellow.bold(), use_colour), @@ -988,10 +1352,12 @@ fn format_shard_sync_phase(shard_sync_download: &ShardSyncDownload, use_colour: )); } format!( - "{} [{}: is_done, requests sent, last target] {}", + "{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}", paint("PARTS", Purple.bold(), use_colour), paint("part_id", Yellow.bold(), use_colour), - text + text, + num_parts_done, + num_parts_not_done ) } _ => unreachable!("timeout cannot happen when all state is downloaded"), @@ -1083,8 +1449,15 @@ mod test { // Start a new state sync - and check that it asks for a header. fn test_ask_for_header() { let mock_peer_manager = Arc::new(MockPeerManagerAdapter::default()); - let mut state_sync = - StateSync::new(mock_peer_manager.clone().into(), TimeDuration::from_secs(1)); + let mut state_sync = StateSync::new( + mock_peer_manager.clone().into(), + TimeDuration::from_secs(1), + "chain_id", + false, + "", + "", + 100, + ); let mut new_shard_sync = HashMap::new(); let (mut chain, kv, signer) = test_utils::setup(); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 3d7ee29b827..797bb43644b 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -175,6 +175,11 @@ pub struct ClientConfig { /// Restart dumping state of selected shards. /// Use for troubleshooting of the state dumping process. pub state_sync_restart_dump_for_shards: Vec, + /// Whether to enable state sync from S3. + /// If disabled will perform state sync from the peers. + pub state_sync_from_s3_enabled: bool, + /// Number of parallel in-flight requests allowed per shard. + pub state_sync_num_concurrent_s3_requests: u64, /// Whether to use the State Sync mechanism. /// If disabled, the node will do Block Sync instead of State Sync. pub state_sync_enabled: bool, @@ -251,7 +256,9 @@ impl ClientConfig { state_sync_s3_bucket: String::new(), state_sync_s3_region: String::new(), state_sync_restart_dump_for_shards: vec![], - state_sync_enabled: true, + state_sync_from_s3_enabled: false, + state_sync_num_concurrent_s3_requests: 10, + state_sync_enabled: false, } } } diff --git a/docs/misc/state_sync_from_s3.md b/docs/misc/state_sync_from_s3.md new file mode 100644 index 00000000000..22eb70d6d4e --- /dev/null +++ b/docs/misc/state_sync_from_s3.md @@ -0,0 +1,55 @@ +# Experimental: Sync state from External Storage + +## Purpose + +Current implementation of state sync (see +https://github.com/near/nearcore/blob/master/docs/architecture/how/sync.md for +details) doesn't allow the nodes to reliably perform state sync for testnet or +mainnet. + +That's why a new solution for state sync is being designed. +This is a short-term solution that is needed to let nodes sync and let chunk +only producers to switch tracked shards. +The experimental code is will not be kept for long and will be replaced with a +decentralized solution. + +## How-to + +[#8789](https://github.com/near/nearcore/pull/8789) adds an experimental option +to sync state from external storage. At the moment only S3 is +supported as external storage. + +To enable, add this to your `config.json` file: + +```json +"state_sync_enabled": true, +"state_sync": { + "s3_bucket": "my-bucket", + "s3_region": "eu-central-1", + "sync_from_s3_enabled": true +} +``` + +And run your node with environment variables `AWS_ACCESS_KEY_ID` and +`AWS_SECRET_ACCESS_KEY`: +```shell +AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run +``` + +## Implementation Details + +The new implementation expects to find state parts as files on an S3 storage. + +The sync mechanism proceeds to download state parts mostly-sequentially from S3. +In case the state part is not available, the request will be retried after a +delay defined by `state_sync_timeout`, which by default is 1 minute. + +State parts are location on S3 at the following location: +``` +"chain_id={chain_id}/epoch_height={epoch_height}/shard_id={shard_id}/state_part_{part_id:06}_of_{num_parts:06}", +``` +for example `chain_id=testnet/epoch_height=1790/shard_id=2/state_part_032642_of_065402` + +After all state parts are downloaded, the node applies them, which replaces the existing State of the node. + +Currently, both downloading and applying state parts work rather quickly. diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 2ca1f01e359..6c8db4cadde 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -332,8 +332,8 @@ pub struct Config { #[serde(skip_serializing_if = "Option::is_none")] pub state_sync: Option, /// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead. - #[serde(skip_serializing_if = "is_false")] - pub state_sync_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub state_sync_enabled: Option, } fn is_false(value: &bool) -> bool { @@ -370,7 +370,7 @@ impl Default for Config { split_storage: None, expected_shutdown: None, state_sync: None, - state_sync_enabled: false, + state_sync_enabled: None, } } } @@ -680,20 +680,38 @@ impl NearConfig { state_sync_dump_enabled: config .state_sync .as_ref() - .map_or(false, |x| x.dump_enabled.unwrap_or(false)), + .map(|x| x.dump_enabled) + .flatten() + .unwrap_or(false), state_sync_s3_bucket: config .state_sync .as_ref() - .map_or(String::new(), |x| x.s3_bucket.clone()), + .map(|x| x.s3_bucket.clone()) + .unwrap_or(String::new()), state_sync_s3_region: config .state_sync .as_ref() - .map_or(String::new(), |x| x.s3_region.clone()), + .map(|x| x.s3_region.clone()) + .unwrap_or(String::new()), state_sync_restart_dump_for_shards: config .state_sync .as_ref() - .map_or(vec![], |x| x.drop_state_of_dump.clone().unwrap_or(vec![])), - state_sync_enabled: config.state_sync_enabled, + .map(|x| x.restart_dump_for_shards.clone()) + .flatten() + .unwrap_or(vec![]), + state_sync_from_s3_enabled: config + .state_sync + .as_ref() + .map(|x| x.sync_from_s3_enabled) + .flatten() + .unwrap_or(false), + state_sync_num_concurrent_s3_requests: config + .state_sync + .as_ref() + .map(|x| x.num_concurrent_s3_requests) + .flatten() + .unwrap_or(100), + state_sync_enabled: config.state_sync_enabled.unwrap_or(false), }, network_config: NetworkConfig::new( config.network, @@ -1519,12 +1537,25 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) - #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] /// Options for dumping state to S3. pub struct StateSyncConfig { + /// Location of state dumps on S3. pub s3_bucket: String, + /// Region is very important on S3. pub s3_region: String, + /// Whether a node should dump state of each epoch to the external storage. #[serde(skip_serializing_if = "Option::is_none")] pub dump_enabled: Option, + /// Use carefully in case a node that dumps state to the external storage + /// gets in trouble. + #[serde(skip_serializing_if = "Option::is_none")] + pub restart_dump_for_shards: Option>, + /// If enabled, will download state parts from external storage and not from + /// the peers. + #[serde(skip_serializing_if = "Option::is_none")] + pub sync_from_s3_enabled: Option, + /// When syncing state from S3, throttle requests to this many concurrent + /// requests per shard. #[serde(skip_serializing_if = "Option::is_none")] - pub drop_state_of_dump: Option>, + pub num_concurrent_s3_requests: Option, } #[test] diff --git a/nearcore/src/config_validate.rs b/nearcore/src/config_validate.rs index d025dd38af6..13164ea1c8b 100644 --- a/nearcore/src/config_validate.rs +++ b/nearcore/src/config_validate.rs @@ -30,14 +30,14 @@ impl<'a> ConfigValidator<'a> { fn validate_all_conditions(&mut self) { if self.config.archive == false && self.config.save_trie_changes == Some(false) { let error_message = format!("Configuration with archive = false and save_trie_changes = false is not supported because non-archival nodes must save trie changes in order to do do garbage collection."); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); } // Checking that if cold storage is configured, trie changes are definitely saved. // Unlike in the previous case, None is not a valid option here. if self.config.cold_store.is_some() && self.config.save_trie_changes != Some(true) { let error_message = format!("cold_store is configured, but save_trie_changes is {:?}. Trie changes should be saved to support cold storage.", self.config.save_trie_changes); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); } if self.config.consensus.min_block_production_delay @@ -48,7 +48,7 @@ impl<'a> ConfigValidator<'a> { self.config.consensus.min_block_production_delay, self.config.consensus.max_block_production_delay ); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); } if self.config.consensus.min_block_production_delay @@ -59,13 +59,13 @@ impl<'a> ConfigValidator<'a> { self.config.consensus.min_block_production_delay, self.config.consensus.max_block_wait_delay ); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); } if self.config.consensus.header_sync_expected_height_per_second == 0 { let error_message = format!("consensus.header_sync_expected_height_per_second should not be 0"); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); } if self.config.gc.gc_blocks_limit == 0 @@ -73,7 +73,22 @@ impl<'a> ConfigValidator<'a> { || self.config.gc.gc_num_epochs_to_keep == 0 { let error_message = format!("gc config values should all be greater than 0, but gc_blocks_limit is {:?}, gc_fork_clean_step is {}, gc_num_epochs_to_keep is {}.", self.config.gc.gc_blocks_limit, self.config.gc.gc_fork_clean_step, self.config.gc.gc_num_epochs_to_keep); - self.validation_errors.push_config_semantics_error(error_message) + self.validation_errors.push_config_semantics_error(error_message); + } + + if let Some(state_sync) = &self.config.state_sync { + if state_sync.dump_enabled.unwrap_or(false) { + if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { + let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.dump_enabled' is enabled."); + self.validation_errors.push_config_semantics_error(error_message); + } + } + if state_sync.sync_from_s3_enabled.unwrap_or(false) { + if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { + let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.sync_from_s3_enabled' is enabled."); + self.validation_errors.push_config_semantics_error(error_message); + } + } } } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 235f6f97533..2be7e5785f6 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -274,12 +274,7 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); - let state_sync_dump_handle = spawn_state_sync_dump( - &config, - chain_genesis, - runtime, - config.network_config.node_id().public_key(), - )?; + let state_sync_dump_handle = spawn_state_sync_dump(&config, chain_genesis, runtime)?; #[allow(unused_mut)] let mut rpc_servers = Vec::new(); diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 19605ceeca1..125cd923d52 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -70,7 +70,7 @@ pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy = Lazy::new(|| try_create_int_counter_vec( "near_state_sync_dump_size_total", "Total size of parts written to S3", - &["shard_id"], + &["epoch_height", "shard_id"], ) .unwrap() }); diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 0df51e42802..da26df70963 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -3,8 +3,7 @@ use borsh::BorshSerialize; use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; use near_chain_configs::ClientConfig; -use near_client::sync::state::StateSync; -use near_crypto::PublicKey; +use near_client::sync::state::{s3_location, StateSync}; use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; @@ -19,7 +18,6 @@ pub fn spawn_state_sync_dump( config: &NearConfig, chain_genesis: ChainGenesis, runtime: Arc, - node_key: &PublicKey, ) -> anyhow::Result> { if !config.client_config.state_sync_dump_enabled { return Ok(None); @@ -81,7 +79,6 @@ pub fn spawn_state_sync_dump( runtime, client_config, bucket.clone(), - node_key.clone(), ))); arbiter_handle }) @@ -116,10 +113,8 @@ async fn state_sync_dump( runtime: Arc, config: ClientConfig, bucket: s3::Bucket, - _node_key: PublicKey, ) { tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); - let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(10)); if config.state_sync_restart_dump_for_shards.contains(&shard_id) { tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); @@ -128,7 +123,7 @@ async fn state_sync_dump( loop { // Avoid a busy-loop when there is nothing to do. - interval.tick().await; + std::thread::sleep(std::time::Duration::from_secs(10)); let progress = chain.store().get_state_sync_dump_progress(shard_id); tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); @@ -287,7 +282,7 @@ fn update_progress( ) { // Record that a part was obtained and dumped. metrics::STATE_SYNC_DUMP_SIZE_TOTAL - .with_label_values(&[&shard_id.to_string()]) + .with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()]) .inc_by(part_len as u64); let next_progress = StateSyncDumpProgress::InProgress { epoch_id: epoch_id.clone(), @@ -423,16 +418,3 @@ fn check_new_epoch( } } } - -fn s3_location( - chain_id: &str, - epoch_height: u64, - shard_id: u64, - part_id: u64, - num_parts: u64, -) -> String { - format!( - "chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}", - chain_id, epoch_height, shard_id, part_id, num_parts - ) -}