Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
nikurt committed Apr 5, 2023
1 parent 07841b5 commit 06a3e85
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 31 deletions.
10 changes: 0 additions & 10 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,3 @@ pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy<near_o11y::metrics::His
)
.unwrap()
});

pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy<near_o11y::metrics::IntCounterVec> =
Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_external_parts_size_downloaded_bytes_total",
"Amount of bytes downloaded from an external storage when requesting state parts for a shard",
&["shard_id"],
)
.unwrap()
});
12 changes: 7 additions & 5 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn make_account_or_peer_id_or_hash(
From::AccountId(a) => To::AccountId(a),
From::PeerId(p) => To::PeerId(p),
From::Hash(h) => To::Hash(h),
From::ExternalStorage() => To::ExternalStorage(),
From::ExternalStorage => To::ExternalStorage,
}
}

Expand Down Expand Up @@ -707,7 +707,7 @@ impl StateSync {
}
download.state_requests_count += 1;
download.last_target =
Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage()));
Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage));

let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts);
let download_response = download.response.clone();
Expand Down Expand Up @@ -1236,17 +1236,19 @@ fn check_external_storage_part_response(
.with_label_values(&[&shard_id.to_string()])
.inc();
tracing::warn!(target: "sync", %shard_id, %sync_hash, part_id, ?err, "Failed to save a state part");
err_to_retry = Some(Error::Other("Failed to save a state part".to_string()));
err_to_retry =
Some(near_chain::Error::Other("Failed to save a state part".to_string()));
}
}
}
// Other HTTP status codes are considered errors.
Ok((status_code, _)) => {
err_to_retry = Some(Error::Other(format!("status_code: {}", status_code).to_string()));
err_to_retry =
Some(near_chain::Error::Other(format!("status_code: {}", status_code).to_string()));
}
// The request failed without reaching the external storage.
Err(err) => {
err_to_retry = Some(Error::Other(err.to_string()));
err_to_retry = Some(near_chain::Error::Other(err.to_string()));
}
};

Expand Down
9 changes: 7 additions & 2 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ pub struct ClientConfig {
/// If disabled will perform state sync from the peers.
pub state_sync_from_s3_enabled: bool,
/// Number of parallel in-flight requests allowed per shard.
pub state_sync_num_s3_requests_per_shard: u64,
pub state_sync_num_concurrent_s3_requests: u64,
/// Whether to use the State Sync mechanism.
/// If disabled, the node will do Block Sync instead of State Sync.
pub state_sync_enabled: bool,
}

impl ClientConfig {
Expand Down Expand Up @@ -253,7 +256,9 @@ impl ClientConfig {
state_sync_s3_bucket: String::new(),
state_sync_s3_region: String::new(),
state_sync_restart_dump_for_shards: vec![],
state_sync_num_s3_requests_per_shard: 10,
state_sync_from_s3_enabled: false,
state_sync_num_concurrent_s3_requests: 10,
state_sync_enabled: false,
}
}
}
26 changes: 16 additions & 10 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ pub struct Config {
#[serde(skip_serializing_if = "Option::is_none")]
pub state_sync: Option<StateSyncConfig>,
/// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead.
#[serde(skip_serializing_if = "is_false")]
pub state_sync_enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub state_sync_enabled: Option<bool>,
}

fn is_false(value: &bool) -> bool {
Expand Down Expand Up @@ -384,7 +384,7 @@ impl Default for Config {
split_storage: None,
expected_shutdown: None,
state_sync: None,
state_sync_enabled: false,
state_sync_enabled: None,
}
}
}
Expand Down Expand Up @@ -695,6 +695,7 @@ impl NearConfig {
.state_sync
.as_ref()
.map(|x| x.dump_enabled)
.flatten()
.unwrap_or(false),
state_sync_s3_bucket: config
.state_sync
Expand All @@ -709,7 +710,7 @@ impl NearConfig {
state_sync_restart_dump_for_shards: config
.state_sync
.as_ref()
.map(|x| x.drop_state_of_dump.clone())
.map(|x| x.restart_dump_for_shards.clone())
.flatten()
.unwrap_or(vec![]),
state_sync_from_s3_enabled: config
Expand All @@ -718,12 +719,13 @@ impl NearConfig {
.map(|x| x.sync_from_s3_enabled)
.flatten()
.unwrap_or(false),
state_sync_num_s3_requests_per_shard: config
state_sync_num_concurrent_s3_requests: config
.state_sync
.as_ref()
.map(|x| x.num_s3_requests_per_shard)
.map(|x| x.num_concurrent_s3_requests)
.flatten()
.unwrap_or(100),
state_sync_enabled: config.state_sync_enabled.unwrap_or(false),
},
network_config: NetworkConfig::new(
config.network,
Expand Down Expand Up @@ -1556,14 +1558,18 @@ pub struct StateSyncConfig {
/// Whether a node should dump state of each epoch to the external storage.
#[serde(skip_serializing_if = "Option::is_none")]
pub dump_enabled: Option<bool>,
/// Use carefully in case a node that dumps state to the external storage gets in trouble.
/// Use carefully in case a node that dumps state to the external storage
/// gets in trouble.
#[serde(skip_serializing_if = "Option::is_none")]
pub drop_state_of_dump: Option<Vec<ShardId>>,
/// If enabled, will download state parts from external storage and not from the peers.
pub restart_dump_for_shards: Option<Vec<ShardId>>,
/// If enabled, will download state parts from external storage and not from
/// the peers.
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_from_s3_enabled: Option<bool>,
/// When syncing state from S3, throttle requests to this many concurrent
/// requests per shard.
#[serde(skip_serializing_if = "Option::is_none")]
pub num_s3_requests_per_shard: Option<u64>,
pub num_concurrent_s3_requests: Option<u64>,
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy<IntCounterVec> = Lazy::new(||
try_create_int_counter_vec(
"near_state_sync_dump_size_total",
"Total size of parts written to S3",
&["shard_id"],
&["epoch_height", "shard_id"],
)
.unwrap()
});
Expand Down
5 changes: 2 additions & 3 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ async fn state_sync_dump(
_node_key: PublicKey,
) {
tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop");
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(10));

if config.state_sync_restart_dump_for_shards.contains(&shard_id) {
tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress");
Expand All @@ -128,7 +127,7 @@ async fn state_sync_dump(

loop {
// Avoid a busy-loop when there is nothing to do.
interval.tick().await;
std::thread::sleep(std::time::Duration::from_secs(10));

let progress = chain.store().get_state_sync_dump_progress(shard_id);
tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration");
Expand Down Expand Up @@ -287,7 +286,7 @@ fn update_progress(
) {
// Record that a part was obtained and dumped.
metrics::STATE_SYNC_DUMP_SIZE_TOTAL
.with_label_values(&[&shard_id.to_string()])
.with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()])
.inc_by(part_len as u64);
let next_progress = StateSyncDumpProgress::InProgress {
epoch_id: epoch_id.clone(),
Expand Down

0 comments on commit 06a3e85

Please sign in to comment.