diff --git a/Cargo.lock b/Cargo.lock index d63ec83443f..7c993d27381 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3256,7 +3256,6 @@ dependencies = [ "once_cell", "rand 0.8.5", "reed-solomon-erasure", - "regex", "rust-s3", "serde_json", "strum", diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 7f57ff57b52..ac9a77d2c74 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -63,8 +63,7 @@ pub struct DownloadStatus { pub state_requests_count: u64, pub last_target: Option, #[serde(skip_serializing, skip_deserializing)] - // Use type `String` as an error to avoid a dependency on the `rust-s3` crate. - pub response: Arc, String>>>>, + pub response: Arc), String>>>>, } impl DownloadStatus { diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index 684ca3d6e6c..7add2bb5916 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -20,7 +20,6 @@ num-rational.workspace = true once_cell.workspace = true rand.workspace = true reed-solomon-erasure.workspace = true -regex.workspace = true rust-s3.workspace = true serde_json.workspace = true strum.workspace = true @@ -29,24 +28,24 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true -delay-detector.workspace = true near-async.workspace = true -near-chain-configs.workspace = true near-chain-primitives.workspace = true +near-crypto.workspace = true +near-primitives.workspace = true +near-store.workspace = true +near-chain-configs.workspace = true near-chain.workspace = true -near-chunks.workspace = true near-client-primitives.workspace = true -near-crypto.workspace = true near-dyn-configs.workspace = true -near-epoch-manager.workspace = true near-network.workspace = true -near-o11y.workspace = true -near-performance-metrics-macros.workspace = true -near-performance-metrics.workspace = true near-pool.workspace = true -near-primitives.workspace = true -near-store.workspace = true +near-chunks.workspace = true near-telemetry.workspace = true +near-o11y.workspace = true +near-performance-metrics.workspace = true +near-performance-metrics-macros.workspace = true +near-epoch-manager.workspace = true +delay-detector.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 29f622a432e..e8885cac80c 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -252,7 +252,10 @@ impl Client { network_adapter.clone(), config.state_sync_timeout, &config.chain_id, - config.state_sync_config_sync.clone(), + config.state_sync_from_s3_enabled, + &config.state_sync_s3_bucket, + &config.state_sync_s3_region, + config.state_sync_num_concurrent_s3_requests, ); let num_block_producer_seats = config.num_block_producer_seats as usize; let data_parts = runtime_adapter.num_data_parts(); @@ -2130,7 +2133,10 @@ impl Client { network_adapter1, state_sync_timeout, &self.config.chain_id, - self.config.state_sync_config_sync.clone(), + self.config.state_sync_from_s3_enabled, + &self.config.state_sync_s3_bucket, + &self.config.state_sync_s3_region, + self.config.state_sync_num_concurrent_s3_requests, ), new_shard_sync, BlocksCatchUpState::new(sync_hash, epoch_id), diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 207eded91f3..8de327457a1 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -305,7 +305,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy = Lazy::new .unwrap() }); -pub(crate) static PRODUCE_CHUNK_TIME: Lazy = Lazy::new(|| { +pub static PRODUCE_CHUNK_TIME: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_produce_chunk_time", "Time taken to produce a chunk", @@ -315,18 +315,17 @@ pub(crate) static PRODUCE_CHUNK_TIME: Lazy = L .unwrap() }); -pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy = - Lazy::new(|| { - try_create_histogram_vec( - "near_view_client_messages_processing_time", - "Time that view client takes to handle different messages", - &["message"], - Some(exponential_buckets(0.001, 2.0, 16).unwrap()), - ) - .unwrap() - }); +pub static VIEW_CLIENT_MESSAGE_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_view_client_messages_processing_time", + "Time that view client takes to handle different messages", + &["message"], + Some(exponential_buckets(0.001, 2.0, 16).unwrap()), + ) + .unwrap() +}); -pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy = +pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_produce_and_distribute_chunk_time", @@ -356,7 +355,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version) .inc(); } -pub(crate) static STATE_SYNC_STAGE: Lazy = Lazy::new(|| { +pub static STATE_SYNC_STAGE: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_stage", "Stage of state sync per shard", @@ -365,17 +364,16 @@ pub(crate) static STATE_SYNC_STAGE: Lazy = Lazy .unwrap() }); -pub(crate) static STATE_SYNC_RETRY_PART: Lazy = - Lazy::new(|| { - try_create_int_counter_vec( - "near_state_sync_retry_part_total", - "Number of part requests retried", - &["shard_id"], - ) - .unwrap() - }); +pub static STATE_SYNC_RETRY_PART: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_retry_part_total", + "Number of part requests retried", + &["shard_id"], + ) + .unwrap() +}); -pub(crate) static STATE_SYNC_PARTS_DONE: Lazy = Lazy::new(|| { +pub static STATE_SYNC_PARTS_DONE: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_parts_done", "Number of parts downloaded", @@ -384,7 +382,7 @@ pub(crate) static STATE_SYNC_PARTS_DONE: Lazy = .unwrap() }); -pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy = Lazy::new(|| { +pub static STATE_SYNC_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_parts_per_shard", "Number of parts that need to be downloaded for the shard", @@ -393,37 +391,47 @@ pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy .unwrap() }); -pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy = +pub static STATE_SYNC_DISCARD_PARTS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_discard_parts_total", + "Number of times all downloaded parts were discarded to try again", + &["shard_id"], + ) + .unwrap() +}); + +pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy = Lazy::new(|| { try_create_int_counter_vec( - "near_state_sync_discard_parts_total", - "Number of times all downloaded parts were discarded to try again", + "near_state_sync_external_parts_done_total", + "Number of parts successfully retrieved from an external storage", &["shard_id"], ) .unwrap() }); -pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy = +pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy = Lazy::new(|| { try_create_int_counter_vec( - "near_state_sync_external_parts_done_total", - "Number of parts successfully retrieved from an external storage", + "near_state_sync_external_parts_failed_total", + "Number of parts failed attempts to retrieve parts from an external storage", &["shard_id"], ) .unwrap() }); -pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy = +pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy = Lazy::new(|| { - try_create_int_counter_vec( - "near_state_sync_external_parts_failed_total", - "Number of parts failed attempts to retrieve parts from an external storage", + try_create_histogram_vec( + "near_state_sync_external_parts_scheduling_delay_sec", + "Delay for a request for parts from an external storage", &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), ) .unwrap() }); -pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = +pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_state_sync_external_parts_request_delay_sec", @@ -434,23 +442,12 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = Lazy::new(|| { - try_create_int_counter_vec( +pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy = + Lazy::new(|| { + try_create_int_counter_vec( "near_state_sync_external_parts_size_downloaded_bytes_total", "Amount of bytes downloaded from an external storage when requesting state parts for a shard", &["shard_id"], ) .unwrap() -}); - -pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_dump_put_object_elapsed_sec", - "Time needed to write a part", - &["shard_id"], - Some(exponential_buckets(0.001, 1.6, 25).unwrap()), - ) - .unwrap() -}); + }); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 880585a8736..5db95e58aa7 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -29,7 +29,6 @@ use near_async::messaging::CanSendAsync; use near_chain::chain::{ApplyStatePartsRequest, StateSplitRequest}; use near_chain::near_chain_primitives; use near_chain::{Chain, RuntimeWithEpochManagerAdapter}; -use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig}; use near_client_primitives::types::{ DownloadStatus, ShardSyncDownload, ShardSyncStatus, StateSplitApplyingStatus, }; @@ -47,9 +46,7 @@ use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot}; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use std::collections::HashMap; -use std::io::Write; use std::ops::Add; -use std::path::PathBuf; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration as TimeDuration; @@ -102,7 +99,7 @@ fn make_account_or_peer_id_or_hash( enum StateSyncInner { /// Request both the state header and state parts from the peers. Peers { - /// Information about which parts were requested from which peer and when. + /// Which parts were requested from which peer and when. last_part_id_requested: HashMap<(AccountOrPeerIdOrHash, ShardId), PendingRequestStatus>, /// Map from which part we requested to whom. requested_target: lru::LruCache<(u64, CryptoHash), AccountOrPeerIdOrHash>, @@ -112,114 +109,14 @@ enum StateSyncInner { PartsFromExternal { /// Chain ID. chain_id: String, + /// Connection to the external storage. + bucket: Arc, /// The number of requests for state parts from external storage that are /// allowed to be started for this shard. requests_remaining: Arc, - /// Connection to the external storage. - external: ExternalConnection, }, } -/// Errors encountered while accessing state parts from external storage. -#[derive(thiserror::Error, Debug)] -pub enum ExternalStorageError { - #[error("S3 request failed: {0}")] - S3Error(#[from] s3::error::S3Error), - #[error("Wrong response code: {0}")] - S3ResponseCode(u16), - #[error("Filesystem error: {0}")] - FilesystemError(#[from] std::io::Error), -} - -/// Connection to the external storage. -#[derive(Clone)] -pub enum ExternalConnection { - S3 { bucket: Arc }, - Filesystem { root_dir: PathBuf }, -} - -impl ExternalConnection { - async fn get_part( - self, - shard_id: ShardId, - location: &str, - ) -> Result, ExternalStorageError> { - let started = StaticClock::utc(); - let result = match self { - ExternalConnection::S3 { bucket } => { - tracing::info!(target: "sync", %shard_id, location, "Getting an object from the external storage"); - let result = bucket.get_object(location.clone()).await; - match result { - Ok(response) => { - tracing::info!(target: "sync", %shard_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished"); - if response.status_code() == 200 { - Ok(response.bytes().to_vec()) - } else { - Err(ExternalStorageError::S3ResponseCode(response.status_code())) - } - } - Err(err) => Err(ExternalStorageError::S3Error(err)), - } - } - ExternalConnection::Filesystem { root_dir } => { - let path = root_dir.join(location.clone()); - tracing::info!(target: "sync", %shard_id, ?path, "Reading a file"); - let result = std::fs::read(&path); - result.map_err(ExternalStorageError::FilesystemError) - } - }; - let completed = StaticClock::utc(); - metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY - .with_label_values(&[&shard_id.to_string()]) - .observe( - completed.signed_duration_since(started).num_nanoseconds().unwrap_or(0) as f64 - / 1e9, - ); - result - } - - pub async fn put_state_part( - &self, - state_part: &[u8], - shard_id: ShardId, - location: &str, - ) -> Result<(), ExternalStorageError> { - let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - match self { - ExternalConnection::S3 { bucket } => { - let put = bucket.put_object(&location, &state_part).await; - match put { - Ok(_) => { - tracing::warn!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); - Ok(()) - } - Err(err) => { - tracing::warn!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, ?err, "Failed to write a state part to S3"); - Err(ExternalStorageError::S3Error(err)) - } - } - } - ExternalConnection::Filesystem { root_dir } => { - let path = root_dir.join(location); - if let Some(parent_dir) = path.parent() { - std::fs::create_dir_all(parent_dir) - .map_err(ExternalStorageError::FilesystemError)?; - } - let mut file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open(&path) - .map_err(ExternalStorageError::FilesystemError)?; - file.write_all(state_part).map_err(ExternalStorageError::FilesystemError)?; - tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to a file"); - Ok(()) - } - } - } -} - /// Helper to track state sync. pub struct StateSync { /// How to retrieve the state data. @@ -248,34 +145,25 @@ impl StateSync { network_adapter: PeerManagerAdapter, timeout: TimeDuration, chain_id: &str, - sync_config: SyncConfig, + state_sync_from_s3_enabled: bool, + s3_bucket: &str, + s3_region: &str, + num_s3_requests_per_shard: u64, ) -> Self { - let inner = match sync_config { - SyncConfig::Peers => StateSyncInner::Peers { + let inner = if state_sync_from_s3_enabled { + let bucket = create_bucket(s3_bucket, s3_region, timeout); + if let Err(err) = bucket { + panic!("Failed to create an S3 bucket: {}", err); + } + StateSyncInner::PartsFromExternal { + chain_id: chain_id.to_string(), + bucket: Arc::new(bucket.unwrap()), + requests_remaining: Arc::new(AtomicI64::new(num_s3_requests_per_shard as i64)), + } + } else { + StateSyncInner::Peers { last_part_id_requested: Default::default(), requested_target: lru::LruCache::new(MAX_PENDING_PART as usize), - }, - SyncConfig::ExternalStorage(ExternalStorageConfig { - location, - num_concurrent_requests, - }) => { - let external = match location { - ExternalStorageLocation::S3 { bucket, region } => { - let bucket = create_bucket(&bucket, ®ion, timeout); - if let Err(err) = bucket { - panic!("Failed to create an S3 bucket: {}", err); - } - ExternalConnection::S3 { bucket: Arc::new(bucket.unwrap()) } - } - ExternalStorageLocation::Filesystem { root_dir } => { - ExternalConnection::Filesystem { root_dir } - } - }; - StateSyncInner::PartsFromExternal { - chain_id: chain_id.to_string(), - requests_remaining: Arc::new(AtomicI64::new(num_concurrent_requests as i64)), - external, - } } }; let timeout = Duration::from_std(timeout).unwrap(); @@ -752,7 +640,7 @@ impl StateSync { ); } } - StateSyncInner::PartsFromExternal { chain_id, requests_remaining, external } => { + StateSyncInner::PartsFromExternal { chain_id, bucket, requests_remaining } => { let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); let epoch_id = sync_block_header.epoch_id(); let epoch_info = chain.runtime_adapter.get_epoch_info(epoch_id).unwrap(); @@ -770,8 +658,8 @@ impl StateSync { epoch_height, state_num_parts, &chain_id.clone(), + bucket.clone(), requests_remaining.clone(), - external.clone(), ); } } @@ -990,7 +878,8 @@ impl StateSync { let part_timeout = now - prev > self.timeout; // Retry parts that failed. if part_timeout || part_download.error { download_timeout |= part_timeout; - if part_timeout || part_download.last_target.is_some() { + if part_timeout || + part_download.last_target != Some(near_client_primitives::types::AccountOrPeerIdOrHash::ExternalStorage) { // Don't immediately retry failed requests from external // storage. Most often error is a state part not // available. That error doesn't get fixed by retrying, @@ -1215,8 +1104,8 @@ fn request_part_from_external_storage( epoch_height: EpochHeight, num_parts: u64, chain_id: &str, + bucket: Arc, requests_remaining: Arc, - external: ExternalConnection, ) { if !allow_request(&requests_remaining) { return; @@ -1231,12 +1120,38 @@ fn request_part_from_external_storage( let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts); let download_response = download.response.clone(); + let scheduled = StaticClock::utc(); near_performance_metrics::actix::spawn("StateSync", { async move { - let result = external.get_part(shard_id, &location).await; + tracing::info!(target: "sync", %shard_id, part_id, location, "Getting an object from the external storage"); + let started = StaticClock::utc(); + metrics::STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY + .with_label_values(&[&shard_id.to_string()]) + .observe( + started.signed_duration_since(scheduled).num_nanoseconds().unwrap_or(0) as f64 + / 1e9, + ); + let result = bucket.get_object(location.clone()).await; + let completed = StaticClock::utc(); finished_request(&requests_remaining); - let mut lock = download_response.lock().unwrap(); - *lock = Some(result.map_err(|err| err.to_string())); + metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY + .with_label_values(&[&shard_id.to_string()]) + .observe( + completed.signed_duration_since(started).num_nanoseconds().unwrap_or(0) as f64 + / 1e9, + ); + match result { + Ok(response) => { + tracing::info!(target: "sync", %shard_id, part_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished"); + let mut lock = download_response.lock().unwrap(); + *lock = Some(Ok((response.status_code(), response.bytes().to_vec()))); + } + Err(err) => { + tracing::info!(target: "sync", %shard_id, part_id, location, ?err, "S3 request failed"); + let mut lock = download_response.lock().unwrap(); + *lock = Some(Err(err.to_string())); + } + } } }); } @@ -1340,7 +1255,8 @@ fn check_external_storage_part_response( let mut err_to_retry = None; match external_storage_response { // HTTP status code 200 means success. - Ok(data) => { + Ok((200, data)) => { + tracing::debug!(target: "sync", %shard_id, part_id, "Got 200 response from external storage"); match chain.set_state_part( shard_id, sync_hash, @@ -1367,6 +1283,11 @@ fn check_external_storage_part_response( } } } + // Other HTTP status codes are considered errors. + Ok((status_code, _)) => { + tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, status_code, "Wrong response code, expected 200"); + err_to_retry = Some(near_chain::Error::Other(format!("status_code: {}", status_code))); + } // The request failed without reaching the external storage. Err(err) => { err_to_retry = Some(near_chain::Error::Other(err)); @@ -1505,35 +1426,6 @@ impl Iterator for SamplerLimited { } } -// Needs to be in sync with `fn s3_location()`. -pub fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { - format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) -} - -pub fn match_filename(s: &str) -> Option { - let re = regex::Regex::new(r"^state_part_(\d{6})_of_(\d{6})$").unwrap(); - re.captures(s) -} - -pub fn is_part_filename(s: &str) -> bool { - match_filename(s).is_some() -} - -pub fn get_num_parts_from_filename(s: &str) -> Option { - if let Some(captures) = match_filename(s) { - if let Some(num_parts) = captures.get(2) { - if let Ok(num_parts) = num_parts.as_str().parse::() { - return Some(num_parts); - } - } - } - None -} - -pub fn part_filename(part_id: u64, num_parts: u64) -> String { - format!("state_part_{:06}_of_{:06}", part_id, num_parts) -} - #[cfg(test)] mod test { diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index b48d882671b..797bb43644b 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -5,7 +5,6 @@ use near_primitives::types::{ }; use near_primitives::version::Version; use std::cmp::{max, min}; -use std::path::PathBuf; use std::time::Duration; pub const TEST_STATE_SYNC_TIMEOUT: u64 = 5; @@ -24,9 +23,6 @@ pub const MIN_GC_NUM_EPOCHS_TO_KEEP: u64 = 3; /// Default number of epochs for which we keep store data pub const DEFAULT_GC_NUM_EPOCHS_TO_KEEP: u64 = 5; -/// Default number of concurrent requests to external storage to fetch state parts. -pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL: u64 = 25; - /// Configuration for garbage collection. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] pub struct GCConfig { @@ -73,80 +69,6 @@ impl GCConfig { } } -fn default_num_concurrent_requests() -> u64 { - DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub struct ExternalStorageConfig { - /// Location of state parts. - pub location: ExternalStorageLocation, - /// When fetching state parts from external storage, throttle fetch requests - /// to this many concurrent requests per shard. - #[serde(default = "default_num_concurrent_requests")] - pub num_concurrent_requests: u64, -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub enum ExternalStorageLocation { - S3 { - /// Location of state dumps on S3. - bucket: String, - /// Data may only be available in certain locations. - region: String, - }, - Filesystem { - root_dir: PathBuf, - }, -} - -/// Configures how to dump state to external storage. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub struct DumpConfig { - /// Specifies where to write the obtained state parts. - pub location: ExternalStorageLocation, - /// Use in case a node that dumps state to the external storage - /// gets in trouble. - #[serde(skip_serializing_if = "Option::is_none")] - pub restart_dump_for_shards: Option>, - /// How often to check if a new epoch has started. - /// Feel free to set to `None`, defaults are sensible. - #[serde(skip_serializing_if = "Option::is_none")] - pub iteration_delay: Option, -} - -/// Configures how to fetch state parts during state sync. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub enum SyncConfig { - /// Syncs state from the peers without reading anything from external storage. - Peers, - /// Expects parts to be available in external storage. - ExternalStorage(ExternalStorageConfig), -} - -impl Default for SyncConfig { - fn default() -> Self { - Self::Peers - } -} - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -/// Options for dumping state to S3. -pub struct StateSyncConfig { - #[serde(skip_serializing_if = "Option::is_none")] - /// `none` value disables state dump to external storage. - pub dump: Option, - #[serde(skip_serializing_if = "SyncConfig::is_default")] - pub sync: SyncConfig, -} - -impl SyncConfig { - /// Checks whether the object equals its default value. - fn is_default(&self) -> bool { - matches!(self, Self::Peers) - } -} - /// ClientConfig where some fields can be updated at runtime. #[derive(Clone, serde::Serialize)] pub struct ClientConfig { @@ -244,13 +166,23 @@ pub struct ClientConfig { pub client_background_migration_threads: usize, /// Duration to perform background flat storage creation step. pub flat_storage_creation_period: Duration, + /// If enabled, will dump state of every epoch to external storage. + pub state_sync_dump_enabled: bool, + /// S3 bucket for storing state dumps. + pub state_sync_s3_bucket: String, + /// S3 region for storing state dumps. + pub state_sync_s3_region: String, + /// Restart dumping state of selected shards. + /// Use for troubleshooting of the state dumping process. + pub state_sync_restart_dump_for_shards: Vec, + /// Whether to enable state sync from S3. + /// If disabled will perform state sync from the peers. + pub state_sync_from_s3_enabled: bool, + /// Number of parallel in-flight requests allowed per shard. + pub state_sync_num_concurrent_s3_requests: u64, /// Whether to use the State Sync mechanism. /// If disabled, the node will do Block Sync instead of State Sync. pub state_sync_enabled: bool, - /// Options for dumping state to S3. - pub state_sync_config_dump: Option, - /// Configures how to fetch state parts during state sync. - pub state_sync_config_sync: SyncConfig, } impl ClientConfig { @@ -320,9 +252,13 @@ impl ClientConfig { enable_statistics_export: true, client_background_migration_threads: 1, flat_storage_creation_period: Duration::from_secs(1), + state_sync_dump_enabled: false, + state_sync_s3_bucket: String::new(), + state_sync_s3_region: String::new(), + state_sync_restart_dump_for_shards: vec![], + state_sync_from_s3_enabled: false, + state_sync_num_concurrent_s3_requests: 10, state_sync_enabled: false, - state_sync_config_dump: None, - state_sync_config_sync: SyncConfig::default(), } } } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 21042e67d7c..6c8db4cadde 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -2,7 +2,7 @@ use crate::download_file::{run_download_file, FileDownloadError}; use anyhow::{anyhow, bail, Context}; use near_chain_configs::{ get_initial_supply, ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, - LogSummaryStyle, MutableConfigValue, StateSyncConfig, + LogSummaryStyle, MutableConfigValue, }; use near_config_utils::{ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer}; @@ -328,7 +328,7 @@ pub struct Config { /// The node usually stops within several seconds after reaching the target height. #[serde(default, skip_serializing_if = "Option::is_none")] pub expected_shutdown: Option, - /// Options for syncing state and dumping state. + /// Options for dumping state of every epoch to S3. #[serde(skip_serializing_if = "Option::is_none")] pub state_sync: Option, /// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead. @@ -677,17 +677,41 @@ impl NearConfig { enable_statistics_export: config.store.enable_statistics_export, client_background_migration_threads: config.store.background_migration_threads, flat_storage_creation_period: config.store.flat_storage_creation_period, - state_sync_enabled: config.state_sync_enabled.unwrap_or(false), - state_sync_config_dump: config + state_sync_dump_enabled: config + .state_sync + .as_ref() + .map(|x| x.dump_enabled) + .flatten() + .unwrap_or(false), + state_sync_s3_bucket: config + .state_sync + .as_ref() + .map(|x| x.s3_bucket.clone()) + .unwrap_or(String::new()), + state_sync_s3_region: config + .state_sync + .as_ref() + .map(|x| x.s3_region.clone()) + .unwrap_or(String::new()), + state_sync_restart_dump_for_shards: config + .state_sync + .as_ref() + .map(|x| x.restart_dump_for_shards.clone()) + .flatten() + .unwrap_or(vec![]), + state_sync_from_s3_enabled: config .state_sync .as_ref() - .map(|x| x.dump.clone()) - .flatten(), - state_sync_config_sync: config + .map(|x| x.sync_from_s3_enabled) + .flatten() + .unwrap_or(false), + state_sync_num_concurrent_s3_requests: config .state_sync .as_ref() - .map(|x| x.sync.clone()) - .unwrap_or_default(), + .map(|x| x.num_concurrent_s3_requests) + .flatten() + .unwrap_or(100), + state_sync_enabled: config.state_sync_enabled.unwrap_or(false), }, network_config: NetworkConfig::new( config.network, @@ -1510,6 +1534,30 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) - NearConfig::new(config, genesis, signer.into(), validator_signer).unwrap() } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] +/// Options for dumping state to S3. +pub struct StateSyncConfig { + /// Location of state dumps on S3. + pub s3_bucket: String, + /// Region is very important on S3. + pub s3_region: String, + /// Whether a node should dump state of each epoch to the external storage. + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_enabled: Option, + /// Use carefully in case a node that dumps state to the external storage + /// gets in trouble. + #[serde(skip_serializing_if = "Option::is_none")] + pub restart_dump_for_shards: Option>, + /// If enabled, will download state parts from external storage and not from + /// the peers. + #[serde(skip_serializing_if = "Option::is_none")] + pub sync_from_s3_enabled: Option, + /// When syncing state from S3, throttle requests to this many concurrent + /// requests per shard. + #[serde(skip_serializing_if = "Option::is_none")] + pub num_concurrent_s3_requests: Option, +} + #[test] fn test_init_config_localnet() { // Check that we can initialize the config with multiple shards. diff --git a/nearcore/src/config_validate.rs b/nearcore/src/config_validate.rs index 381daae1e68..13164ea1c8b 100644 --- a/nearcore/src/config_validate.rs +++ b/nearcore/src/config_validate.rs @@ -1,7 +1,4 @@ -use near_chain_configs::{ExternalStorageLocation, SyncConfig}; use near_config_utils::{ValidationError, ValidationErrors}; -use std::collections::HashSet; -use std::path::PathBuf; use crate::config::Config; @@ -80,51 +77,16 @@ impl<'a> ConfigValidator<'a> { } if let Some(state_sync) = &self.config.state_sync { - if let Some(dump_config) = &state_sync.dump { - if let Some(restart_dump_for_shards) = &dump_config.restart_dump_for_shards { - let unique_values: HashSet<_> = restart_dump_for_shards.iter().collect(); - if unique_values.len() != restart_dump_for_shards.len() { - let error_message = format!("'config.state_sync.dump.restart_dump_for_shards' contains duplicate values."); - self.validation_errors.push_config_semantics_error(error_message); - } - } - - match &dump_config.location { - ExternalStorageLocation::S3 { bucket, region } => { - if bucket.is_empty() || region.is_empty() { - let error_message = format!("'config.state_sync.dump.location.s3.bucket' and 'config.state_sync.dump.location.s3.region' need to be specified when 'config.state_sync.dump.location.s3' is present."); - self.validation_errors.push_config_semantics_error(error_message); - } - } - ExternalStorageLocation::Filesystem { root_dir } => { - if root_dir == &PathBuf::new() { - let error_message = format!("'config.state_sync.dump.location.filesystem.root_dir' needs to be specified when 'config.state_sync.dump.location.filesystem' is present."); - self.validation_errors.push_config_semantics_error(error_message); - } - } + if state_sync.dump_enabled.unwrap_or(false) { + if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { + let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.dump_enabled' is enabled."); + self.validation_errors.push_config_semantics_error(error_message); } } - match &state_sync.sync { - SyncConfig::Peers => {} - SyncConfig::ExternalStorage(config) => { - match &config.location { - ExternalStorageLocation::S3 { bucket, region } => { - if bucket.is_empty() || region.is_empty() { - let error_message = format!("'config.state_sync.sync.external_storage.location.s3.bucket' and 'config.state_sync.sync.external_storage.location.s3.region' need to be specified when 'config.state_sync.sync.external_storage.location.s3' is present."); - self.validation_errors.push_config_semantics_error(error_message); - } - } - ExternalStorageLocation::Filesystem { root_dir } => { - if root_dir == &PathBuf::new() { - let error_message = format!("'config.state_sync.sync.external_storage.location.filesystem.root_dir' needs to be specified when 'config.state_sync.sync.external_storage.location.filesystem' is present."); - self.validation_errors.push_config_semantics_error(error_message); - } - } - } - if config.num_concurrent_requests == 0 { - let error_message = format!("'config.state_sync.sync.external_storage.num_concurrent_requests' needs to be greater than 0"); - self.validation_errors.push_config_semantics_error(error_message); - } + if state_sync.sync_from_s3_enabled.unwrap_or(false) { + if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { + let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.sync_from_s3_enabled' is enabled."); + self.validation_errors.push_config_semantics_error(error_message); } } } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 9ca43a659d7..b300aa0f883 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -34,7 +34,7 @@ pub mod dyn_config; mod metrics; pub mod migrations; mod runtime; -pub mod state_sync; +mod state_sync; pub fn get_default_home() -> PathBuf { if let Ok(near_home) = std::env::var("NEAR_HOME") { @@ -274,12 +274,7 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); - let state_sync_dump_handle = spawn_state_sync_dump( - &config.client_config, - chain_genesis, - runtime, - config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), - )?; + let state_sync_dump_handle = spawn_state_sync_dump(&config, chain_genesis, runtime)?; #[allow(unused_mut)] let mut rpc_servers = Vec::new(); diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 568f66c5358..da26df70963 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -1,57 +1,49 @@ -use crate::metrics; +use crate::{metrics, NearConfig, NightshadeRuntime}; use borsh::BorshSerialize; -use near_chain::{ - Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error, - RuntimeWithEpochManagerAdapter, -}; -use near_chain_configs::{ClientConfig, ExternalStorageLocation}; -use near_client::sync::state::{s3_location, ExternalConnection, StateSync}; +use near_chain::types::RuntimeAdapter; +use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; +use near_chain_configs::ClientConfig; +use near_client::sync::state::{s3_location, StateSync}; +use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpProgress}; -use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; +use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; use near_store::DBCol; -use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::time::Duration; /// Starts one a thread per tracked shard. /// Each started thread will be dumping state parts of a single epoch to external storage. pub fn spawn_state_sync_dump( - client_config: &ClientConfig, + config: &NearConfig, chain_genesis: ChainGenesis, - runtime: Arc, - account_id: Option, + runtime: Arc, ) -> anyhow::Result> { - let dump_config = if let Some(dump_config) = client_config.state_sync_config_dump.clone() { - dump_config - } else { - // Dump is not configured, and therefore not enabled. + if !config.client_config.state_sync_dump_enabled { return Ok(None); - }; + } + if config.client_config.state_sync_s3_bucket.is_empty() + || config.client_config.state_sync_s3_region.is_empty() + { + panic!("Enabled dumps of state to external storage. Please specify state_sync.s3_bucket and state_sync.s3_region"); + } tracing::info!(target: "state_sync_dump", "Spawning the state sync dump loop"); - let external = match dump_config.location { - ExternalStorageLocation::S3 { bucket, region } => { - // Credentials to establish a connection are taken from environment variables: - // * `AWS_ACCESS_KEY_ID` - // * `AWS_SECRET_ACCESS_KEY` - let bucket = s3::Bucket::new( - &bucket, - region - .parse::() - .map_err(|err| >::into(err))?, - s3::creds::Credentials::default().map_err(|err| { - tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); - >::into(err) - })?, - ).map_err(|err| >::into(err))?; - ExternalConnection::S3 { bucket: Arc::new(bucket) } - } - ExternalStorageLocation::Filesystem { root_dir } => { - ExternalConnection::Filesystem { root_dir } - } - }; + // Create a connection to S3. + let s3_bucket = config.client_config.state_sync_s3_bucket.clone(); + let s3_region = config.client_config.state_sync_s3_region.clone(); + + // Credentials to establish a connection are taken from environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. + let bucket = s3::Bucket::new( + &s3_bucket, + s3_region + .parse::() + .map_err(|err| >::into(err))?, + s3::creds::Credentials::default().map_err(|err| { + tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); + >::into(err) + })?, + ).map_err(|err| >::into(err))?; // Determine how many threads to start. // TODO: Handle the case of changing the shard layout. @@ -67,11 +59,10 @@ pub fn spawn_state_sync_dump( runtime.num_shards(&epoch_id) }?; - let chain_id = client_config.chain_id.clone(); - let keep_running = Arc::new(AtomicBool::new(true)); // Start a thread for each shard. let handles = (0..num_shards as usize) .map(|shard_id| { + let client_config = config.client_config.clone(); let runtime = runtime.clone(); let chain_genesis = chain_genesis.clone(); let chain = Chain::new_for_view_client( @@ -86,38 +77,30 @@ pub fn spawn_state_sync_dump( shard_id as ShardId, chain, runtime, - chain_id.clone(), - dump_config.restart_dump_for_shards.clone().unwrap_or_default(), - external.clone(), - dump_config.iteration_delay.unwrap_or(Duration::from_secs(10)).clone(), - account_id.clone(), - keep_running.clone(), + client_config, + bucket.clone(), ))); arbiter_handle }) .collect(); - Ok(Some(StateSyncDumpHandle { handles, keep_running })) + Ok(Some(StateSyncDumpHandle { handles })) } /// Holds arbiter handles controlling the lifetime of the spawned threads. pub struct StateSyncDumpHandle { pub handles: Vec, - keep_running: Arc, } impl Drop for StateSyncDumpHandle { fn drop(&mut self) { - self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); self.stop() } } impl StateSyncDumpHandle { pub fn stop(&self) { - self.handles.iter().for_each(|handle| { - handle.stop(); - }); + let _: Vec = self.handles.iter().map(|handle| handle.stop()).collect(); } } @@ -127,22 +110,21 @@ impl StateSyncDumpHandle { async fn state_sync_dump( shard_id: ShardId, chain: Chain, - runtime: Arc, - chain_id: String, - restart_dump_for_shards: Vec, - external: ExternalConnection, - iteration_delay: Duration, - account_id: Option, - keep_running: Arc, + runtime: Arc, + config: ClientConfig, + bucket: s3::Bucket, ) { tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); - if restart_dump_for_shards.contains(&shard_id) { + if config.state_sync_restart_dump_for_shards.contains(&shard_id) { tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); chain.store().set_state_sync_dump_progress(shard_id, None).unwrap(); } - while keep_running.load(std::sync::atomic::Ordering::Relaxed) { + loop { + // Avoid a busy-loop when there is nothing to do. + std::thread::sleep(std::time::Duration::from_secs(10)); + let progress = chain.store().get_state_sync_dump_progress(shard_id); tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); // The `match` returns the next state of the state machine. @@ -156,12 +138,11 @@ async fn state_sync_dump( shard_id, &chain, &runtime, - &account_id, ) } Err(Error::DBNotFoundErr(_)) | Ok(None) => { // First invocation of this state-machine. See if at least one epoch is available for dumping. - check_new_epoch(None, None, None, shard_id, &chain, &runtime, &account_id) + check_new_epoch(None, None, None, shard_id, &chain, &runtime) } Err(err) => { // Something went wrong, let's retry. @@ -186,7 +167,7 @@ async fn state_sync_dump( let mut res = None; // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, ?state_root, parts_dumped, "Creating parts and dumping them"); + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, parts_dumped, "Creating parts and dumping them"); for part_id in parts_dumped..num_parts { // Dump parts sequentially synchronously. // TODO: How to make it possible to dump state more effectively using multiple nodes? @@ -209,12 +190,17 @@ async fn state_sync_dump( break; } }; - let location = - s3_location(&chain_id, epoch_height, shard_id, part_id, num_parts); + let location = s3_location( + &config.chain_id, + epoch_height, + shard_id, + part_id, + num_parts, + ); if let Err(err) = - external.put_state_part(&state_part, shard_id, &location).await + put_state_part(&location, &state_part, &shard_id, &bucket).await { - res = Some(Error::Other(err.to_string())); + res = Some(err); break; } update_progress( @@ -244,36 +230,44 @@ async fn state_sync_dump( }; // Record the next state of the state machine. - let has_progress = match next_state { + match next_state { Ok(Some(next_state)) => { tracing::debug!(target: "state_sync_dump", shard_id, ?next_state); match chain.store().set_state_sync_dump_progress(shard_id, Some(next_state)) { - Ok(_) => true, + Ok(_) => {} Err(err) => { // This will be retried. tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to set progress"); - false } } } Ok(None) => { // Do nothing. tracing::debug!(target: "state_sync_dump", shard_id, "Idle"); - false } Err(err) => { // Will retry. tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to determine what to do"); - false } - }; - - if !has_progress { - // Avoid a busy-loop when there is nothing to do. - std::thread::sleep(iteration_delay); } } - tracing::debug!(target: "state_sync_dump", shard_id, "DONE"); +} + +async fn put_state_part( + location: &str, + state_part: &[u8], + shard_id: &ShardId, + bucket: &s3::Bucket, +) -> Result { + let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let put = bucket + .put_object(&location, &state_part) + .await + .map_err(|err| Error::Other(err.to_string())); + tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); + put } fn update_progress( @@ -340,7 +334,7 @@ fn set_metrics( /// Obtains and then saves the part data. fn obtain_and_store_state_part( - runtime: &Arc, + runtime: &Arc, shard_id: &ShardId, sync_hash: &CryptoHash, state_root: &StateRoot, @@ -368,8 +362,7 @@ fn start_dumping( sync_hash: CryptoHash, shard_id: ShardId, chain: &Chain, - runtime: &Arc, - account_id: &Option, + runtime: &Arc, ) -> Result, Error> { let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); @@ -378,8 +371,8 @@ fn start_dumping( let state_header = chain.get_state_response_header(shard_id, sync_hash)?; let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); - if runtime.cares_about_shard(account_id.as_ref(), sync_prev_hash, shard_id, true) { - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); + if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) { + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -390,7 +383,7 @@ fn start_dumping( parts_dumped: 0, })) } else { - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } @@ -403,8 +396,7 @@ fn check_new_epoch( num_parts: Option, shard_id: ShardId, chain: &Chain, - runtime: &Arc, - account_id: &Option, + runtime: &Arc, ) -> Result, Error> { let head = chain.head()?; if Some(&head.epoch_id) == epoch_id.as_ref() { @@ -422,116 +414,7 @@ fn check_new_epoch( // Still in the latest dumped epoch. Do nothing. Ok(None) } else { - start_dumping(head.epoch_id, sync_hash, shard_id, &chain, runtime, account_id) + start_dumping(head.epoch_id, sync_hash, shard_id, &chain, runtime) } } } - -#[cfg(test)] -mod tests { - use crate::state_sync::spawn_state_sync_dump; - use near_chain::{ChainGenesis, Provenance}; - use near_chain_configs::{DumpConfig, ExternalStorageLocation}; - use near_client::test_utils::TestEnv; - use near_network::test_utils::wait_or_timeout; - use near_o11y::testonly::init_test_logger; - use near_primitives::hash::CryptoHash; - use near_primitives::state_part::PartId; - use near_primitives::types::BlockHeight; - use std::ops::ControlFlow; - use std::str::FromStr; - use std::time::Duration; - - #[test] - /// Produce several blocks, wait for the state dump thread to notice and - /// write files to a temp dir. - fn test_state_dump() { - init_test_logger(); - - let mut chain_genesis = ChainGenesis::test(); - chain_genesis.epoch_length = 5; - let mut env = TestEnv::builder(chain_genesis.clone()).build(); - let chain = &env.clients[0].chain; - let runtime = chain.runtime_adapter(); - let mut config = env.clients[0].config.clone(); - let root_dir = tempfile::Builder::new().prefix("state_dump").tempdir().unwrap(); - config.state_sync_config_dump = Some(DumpConfig { - location: ExternalStorageLocation::Filesystem { - root_dir: root_dir.path().to_path_buf(), - }, - restart_dump_for_shards: None, - iteration_delay: Some(Duration::from_millis(250)), - }); - - const MAX_HEIGHT: BlockHeight = 15; - - near_actix_test_utils::run_actix(async move { - let _state_sync_dump_handle = spawn_state_sync_dump( - &config, - chain_genesis, - runtime.clone(), - Some("test0".parse().unwrap()), - ) - .unwrap(); - let mut last_block = None; - for i in 1..=MAX_HEIGHT { - let block = env.clients[0].produce_block(i as u64).unwrap().unwrap(); - last_block = Some(block.clone()); - env.process_block(0, block, Provenance::PRODUCED); - } - let epoch_id = runtime.get_epoch_id(last_block.clone().unwrap().hash()).unwrap(); - let epoch_info = runtime.get_epoch_info(&epoch_id).unwrap(); - let epoch_height = epoch_info.epoch_height(); - - let num_shards = runtime.num_shards(&epoch_id).unwrap(); - assert_eq!(num_shards, 1); - assert_eq!(epoch_height, 10); - - wait_or_timeout(100, 10000, || async { - let mut all_parts_present = true; - - assert_eq!(num_shards, 1); - for (part_id, path) in [ - root_dir.path().join( - "chain_id=unittest/epoch_height=10/shard_id=0/state_part_000000_of_000003", - ), - root_dir.path().join( - "chain_id=unittest/epoch_height=10/shard_id=0/state_part_000001_of_000003", - ), - root_dir.path().join( - "chain_id=unittest/epoch_height=10/shard_id=0/state_part_000002_of_000003", - ), - ] - .iter() - .enumerate() - { - match std::fs::read(&path) { - Ok(part) => { - // KeyValueRuntime::validate_state_part() always returns true. - assert!(runtime.validate_state_part( - &CryptoHash::from_str( - "GqPgXpMYEGkvmGjdtEWygRUgYkaVQUfvwB7MfUi9jpZ2" - ) - .unwrap(), - PartId::new(part_id as u64, 3), - &part - )); - } - Err(err) => { - println!("path: {:?}, err: {:?}", path, err); - all_parts_present = false; - } - } - } - if all_parts_present { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - } - }) - .await - .unwrap(); - actix_rt::System::current().stop(); - }); - } -}