Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state-sync): Remove state_fetch_horizon because it causes nodes to crash #9380

Merged
merged 19 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 6 additions & 21 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix::Message;
use ansi_term::Color::{Purple, Yellow};
use ansi_term::Color::Purple;
use ansi_term::Style;
use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -231,31 +231,16 @@ pub fn format_shard_sync_phase(
ShardSyncStatus::StateDownloadParts => {
let mut num_parts_done = 0;
let mut num_parts_not_done = 0;
let mut text = "".to_string();
for (i, download) in shard_sync_download.downloads.iter().enumerate() {
for download in shard_sync_download.downloads.iter() {
if download.done {
num_parts_done += 1;
continue;
} else {
num_parts_not_done += 1;
}
num_parts_not_done += 1;
text.push_str(&format!(
"[{}: {}, {}, {:?}] ",
paint(&i.to_string(), Yellow.bold(), use_colour),
download.done,
download.state_requests_count,
download.last_target
));
}
format!(
"{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}",
paint("PARTS", Purple.bold(), use_colour),
paint("part_id", Yellow.bold(), use_colour),
text,
num_parts_done,
num_parts_not_done
)
format!("num_parts_done={num_parts_done} num_parts_not_done={num_parts_not_done}")
}
status => format!("{:?}", status),
status => format!("{status:?}"),
}
}

