Skip to content

Commit

Permalink
fix(state-sync): Remove state_fetch_horizon because it causes nodes…
Browse files Browse the repository at this point in the history
… to crash (near#9380)

The main issue is that `find_sync_hash()` is not consistent with `check_state_needed()`.
`check_state_needed` compares epoch_id of header_head and head.
`find_sync_hash()` moves back a number of blocks before determining which epoch to sync.
It can lead to a node deciding to state sync, and downloading the same state again.

I've observed that it leads to a crash in this complicated scenario:
* State sync to epoch X
* Finalize the state sync and execute block at height H
* Check orphans and unblock a block at height H+1
* At the same time check if a node needs a state sync - determine that yes, and reset Flat Storage to prepare it to execute a block at height H.
* The orhpaned block gets executed, but flat storage doesn't support height H+1, leading to a panic.
  • Loading branch information
nikurt committed Aug 24, 2023
1 parent 9037544 commit a67dcf5
Show file tree
Hide file tree
Showing 18 changed files with 279 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 6 additions & 21 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix::Message;
use ansi_term::Color::{Purple, Yellow};
use ansi_term::Color::Purple;
use ansi_term::Style;
use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -231,31 +231,16 @@ pub fn format_shard_sync_phase(
ShardSyncStatus::StateDownloadParts => {
let mut num_parts_done = 0;
let mut num_parts_not_done = 0;
let mut text = "".to_string();
for (i, download) in shard_sync_download.downloads.iter().enumerate() {
for download in shard_sync_download.downloads.iter() {
if download.done {
num_parts_done += 1;
continue;
} else {
num_parts_not_done += 1;
}
num_parts_not_done += 1;
text.push_str(&format!(
"[{}: {}, {}, {:?}] ",
paint(&i.to_string(), Yellow.bold(), use_colour),
download.done,
download.state_requests_count,
download.last_target
));
}
format!(
"{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}",
paint("PARTS", Purple.bold(), use_colour),
paint("part_id", Yellow.bold(), use_colour),
text,
num_parts_done,
num_parts_not_done
)
format!("num_parts_done={num_parts_done} num_parts_not_done={num_parts_not_done}")
}
status => format!("{:?}", status),
status => format!("{status:?}"),
}
}

Expand Down
31 changes: 11 additions & 20 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1491,30 +1491,21 @@ impl ClientActor {
///
/// The selected block will always be the first block on a new epoch:
/// <https://github.com/nearprotocol/nearcore/issues/2021#issuecomment-583039862>.
///
/// To prevent syncing from a fork, we move `state_fetch_horizon` steps backwards and use that epoch.
/// Usually `state_fetch_horizon` is much less than the expected number of produced blocks on an epoch,
/// so this is only relevant on epoch boundaries.
fn find_sync_hash(&mut self) -> Result<CryptoHash, near_chain::Error> {
let header_head = self.client.chain.header_head()?;
let mut sync_hash = header_head.prev_block_hash;
for _ in 0..self.client.config.state_fetch_horizon {
sync_hash = *self.client.chain.get_block_header(&sync_hash)?.prev_hash();
}
let mut epoch_start_sync_hash =
let sync_hash = header_head.last_block_hash;
let epoch_start_sync_hash =
StateSync::get_epoch_start_sync_hash(&mut self.client.chain, &sync_hash)?;

if &epoch_start_sync_hash == self.client.chain.genesis().hash() {
// If we are within `state_fetch_horizon` blocks of the second epoch, the sync hash will
// be the first block of the first epoch (or, the genesis block). Due to implementation
// details of the state sync, we can't state sync to the genesis block, so redo the
// search without going back `state_fetch_horizon` blocks.
epoch_start_sync_hash = StateSync::get_epoch_start_sync_hash(
&mut self.client.chain,
&header_head.last_block_hash,
)?;
assert_ne!(&epoch_start_sync_hash, self.client.chain.genesis().hash());
}
let genesis_hash = self.client.chain.genesis().hash();
tracing::debug!(
target: "sync",
?header_head,
?sync_hash,
?epoch_start_sync_hash,
?genesis_hash,
"find_sync_hash");
assert_ne!(&epoch_start_sync_hash, genesis_hash);
Ok(epoch_start_sync_hash)
}

Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/sync/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ impl BlockSync {
&& !self.archive
&& self.state_sync_enabled
{
tracing::debug!(
target: "sync",
head_epoch_id = ?head.epoch_id,
header_head_epoch_id = ?header_head.epoch_id,
head_next_epoch_id = ?head.next_epoch_id,
head_height = head.height,
header_head_height = header_head.height,
header_head_height_sub = header_head.height.saturating_sub(self.block_fetch_horizon),
archive = self.archive,
state_sync_enabled = self.state_sync_enabled,
block_fetch_horizon = self.block_fetch_horizon,
"Switched from block sync to state sync");
// Epochs are different and we are too far from horizon, State Sync is needed
return Ok(true);
}
Expand Down
58 changes: 25 additions & 33 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use near_chain::near_chain_primitives;
use near_chain::resharding::StateSplitRequest;
use near_chain::Chain;
use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig};
use near_client_primitives::types::format_shard_sync_phase_per_shard;
use near_client_primitives::types::{
format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus,
};
Expand Down Expand Up @@ -297,9 +298,6 @@ impl StateSync {

let old_status = shard_sync_download.status.clone();
let mut shard_sync_done = false;
metrics::STATE_SYNC_STAGE
.with_label_values(&[&shard_id.to_string()])
.set(shard_sync_download.status.repr() as i64);
match &shard_sync_download.status {
ShardSyncStatus::StateDownloadHeader => {
(download_timeout, run_shard_state_download) = self
Expand All @@ -312,12 +310,8 @@ impl StateSync {
)?;
}
ShardSyncStatus::StateDownloadParts => {
let res = self.sync_shards_download_parts_status(
shard_id,
shard_sync_download,
sync_hash,
now,
);
let res =
self.sync_shards_download_parts_status(shard_id, shard_sync_download, now);
download_timeout = res.0;
run_shard_state_download = res.1;
update_sync_status |= res.2;
Expand All @@ -342,13 +336,8 @@ impl StateSync {
)?;
}
ShardSyncStatus::StateDownloadComplete => {
shard_sync_done = self.sync_shards_download_complete_status(
split_states,
shard_id,
shard_sync_download,
sync_hash,
chain,
)?;
shard_sync_done = self
.sync_shards_download_complete_status(split_states, shard_sync_download);
}
ShardSyncStatus::StateSplitScheduling => {
debug_assert!(split_states);
Expand All @@ -374,6 +363,14 @@ impl StateSync {
shard_sync_done = true;
}
}
let stage = if shard_sync_done {
// Update the state sync stage metric, because maybe we'll not
// enter this function again.
ShardSyncStatus::StateSyncDone.repr()
} else {
shard_sync_download.status.repr()
};
metrics::STATE_SYNC_STAGE.with_label_values(&[&shard_id.to_string()]).set(stage as i64);
all_done &= shard_sync_done;

if download_timeout {
Expand Down Expand Up @@ -407,6 +404,13 @@ impl StateSync {
}
update_sync_status |= shard_sync_download.status != old_status;
}
if update_sync_status {
// Print debug messages only if something changed.
// Otherwise it spams the debug logs.
tracing::debug!(
target: "sync",
progress_per_shard = ?format_shard_sync_phase_per_shard(new_shard_sync, false));
}

Ok((update_sync_status, all_done))
}
Expand Down Expand Up @@ -951,7 +955,6 @@ impl StateSync {
&mut self,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
sync_hash: CryptoHash,
now: DateTime<Utc>,
) -> (bool, bool, bool) {
// Step 2 - download all the parts (each part is usually around 1MB).
Expand Down Expand Up @@ -991,7 +994,6 @@ impl StateSync {
num_parts_done += 1;
}
}
tracing::debug!(target: "sync", %shard_id, %sync_hash, num_parts_done, parts_done);
metrics::STATE_SYNC_PARTS_DONE
.with_label_values(&[&shard_id.to_string()])
.set(num_parts_done);
Expand Down Expand Up @@ -1058,8 +1060,7 @@ impl StateSync {
) -> Result<(), near_chain::Error> {
// Keep waiting until our shard is on the list of results
// (these are set via callback from ClientActor - both for sync and catchup).
let result = self.state_parts_apply_results.remove(&shard_id);
if let Some(result) = result {
if let Some(result) = self.state_parts_apply_results.remove(&shard_id) {
match chain.set_state_finalize(shard_id, sync_hash, result) {
Ok(()) => {
*shard_sync_download = ShardSyncDownload {
Expand Down Expand Up @@ -1088,30 +1089,21 @@ impl StateSync {
fn sync_shards_download_complete_status(
&mut self,
split_states: bool,
shard_id: ShardId,
shard_sync_download: &mut ShardSyncDownload,
sync_hash: CryptoHash,
chain: &mut Chain,
) -> Result<bool, near_chain::Error> {
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
let state_num_parts =
get_num_state_parts(shard_state_header.state_root_node().memory_usage);
chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?;

let mut shard_sync_done = false;
) -> bool {
// If the shard layout is changing in this epoch - we have to apply it right now.
if split_states {
*shard_sync_download = ShardSyncDownload {
downloads: vec![],
status: ShardSyncStatus::StateSplitScheduling,
}
};
false
} else {
// If there is no layout change - we're done.
*shard_sync_download =
ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSyncDone };
shard_sync_done = true;
true
}
Ok(shard_sync_done)
}

fn sync_shards_state_split_scheduling_status(
Expand Down
Empty file.
3 changes: 0 additions & 3 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ pub struct ClientConfig {
pub ttl_account_id_router: Duration,
/// Horizon at which instead of fetching block, fetch full state.
pub block_fetch_horizon: BlockHeightDelta,
/// Horizon to step from the latest block when fetching state.
pub state_fetch_horizon: NumBlocks,
/// Time between check to perform catchup.
pub catchup_step_period: Duration,
/// Time between checking to re-request chunks.
Expand Down Expand Up @@ -318,7 +316,6 @@ impl ClientConfig {
num_block_producer_seats,
ttl_account_id_router: Duration::from_secs(60 * 60),
block_fetch_horizon: 50,
state_fetch_horizon: 5,
catchup_step_period: Duration::from_millis(1),
chunk_request_retry_period: min(
Duration::from_millis(100),
Expand Down
15 changes: 13 additions & 2 deletions core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,18 @@ fn add_simple_log_layer<S, W>(
filter: EnvFilter,
writer: W,
ansi: bool,
with_span_events: bool,
subscriber: S,
) -> SimpleLogLayer<S, W>
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
W: for<'writer> fmt::MakeWriter<'writer> + 'static,
{
let layer = fmt::layer().with_ansi(ansi).with_writer(writer).with_filter(filter);
let layer = fmt::layer()
.with_ansi(ansi)
.with_span_events(get_fmt_span(with_span_events))
.with_writer(writer)
.with_filter(filter);

subscriber.with(layer)
}
Expand Down Expand Up @@ -337,7 +342,13 @@ pub fn default_subscriber(
};

let subscriber = tracing_subscriber::registry();
let subscriber = add_simple_log_layer(env_filter, make_writer, color_output, subscriber);
let subscriber = add_simple_log_layer(
env_filter,
make_writer,
color_output,
options.log_span_events,
subscriber,
);

#[allow(unused_mut)]
let mut io_trace_guard = None;
Expand Down
3 changes: 0 additions & 3 deletions integration-tests/src/tests/nearcore/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,6 @@ fn sync_empty_state() {
Duration::from_millis(200);
near2.client_config.max_block_production_delay =
Duration::from_millis(400);
near2.client_config.state_fetch_horizon =
state_sync_horizon;
near2.client_config.block_header_fetch_horizon =
block_header_fetch_horizon;
near2.client_config.block_fetch_horizon =
Expand Down Expand Up @@ -482,7 +480,6 @@ fn sync_state_dump() {
Duration::from_millis(300);
near2.client_config.max_block_production_delay =
Duration::from_millis(600);
near2.client_config.state_fetch_horizon = state_sync_horizon;
near2.client_config.block_header_fetch_horizon =
block_header_fetch_horizon;
near2.client_config.block_fetch_horizon = block_fetch_horizon;
Expand Down
1 change: 0 additions & 1 deletion nearcore/res/example-config-gc.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
},
"produce_empty_blocks": true,
"block_fetch_horizon": 50,
"state_fetch_horizon": 5,
"block_header_fetch_horizon": 50,
"catchup_step_period": {
"secs": 0,
Expand Down
1 change: 0 additions & 1 deletion nearcore/res/example-config-no-gc.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
},
"produce_empty_blocks": true,
"block_fetch_horizon": 50,
"state_fetch_horizon": 5,
"block_header_fetch_horizon": 50,
"catchup_step_period": {
"secs": 0,
Expand Down
Loading

0 comments on commit a67dcf5

Please sign in to comment.