Skip to content

Commit

Permalink
fix: Save genesis bootstrap congestion info into DB and reuse it at n…
Browse files Browse the repository at this point in the history
…ode restarts (#11724)

This addresses [#11702](#11702).

The issue happens for forknet and local testing where the network starts
from the genesis block and the genesis protocol version supports
congestion control. In this case, every time the node restarts, it
attempts to re-generate the genesis congestion info. However, this is
done using the state roots at genesis; these state roots are garbage
collected and causes the bootstrapping to fail.

We fix this by storing the genesis congestion info (CongestionInfo for
each shard) in DB. If it exists, the node directly uses it instead of
attempting to re-compute it from state roots.

We store the congestion info in BlockMisc column with key
`GENESIS_CONGESTION_INFO`.

This PR also adds a testloop test to check if the congestion info is
saved (and not cleaned up by GC) and re-enables the previously disabled
nayducks tests due to this problem.
  • Loading branch information
tayfunelmas authored Jul 5, 2024
1 parent dfd2ca0 commit 20af8fe
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 35 deletions.
67 changes: 40 additions & 27 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3943,54 +3943,67 @@ fn get_genesis_congestion_infos_impl(
runtime: &dyn RuntimeAdapter,
state_roots: &Vec<CryptoHash>,
) -> Result<Vec<Option<CongestionInfo>>, Error> {
let prev_hash = CryptoHash::default();
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(&prev_hash)?;
let protocol_version = epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let genesis_prev_hash = CryptoHash::default();
let genesis_epoch_id = epoch_manager.get_epoch_id_from_prev_block(&genesis_prev_hash)?;
let genesis_protocol_version = epoch_manager.get_epoch_protocol_version(&genesis_epoch_id)?;
// If congestion control is not enabled at the genesis block, we return None (congestion info) for each shard.
if !ProtocolFeature::CongestionControl.enabled(genesis_protocol_version) {
return Ok(std::iter::repeat(None).take(state_roots.len()).collect());
}

// Since the congestion info is already bootstrapped in statelessnet, skip another bootstrap.
// TODO: This is temporary mitigation for the failing genesis congestion info due to garbage
// collected genesis state roots. It can be removed after the statelessnet network is turned down.
if let Ok(protocol_config) = runtime.get_protocol_config(&genesis_epoch_id) {
if protocol_config.genesis_config.chain_id == near_primitives::chains::STATELESSNET {
return Ok(std::iter::repeat(None).take(state_roots.len()).collect());
}
}

// Check we had already computed the congestion infos from the genesis state roots.
if let Some(saved_infos) = near_store::get_genesis_congestion_infos(runtime.store())? {
tracing::debug!(target: "chain", "Reading genesis congestion infos from database.");
return Ok(saved_infos.into_iter().map(Option::Some).collect());
}

let mut result = vec![];
let mut new_infos = vec![];
for (shard_id, &state_root) in state_roots.iter().enumerate() {
let shard_id = shard_id as ShardId;
let congestion_info = get_congestion_info(
let congestion_info = get_genesis_congestion_info(
runtime,
protocol_version,
&prev_hash,
genesis_protocol_version,
&genesis_prev_hash,
shard_id,
state_root,
&epoch_id,
)?;
result.push(congestion_info);
new_infos.push(congestion_info);
}
Ok(result)

// Store it in DB so that we can read it later, instead of recomputing from genesis state roots.
// Note that this is necessary because genesis state roots will be garbage-collected and will not
// be available, for example, when the node restarts later.
tracing::debug!(target: "chain", "Saving genesis congestion infos to database.");
let mut store_update = runtime.store().store_update();
near_store::set_genesis_congestion_infos(&mut store_update, &new_infos);
store_update.commit()?;

Ok(new_infos.into_iter().map(Option::Some).collect())
}

fn get_congestion_info(
fn get_genesis_congestion_info(
runtime: &dyn RuntimeAdapter,
protocol_version: ProtocolVersion,
prev_hash: &CryptoHash,
shard_id: ShardId,
state_root: StateRoot,
epoch_id: &EpochId,
) -> Result<Option<CongestionInfo>, Error> {
if !ProtocolFeature::CongestionControl.enabled(protocol_version) {
return Ok(None);
}

// Since the congestion info is already bootstrapped in statelessnet, skip another bootstrap.
// TODO: This is temporary mitigation for the failing genesis congestion info due to garbage
// collected genesis state roots. It can be removed after the statelessnet network is turned down.
if let Ok(protocol_config) = runtime.get_protocol_config(&epoch_id) {
if protocol_config.genesis_config.chain_id == near_primitives::chains::STATELESSNET {
return Ok(None);
}
}

) -> Result<CongestionInfo, Error> {
// Get the view trie because it's possible that the chain is ahead of
// genesis and doesn't have this block in flat state and memtrie.
let trie = runtime.get_view_trie_for_shard(shard_id, prev_hash, state_root)?;
let runtime_config = runtime.get_runtime_config(protocol_version)?;
let congestion_info = bootstrap_congestion_info(&trie, &runtime_config, shard_id)?;
tracing::debug!(target: "chain", ?shard_id, ?state_root, ?congestion_info, "Computed genesis congestion info.");
Ok(Some(congestion_info))
Ok(congestion_info)
}

fn shard_id_out_of_bounds(shard_id: ShardId) -> Error {
Expand Down
1 change: 1 addition & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub const LATEST_KNOWN_KEY: &[u8; 12] = b"LATEST_KNOWN";
pub const LARGEST_TARGET_HEIGHT_KEY: &[u8; 21] = b"LARGEST_TARGET_HEIGHT";
pub const GENESIS_JSON_HASH_KEY: &[u8; 17] = b"GENESIS_JSON_HASH";
pub const GENESIS_STATE_ROOTS_KEY: &[u8; 19] = b"GENESIS_STATE_ROOTS";
pub const GENESIS_CONGESTION_INFO_KEY: &[u8] = b"GENESIS_CONGESTION_INFO_KEY";
pub const COLD_HEAD_KEY: &[u8; 9] = b"COLD_HEAD";
pub const STATE_SYNC_DUMP_KEY: &[u8; 15] = b"STATE_SYNC_DUMP";
pub const STATE_SNAPSHOT_KEY: &[u8; 18] = b"STATE_SNAPSHOT_KEY";
Expand Down
15 changes: 15 additions & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use crate::trie::{
};
use borsh::{BorshDeserialize, BorshSerialize};
pub use columns::DBCol;
use db::GENESIS_CONGESTION_INFO_KEY;
pub use db::{
CHUNK_TAIL_KEY, COLD_HEAD_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY, GENESIS_JSON_HASH_KEY,
GENESIS_STATE_ROOTS_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
Expand All @@ -21,6 +22,7 @@ use metadata::{DbKind, DbVersion, KIND_KEY, VERSION_KEY};
use near_crypto::PublicKey;
use near_fmt::{AbbrBytes, StorageKey};
use near_primitives::account::{AccessKey, Account};
use near_primitives::congestion_info::CongestionInfo;
pub use near_primitives::errors::{MissingTrieValueContext, StorageError};
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::{
Expand Down Expand Up @@ -1011,6 +1013,10 @@ pub fn get_genesis_state_roots(store: &Store) -> io::Result<Option<Vec<StateRoot
store.get_ser::<Vec<StateRoot>>(DBCol::BlockMisc, GENESIS_STATE_ROOTS_KEY)
}

pub fn get_genesis_congestion_infos(store: &Store) -> io::Result<Option<Vec<CongestionInfo>>> {
store.get_ser::<Vec<CongestionInfo>>(DBCol::BlockMisc, GENESIS_CONGESTION_INFO_KEY)
}

pub fn get_genesis_hash(store: &Store) -> io::Result<Option<CryptoHash>> {
store.get_ser::<CryptoHash>(DBCol::BlockMisc, GENESIS_JSON_HASH_KEY)
}
Expand All @@ -1027,6 +1033,15 @@ pub fn set_genesis_state_roots(store_update: &mut StoreUpdate, genesis_roots: &[
.expect("Borsh cannot fail");
}

pub fn set_genesis_congestion_infos(
store_update: &mut StoreUpdate,
congestion_infos: &[CongestionInfo],
) {
store_update
.set_ser(DBCol::BlockMisc, GENESIS_CONGESTION_INFO_KEY, &congestion_infos)
.expect("Borsh cannot fail");
}

fn option_to_not_found<T, F>(res: io::Result<Option<T>>, field_name: F) -> io::Result<T>
where
F: std::string::ToString,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use near_async::time::Duration;
use near_chain::ChainStoreAccess;
use near_chain_configs::test_genesis::TestGenesisBuilder;
use near_client::Client;
use near_o11y::testonly::init_test_logger;
use near_primitives::types::AccountId;
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};

use crate::test_loop::builder::TestLoopBuilder;
use crate::test_loop::env::TestLoopEnv;
use crate::test_loop::utils::ONE_NEAR;

const NUM_SHARDS: usize = 4;

/// This test checks that the genesis congestion control info is saved into DB and not cleaned during GC,
/// so that client can use it to bootstrap the genesis congestion control info after restarting.
/// Restarting is the node is not checked here but in python/nayduck tests.
#[test]
fn test_congestion_control_genesis_bootstrap() {
if !ProtocolFeature::CongestionControl.enabled(PROTOCOL_VERSION) {
return;
}

init_test_logger();

let builder = TestLoopBuilder::new();

let initial_balance = 10000 * ONE_NEAR;
let accounts = ["test0", "test1"];
let clients: Vec<AccountId> = accounts.iter().map(|account| account.parse().unwrap()).collect();

let mut genesis_builder = TestGenesisBuilder::new();
genesis_builder
.genesis_time_from_clock(&builder.clock())
.protocol_version_latest()
.shard_layout_simple_v1(&["account3", "account5", "account7"])
.validators_desired_roles(&accounts[0..1], &accounts[1..2])
.minimum_validators_per_shard(1);

for i in 0..clients.len() {
genesis_builder.add_user_account_simple(clients[i].clone(), initial_balance);
}

let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } =
builder.genesis(genesis_builder.build()).clients(clients.clone()).build();

test_loop.run_for(Duration::seconds(5));

for i in 0..clients.len() {
check_genesis_congestion_info_in_store(
&mut test_loop.data.get_mut(&node_datas[i].client_sender.actor_handle()).client,
);
}

TestLoopEnv { test_loop, datas: node_datas, tempdir }
.shutdown_and_drain_remaining_events(Duration::seconds(20));
}

fn check_genesis_congestion_info_in_store(client: &mut Client) {
let gc_config = client.config.gc.clone();
client.chain.clear_data(&gc_config).unwrap();

let infos = near_store::get_genesis_congestion_infos(client.chain.chain_store().store())
.unwrap()
.unwrap();
assert_eq!(infos.len(), NUM_SHARDS);
for i in 0..NUM_SHARDS {
assert_eq!(infos[i].buffered_receipts_gas(), 0);
assert_eq!(infos[i].delayed_receipts_gas(), 0);
assert_eq!(infos[i].receipt_bytes(), 0);
}
}
1 change: 1 addition & 0 deletions integration-tests/src/test_loop/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod chunk_validator_kickout;
pub mod congestion_control;
pub mod congestion_control_genesis_bootstrap;
pub mod in_memory_tries;
pub mod multinode_stateless_validators;
pub mod multinode_test_loop_example;
Expand Down
5 changes: 2 additions & 3 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,8 @@ pytest sanity/slow_chunk.py --features nightly
# TODO(congestion_control) - enable pytest on stabilization
# pytest sanity/congestion_control.py
pytest sanity/congestion_control.py --features nightly
# TODO(#11702) Enable these after fixing the issue and stabilization.
# pytest sanity/congestion_control_genesis_bootstrap.py
# pytest sanity/congestion_control_genesis_bootstrap.py --features nightly
pytest sanity/congestion_control_genesis_bootstrap.py
pytest sanity/congestion_control_genesis_bootstrap.py --features nightly

# Tests the correct operation of the view client without using memtries (#11312).
pytest sanity/rpc_view_history.py
Expand Down
11 changes: 6 additions & 5 deletions pytest/tests/sanity/memtrie_disktrie_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ def test(self):
self.__restart_nodes(enable_memtries=False)
self.__random_workload_until(target_height)

# TODO(#11675): Fix MissingTrieValue error and re-enable this step of the test.
# target_height = self.__next_target_height(num_epochs=1)
# logger.info(f"Step 3: Restarting nodes with memtries enabled until height {target_height}")
# self.__restart_nodes(enable_memtries=True)
# self.__random_workload_until(target_height)
target_height = self.__next_target_height(num_epochs=1)
logger.info(
f"Step 3: Restarting nodes with memtries enabled until height {target_height}"
)
self.__restart_nodes(enable_memtries=True)
self.__random_workload_until(target_height)

self.__wait_for_txs(self.txs, assert_all_accepted=False)
logger.info("Test ended")
Expand Down

0 comments on commit 20af8fe

Please sign in to comment.