Skip to content

Commit

Permalink
fix(nayduck): State sync skip state dump (#10865)
Browse files Browse the repository at this point in the history
This addresses `sanity/state_sync_fail.py` and
`sanity/gc_after_sync.py`.

`sanity/state_sync_fail.py`- the node was trying to dump after the shard
layout change.
`sanity/gc_after_sync.py` did not have state sync configured and the
epoch was to short for the node to sync to the right epoch.
  • Loading branch information
VanBarbascu authored Mar 28, 2024
1 parent f98f3d9 commit 6a45ef9
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 33 deletions.
2 changes: 2 additions & 0 deletions core/primitives/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ pub enum StateSyncDumpProgress {
epoch_id: EpochId,
epoch_height: EpochHeight,
},
/// * An epoch dump is skipped in the epoch where shard layout changes
Skipped { epoch_id: EpochId, epoch_height: EpochHeight },
/// Represents the case of an epoch being partially dumped.
InProgress {
/// The dumped state corresponds to the state at the beginning of the specified epoch.
Expand Down
83 changes: 55 additions & 28 deletions nearcore/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,40 +202,59 @@ fn select_random_part_id_with_index(parts_to_be_dumped: &Vec<u64>) -> (u64, usiz
(selected_element, selected_idx)
}

enum StateDumpAction {
Wait,
Dump { epoch_id: EpochId, epoch_height: EpochHeight, sync_hash: CryptoHash },
}

fn get_current_state(
chain: &Chain,
shard_id: &ShardId,
shard_tracker: &ShardTracker,
account_id: &Option<AccountId>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Result<Option<(EpochId, EpochHeight, CryptoHash)>, Error> {
let was_last_epoch_dumped = match chain.chain_store().get_state_sync_dump_progress(*shard_id) {
) -> Result<StateDumpAction, Error> {
let was_last_epoch_done = match chain.chain_store().get_state_sync_dump_progress(*shard_id) {
Ok(StateSyncDumpProgress::AllDumped { epoch_id, .. }) => Some(epoch_id),
Ok(StateSyncDumpProgress::Skipped { epoch_id, .. }) => Some(epoch_id),
_ => None,
};

match get_latest_epoch(shard_id, &chain, epoch_manager) {
Err(err) => {
tracing::debug!(target: "state_sync_dump", shard_id, ?err, "check_latest_epoch failed. Will retry.");
Err(err)
}
Ok((new_epoch_id, new_epoch_height, new_sync_hash)) => {
if Some(&new_epoch_id) == was_last_epoch_dumped.as_ref() {
tracing::debug!(target: "state_sync_dump", shard_id, ?was_last_epoch_dumped, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "latest epoch is all dumped. No new epoch to dump. Idle");
Ok(None)
} else if cares_about_shard(
chain,
shard_id,
&new_sync_hash,
&shard_tracker,
&account_id,
)? {
Ok(Some((new_epoch_id, new_epoch_height, new_sync_hash)))
} else {
tracing::debug!(target: "state_sync_dump", shard_id, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Doesn't care about the shard in the current epoch. Idle");
Ok(None)
}
}
let latest_epoch_info = get_latest_epoch(shard_id, &chain, epoch_manager.clone());
let LatestEpochInfo {
prev_epoch_id,
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
sync_hash: new_sync_hash,
} = latest_epoch_info.map_err(|err| {
tracing::error!(target: "state_sync_dump", shard_id, ?err, "Failed to get the latest epoch");
err
})?;

if Some(&new_epoch_id) == was_last_epoch_done.as_ref() {
tracing::debug!(target: "state_sync_dump", shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "latest epoch is done. No new epoch to dump. Idle");
Ok(StateDumpAction::Wait)
} else if epoch_manager.get_shard_layout(&prev_epoch_id)
!= epoch_manager.get_shard_layout(&new_epoch_id)
{
tracing::debug!(target: "state_sync_dump", shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Shard layout change detected, will skip dumping for this epoch. Idle");
chain.chain_store().set_state_sync_dump_progress(
*shard_id,
Some(StateSyncDumpProgress::Skipped {
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
}),
)?;
Ok(StateDumpAction::Wait)
} else if cares_about_shard(chain, shard_id, &new_sync_hash, &shard_tracker, &account_id)? {
Ok(StateDumpAction::Dump {
epoch_id: new_epoch_id,
epoch_height: new_epoch_height,
sync_hash: new_sync_hash,
})
} else {
tracing::debug!(target: "state_sync_dump", shard_id, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "Doesn't care about the shard in the current epoch. Idle");
Ok(StateDumpAction::Wait)
}
}

Expand Down Expand Up @@ -310,8 +329,8 @@ async fn state_sync_dump(
tracing::error!(target: "state_sync_dump", ?err, ?shard_id, "Failed to get the current state");
None
}
Ok(None) => None,
Ok(Some((epoch_id, epoch_height, sync_hash))) => {
Ok(StateDumpAction::Wait) => None,
Ok(StateDumpAction::Dump { epoch_id, epoch_height, sync_hash }) => {
let in_progress_data = get_in_progress_data(shard_id, sync_hash, &chain);
match in_progress_data {
Err(err) => {
Expand Down Expand Up @@ -589,12 +608,19 @@ fn cares_about_shard(
Ok(shard_tracker.care_about_shard(account_id.as_ref(), sync_prev_hash, *shard_id, true))
}

struct LatestEpochInfo {
prev_epoch_id: EpochId,
epoch_id: EpochId,
epoch_height: EpochHeight,
sync_hash: CryptoHash,
}

/// return epoch_id and sync_hash of the latest complete epoch available locally.
fn get_latest_epoch(
shard_id: &ShardId,
chain: &Chain,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Result<(EpochId, EpochHeight, CryptoHash), Error> {
) -> Result<LatestEpochInfo, Error> {
let head = chain.head()?;
tracing::debug!(target: "state_sync_dump", shard_id, "Check if a new complete epoch is available");
let hash = head.last_block_hash;
Expand All @@ -604,8 +630,9 @@ fn get_latest_epoch(
let final_block_header = chain.get_block_header(&final_hash)?;
let epoch_id = final_block_header.epoch_id().clone();
let epoch_info = epoch_manager.get_epoch_info(&epoch_id)?;
let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(&head.prev_block_hash)?;
let epoch_height = epoch_info.epoch_height();
tracing::debug!(target: "state_sync_dump", ?final_hash, ?sync_hash, ?epoch_id, epoch_height, "get_latest_epoch");

Ok((epoch_id, epoch_height, sync_hash))
Ok(LatestEpochInfo { prev_epoch_id, epoch_id, epoch_height, sync_hash })
}
2 changes: 1 addition & 1 deletion pytest/tests/sanity/gc_after_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import state_sync_lib
import utils

EPOCH_LENGTH = 20
EPOCH_LENGTH = 50
TARGET_HEIGHT = int(EPOCH_LENGTH * 2.5)
AFTER_SYNC_HEIGHT = EPOCH_LENGTH * 10
TIMEOUT = 300
Expand Down
11 changes: 7 additions & 4 deletions pytest/tests/sanity/gc_after_sync1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,27 @@
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import start_cluster
import state_sync_lib
from configured_logger import logger
import utils

EPOCH_LENGTH = 20
EPOCH_LENGTH = 30
TARGET_HEIGHT1 = EPOCH_LENGTH + (EPOCH_LENGTH // 2)
TARGET_HEIGHT2 = EPOCH_LENGTH * 3 + (EPOCH_LENGTH // 2)
TARGET_HEIGHT3 = EPOCH_LENGTH * 10 + (EPOCH_LENGTH // 2)

node0_config = {"gc_blocks_limit": 10}
node0_config, node1_config = state_sync_lib.get_state_sync_configs_pair()

node1_config = {
node0_config.update({"gc_blocks_limit": 10})

node1_config.update({
"consensus": {
"block_fetch_horizon": 10,
"block_header_fetch_horizon": 10,
},
"tracked_shards": [0],
"gc_blocks_limit": 10,
}
})

nodes = start_cluster(
1, 1, 1, None,
Expand Down

0 comments on commit 6a45ef9

Please sign in to comment.