Expand Down
30 changes: 10 additions & 20 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1465,30 +1465,20 @@ impl ClientActor {
///
/// The selected block will always be the first block on a new epoch:
/// <https://github.com/nearprotocol/nearcore/issues/2021#issuecomment-583039862>.
///
/// To prevent syncing from a fork, we move `state_fetch_horizon` steps backwards and use that epoch.
/// Usually `state_fetch_horizon` is much less than the expected number of produced blocks on an epoch,
/// so this is only relevant on epoch boundaries.
fn find_sync_hash(&mut self) -> Result<CryptoHash, near_chain::Error> {
let header_head = self.client.chain.header_head()?;
let mut sync_hash = header_head.prev_block_hash;
for _ in 0..self.client.config.state_fetch_horizon {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
sync_hash = *self.client.chain.get_block_header(&sync_hash)?.prev_hash();
}
let mut epoch_start_sync_hash =
let sync_hash = header_head.last_block_hash;
let epoch_start_sync_hash =
StateSync::get_epoch_start_sync_hash(&mut self.client.chain, &sync_hash)?;

if &epoch_start_sync_hash == self.client.chain.genesis().hash() {
// If we are within `state_fetch_horizon` blocks of the second epoch, the sync hash will
// be the first block of the first epoch (or, the genesis block). Due to implementation
// details of the state sync, we can't state sync to the genesis block, so redo the
// search without going back `state_fetch_horizon` blocks.
epoch_start_sync_hash = StateSync::get_epoch_start_sync_hash(
&mut self.client.chain,
&header_head.last_block_hash,
)?;
assert_ne!(&epoch_start_sync_hash, self.client.chain.genesis().hash());
}
tracing::debug!(
target: "sync",
?header_head,
?sync_hash,
?epoch_start_sync_hash,
genesis_hash = ?self.client.chain.genesis().hash(),
nikurt marked this conversation as resolved.
Show resolved Hide resolved
"find_sync_hash");
assert_ne!(&epoch_start_sync_hash, self.client.chain.genesis().hash());
Ok(epoch_start_sync_hash)
}

Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/sync/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ impl BlockSync {
&& !self.archive
&& self.state_sync_enabled
{
tracing::debug!(
target: "sync",
head_epoch_id = ?head.epoch_id,
header_head_epoch_id = ?header_head.epoch_id,
head_next_epoch_id = ?head.next_epoch_id,
head_height = head.height,
header_head_height = header_head.height,
header_head_height_sub = header_head.height.saturating_sub(self.block_fetch_horizon),
archive = self.archive,
state_sync_enabled = self.state_sync_enabled,
block_fetch_horizon = self.block_fetch_horizon,
"Switched from block sync to state sync");
// Epochs are different and we are too far from horizon, State Sync is needed
return Ok(true);
}
Expand Down
58 changes: 25 additions & 33 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use near_chain::near_chain_primitives;
use near_chain::resharding::StateSplitRequest;
use near_chain::Chain;
use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig};
use near_client_primitives::types::format_shard_sync_phase_per_shard;
use near_client_primitives::types::{
format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus,
};
Expand Down Expand Up @@ -297,9 +298,6 @@ impl StateSync {

let old_status = shard_sync_download.status.clone();
let mut shard_sync_done = false;
metrics::STATE_SYNC_STAGE
.with_label_values(&[&shard_id.to_string()])
.set(shard_sync_download.status.repr() as i64);
match &shard_sync_download.status {
ShardSyncStatus::StateDownloadHeader => {
(download_timeout, run_shard_state_download) = self
Expand All @@ -312,12 +310,8 @@ impl StateSync {
)?;
}
ShardSyncStatus::StateDownloadParts => {
let res = self.sync_shards_download_parts_status(
shard_id,
shard_sync_download,
sync_hash,
now,
);
let res =
self.sync_shards_download_parts_status(shard_id, shard_sync_download, now);
download_timeout = res.0;
run_shard_state_download = res.1;
update_sync_status |= res.2;
Expand All @@ -342,13 +336,8 @@ impl StateSync {
)?;
}
ShardSyncStatus::StateDownloadComplete => {
shard_sync_done = self.sync_shards_download_complete_status(
split_states,
shard_id,
shard_sync_download,
sync_hash,
chain,
)?;
shard_sync_done = self
.sync_shards_download_complete_status(split_states, shard_sync_download);
}
ShardSyncStatus::StateSplitScheduling => {
debug_assert!(split_states);
Expand All @@ -374,6 +363,14 @@ impl StateSync {
shard_sync_done = true;
}
}
let stage = if shard_sync_done {
// Update the state sync stage metric, because maybe we'll not
// enter this function again.
ShardSyncStatus::StateSyncDone.repr()
} else {
shard_sync_download.status.repr()
};
metrics::STATE_SYNC_STAGE.with_label_values(&[&shard_id.to_string()]).set(stage as i64);
all_done &= shard_sync_done;

if download_timeout {
Expand Down Expand Up @@ -407,6 +404,13 @@ impl StateSync {
}
update_sync_status |= shard_sync_download.status != old_status;
}
if update_sync_status {
// Print debug messages only if something changed.
// Otherwise it spams the debug logs.
tracing::debug!(
target: "sync",
progress_per_shard = ?format_shard_sync_phase_per_shard(new_shard_sync, false));
}

Ok((update_sync_status, all_done))
}
Expand Down Expand Up @@ -951,7 +955,6 @@ impl StateSync {
&mut self,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
sync_hash: CryptoHash,
now: DateTime<Utc>,
) -> (bool, bool, bool) {
// Step 2 - download all the parts (each part is usually around 1MB).
Expand Down Expand Up @@ -991,7 +994,6 @@ impl StateSync {
num_parts_done += 1;
}
}
tracing::debug!(target: "sync", %shard_id, %sync_hash, num_parts_done, parts_done);
metrics::STATE_SYNC_PARTS_DONE
.with_label_values(&[&shard_id.to_string()])
.set(num_parts_done);
Expand Down Expand Up @@ -1058,8 +1060,7 @@ impl StateSync {
) -> Result<(), near_chain::Error> {
// Keep waiting until our shard is on the list of results
// (these are set via callback from ClientActor - both for sync and catchup).
let result = self.state_parts_apply_results.remove(&shard_id);
if let Some(result) = result {
if let Some(result) = self.state_parts_apply_results.remove(&shard_id) {
match chain.set_state_finalize(shard_id, sync_hash, result) {
Ok(()) => {
*shard_sync_download = ShardSyncDownload {
Expand Down Expand Up @@ -1088,30 +1089,21 @@ impl StateSync {
fn sync_shards_download_complete_status(
&mut self,
split_states: bool,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
sync_hash: CryptoHash,
chain: &mut Chain,
) -> Result<bool, near_chain::Error> {
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
let state_num_parts =
get_num_state_parts(shard_state_header.state_root_node().memory_usage);
chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?;

let mut shard_sync_done = false;
) -> bool {
// If the shard layout is changing in this epoch - we have to apply it right now.
if split_states {
*shard_sync_download = ShardSyncDownload {
downloads: vec![],
status: ShardSyncStatus::StateSplitScheduling,
}
};
false
} else {
// If there is no layout change - we're done.
*shard_sync_download =
ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSyncDone };
shard_sync_done = true;
true
}
Ok(shard_sync_done)
}

fn sync_shards_state_split_scheduling_status(
Expand Down
Empty file.
3 changes: 0 additions & 3 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ pub struct ClientConfig {
pub ttl_account_id_router: Duration,
/// Horizon at which instead of fetching block, fetch full state.
pub block_fetch_horizon: BlockHeightDelta,
/// Horizon to step from the latest block when fetching state.
pub state_fetch_horizon: NumBlocks,
/// Time between check to perform catchup.
pub catchup_step_period: Duration,
/// Time between checking to re-request chunks.
Expand Down Expand Up @@ -318,7 +316,6 @@ impl ClientConfig {
num_block_producer_seats,
ttl_account_id_router: Duration::from_secs(60 * 60),
block_fetch_horizon: 50,
state_fetch_horizon: 5,
catchup_step_period: Duration::from_millis(1),
chunk_request_retry_period: min(
Duration::from_millis(100),
Expand Down
15 changes: 13 additions & 2 deletions core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,18 @@ fn add_simple_log_layer<S, W>(
filter: EnvFilter,
writer: W,
ansi: bool,
with_span_events: bool,
subscriber: S,
) -> SimpleLogLayer<S, W>
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
W: for<'writer> fmt::MakeWriter<'writer> + 'static,
{
let layer = fmt::layer().with_ansi(ansi).with_writer(writer).with_filter(filter);
let layer = fmt::layer()
.with_ansi(ansi)
.with_span_events(get_fmt_span(with_span_events))
.with_writer(writer)
.with_filter(filter);

subscriber.with(layer)
}
Expand Down Expand Up @@ -337,7 +342,13 @@ pub fn default_subscriber(
};

let subscriber = tracing_subscriber::registry();
let subscriber = add_simple_log_layer(env_filter, make_writer, color_output, subscriber);
let subscriber = add_simple_log_layer(
env_filter,
make_writer,
color_output,
options.log_span_events,
subscriber,
);

#[allow(unused_mut)]
let mut io_trace_guard = None;
Expand Down
3 changes: 0 additions & 3 deletions integration-tests/src/tests/nearcore/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,6 @@ fn sync_empty_state() {
Duration::from_millis(200);
near2.client_config.max_block_production_delay =
Duration::from_millis(400);
near2.client_config.state_fetch_horizon =
state_sync_horizon;
near2.client_config.block_header_fetch_horizon =
block_header_fetch_horizon;
near2.client_config.block_fetch_horizon =
Expand Down Expand Up @@ -482,7 +480,6 @@ fn sync_state_dump() {
Duration::from_millis(300);
near2.client_config.max_block_production_delay =
Duration::from_millis(600);
near2.client_config.state_fetch_horizon = state_sync_horizon;
near2.client_config.block_header_fetch_horizon =
block_header_fetch_horizon;
near2.client_config.block_fetch_horizon = block_fetch_horizon;
Expand Down
1 change: 0 additions & 1 deletion nearcore/res/example-config-gc.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
},
"produce_empty_blocks": true,
"block_fetch_horizon": 50,
"state_fetch_horizon": 5,
"block_header_fetch_horizon": 50,
"catchup_step_period": {
"secs": 0,
Expand Down
1 change: 0 additions & 1 deletion nearcore/res/example-config-no-gc.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
},
"produce_empty_blocks": true,
"block_fetch_horizon": 50,
"state_fetch_horizon": 5,
"block_header_fetch_horizon": 50,
"catchup_step_period": {
"secs": 0,
Expand Down
Loading