Skip to content

Commit

Permalink
feat: State Sync from External Storage (#8789)
Browse files Browse the repository at this point in the history
* Adds functionality to get state parts as files from S3
* Fixes an off-by-one-block error in state dumping to S3
* * In State Dump
* * In state-viewer
* Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()`
* New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
  • Loading branch information
nikurt committed Apr 18, 2023
1 parent 1b83813 commit f8bb69f
Show file tree
Hide file tree
Showing 15 changed files with 825 additions and 209 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Non-protocol Changes

* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)

## 1.33.0

### Protocol Changes
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3139,7 +3139,10 @@ impl Chain {
let state_root = *chunk.take_header().take_inner().prev_state_root();
if !self.runtime_adapter.validate_state_part(&state_root, part_id, data) {
byzantine_assert!(false);
return Err(Error::Other("set_state_part failed: validate_state_part failed".into()));
return Err(Error::Other(format!(
"set_state_part failed: validate_state_part failed. state_root={:?}",
state_root
)));
}

// Saving the part data.
Expand Down
68 changes: 53 additions & 15 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use near_primitives::views::{StatusResponse, StatusSyncInfo};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

/// Combines errors coming from chain, tx pool and block producer.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -50,6 +50,7 @@ pub enum AccountOrPeerIdOrHash {
AccountId(AccountId),
PeerId(PeerId),
Hash(CryptoHash),
ExternalStorage,
}

#[derive(Debug, serde::Serialize)]
Expand All @@ -61,18 +62,39 @@ pub struct DownloadStatus {
pub done: bool,
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
}

impl DownloadStatus {
pub fn new(now: DateTime<Utc>) -> Self {
Self {
start_time: now,
prev_update_time: now,
run_me: Arc::new(AtomicBool::new(true)),
error: false,
done: false,
state_requests_count: 0,
last_target: None,
response: Arc::new(Mutex::new(None)),
}
}
}

impl Clone for DownloadStatus {
/// Clones an object, but it clones the value of `run_me` instead of the
/// `Arc` that wraps that value.
fn clone(&self) -> Self {
DownloadStatus {
start_time: self.start_time,
prev_update_time: self.prev_update_time,
// Creates a new `Arc` holding the same value.
run_me: Arc::new(AtomicBool::new(self.run_me.load(Ordering::SeqCst))),
error: self.error,
done: self.done,
state_requests_count: self.state_requests_count,
last_target: self.last_target.clone(),
response: self.response.clone(),
}
}
}
Expand All @@ -90,6 +112,21 @@ pub enum ShardSyncStatus {
StateSyncDone,
}

impl ShardSyncStatus {
pub fn repr(&self) -> u8 {
match self {
ShardSyncStatus::StateDownloadHeader => 0,
ShardSyncStatus::StateDownloadParts => 1,
ShardSyncStatus::StateDownloadScheduling => 2,
ShardSyncStatus::StateDownloadApplying => 3,
ShardSyncStatus::StateDownloadComplete => 4,
ShardSyncStatus::StateSplitScheduling => 5,
ShardSyncStatus::StateSplitApplying(_) => 6,
ShardSyncStatus::StateSyncDone => 7,
}
}
}

/// Manually implement compare for ShardSyncStatus to compare only based on variant name
impl PartialEq<Self> for ShardSyncStatus {
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -164,25 +201,26 @@ pub struct ShardSyncDownload {
}

impl ShardSyncDownload {
/// Creates a instance of self which includes initial statuses for shard sync and download at the given time.
pub fn new(now: DateTime<Utc>) -> Self {
/// Creates a instance of self which includes initial statuses for shard state header download at the given time.
pub fn new_download_state_header(now: DateTime<Utc>) -> Self {
Self {
downloads: vec![
DownloadStatus {
start_time: now,
prev_update_time: now,
run_me: Arc::new(AtomicBool::new(true)),
error: false,
done: false,
state_requests_count: 0,
last_target: None,
};
1
],
downloads: vec![DownloadStatus::new(now)],
status: ShardSyncStatus::StateDownloadHeader,
}
}

/// Creates a instance of self which includes initial statuses for shard state parts download at the given time.
pub fn new_download_state_parts(now: DateTime<Utc>, num_parts: u64) -> Self {
// Avoid using `vec![x; num_parts]`, because each element needs to have
// its own independent value of `response`.
let mut downloads = Vec::with_capacity(num_parts as usize);
for _ in 0..num_parts {
downloads.push(DownloadStatus::new(now));
}
Self { downloads, status: ShardSyncStatus::StateDownloadParts }
}
}

/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Clone, Debug, strum::AsRefStr)]
pub enum SyncStatus {
Expand Down
1 change: 1 addition & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
sysinfo.workspace = true
Expand Down
20 changes: 18 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,15 @@ impl Client {
config.archive,
config.state_sync_enabled,
);
let state_sync = StateSync::new(network_adapter.clone(), config.state_sync_timeout);
let state_sync = StateSync::new(
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
let parity_parts = runtime_adapter.num_total_parts() - data_parts;
Expand Down Expand Up @@ -2121,7 +2129,15 @@ impl Client {
let (state_sync, new_shard_sync, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
(
StateSync::new(network_adapter1, state_sync_timeout),
StateSync::new(
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
)
Expand Down
97 changes: 97 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,100 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
])
.inc();
}

pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
"Latency of state part requests to external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
&["shard_id"],
)
.unwrap()
});
Loading

0 comments on commit f8bb69f

Please sign in to comment.