Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: State sync from local filesystem #8913

Merged
merged 24 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Non-protocol Changes

* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789)
* Node can sync State from local storage. [#8789](https://github.com/near/nearcore/pull/8789)

## 1.33.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ pub struct DownloadStatus {
pub state_requests_count: u64,
pub last_target: Option<AccountOrPeerIdOrHash>,
#[serde(skip_serializing, skip_deserializing)]
pub response: Arc<Mutex<Option<Result<(u16, Vec<u8>), String>>>>,
// Use type `String` as an error to avoid a dependency on the `rust-s3` crate.
pub response: Arc<Mutex<Option<Result<Vec<u8>, String>>>>,
ppca marked this conversation as resolved.
Show resolved Hide resolved
}

impl DownloadStatus {
Expand Down
21 changes: 11 additions & 10 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
reed-solomon-erasure.workspace = true
regex.workspace = true
rust-s3.workspace = true
serde_json.workspace = true
strum.workspace = true
Expand All @@ -28,24 +29,24 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

delay-detector.workspace = true
near-async.workspace = true
near-chain-primitives.workspace = true
near-crypto.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-chain.workspace = true
near-chunks.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-dyn-configs.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-pool.workspace = true
near-chunks.workspace = true
near-telemetry.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
near-epoch-manager.workspace = true
delay-detector.workspace = true
near-performance-metrics.workspace = true
near-pool.workspace = true
near-primitives.workspace = true
near-store.workspace = true
near-telemetry.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
10 changes: 2 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ impl Client {
network_adapter.clone(),
config.state_sync_timeout,
&config.chain_id,
config.state_sync_from_s3_enabled,
&config.state_sync_s3_bucket,
&config.state_sync_s3_region,
config.state_sync_num_concurrent_s3_requests,
&config.state_sync_config_sync,
);
let num_block_producer_seats = config.num_block_producer_seats as usize;
let data_parts = runtime_adapter.num_data_parts();
Expand Down Expand Up @@ -2133,10 +2130,7 @@ impl Client {
network_adapter1,
state_sync_timeout,
&self.config.chain_id,
self.config.state_sync_from_s3_enabled,
&self.config.state_sync_s3_bucket,
&self.config.state_sync_s3_region,
self.config.state_sync_num_concurrent_s3_requests,
&self.config.state_sync_config_sync,
),
new_shard_sync,
BlocksCatchUpState::new(sync_hash, epoch_id),
Expand Down
6 changes: 5 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,11 @@ impl Handler<WithSpanContext<Status>> for ClientActor {
sync_status: format!(
"{} ({})",
self.client.sync_status.as_variant_name().to_string(),
display_sync_status(&self.client.sync_status, &self.client.chain.head()?,),
display_sync_status(
&self.client.sync_status,
&self.client.chain.head()?,
&self.client.config.state_sync_config_sync
),
),
catchup_status: self.client.get_catchup_status()?,
current_head_status: head.clone().into(),
Expand Down
30 changes: 19 additions & 11 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config_updater::ConfigUpdater;
use crate::{metrics, SyncStatus};
use actix::Addr;
use itertools::Itertools;
use near_chain_configs::{ClientConfig, LogSummaryStyle};
use near_chain_configs::{ClientConfig, LogSummaryStyle, SyncConfig};
use near_network::types::NetworkInfo;
use near_primitives::block::Tip;
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -280,7 +280,8 @@ impl InfoHelper {

let s = |num| if num == 1 { "" } else { "s" };

let sync_status_log = Some(display_sync_status(sync_status, head));
let sync_status_log =
Some(display_sync_status(sync_status, head, &client_config.state_sync_config_sync));

let catchup_status_log = display_catchup_status(catchup_status);

Expand Down Expand Up @@ -488,7 +489,11 @@ pub fn display_catchup_status(catchup_status: Vec<CatchupStatusView>) -> String
.join("\n")
}

pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
pub fn display_sync_status(
sync_status: &SyncStatus,
head: &Tip,
state_sync_config: &SyncConfig,
) -> String {
metrics::SYNC_STATUS.set(sync_status.repr() as i64);
match sync_status {
SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height),
Expand Down Expand Up @@ -533,14 +538,17 @@ pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String {
for (shard_id, shard_status) in shard_statuses {
write!(res, "[{}: {}]", shard_id, shard_status.status.to_string(),).unwrap();
}
// TODO #8719
tracing::warn!(target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
if matches!(state_sync_config, SyncConfig::Peers) {
// TODO #8719
tracing::warn!(
target: "stats",
"The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\
Suggestions:\n\
* Download a recent data snapshot and restart the node.\n\
* Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\
\n\
A better implementation of State Sync is work in progress.");
}
res
}
SyncStatus::StateSyncDone => "State sync done".to_string(),
Expand Down
97 changes: 50 additions & 47 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy<IntGauge> = Lazy::new
.unwrap()
});

pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
pub(crate) static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
ppca marked this conversation as resolved.
Show resolved Hide resolved
try_create_histogram_vec(
"near_produce_chunk_time",
"Time taken to produce a chunk",
Expand All @@ -315,17 +315,18 @@ pub static PRODUCE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::ne
.unwrap()
});

pub static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
&["message"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});
pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_view_client_messages_processing_time",
"Time that view client takes to handle different messages",
&["message"],
Some(exponential_buckets(0.001, 2.0, 16).unwrap()),
)
.unwrap()
});

pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_produce_and_distribute_chunk_time",
Expand Down Expand Up @@ -355,7 +356,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version)
.inc();
}

pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_stage",
"Stage of state sync per shard",
Expand All @@ -364,16 +365,17 @@ pub static STATE_SYNC_STAGE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|
.unwrap()
});

pub static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_RETRY_PART: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_retry_part_total",
"Number of part requests retried",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
"Number of parts downloaded",
Expand All @@ -382,7 +384,7 @@ pub static STATE_SYNC_PARTS_DONE: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::
.unwrap()
});

pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_per_shard",
"Number of parts that need to be downloaded for the shard",
Expand All @@ -391,47 +393,37 @@ pub static STATE_SYNC_PARTS_TOTAL: Lazy<near_o11y::metrics::IntGaugeVec> = Lazy:
.unwrap()
});

pub static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
"near_state_sync_discard_parts_total",
"Number of times all downloaded parts were discarded to try again",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
"near_state_sync_external_parts_done_total",
"Number of parts successfully retrieved from an external storage",
&["shard_id"],
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_scheduling_delay_sec",
"Delay for a request for parts from an external storage",
try_create_int_counter_vec(
"near_state_sync_external_parts_failed_total",
"Number of parts failed attempts to retrieve parts from an external storage",
nikurt marked this conversation as resolved.
Show resolved Hide resolved
&["shard_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::HistogramVec> =
Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_external_parts_request_delay_sec",
Expand All @@ -442,12 +434,23 @@ pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::His
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<
near_o11y::metrics::IntCounterVec,
> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
&["shard_id"],
)
.unwrap()
});
});

pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Time needed to write a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
Loading