Skip to content

Commit

Permalink
feat: State Sync from External Storage (#8789)
Browse files Browse the repository at this point in the history
* Adds functionality to get state parts as files from S3
* Fixes an off-by-one-block error in state dumping to S3
* * In State Dump
* * In state-viewer
* Latency metric for functions `fn apply_state_part()` and `fn obtain_state_part()`
* New sub-sub-command `neard view-state state-parts read-state-header` to read state header stored in the DB.
  • Loading branch information
nikurt committed Apr 14, 2023
1 parent 3def342 commit dc64dd6
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 578 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ pub struct DownloadStatus {
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
// Use type `String` as an error to avoid a dependency on the `rust-s3` crate.
pub response: Arc<Mutex<Option<Result<Vec<u8>, String>>>>,
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
}

impl DownloadStatus {
Expand Down
21 changes: 10 additions & 11 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
regex.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
Expand All @@ -29,24 +28,24 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

delay-detector.workspace = true
near-async.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-crypto.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-chain-configs.workspace = true
near-chain.workspace = true
near-chunks.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-o11y.workspace = true
near-performance-metrics-macros.workspace = true
near-performance-metrics.workspace = true
near-pool.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-chunks.workspace = true
near-telemetry.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
near-epoch-manager.workspace = true
delay-detector.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
10 changes: 8 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ impl Client {
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_config_sync.clone(),
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
Expand Down Expand Up @@ -2130,7 +2133,10 @@ impl Client {
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_config_sync.clone(),
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
Expand Down
97 changes: 47 additions & 50 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy<IntGauge> = Lazy::new
.unwrap()
});

pub(crate) static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_produce_chunk_time",
"Time taken to produce a chunk",
Expand All @@ -315,18 +315,17 @@ pub(crate) static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = L
.unwrap()
});

pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
&["message"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
&["message"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});

pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
Expand Down Expand Up @@ -356,7 +355,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
.inc();
}

pub(crate) static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
Expand All @@ -365,17 +364,16 @@ pub(crate) static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy
.unwrap()
});

pub(crate) static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});
pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
Expand All @@ -384,7 +382,7 @@ pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> =
.unwrap()
});

pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
Expand All @@ -393,37 +391,47 @@ pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec>
.unwrap()
});

pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> =
pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
Expand All @@ -434,23 +442,12 @@ pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metri
.unwrap()
});

pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<
near_o11y::metrics::IntCounterVec,
> = Lazy::new(|| {
try_create_int_counter_vec(
pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Time needed to write a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
});
Loading

0 comments on commit dc64dd6

Please sign in to comment.