From 4a447658b358bc855f779f36a03dcf80d99bfac2 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Mon, 7 Aug 2023 19:16:57 +0200 Subject: [PATCH] fix(state-sync): Remove `state_fetch_horizon` because it causes nodes to crash (#9380) The main issue is that `find_sync_hash()` is not consistent with `check_state_needed()`. `check_state_needed` compares epoch_id of header_head and head. `find_sync_hash()` moves back a number of blocks before determining which epoch to sync. It can lead to a node deciding to state sync, and downloading the same state again. I've observed that it leads to a crash in this complicated scenario: * State sync to epoch X * Finalize the state sync and execute block at height H * Check orphans and unblock a block at height H+1 * At the same time check if a node needs a state sync - determine that yes, and reset Flat Storage to prepare it to execute a block at height H. * The orhpaned block gets executed, but flat storage doesn't support height H+1, leading to a panic. --- Cargo.lock | 1 + chain/client-primitives/src/types.rs | 27 +-- chain/client/src/client_actor.rs | 31 +-- chain/client/src/sync/block.rs | 12 ++ chain/client/src/sync/state.rs | 58 +++--- chain/client/src/sync/state_sync_actor.rs | 0 core/chain-configs/src/client_config.rs | 3 - core/o11y/src/lib.rs | 15 +- .../src/tests/nearcore/sync_state_nodes.rs | 3 - nearcore/res/example-config-gc.json | 1 - nearcore/res/example-config-no-gc.json | 1 - nearcore/src/config.rs | 16 +- nightly/pytest-sanity.txt | 2 + pytest/lib/cluster.py | 3 + pytest/tests/sanity/gc_after_sync1.py | 1 - .../tests/sanity/state_sync_epoch_boundary.py | 196 ++++++++++++++++++ pytest/tests/stress/stress.py | 2 +- tools/flat-storage/Cargo.toml | 2 +- 18 files changed, 279 insertions(+), 95 deletions(-) delete mode 100644 chain/client/src/sync/state_sync_actor.rs create mode 100755 pytest/tests/sanity/state_sync_epoch_boundary.py diff --git a/Cargo.lock b/Cargo.lock index bebade802a6..7a60292fa91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3749,6 +3749,7 @@ dependencies = [ "nearcore", "rayon", "tqdm", + "tracing", ] [[package]] diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index 9794a76b9ed..46a0ba160b4 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -1,5 +1,5 @@ use actix::Message; -use ansi_term::Color::{Purple, Yellow}; +use ansi_term::Color::Purple; use ansi_term::Style; use chrono::DateTime; use chrono::Utc; @@ -231,31 +231,16 @@ pub fn format_shard_sync_phase( ShardSyncStatus::StateDownloadParts => { let mut num_parts_done = 0; let mut num_parts_not_done = 0; - let mut text = "".to_string(); - for (i, download) in shard_sync_download.downloads.iter().enumerate() { + for download in shard_sync_download.downloads.iter() { if download.done { num_parts_done += 1; - continue; + } else { + num_parts_not_done += 1; } - num_parts_not_done += 1; - text.push_str(&format!( - "[{}: {}, {}, {:?}] ", - paint(&i.to_string(), Yellow.bold(), use_colour), - download.done, - download.state_requests_count, - download.last_target - )); } - format!( - "{} [{}: is_done, requests sent, last target] {} num_parts_done={} num_parts_not_done={}", - paint("PARTS", Purple.bold(), use_colour), - paint("part_id", Yellow.bold(), use_colour), - text, - num_parts_done, - num_parts_not_done - ) + format!("num_parts_done={num_parts_done} num_parts_not_done={num_parts_not_done}") } - status => format!("{:?}", status), + status => format!("{status:?}"), } } diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 3c4383df159..5f3ebc2032e 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -1491,30 +1491,21 @@ impl ClientActor { /// /// The selected block will always be the first block on a new epoch: /// . - /// - /// To prevent syncing from a fork, we move `state_fetch_horizon` steps backwards and use that epoch. - /// Usually `state_fetch_horizon` is much less than the expected number of produced blocks on an epoch, - /// so this is only relevant on epoch boundaries. fn find_sync_hash(&mut self) -> Result { let header_head = self.client.chain.header_head()?; - let mut sync_hash = header_head.prev_block_hash; - for _ in 0..self.client.config.state_fetch_horizon { - sync_hash = *self.client.chain.get_block_header(&sync_hash)?.prev_hash(); - } - let mut epoch_start_sync_hash = + let sync_hash = header_head.last_block_hash; + let epoch_start_sync_hash = StateSync::get_epoch_start_sync_hash(&mut self.client.chain, &sync_hash)?; - if &epoch_start_sync_hash == self.client.chain.genesis().hash() { - // If we are within `state_fetch_horizon` blocks of the second epoch, the sync hash will - // be the first block of the first epoch (or, the genesis block). Due to implementation - // details of the state sync, we can't state sync to the genesis block, so redo the - // search without going back `state_fetch_horizon` blocks. - epoch_start_sync_hash = StateSync::get_epoch_start_sync_hash( - &mut self.client.chain, - &header_head.last_block_hash, - )?; - assert_ne!(&epoch_start_sync_hash, self.client.chain.genesis().hash()); - } + let genesis_hash = self.client.chain.genesis().hash(); + tracing::debug!( + target: "sync", + ?header_head, + ?sync_hash, + ?epoch_start_sync_hash, + ?genesis_hash, + "find_sync_hash"); + assert_ne!(&epoch_start_sync_hash, genesis_hash); Ok(epoch_start_sync_hash) } diff --git a/chain/client/src/sync/block.rs b/chain/client/src/sync/block.rs index 85bf04868c9..5189dbd1a22 100644 --- a/chain/client/src/sync/block.rs +++ b/chain/client/src/sync/block.rs @@ -96,6 +96,18 @@ impl BlockSync { && !self.archive && self.state_sync_enabled { + tracing::debug!( + target: "sync", + head_epoch_id = ?head.epoch_id, + header_head_epoch_id = ?header_head.epoch_id, + head_next_epoch_id = ?head.next_epoch_id, + head_height = head.height, + header_head_height = header_head.height, + header_head_height_sub = header_head.height.saturating_sub(self.block_fetch_horizon), + archive = self.archive, + state_sync_enabled = self.state_sync_enabled, + block_fetch_horizon = self.block_fetch_horizon, + "Switched from block sync to state sync"); // Epochs are different and we are too far from horizon, State Sync is needed return Ok(true); } diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index b5630e27b98..4ffa3cd8903 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -33,6 +33,7 @@ use near_chain::near_chain_primitives; use near_chain::resharding::StateSplitRequest; use near_chain::Chain; use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig}; +use near_client_primitives::types::format_shard_sync_phase_per_shard; use near_client_primitives::types::{ format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus, }; @@ -297,9 +298,6 @@ impl StateSync { let old_status = shard_sync_download.status.clone(); let mut shard_sync_done = false; - metrics::STATE_SYNC_STAGE - .with_label_values(&[&shard_id.to_string()]) - .set(shard_sync_download.status.repr() as i64); match &shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { (download_timeout, run_shard_state_download) = self @@ -312,12 +310,8 @@ impl StateSync { )?; } ShardSyncStatus::StateDownloadParts => { - let res = self.sync_shards_download_parts_status( - shard_id, - shard_sync_download, - sync_hash, - now, - ); + let res = + self.sync_shards_download_parts_status(shard_id, shard_sync_download, now); download_timeout = res.0; run_shard_state_download = res.1; update_sync_status |= res.2; @@ -342,13 +336,8 @@ impl StateSync { )?; } ShardSyncStatus::StateDownloadComplete => { - shard_sync_done = self.sync_shards_download_complete_status( - split_states, - shard_id, - shard_sync_download, - sync_hash, - chain, - )?; + shard_sync_done = self + .sync_shards_download_complete_status(split_states, shard_sync_download); } ShardSyncStatus::StateSplitScheduling => { debug_assert!(split_states); @@ -374,6 +363,14 @@ impl StateSync { shard_sync_done = true; } } + let stage = if shard_sync_done { + // Update the state sync stage metric, because maybe we'll not + // enter this function again. + ShardSyncStatus::StateSyncDone.repr() + } else { + shard_sync_download.status.repr() + }; + metrics::STATE_SYNC_STAGE.with_label_values(&[&shard_id.to_string()]).set(stage as i64); all_done &= shard_sync_done; if download_timeout { @@ -407,6 +404,13 @@ impl StateSync { } update_sync_status |= shard_sync_download.status != old_status; } + if update_sync_status { + // Print debug messages only if something changed. + // Otherwise it spams the debug logs. + tracing::debug!( + target: "sync", + progress_per_shard = ?format_shard_sync_phase_per_shard(new_shard_sync, false)); + } Ok((update_sync_status, all_done)) } @@ -951,7 +955,6 @@ impl StateSync { &mut self, shard_id: ShardId, shard_sync_download: &mut ShardSyncDownload, - sync_hash: CryptoHash, now: DateTime, ) -> (bool, bool, bool) { // Step 2 - download all the parts (each part is usually around 1MB). @@ -991,7 +994,6 @@ impl StateSync { num_parts_done += 1; } } - tracing::trace!(target: "sync", %shard_id, %sync_hash, num_parts_done, parts_done); metrics::STATE_SYNC_PARTS_DONE .with_label_values(&[&shard_id.to_string()]) .set(num_parts_done); @@ -1058,8 +1060,7 @@ impl StateSync { ) -> Result<(), near_chain::Error> { // Keep waiting until our shard is on the list of results // (these are set via callback from ClientActor - both for sync and catchup). - let result = self.state_parts_apply_results.remove(&shard_id); - if let Some(result) = result { + if let Some(result) = self.state_parts_apply_results.remove(&shard_id) { match chain.set_state_finalize(shard_id, sync_hash, result) { Ok(()) => { *shard_sync_download = ShardSyncDownload { @@ -1088,30 +1089,21 @@ impl StateSync { fn sync_shards_download_complete_status( &mut self, split_states: bool, - shard_id: ShardId, shard_sync_download: &mut ShardSyncDownload, - sync_hash: CryptoHash, - chain: &mut Chain, - ) -> Result { - let shard_state_header = chain.get_state_header(shard_id, sync_hash)?; - let state_num_parts = - get_num_state_parts(shard_state_header.state_root_node().memory_usage); - chain.clear_downloaded_parts(shard_id, sync_hash, state_num_parts)?; - - let mut shard_sync_done = false; + ) -> bool { // If the shard layout is changing in this epoch - we have to apply it right now. if split_states { *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitScheduling, - } + }; + false } else { // If there is no layout change - we're done. *shard_sync_download = ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSyncDone }; - shard_sync_done = true; + true } - Ok(shard_sync_done) } fn sync_shards_state_split_scheduling_status( diff --git a/chain/client/src/sync/state_sync_actor.rs b/chain/client/src/sync/state_sync_actor.rs deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 948dc35c50c..717fe6c3b06 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -212,8 +212,6 @@ pub struct ClientConfig { pub ttl_account_id_router: Duration, /// Horizon at which instead of fetching block, fetch full state. pub block_fetch_horizon: BlockHeightDelta, - /// Horizon to step from the latest block when fetching state. - pub state_fetch_horizon: NumBlocks, /// Time between check to perform catchup. pub catchup_step_period: Duration, /// Time between checking to re-request chunks. @@ -318,7 +316,6 @@ impl ClientConfig { num_block_producer_seats, ttl_account_id_router: Duration::from_secs(60 * 60), block_fetch_horizon: 50, - state_fetch_horizon: 5, catchup_step_period: Duration::from_millis(1), chunk_request_retry_period: min( Duration::from_millis(100), diff --git a/core/o11y/src/lib.rs b/core/o11y/src/lib.rs index 2a255e6d2d7..bf3ec6993a3 100644 --- a/core/o11y/src/lib.rs +++ b/core/o11y/src/lib.rs @@ -188,13 +188,18 @@ fn add_simple_log_layer( filter: EnvFilter, writer: W, ansi: bool, + with_span_events: bool, subscriber: S, ) -> SimpleLogLayer where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, W: for<'writer> fmt::MakeWriter<'writer> + 'static, { - let layer = fmt::layer().with_ansi(ansi).with_writer(writer).with_filter(filter); + let layer = fmt::layer() + .with_ansi(ansi) + .with_span_events(get_fmt_span(with_span_events)) + .with_writer(writer) + .with_filter(filter); subscriber.with(layer) } @@ -337,7 +342,13 @@ pub fn default_subscriber( }; let subscriber = tracing_subscriber::registry(); - let subscriber = add_simple_log_layer(env_filter, make_writer, color_output, subscriber); + let subscriber = add_simple_log_layer( + env_filter, + make_writer, + color_output, + options.log_span_events, + subscriber, + ); #[allow(unused_mut)] let mut io_trace_guard = None; diff --git a/integration-tests/src/tests/nearcore/sync_state_nodes.rs b/integration-tests/src/tests/nearcore/sync_state_nodes.rs index 755ba7ff431..deb502709ab 100644 --- a/integration-tests/src/tests/nearcore/sync_state_nodes.rs +++ b/integration-tests/src/tests/nearcore/sync_state_nodes.rs @@ -349,8 +349,6 @@ fn sync_empty_state() { Duration::from_millis(200); near2.client_config.max_block_production_delay = Duration::from_millis(400); - near2.client_config.state_fetch_horizon = - state_sync_horizon; near2.client_config.block_header_fetch_horizon = block_header_fetch_horizon; near2.client_config.block_fetch_horizon = @@ -482,7 +480,6 @@ fn sync_state_dump() { Duration::from_millis(300); near2.client_config.max_block_production_delay = Duration::from_millis(600); - near2.client_config.state_fetch_horizon = state_sync_horizon; near2.client_config.block_header_fetch_horizon = block_header_fetch_horizon; near2.client_config.block_fetch_horizon = block_fetch_horizon; diff --git a/nearcore/res/example-config-gc.json b/nearcore/res/example-config-gc.json index 251e054aa1b..97addfeebee 100644 --- a/nearcore/res/example-config-gc.json +++ b/nearcore/res/example-config-gc.json @@ -78,7 +78,6 @@ }, "produce_empty_blocks": true, "block_fetch_horizon": 50, - "state_fetch_horizon": 5, "block_header_fetch_horizon": 50, "catchup_step_period": { "secs": 0, diff --git a/nearcore/res/example-config-no-gc.json b/nearcore/res/example-config-no-gc.json index 3239f65a91a..64b0ea07631 100644 --- a/nearcore/res/example-config-no-gc.json +++ b/nearcore/res/example-config-no-gc.json @@ -78,7 +78,6 @@ }, "produce_empty_blocks": true, "block_fetch_horizon": 50, - "state_fetch_horizon": 5, "block_header_fetch_horizon": 50, "catchup_step_period": { "secs": 0, diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index ae677c815e7..bfe36fdd513 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -69,9 +69,6 @@ pub const MAX_BLOCK_WAIT_DELAY: u64 = 6_000; /// Horizon at which instead of fetching block, fetch full state. const BLOCK_FETCH_HORIZON: BlockHeightDelta = 50; -/// Horizon to step from the latest block when fetching state. -const STATE_FETCH_HORIZON: NumBlocks = 5; - /// Behind this horizon header fetch kicks in. const BLOCK_HEADER_FETCH_HORIZON: BlockHeightDelta = 50; @@ -181,6 +178,10 @@ fn default_view_client_threads() -> usize { 4 } +fn default_log_summary_period() -> Duration { + Duration::from_secs(10) +} + fn default_doomslug_step_period() -> Duration { Duration::from_millis(100) } @@ -213,8 +214,6 @@ pub struct Consensus { pub produce_empty_blocks: bool, /// Horizon at which instead of fetching block, fetch full state. pub block_fetch_horizon: BlockHeightDelta, - /// Horizon to step from the latest block when fetching state. - pub state_fetch_horizon: NumBlocks, /// Behind this horizon header fetch kicks in. pub block_header_fetch_horizon: BlockHeightDelta, /// Time between check to perform catchup. @@ -259,7 +258,6 @@ impl Default for Consensus { max_block_wait_delay: Duration::from_millis(MAX_BLOCK_WAIT_DELAY), produce_empty_blocks: true, block_fetch_horizon: BLOCK_FETCH_HORIZON, - state_fetch_horizon: STATE_FETCH_HORIZON, block_header_fetch_horizon: BLOCK_HEADER_FETCH_HORIZON, catchup_step_period: Duration::from_millis(CATCHUP_STEP_PERIOD), chunk_request_retry_period: Duration::from_millis(CHUNK_REQUEST_RETRY_PERIOD), @@ -308,6 +306,8 @@ pub struct Config { #[serde(skip_serializing_if = "Option::is_none")] pub save_trie_changes: Option, pub log_summary_style: LogSummaryStyle, + #[serde(default = "default_log_summary_period")] + pub log_summary_period: Duration, // Allows more detailed logging, for example a list of orphaned blocks. pub enable_multiline_logging: Option, /// Garbage collection configuration. @@ -377,6 +377,7 @@ impl Default for Config { archive: false, save_trie_changes: None, log_summary_style: LogSummaryStyle::Colored, + log_summary_period: default_log_summary_period(), gc: GCConfig::default(), epoch_sync_enabled: true, view_client_threads: default_view_client_threads(), @@ -658,14 +659,13 @@ impl NearConfig { .header_sync_expected_height_per_second, state_sync_timeout: config.consensus.state_sync_timeout, min_num_peers: config.consensus.min_num_peers, - log_summary_period: Duration::from_secs(10), + log_summary_period: config.log_summary_period, produce_empty_blocks: config.consensus.produce_empty_blocks, epoch_length: genesis.config.epoch_length, num_block_producer_seats: genesis.config.num_block_producer_seats, ttl_account_id_router: config.network.ttl_account_id_router, // TODO(1047): this should be adjusted depending on the speed of sync of state. block_fetch_horizon: config.consensus.block_fetch_horizon, - state_fetch_horizon: config.consensus.state_fetch_horizon, block_header_fetch_horizon: config.consensus.block_header_fetch_horizon, catchup_step_period: config.consensus.catchup_step_period, chunk_request_retry_period: config.consensus.chunk_request_retry_period, diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index 3f7de18d15a..02bbac22ad5 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -63,6 +63,8 @@ pytest --timeout=10m sanity/block_sync_archival.py pytest --timeout=10m sanity/block_sync_archival.py --features nightly pytest --timeout=120 sanity/block_sync_flat_storage.py pytest --timeout=120 sanity/block_sync_flat_storage.py --features nightly +pytest --timeout=240 sanity/state_sync_epoch_boundary.py +pytest --timeout=240 sanity/state_sync_epoch_boundary.py --features nightly pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly pytest --timeout=240 sanity/rpc_state_changes.py diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 8f0a98f2b8a..f62e83cd501 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -815,11 +815,14 @@ def apply_config_changes(node_dir, client_config_change): # when None. allowed_missing_configs = ( 'archive', + 'log_summary_period', 'max_gas_burnt_view', 'rosetta_rpc', 'save_trie_changes', 'split_storage', + 'state_sync', 'state_sync_enabled', + 'state_sync_timeout', 'store.state_snapshot_enabled', 'tracked_shard_schedule', ) diff --git a/pytest/tests/sanity/gc_after_sync1.py b/pytest/tests/sanity/gc_after_sync1.py index 892dd834503..995c5453c12 100755 --- a/pytest/tests/sanity/gc_after_sync1.py +++ b/pytest/tests/sanity/gc_after_sync1.py @@ -25,7 +25,6 @@ "consensus": { "block_fetch_horizon": 10, "block_header_fetch_horizon": 10, - "state_fetch_horizon": 0 }, "tracked_shards": [0], "gc_blocks_limit": 10, diff --git a/pytest/tests/sanity/state_sync_epoch_boundary.py b/pytest/tests/sanity/state_sync_epoch_boundary.py new file mode 100755 index 00000000000..82eb3440b1c --- /dev/null +++ b/pytest/tests/sanity/state_sync_epoch_boundary.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +# Spins up one validating node. +# Spins a non-validating node that tracks some shards and the set of tracked shards changes regularly. +# The node gets stopped, and gets restarted close to an epoch boundary but in a way to trigger epoch sync. +# +# This test is a regression test to ensure that the node doesn't panic during +# function execution during block sync after a state sync. + +import pathlib +import random +import sys +import tempfile + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config, apply_config_changes +import account +import transaction +import utils + +from configured_logger import logger + +EPOCH_LENGTH = 50 + +state_parts_dir = str(pathlib.Path(tempfile.gettempdir()) / 'state_parts') + +config0 = { + 'enable_multiline_logging': False, + 'log_summary_period': { + 'secs': 0, + 'nanos': 100000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'dump': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + }, + 'iteration_delay': { + 'secs': 0, + 'nanos': 100000000 + }, + } + }, + 'store.state_snapshot_enabled': True, + 'tracked_shards': [0], +} +config1 = { + 'enable_multiline_logging': False, + 'log_summary_period': { + 'secs': 0, + 'nanos': 100000000 + }, + 'log_summary_style': 'plain', + 'state_sync': { + 'sync': { + 'ExternalStorage': { + 'location': { + 'Filesystem': { + 'root_dir': state_parts_dir + } + } + } + } + }, + 'state_sync_enabled': True, + 'state_sync_timeout': { + 'secs': 0, + 'nanos': 500000000 + }, + 'tracked_shard_schedule': [[0, 2, 3], [0, 2, 3], [0, 1], [0, 1], [0, 1], + [0, 1]], + 'tracked_shards': [], +} +logger.info(f'state_parts_dir: {state_parts_dir}') +logger.info(f'config0: {config0}') +logger.info(f'config1: {config1}') + +config = load_config() +near_root, node_dirs = init_cluster(1, 1, 4, config, + [["epoch_length", EPOCH_LENGTH]], { + 0: config0, + 1: config1 + }) + +boot_node = spin_up_node(config, near_root, node_dirs[0], 0) +logger.info('started boot_node') +node1 = spin_up_node(config, near_root, node_dirs[1], 1, boot_node=boot_node) +logger.info('started node1') + +contract = utils.load_test_contract() + +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + boot_node.signer_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + +latest_block_hash = boot_node.get_latest_block().hash_bytes +deploy_contract_tx = transaction.sign_deploy_contract_tx( + node1.signer_key, contract, 10, latest_block_hash) +result = boot_node.send_tx_and_wait(deploy_contract_tx, 10) +assert 'result' in result and 'error' not in result, ( + 'Expected "result" and no "error" in response, got: {}'.format(result)) + + +def epoch_height(block_height): + if block_height == 0: + return 0 + if block_height <= EPOCH_LENGTH: + # According to the protocol specifications, there are two epochs with height 1. + return "1*" + return int((block_height - 1) / EPOCH_LENGTH) + + +# Generates traffic for all possible shards. +# Assumes that `test0`, `test1`, `near` all belong to different shards. +def random_workload_until(target, nonce, keys, target_node): + last_height = -1 + while True: + nonce += 1 + + last_block = target_node.get_latest_block() + height = last_block.height + if height > target: + break + if height != last_height: + logger.info(f'@{height}, epoch_height: {epoch_height(height)}') + last_height = height + + last_block_hash = boot_node.get_latest_block().hash_bytes + if (len(keys) > 100 and random.random() < 0.2) or len(keys) > 1000: + key = keys[random.randint(0, len(keys) - 1)] + call_function('read', key, nonce, boot_node.signer_key, + last_block_hash) + call_function('read', key, nonce, node1.signer_key, last_block_hash) + elif random.random() < 0.5: + if random.random() < 0.3: + key_from, account_to = boot_node.signer_key, node1.signer_key.account_id + elif random.random() < 0.3: + key_from, account_to = boot_node.signer_key, "near" + elif random.random() < 0.5: + key_from, account_to = node1.signer_key, boot_node.signer_key.account_id + else: + key_from, account_to = node1.signer_key, "near" + payment_tx = transaction.sign_payment_tx(key_from, account_to, 1, + nonce, last_block_hash) + boot_node.send_tx(payment_tx).get('result') + else: + key = random_u64() + keys.append(key) + call_function('write', key, nonce, boot_node.signer_key, + last_block_hash) + call_function('write', key, nonce, node1.signer_key, + last_block_hash) + return (nonce, keys) + + +def random_u64(): + return bytes(random.randint(0, 255) for _ in range(8)) + + +def call_function(op, key, nonce, signer_key, last_block_hash): + if op == 'read': + args = key + fn = 'read_value' + else: + args = key + random_u64() + fn = 'write_key_value' + + tx = transaction.sign_function_call_tx(signer_key, signer_key.account_id, + fn, args, 300 * account.TGAS, 0, + nonce, last_block_hash) + return boot_node.send_tx(tx).get('result') + + +nonce, keys = random_workload_until(EPOCH_LENGTH - 5, 1, [], boot_node) + +node1_height = node1.get_latest_block().height +logger.info(f'node1@{node1_height}') +node1.kill() +logger.info(f'killed node1') + +# Run node0 more to trigger block sync in node1. +nonce, keys = random_workload_until(int(EPOCH_LENGTH * 2.7), nonce, keys, + boot_node) + +# Node1 is now behind and needs to do header sync and block sync. +node1.start(boot_node=boot_node) +node1_height = node1.get_latest_block().height +logger.info(f'started node1@{node1_height}') + +nonce, keys = random_workload_until(int(EPOCH_LENGTH * 3.1), nonce, keys, node1) diff --git a/pytest/tests/stress/stress.py b/pytest/tests/stress/stress.py index 8c96ed6c2ff..5b3545cd62d 100755 --- a/pytest/tests/stress/stress.py +++ b/pytest/tests/stress/stress.py @@ -47,7 +47,7 @@ # Is only applicable in the scenarios where we expect failures in tx sends. SEND_TX_ATTEMPTS = 10 -# Block_header_fetch_horizon + state_fetch_horizon (which is equalto 5) need to be shorter than the epoch length. +# Block_header_fetch_horizon need to be shorter than the epoch length. # otherwise say epoch boundaries are H and H'. If the current height is H' + eps, and a node finished header sync at # H' + eps - block_header_fetch_horizon, and then rolled state_fetch_horizon back, it will end up before H, and will # try to state sync at the beginning of the epoch *two* epochs ago. No node will respond to such state requests. diff --git a/tools/flat-storage/Cargo.toml b/tools/flat-storage/Cargo.toml index b2d49aba6d6..35e6080057e 100644 --- a/tools/flat-storage/Cargo.toml +++ b/tools/flat-storage/Cargo.toml @@ -13,8 +13,8 @@ anyhow.workspace = true borsh.workspace = true clap.workspace = true rayon.workspace = true - tqdm.workspace = true +tracing.workspace = true near-chain.workspace = true near-chain-configs.workspace = true