From ea77618b05b30bb3eb59df14aadce7202da324df Mon Sep 17 00:00:00 2001 From: wacban Date: Mon, 13 Nov 2023 17:10:26 +0000 Subject: [PATCH 1/4] feat(resharding): implemented error handling and test for it --- chain/chain/src/chain.rs | 2 - chain/chain/src/metrics.rs | 3 + chain/chain/src/resharding.rs | 47 +++-- chain/client/src/client.rs | 1 + chain/client/src/sync/state.rs | 3 +- chain/client/src/sync_jobs_actor.rs | 25 ++- core/chain-configs/src/client_config.rs | 14 ++ pytest/lib/cluster.py | 23 +++ pytest/lib/utils.py | 35 ++-- pytest/tests/sanity/resharding.py | 5 +- .../tests/sanity/resharding_error_handling.py | 165 ++++++++++++++++++ tools/database/src/commands.rs | 5 + tools/database/src/corrupt.rs | 39 +++++ tools/database/src/lib.rs | 1 + tools/database/src/utils.rs | 27 ++- 15 files changed, 356 insertions(+), 39 deletions(-) create mode 100644 pytest/tests/sanity/resharding_error_handling.py create mode 100644 tools/database/src/corrupt.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 22c9183c3c8..b74aa4bd8e2 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2146,8 +2146,6 @@ impl Chain { "start_process_block_impl", height = block_height) .entered(); - - tracing::debug!(target: "chain", "start process block"); // 0) Before we proceed with any further processing, we first check that the block // hash and signature matches to make sure the block is indeed produced by the assigned // block producer. If not, we drop the block immediately diff --git a/chain/chain/src/metrics.rs b/chain/chain/src/metrics.rs index 57f3c3ff39d..b3163bde6a8 100644 --- a/chain/chain/src/metrics.rs +++ b/chain/chain/src/metrics.rs @@ -148,6 +148,8 @@ pub(crate) enum ReshardingStatus { BuildingState, /// The resharding is finished. Finished, + /// The resharding failed. Manual recovery is necessary! + Failed, } impl From for i64 { @@ -158,6 +160,7 @@ impl From for i64 { ReshardingStatus::Scheduled => 0, ReshardingStatus::BuildingState => 1, ReshardingStatus::Finished => 2, + ReshardingStatus::Failed => -1, } } } diff --git a/chain/chain/src/resharding.rs b/chain/chain/src/resharding.rs index 820f68822b5..6f318cd25c0 100644 --- a/chain/chain/src/resharding.rs +++ b/chain/chain/src/resharding.rs @@ -31,8 +31,6 @@ use std::sync::Arc; use std::time::Duration; use tracing::debug; -const MAX_RESHARDING_POLL_TIME: Duration = Duration::from_secs(5 * 60 * 60); // 5 hrs - /// StateSplitRequest has all the information needed to start a resharding job. This message is sent /// from ClientActor to SyncJobsActor. We do not want to stall the ClientActor with a long running /// resharding job. The SyncJobsActor is helpful for handling such long running jobs. @@ -68,7 +66,8 @@ impl Debug for StateSplitRequest { .field("prev_prev_hash", &self.prev_prev_hash) .field("shard_uid", &self.shard_uid) .field("state_root", &self.state_root) - .field("next_epoch_shard_layout", &self.next_epoch_shard_layout) + .field("next_epoch_shard_layout_version", &self.next_epoch_shard_layout.version()) + .field("curr_poll_time", &self.curr_poll_time) .finish() } } @@ -200,6 +199,7 @@ impl Chain { shard_id: ShardId, state_split_scheduler: &dyn Fn(StateSplitRequest), ) -> Result<(), Error> { + tracing::debug!(target: "resharding", ?shard_id, ?sync_hash, "preprocessing started"); let block_header = self.get_block_header(sync_hash)?; let shard_layout = self.epoch_manager.get_shard_layout(block_header.epoch_id())?; let next_epoch_shard_layout = @@ -233,26 +233,47 @@ impl Chain { /// Function to check whether the snapshot is ready for resharding or not. We return true if the snapshot is not /// ready and we need to retry/reschedule the resharding job. pub fn retry_build_state_for_split_shards(state_split_request: &StateSplitRequest) -> bool { - let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, .. } = state_split_request; - // Do not retry if we have spent more than MAX_RESHARDING_POLL_TIME + let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, config, .. } = + state_split_request; + + // Do not retry if we have spent more than max_poll_time // The error would be caught in build_state_for_split_shards and propagated to client actor - if curr_poll_time > &MAX_RESHARDING_POLL_TIME { + if curr_poll_time > &config.max_poll_time { + tracing::warn!(target: "resharding", ?curr_poll_time, ?config.max_poll_time, "exceeded max poll time while waiting for snapsthot"); return false; } - tries.get_state_snapshot(prev_prev_hash).is_err_and(|err| match err { - SnapshotError::SnapshotNotFound(_) => true, - SnapshotError::LockWouldBlock => true, - SnapshotError::IncorrectSnapshotRequested(_, _) => false, - SnapshotError::Other(_) => false, - }) + + let state_snapshot = tries.get_state_snapshot(prev_prev_hash); + if let Err(err) = state_snapshot { + tracing::debug!(target: "resharding", ?err, "state snapshot is not ready"); + return match err { + SnapshotError::SnapshotNotFound(_) => true, + SnapshotError::LockWouldBlock => true, + SnapshotError::IncorrectSnapshotRequested(_, _) => false, + SnapshotError::Other(_) => false, + }; + } + + // The snapshot is Ok, no need to retry. + return false; } pub fn build_state_for_split_shards( state_split_request: StateSplitRequest, ) -> StateSplitResponse { - let shard_id = state_split_request.shard_uid.shard_id(); + let shard_uid = state_split_request.shard_uid; + let shard_id = shard_uid.shard_id(); let sync_hash = state_split_request.sync_hash; let new_state_roots = Self::build_state_for_split_shards_impl(state_split_request); + match &new_state_roots { + Ok(_) => {} + Err(err) => { + tracing::error!(target: "resharding", ?err, "Resharding failed, manual recovery is necessary!"); + RESHARDING_STATUS + .with_label_values(&[&shard_uid.to_string()]) + .set(ReshardingStatus::Failed.into()); + } + } StateSplitResponse { shard_id, sync_hash, new_state_roots } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index ea793cbeff1..e489b59c65d 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2388,6 +2388,7 @@ impl Client { let (state_sync, shards_to_split, blocks_catch_up_state) = self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { + tracing::debug!(target: "client", ?sync_hash, "inserting new state sync"); notify_state_sync = true; ( StateSync::new( diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 32aab9dde6c..809b4eac222 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -1014,6 +1014,7 @@ impl StateSync { ) -> bool { // If the shard layout is changing in this epoch - we have to apply it right now. if split_states { + tracing::debug!(target: "sync", "scheduling resharding after download"); *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitScheduling, @@ -1041,7 +1042,7 @@ impl StateSync { shard_id, state_split_scheduler, )?; - tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "State sync split scheduled"); + tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "resharding scheduled"); *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitApplying }; Ok(()) diff --git a/chain/client/src/sync_jobs_actor.rs b/chain/client/src/sync_jobs_actor.rs index 2b36b3f1231..b4aac92a1bf 100644 --- a/chain/client/src/sync_jobs_actor.rs +++ b/chain/client/src/sync_jobs_actor.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::ClientActor; use actix::AsyncContext; @@ -155,18 +157,29 @@ impl actix::Handler> for SyncJobsActor { msg: WithSpanContext, context: &mut Self::Context, ) -> Self::Result { - let (_span, mut state_split_request) = handler_debug_span!(target: "client", msg); + let (_span, mut state_split_request) = handler_debug_span!(target: "resharding", msg); + + // Wait for the initial delay. It should only be used in tests. + let initial_delay = state_split_request.config.initial_delay; + if state_split_request.curr_poll_time == Duration::ZERO && initial_delay > Duration::ZERO { + tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "initial delay"); + state_split_request.curr_poll_time += initial_delay; + context.notify_later(state_split_request.with_span_context(), initial_delay); + return; + } + if Chain::retry_build_state_for_split_shards(&state_split_request) { // Actix implementation let's us send message to ourselves with a delay. // In case snapshots are not ready yet, we will retry resharding later. - tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later"); let retry_delay = state_split_request.config.retry_delay; + tracing::debug!(target: "resharding", ?state_split_request, ?retry_delay, "Snapshot missing, retrying resharding later"); state_split_request.curr_poll_time += retry_delay; context.notify_later(state_split_request.with_span_context(), retry_delay); - } else { - tracing::debug!(target: "client", ?state_split_request, "Starting resharding"); - let response = Chain::build_state_for_split_shards(state_split_request); - self.client_addr.do_send(response.with_span_context()); + return; } + + tracing::debug!(target: "resharding", ?state_split_request, "Starting resharding"); + let response = Chain::build_state_for_split_shards(state_split_request); + self.client_addr.do_send(response.with_span_context()); } } diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 8b7a639685e..a85c0f62e39 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -177,6 +177,15 @@ pub struct StateSplitConfig { /// The delay between attempts to start resharding while waiting for the /// state snapshot to become available. pub retry_delay: Duration, + + /// The delay between the resharding request is received and when the actor + /// actually starts working on it. This delay should only be used in tests. + pub initial_delay: Duration, + + /// The maximum time that the actor will wait for the snapshot to be ready, + /// before starting resharding. Do not wait indefinitely since we want to + /// report error early enough for the node maintainer to have time to recover. + pub max_poll_time: Duration, } impl Default for StateSplitConfig { @@ -187,6 +196,11 @@ impl Default for StateSplitConfig { batch_size: bytesize::ByteSize::kb(500), batch_delay: Duration::from_millis(100), retry_delay: Duration::from_secs(10), + initial_delay: Duration::from_secs(0), + // The snapshot typically is available within a minute from the + // epoch start. Set the default higher in case we need to wait for + // state sync. + max_poll_time: Duration::from_secs(2 * 60 * 60), // 2 hours } } } diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index b685c9f313c..38bebf3d055 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -825,6 +825,7 @@ def apply_config_changes(node_dir, client_config_change): 'consensus.block_fetch_horizon', 'consensus.min_block_production_delay', 'consensus.state_sync_timeout', + 'expected_shutdown', 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', @@ -982,3 +983,25 @@ def get_binary_protocol_version(config) -> typing.Optional[int]: if tokens[i] == "protocol" and i + 1 < n: return int(tokens[i + 1]) return None + + +def corrupt_state_snapshot(config, node_dir): + near_root = config['near_root'] + binary_name = config.get('binary_name', 'neard') + binary_path = os.path.join(near_root, binary_name) + + cmd = [ + binary_path, + "--home", + node_dir, + "database", + "corrupt-state-snapshot", + ] + + env = os.environ.copy() + env["RUST_BACKTRACE"] = "1" + env["RUST_LOG"] = "db=warn,db_opener=warn," + env.get("RUST_LOG", "debug") + + out = subprocess.check_output(cmd, text=True, env=env) + + return out diff --git a/pytest/lib/utils.py b/pytest/lib/utils.py index 3c087a2791c..bae0d3b5c09 100644 --- a/pytest/lib/utils.py +++ b/pytest/lib/utils.py @@ -144,25 +144,32 @@ def get_all_metrics(self) -> str: f"Could not fetch metrics from {self.addr}: {response}") return response.content.decode('utf-8') + def get_metric_all_values( + self, metric_name: str) -> typing.List[typing.Tuple[str, str]]: + for family in text_string_to_metric_families(self.get_all_metrics()): + if family.name == metric_name: + return [ + (sample.labels, sample.value) for sample in family.samples + ] + return [] + def get_metric_value( self, metric_name: str, labels: typing.Optional[typing.Dict[str, str]] = None ) -> typing.Optional[str]: - for family in text_string_to_metric_families(self.get_all_metrics()): - if family.name == metric_name: - all_samples = [sample for sample in family.samples] - if not labels: - if len(all_samples) > 1: - raise AssertionError( - f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label" - ) - if not all_samples: - return None - return all_samples[0].value - for sample in all_samples: - if sample.labels == labels: - return sample.value + all_samples = self.get_metric_all_values(metric_name) + if not labels: + if len(all_samples) > 1: + raise AssertionError( + f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label" + ) + if not all_samples: + return None + return all_samples[0].value + for (sample_labels, sample_value) in all_samples: + if sample_labels == labels: + return sample_value return None def get_int_metric_value( diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py index 97af3b0765c..ab64e8baefb 100644 --- a/pytest/tests/sanity/resharding.py +++ b/pytest/tests/sanity/resharding.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 - +# # Small test for resharding. Spins up a few nodes from genesis with the previous # shard layout, waits for a few epochs and verifies that the shard layout is # upgraded. +# # Usage: # python3 pytest/tests/sanity/resharding.py @@ -62,7 +63,7 @@ def __get_client_config_changes(self, num_nodes): # retry often to start resharding as fast as possible "retry_delay": { "secs": 0, - "nanos": 500_000_000 + "nanos": 100_000_000 } } } diff --git a/pytest/tests/sanity/resharding_error_handling.py b/pytest/tests/sanity/resharding_error_handling.py new file mode 100644 index 00000000000..5109d9b788e --- /dev/null +++ b/pytest/tests/sanity/resharding_error_handling.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 + +# Test for checking error handling during resharding. Spins up a few nodes from +# genesis with the previous shard layout. Stops the nodes in the middle of the +# epoch before resharding and corrupts the state snapshot. Resumes the nodes and +# verifies that the error is reported correctly. +# Usage: +# python3 pytest/tests/sanity/resharding_error_handling.py + +import unittest +import sys +import pathlib + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from configured_logger import logger +from cluster import corrupt_state_snapshot, get_binary_protocol_version, init_cluster, load_config, spin_up_node +from utils import MetricsTracker, poll_blocks, wait_for_blocks +from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version + + +class ReshardingTest(unittest.TestCase): + + def setUp(self) -> None: + self.epoch_length = 10 + self.config = load_config() + self.binary_protocol_version = get_binary_protocol_version(self.config) + assert self.binary_protocol_version is not None + + self.genesis_shard_layout_version = get_genesis_shard_layout_version( + self.binary_protocol_version) + self.target_shard_layout_version = get_target_shard_layout_version( + self.binary_protocol_version) + + self.genesis_num_shards = get_genesis_num_shards( + self.binary_protocol_version) + self.target_num_shards = get_target_num_shards( + self.binary_protocol_version) + + def __get_genesis_config_changes(self): + genesis_config_changes = [ + ["epoch_length", self.epoch_length], + ] + + append_shard_layout_config_changes( + genesis_config_changes, + self.binary_protocol_version, + logger, + ) + + return genesis_config_changes + + def __get_client_config_changes(self, num_nodes): + single = { + "tracked_shards": [0], + # arbitrary long initial delay to not trigger resharding + # will get overwritten before restarting the node + "state_split_config": self.__get_state_split_config(10) + } + return {i: single for i in range(num_nodes)} + + def __get_state_split_config(self, initial_delay): + return { + "batch_size": 1000000, + # don't throttle resharding + "batch_delay": { + "secs": 0, + "nanos": 0, + }, + # retry often to start resharding as fast as possible + "retry_delay": { + "secs": 0, + "nanos": 100_000_000 + }, + "initial_delay": { + "secs": initial_delay, + "nanos": 0 + }, + } + + # timeline by block number + # epoch_length + 2 - snapshot is requested + # epoch_length + 3 - snapshot is finished + # epoch_length + 4 - stop the nodes, corrupt the snapshot, start nodes + # epoch_length + 6s - resharding starts and fails + def test_resharding(self): + logger.info("The resharding test is starting.") + num_nodes = 2 + + genesis_config_changes = self.__get_genesis_config_changes() + client_config_changes = self.__get_client_config_changes(num_nodes) + + near_root, [node0_dir, node1_dir] = init_cluster( + num_nodes=num_nodes, + num_observers=0, + num_shards=1, + config=self.config, + genesis_config_changes=genesis_config_changes, + client_config_changes=client_config_changes, + ) + + node0 = spin_up_node( + self.config, + near_root, + node0_dir, + 0, + ) + node1 = spin_up_node( + self.config, + near_root, + node1_dir, + 1, + boot_node=node0, + ) + + logger.info("wait until the snapshot is ready") + wait_for_blocks(node0, target=self.epoch_length + 4) + wait_for_blocks(node1, target=self.epoch_length + 4) + + logger.info("the snapshot should be ready, stopping nodes") + node0.kill(gentle=True) + node1.kill(gentle=True) + + logger.info("corrupting the state snapshot of node0") + output = corrupt_state_snapshot(self.config, node0_dir) + logger.info(f"corrupted state snapshot\n{output}") + + # Update the initial delay to start resharding as soon as possible. + client_config_changes = { + "state_split_config": self.__get_state_split_config(initial_delay=0) + } + node0.change_config(client_config_changes) + node1.change_config(client_config_changes) + + logger.info("restarting nodes") + node0.start() + node1.start(boot_node=node0) + + all_failed_observed = False + + metrics = MetricsTracker(node0) + for height, _ in poll_blocks(node0): + status = metrics.get_metric_all_values("near_resharding_status") + logger.info(f"#{height} resharding status {status}") + + if len(status) > 0: + all_failed = all([s == -1.0 for (_, s) in status]) + all_failed_observed = all_failed_observed or all_failed + + # The node should be able to survive until the end of the epoch even + # though resharding is broken. Only break after the last block of epoch. + if height >= self.epoch_length * 2 + 1: + break + + node0.kill(gentle=True) + node1.kill(gentle=True) + + # Resharding should fail for all shards. + self.assertTrue(all_failed_observed) + + logger.info("The resharding error handling test is finished.") + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/database/src/commands.rs b/tools/database/src/commands.rs index e0fabe763bf..e875f6cc3a0 100644 --- a/tools/database/src/commands.rs +++ b/tools/database/src/commands.rs @@ -1,6 +1,7 @@ use crate::adjust_database::ChangeDbKindCommand; use crate::analyse_data_size_distribution::AnalyseDataSizeDistributionCommand; use crate::compact::RunCompactionCommand; +use crate::corrupt::CorruptStateSnapshotCommand; use crate::make_snapshot::MakeSnapshotCommand; use crate::memtrie::LoadMemTrieCommand; use crate::run_migrations::RunMigrationsCommand; @@ -26,6 +27,9 @@ enum SubCommand { /// Run SST file compaction on database CompactDatabase(RunCompactionCommand), + /// Corrupt the database. + CorruptStateSnapshot(CorruptStateSnapshotCommand), + /// Make snapshot of the database MakeSnapshot(MakeSnapshotCommand), @@ -46,6 +50,7 @@ impl DatabaseCommand { SubCommand::AnalyseDataSizeDistribution(cmd) => cmd.run(home), SubCommand::ChangeDbKind(cmd) => cmd.run(home), SubCommand::CompactDatabase(cmd) => cmd.run(home), + SubCommand::CorruptStateSnapshot(cmd) => cmd.run(home), SubCommand::MakeSnapshot(cmd) => { let near_config = nearcore::config::load_config( &home, diff --git a/tools/database/src/corrupt.rs b/tools/database/src/corrupt.rs new file mode 100644 index 00000000000..5efe3df2c9b --- /dev/null +++ b/tools/database/src/corrupt.rs @@ -0,0 +1,39 @@ +use crate::utils::open_state_snapshot; +use clap::Parser; +use near_primitives::shard_layout::ShardLayout; +use near_store::{flat::FlatStorageManager, ShardUId, StoreUpdate}; +use std::path::PathBuf; + +#[derive(Parser)] +pub(crate) struct CorruptStateSnapshotCommand {} + +impl CorruptStateSnapshotCommand { + pub(crate) fn run(&self, home: &PathBuf) -> anyhow::Result<()> { + let store = open_state_snapshot(home, near_store::Mode::ReadWrite)?; + let flat_storage_manager = FlatStorageManager::new(store.clone()); + + let mut store_update = store.store_update(); + // TODO there must be a better way to get the shard uids + // This only works for the V1 shard layout. + let shard_uids = ShardLayout::get_simple_nightshade_layout().get_shard_uids(); + for shard_uid in shard_uids { + corrupt(&mut store_update, &flat_storage_manager, shard_uid)?; + } + store_update.commit().unwrap(); + + println!("corrupted the state snapshot"); + + Ok(()) + } +} + +fn corrupt( + store_update: &mut StoreUpdate, + flat_storage_manager: &FlatStorageManager, + shard_uid: ShardUId, +) -> Result<(), anyhow::Error> { + flat_storage_manager.create_flat_storage_for_shard(shard_uid)?; + let result = flat_storage_manager.remove_flat_storage_for_shard(shard_uid, store_update)?; + println!("removed flat storage for shard {shard_uid:?} result is {result}"); + Ok(()) +} diff --git a/tools/database/src/lib.rs b/tools/database/src/lib.rs index c575892b5a5..6d7e70f1691 100644 --- a/tools/database/src/lib.rs +++ b/tools/database/src/lib.rs @@ -2,6 +2,7 @@ mod adjust_database; mod analyse_data_size_distribution; pub mod commands; mod compact; +mod corrupt; mod make_snapshot; mod memtrie; mod run_migrations; diff --git a/tools/database/src/utils.rs b/tools/database/src/utils.rs index 6d6a18493a9..558d6b86d9c 100644 --- a/tools/database/src/utils.rs +++ b/tools/database/src/utils.rs @@ -1,10 +1,11 @@ +use std::fs; use std::path::Path; use anyhow::anyhow; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::get_block_shard_uid; use near_store::flat::{store_helper, BlockInfo}; -use near_store::{DBCol, ShardUId, Store}; +use near_store::{DBCol, NodeStorage, ShardUId, Store}; use strum::IntoEnumIterator; pub(crate) fn open_rocksdb( @@ -21,6 +22,30 @@ pub(crate) fn open_rocksdb( Ok(rocksdb) } +pub(crate) fn open_state_snapshot(home: &Path, mode: near_store::Mode) -> anyhow::Result { + let config = nearcore::config::Config::from_file_skip_validation( + &home.join(nearcore::config::CONFIG_FILENAME), + )?; + let store_config = &config.store; + let db_path = store_config.path.as_ref().cloned().unwrap_or_else(|| home.join("data")); + + let state_snapshot_dir = db_path.join("state_snapshot"); + let snapshots: Result, _> = fs::read_dir(state_snapshot_dir)?.into_iter().collect(); + let snapshots = snapshots?; + let &[snapshot_dir] = &snapshots.as_slice() else { + return Err(anyhow!("found more than one snapshot")); + }; + + let path = snapshot_dir.path(); + println!("state snapshot path {path:?}"); + + let opener = NodeStorage::opener(&path, false, &store_config, None); + let storage = opener.open_in_mode(mode)?; + let store = storage.get_hot_store(); + + Ok(store) +} + pub(crate) fn resolve_column(col_name: &str) -> anyhow::Result { DBCol::iter() .filter(|db_col| <&str>::from(db_col) == col_name) From 03f9e5d4de4b8f7216e5df6c085e50b765a07b84 Mon Sep 17 00:00:00 2001 From: wacban Date: Wed, 15 Nov 2023 13:25:10 +0000 Subject: [PATCH 2/4] self code review --- chain/client/src/sync/state.rs | 1 - chain/client/src/sync_jobs_actor.rs | 5 ++--- nightly/pytest-sanity.txt | 4 ++++ pytest/tests/sanity/resharding.py | 2 -- pytest/tests/sanity/resharding_error_handling.py | 7 +++++-- tools/database/src/commands.rs | 2 +- tools/database/src/corrupt.rs | 2 +- 7 files changed, 13 insertions(+), 10 deletions(-) diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 809b4eac222..8650e276f41 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -1014,7 +1014,6 @@ impl StateSync { ) -> bool { // If the shard layout is changing in this epoch - we have to apply it right now. if split_states { - tracing::debug!(target: "sync", "scheduling resharding after download"); *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitScheduling, diff --git a/chain/client/src/sync_jobs_actor.rs b/chain/client/src/sync_jobs_actor.rs index b4aac92a1bf..a16887339bf 100644 --- a/chain/client/src/sync_jobs_actor.rs +++ b/chain/client/src/sync_jobs_actor.rs @@ -1,7 +1,6 @@ -use std::time::Duration; - use crate::ClientActor; use actix::AsyncContext; +use std::time::Duration; use near_chain::chain::{ do_apply_chunks, ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest, @@ -162,7 +161,7 @@ impl actix::Handler> for SyncJobsActor { // Wait for the initial delay. It should only be used in tests. let initial_delay = state_split_request.config.initial_delay; if state_split_request.curr_poll_time == Duration::ZERO && initial_delay > Duration::ZERO { - tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "initial delay"); + tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "Waiting for the initial delay"); state_split_request.curr_poll_time += initial_delay; context.notify_later(state_split_request.with_span_context(), initial_delay); return; diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index f23f8204bb1..81948339ba8 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -150,3 +150,7 @@ pytest --timeout=600 sanity/split_storage.py --features nightly # Test for resharding pytest --timeout=120 sanity/resharding.py pytest --timeout=120 sanity/resharding.py --features nightly + +# Test for resharding error handling +pytest --timeout=120 sanity/resharding_error_handling.py +pytest --timeout=120 sanity/resharding_error_handling.py --features nightly diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py index ab64e8baefb..5d9d1077ecd 100644 --- a/pytest/tests/sanity/resharding.py +++ b/pytest/tests/sanity/resharding.py @@ -1,9 +1,7 @@ #!/usr/bin/env python3 -# # Small test for resharding. Spins up a few nodes from genesis with the previous # shard layout, waits for a few epochs and verifies that the shard layout is # upgraded. -# # Usage: # python3 pytest/tests/sanity/resharding.py diff --git a/pytest/tests/sanity/resharding_error_handling.py b/pytest/tests/sanity/resharding_error_handling.py index 5109d9b788e..00f165b0023 100644 --- a/pytest/tests/sanity/resharding_error_handling.py +++ b/pytest/tests/sanity/resharding_error_handling.py @@ -19,7 +19,8 @@ from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version -class ReshardingTest(unittest.TestCase): +# TODO(resharding): refactor the resharding tests to re-use the common logic +class ReshardingErrorHandlingTest(unittest.TestCase): def setUp(self) -> None: self.epoch_length = 10 @@ -82,7 +83,9 @@ def __get_state_split_config(self, initial_delay): # epoch_length + 2 - snapshot is requested # epoch_length + 3 - snapshot is finished # epoch_length + 4 - stop the nodes, corrupt the snapshot, start nodes - # epoch_length + 6s - resharding starts and fails + # epoch_length + 4 - resharding starts and fails + # epoch_length * 2 + 1 - last block while node is still healthy before chain + # upgrades to the new shard layout def test_resharding(self): logger.info("The resharding test is starting.") num_nodes = 2 diff --git a/tools/database/src/commands.rs b/tools/database/src/commands.rs index e875f6cc3a0..f3e656e68e7 100644 --- a/tools/database/src/commands.rs +++ b/tools/database/src/commands.rs @@ -27,7 +27,7 @@ enum SubCommand { /// Run SST file compaction on database CompactDatabase(RunCompactionCommand), - /// Corrupt the database. + /// Corrupt the state snapshot. CorruptStateSnapshot(CorruptStateSnapshotCommand), /// Make snapshot of the database diff --git a/tools/database/src/corrupt.rs b/tools/database/src/corrupt.rs index 5efe3df2c9b..bff6a3ddae5 100644 --- a/tools/database/src/corrupt.rs +++ b/tools/database/src/corrupt.rs @@ -13,7 +13,7 @@ impl CorruptStateSnapshotCommand { let flat_storage_manager = FlatStorageManager::new(store.clone()); let mut store_update = store.store_update(); - // TODO there must be a better way to get the shard uids + // TODO(resharding) there must be a better way to get the shard uids // This only works for the V1 shard layout. let shard_uids = ShardLayout::get_simple_nightshade_layout().get_shard_uids(); for shard_uid in shard_uids { From 91b0b25b238c3ed3986fcbf05cd22c1dd63da09b Mon Sep 17 00:00:00 2001 From: wacban Date: Wed, 15 Nov 2023 13:29:52 +0000 Subject: [PATCH 3/4] self code review --- chain/chain/src/resharding.rs | 2 +- pytest/tests/sanity/resharding.py | 1 + pytest/tests/sanity/resharding_error_handling.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/chain/chain/src/resharding.rs b/chain/chain/src/resharding.rs index 6f318cd25c0..486fd856d73 100644 --- a/chain/chain/src/resharding.rs +++ b/chain/chain/src/resharding.rs @@ -268,7 +268,7 @@ impl Chain { match &new_state_roots { Ok(_) => {} Err(err) => { - tracing::error!(target: "resharding", ?err, "Resharding failed, manual recovery is necessary!"); + tracing::error!(target: "resharding", ?shard_uid, ?err, "Resharding failed, manual recovery is necessary!"); RESHARDING_STATUS .with_label_values(&[&shard_uid.to_string()]) .set(ReshardingStatus::Failed.into()); diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py index 5d9d1077ecd..fb70bec0958 100644 --- a/pytest/tests/sanity/resharding.py +++ b/pytest/tests/sanity/resharding.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 + # Small test for resharding. Spins up a few nodes from genesis with the previous # shard layout, waits for a few epochs and verifies that the shard layout is # upgraded. diff --git a/pytest/tests/sanity/resharding_error_handling.py b/pytest/tests/sanity/resharding_error_handling.py index 00f165b0023..e16d19787ad 100644 --- a/pytest/tests/sanity/resharding_error_handling.py +++ b/pytest/tests/sanity/resharding_error_handling.py @@ -4,6 +4,7 @@ # genesis with the previous shard layout. Stops the nodes in the middle of the # epoch before resharding and corrupts the state snapshot. Resumes the nodes and # verifies that the error is reported correctly. + # Usage: # python3 pytest/tests/sanity/resharding_error_handling.py From b6d84855499dcf40718b26ab7251be4326820973 Mon Sep 17 00:00:00 2001 From: wacban Date: Wed, 15 Nov 2023 16:46:49 +0000 Subject: [PATCH 4/4] fix some things --- pytest/lib/cluster.py | 4 +++- pytest/lib/resharding_lib.py | 9 ++++++++ pytest/lib/utils.py | 5 ++-- pytest/tests/sanity/resharding.py | 7 ++++-- .../tests/sanity/resharding_error_handling.py | 8 +++++-- tools/database/src/corrupt.rs | 23 +++++++++++++++---- 6 files changed, 44 insertions(+), 12 deletions(-) diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 38bebf3d055..b902883005c 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -985,7 +985,7 @@ def get_binary_protocol_version(config) -> typing.Optional[int]: return None -def corrupt_state_snapshot(config, node_dir): +def corrupt_state_snapshot(config, node_dir, shard_layout_version): near_root = config['near_root'] binary_name = config.get('binary_name', 'neard') binary_path = os.path.join(near_root, binary_name) @@ -996,6 +996,8 @@ def corrupt_state_snapshot(config, node_dir): node_dir, "database", "corrupt-state-snapshot", + "--shard-layout-version", + str(shard_layout_version), ] env = os.environ.copy() diff --git a/pytest/lib/resharding_lib.py b/pytest/lib/resharding_lib.py index ff2dc471370..41d19e12a92 100644 --- a/pytest/lib/resharding_lib.py +++ b/pytest/lib/resharding_lib.py @@ -113,3 +113,12 @@ def get_target_num_shards(binary_protocol_version): return 4 assert False + + +def get_epoch_offset(binary_protocol_version): + if binary_protocol_version >= V2_PROTOCOL_VERSION: + return 1 + if binary_protocol_version >= V1_PROTOCOL_VERSION: + return 0 + + assert False diff --git a/pytest/lib/utils.py b/pytest/lib/utils.py index bae0d3b5c09..42b0113e0ba 100644 --- a/pytest/lib/utils.py +++ b/pytest/lib/utils.py @@ -166,7 +166,8 @@ def get_metric_value( ) if not all_samples: return None - return all_samples[0].value + (sample_labels, sample_value) = all_samples[0] + return sample_value for (sample_labels, sample_value) in all_samples: if sample_labels == labels: return sample_value @@ -179,7 +180,7 @@ def get_int_metric_value( ) -> typing.Optional[int]: """Helper function to return the integer value of the metric (as function above returns strings).""" value = self.get_metric_value(metric_name, labels) - if not value: + if value is None: return None return round(float(value)) diff --git a/pytest/tests/sanity/resharding.py b/pytest/tests/sanity/resharding.py index fb70bec0958..9715cbbf963 100644 --- a/pytest/tests/sanity/resharding.py +++ b/pytest/tests/sanity/resharding.py @@ -15,7 +15,7 @@ from configured_logger import logger from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node from utils import MetricsTracker, poll_blocks -from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version +from resharding_lib import append_shard_layout_config_changes, get_epoch_offset, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version class ReshardingTest(unittest.TestCase): @@ -36,6 +36,8 @@ def setUp(self) -> None: self.target_num_shards = get_target_num_shards( self.binary_protocol_version) + self.epoch_offset = get_epoch_offset(self.binary_protocol_version) + def __get_genesis_config_changes(self): genesis_config_changes = [ ["epoch_length", self.epoch_length], @@ -114,7 +116,8 @@ def test_resharding(self): # after the block is processed. If there is some delay the shard # layout may change and the assertions below will fail. - if height <= 2 * self.epoch_length + 1: + # TODO(resharding) Why is epoch offset needed here? + if height <= 2 * self.epoch_length + self.epoch_offset: self.assertEqual(version, self.genesis_shard_layout_version) self.assertEqual(num_shards, self.genesis_num_shards) else: diff --git a/pytest/tests/sanity/resharding_error_handling.py b/pytest/tests/sanity/resharding_error_handling.py index e16d19787ad..82424812e97 100644 --- a/pytest/tests/sanity/resharding_error_handling.py +++ b/pytest/tests/sanity/resharding_error_handling.py @@ -126,7 +126,11 @@ def test_resharding(self): node1.kill(gentle=True) logger.info("corrupting the state snapshot of node0") - output = corrupt_state_snapshot(self.config, node0_dir) + output = corrupt_state_snapshot( + self.config, + node0_dir, + self.genesis_shard_layout_version, + ) logger.info(f"corrupted state snapshot\n{output}") # Update the initial delay to start resharding as soon as possible. @@ -153,7 +157,7 @@ def test_resharding(self): # The node should be able to survive until the end of the epoch even # though resharding is broken. Only break after the last block of epoch. - if height >= self.epoch_length * 2 + 1: + if height >= self.epoch_length * 2: break node0.kill(gentle=True) diff --git a/tools/database/src/corrupt.rs b/tools/database/src/corrupt.rs index bff6a3ddae5..d172cc52019 100644 --- a/tools/database/src/corrupt.rs +++ b/tools/database/src/corrupt.rs @@ -1,11 +1,15 @@ use crate::utils::open_state_snapshot; +use anyhow::anyhow; use clap::Parser; -use near_primitives::shard_layout::ShardLayout; +use near_primitives::shard_layout::{ShardLayout, ShardVersion}; use near_store::{flat::FlatStorageManager, ShardUId, StoreUpdate}; use std::path::PathBuf; #[derive(Parser)] -pub(crate) struct CorruptStateSnapshotCommand {} +pub(crate) struct CorruptStateSnapshotCommand { + #[clap(short, long)] + shard_layout_version: ShardVersion, +} impl CorruptStateSnapshotCommand { pub(crate) fn run(&self, home: &PathBuf) -> anyhow::Result<()> { @@ -13,9 +17,18 @@ impl CorruptStateSnapshotCommand { let flat_storage_manager = FlatStorageManager::new(store.clone()); let mut store_update = store.store_update(); - // TODO(resharding) there must be a better way to get the shard uids - // This only works for the V1 shard layout. - let shard_uids = ShardLayout::get_simple_nightshade_layout().get_shard_uids(); + // TODO(resharding) automatically detect the shard version + let shard_uids = match self.shard_layout_version { + 0 => ShardLayout::v0(1, 0).get_shard_uids(), + 1 => ShardLayout::get_simple_nightshade_layout().get_shard_uids(), + 2 => ShardLayout::get_simple_nightshade_layout_v2().get_shard_uids(), + _ => { + return Err(anyhow!( + "Unsupported shard layout version! {}", + self.shard_layout_version + )) + } + }; for shard_uid in shard_uids { corrupt(&mut store_update, &flat_storage_manager, shard_uid)?; }