Skip to content

Commit

Permalink
feat: stateless validation jobs in test mode (#10248)
Browse files Browse the repository at this point in the history
This is a next step for #9982.
Here I introduce jobs which will perform stateless validation of newly
received chunk by executing txs and receipts.
Later they should be executed against state witness, but for now I just
set a foundation by running these jobs against state data in DB. All
passing tests verify that old and new jobs generate the same result.
The final switch will happen when stateful jobs will be replaced with
stateless ones.

### Details

This doesn't introduce any load on stable version. On nightly version
there will be `num_shards` extra jobs which will check that stateless
validation results are consistent with stateful execution. But as we use
nightly only for testing, it shouldn't mean much overhead.

I add more fields to `ShardContext` structure to simplify code. Some of
them are needed to break early if there is resharding, and the logic is
the same for both kinds of jobs.

`StorageDataSource::DbTrieOnly` is introduced to read data only from
trie in stateless jobs. This is annoying but still needed if there are a
lot of missing chunks and flat storage head moved above the block at
which previous chunk was created. When state witness will be
implemented, `Recorded` will be used instead.

## Testing

* Failure to update current_chunk_extra on the way leads to >20 tests
failing in process_blocks, with errors like `assertion `left == right`
failed: For stateless validation, chunk extras for block
CMV88CBcnKoxa7eTnkG64psLoJzpW9JeAhFrZBVv6zDc and shard s3.v2 do not
match...`
* If I update current_chunk_extra only once,
`tests::client::resharding::test_latest_protocol_missing_chunks_high_missing_prob`
fails which was specifically introduced for that. Actually this helped
to realize that `validate_chunk_with_chunk_extra` is still needed but I
will introduce it later.
* Nayduck: ~https://nayduck.near.org/#/run/3293 - +10 nightly tests
failing, will take a look~ https://nayduck.near.org/#/run/3300

---------

Co-authored-by: Longarithm <the.aleksandr.logunov@gmail.com>
  • Loading branch information
Longarithm and Longarithm authored Dec 12, 2023
1 parent 5ce0d8a commit 5595df6
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 114 deletions.
496 changes: 413 additions & 83 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,14 @@ impl ChainGenesis {
}

pub enum StorageDataSource {
/// Full state data is present in DB.
Db,
/// Trie is present in DB and flat storage is not.
/// Used for testing stateless validation jobs, should be removed after
/// stateless validation release.
DbTrieOnly,
/// State data is supplied from state witness, there is no state data
/// stored on disk.
Recorded(PartialStorage),
}

Expand Down
79 changes: 52 additions & 27 deletions chain/chain/src/update_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::crypto_hash_timer::CryptoHashTimer;
use crate::types::{
ApplySplitStateResult, ApplySplitStateResultOrStateChanges, ApplyTransactionResult,
ApplyTransactionsBlockContext, ApplyTransactionsChunkContext, RuntimeAdapter,
RuntimeStorageConfig,
RuntimeStorageConfig, StorageDataSource,
};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -45,8 +45,20 @@ pub struct StateSplitResult {
pub(crate) results: Vec<ApplySplitStateResult>,
}

/// Result of processing shard update, covering both stateful and stateless scenarios.
#[derive(Debug)]
pub enum ApplyChunkResult {
pub enum ShardUpdateResult {
/// Stateful scenario - processed update for a single block.
Stateful(ShardBlockUpdateResult),
/// Stateless scenario - processed update based on state witness in a chunk.
/// Contains `ChunkExtra`s - results for processing updates corresponding
/// to state witness.
Stateless(Vec<(CryptoHash, ShardUId, ChunkExtra)>),
}

/// Result for a shard update for a single block.
#[derive(Debug)]
pub enum ShardBlockUpdateResult {
NewChunk(NewChunkResult),
OldChunk(OldChunkResult),
StateSplit(StateSplitResult),
Expand All @@ -61,12 +73,14 @@ pub(crate) struct NewChunkData {
pub split_state_roots: Option<SplitStateRoots>,
pub block: ApplyTransactionsBlockContext,
pub is_first_block_with_chunk_of_version: bool,
pub storage_context: StorageContext,
}

pub(crate) struct OldChunkData {
pub prev_chunk_extra: ChunkExtra,
pub split_state_roots: Option<SplitStateRoots>,
pub block: ApplyTransactionsBlockContext,
pub storage_context: StorageContext,
}

pub(crate) struct StateSplitData {
Expand Down Expand Up @@ -94,8 +108,21 @@ pub(crate) enum ShardUpdateReason {
/// Information about shard to update.
pub(crate) struct ShardContext {
pub shard_uid: ShardUId,
/// Whether node cares about shard in this epoch.
pub cares_about_shard_this_epoch: bool,
/// Whether shard layout changes in the next epoch.
pub will_shard_layout_change: bool,
/// Whether transactions should be applied.
pub should_apply_transactions: bool,
/// See comment in `get_update_shard_job`.
pub need_to_split_states: bool,
}

/// Information about storage used for applying txs and receipts.
pub(crate) struct StorageContext {
/// Data source used for processing shard update.
pub storage_data_source: StorageDataSource,
pub state_patch: SandboxStatePatch,
}

/// Processes shard update with given block and shard.
Expand All @@ -106,14 +133,13 @@ pub(crate) fn process_shard_update(
epoch_manager: &dyn EpochManagerAdapter,
shard_update_reason: ShardUpdateReason,
shard_context: ShardContext,
state_patch: SandboxStatePatch,
) -> Result<ApplyChunkResult, Error> {
) -> Result<ShardBlockUpdateResult, Error> {
match shard_update_reason {
ShardUpdateReason::NewChunk(data) => {
apply_new_chunk(parent_span, data, shard_context, state_patch, runtime, epoch_manager)
apply_new_chunk(parent_span, data, shard_context, runtime, epoch_manager)
}
ShardUpdateReason::OldChunk(data) => {
apply_old_chunk(parent_span, data, shard_context, state_patch, runtime, epoch_manager)
apply_old_chunk(parent_span, data, shard_context, runtime, epoch_manager)
}
ShardUpdateReason::StateSplit(data) => {
apply_state_split(parent_span, data, shard_context.shard_uid, runtime, epoch_manager)
Expand All @@ -126,19 +152,19 @@ pub(crate) fn process_shard_update(
fn apply_new_chunk(
parent_span: &tracing::Span,
data: NewChunkData,
shard_info: ShardContext,
state_patch: SandboxStatePatch,
shard_context: ShardContext,
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<ApplyChunkResult, Error> {
) -> Result<ShardBlockUpdateResult, Error> {
let NewChunkData {
block,
chunk,
receipts,
split_state_roots,
is_first_block_with_chunk_of_version,
storage_context,
} = data;
let shard_id = shard_info.shard_uid.shard_id();
let shard_id = shard_context.shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
Expand All @@ -152,8 +178,8 @@ fn apply_new_chunk(
let storage_config = RuntimeStorageConfig {
state_root: *chunk_inner.prev_state_root(),
use_flat_storage: true,
source: crate::types::StorageDataSource::Db,
state_patch,
source: storage_context.storage_data_source,
state_patch: storage_context.state_patch,
record_storage: false,
};
match runtime.apply_transactions(
Expand All @@ -170,7 +196,7 @@ fn apply_new_chunk(
chunk.transactions(),
) {
Ok(apply_result) => {
let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change {
let apply_split_result_or_state_changes = if shard_context.will_shard_layout_change {
Some(apply_split_state_changes(
epoch_manager,
runtime,
Expand All @@ -181,9 +207,9 @@ fn apply_new_chunk(
} else {
None
};
Ok(ApplyChunkResult::NewChunk(NewChunkResult {
Ok(ShardBlockUpdateResult::NewChunk(NewChunkResult {
gas_limit,
shard_uid: shard_info.shard_uid,
shard_uid: shard_context.shard_uid,
apply_result,
apply_split_result_or_state_changes,
}))
Expand All @@ -198,13 +224,12 @@ fn apply_new_chunk(
fn apply_old_chunk(
parent_span: &tracing::Span,
data: OldChunkData,
shard_info: ShardContext,
state_patch: SandboxStatePatch,
shard_context: ShardContext,
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<ApplyChunkResult, Error> {
let OldChunkData { prev_chunk_extra, split_state_roots, block } = data;
let shard_id = shard_info.shard_uid.shard_id();
) -> Result<ShardBlockUpdateResult, Error> {
let OldChunkData { prev_chunk_extra, split_state_roots, block, storage_context } = data;
let shard_id = shard_context.shard_uid.shard_id();
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
Expand All @@ -215,8 +240,8 @@ fn apply_old_chunk(
let storage_config = RuntimeStorageConfig {
state_root: *prev_chunk_extra.state_root(),
use_flat_storage: true,
source: crate::types::StorageDataSource::Db,
state_patch,
source: storage_context.storage_data_source,
state_patch: storage_context.state_patch,
record_storage: false,
};
match runtime.apply_transactions(
Expand All @@ -233,7 +258,7 @@ fn apply_old_chunk(
&[],
) {
Ok(apply_result) => {
let apply_split_result_or_state_changes = if shard_info.will_shard_layout_change {
let apply_split_result_or_state_changes = if shard_context.will_shard_layout_change {
Some(apply_split_state_changes(
epoch_manager,
runtime,
Expand All @@ -244,8 +269,8 @@ fn apply_old_chunk(
} else {
None
};
Ok(ApplyChunkResult::OldChunk(OldChunkResult {
shard_uid: shard_info.shard_uid,
Ok(ShardBlockUpdateResult::OldChunk(OldChunkResult {
shard_uid: shard_context.shard_uid,
apply_result,
apply_split_result_or_state_changes,
}))
Expand All @@ -261,7 +286,7 @@ fn apply_state_split(
shard_uid: ShardUId,
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<ApplyChunkResult, Error> {
) -> Result<ShardBlockUpdateResult, Error> {
let StateSplitData { split_state_roots, state_changes, block_height: height, block_hash } =
data;
let shard_id = shard_uid.shard_id();
Expand All @@ -281,7 +306,7 @@ fn apply_state_split(
&next_epoch_shard_layout,
state_changes,
)?;
Ok(ApplyChunkResult::StateSplit(StateSplitResult { shard_uid, results }))
Ok(ShardBlockUpdateResult::StateSplit(StateSplitResult { shard_uid, results }))
}

/// Process ApplyTransactionResult to apply changes to split states
Expand Down
5 changes: 4 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{metrics, StatusResponse, SyncAdapter};
use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler};
use actix_rt::ArbiterHandle;
use chrono::{DateTime, Utc};
use itertools::Itertools;
use near_async::messaging::{CanSend, Sender};
use near_chain::chain::{
ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest, BlockCatchUpResponse,
Expand Down Expand Up @@ -1889,7 +1890,9 @@ impl Handler<WithSpanContext<BlockCatchUpResponse>> for ClientActor {
self.client.catchup_state_syncs.get_mut(&msg.sync_hash)
{
assert!(blocks_catch_up_state.scheduled_blocks.remove(&msg.block_hash));
blocks_catch_up_state.processed_blocks.insert(msg.block_hash, msg.results);
blocks_catch_up_state
.processed_blocks
.insert(msg.block_hash, msg.results.into_iter().map(|res| res.1).collect_vec());
} else {
panic!("block catch up processing result from unknown sync hash");
}
Expand Down
6 changes: 5 additions & 1 deletion chain/client/src/test_utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, RwLock};

use crate::Client;
use actix_rt::{Arbiter, System};
use itertools::Itertools;
use near_chain::chain::{do_apply_chunks, BlockCatchUpRequest};
use near_chain::resharding::StateSplitRequest;
use near_chain::test_utils::{wait_for_all_blocks_in_processing, wait_for_block_in_processing};
Expand Down Expand Up @@ -242,7 +243,10 @@ pub fn run_catchup(
)?;
let mut catchup_done = true;
for msg in block_messages.write().unwrap().drain(..) {
let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work);
let results = do_apply_chunks(msg.block_hash, msg.block_height, msg.work)
.into_iter()
.map(|res| res.1)
.collect_vec();
if let Some((_, _, blocks_catch_up_state)) =
client.catchup_state_syncs.get_mut(&msg.sync_hash)
{
Expand Down
6 changes: 6 additions & 0 deletions core/primitives/src/challenge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ pub enum PartialState {
TrieValues(Vec<TrieValue>),
}

impl Default for PartialState {
fn default() -> Self {
PartialState::TrieValues(vec![])
}
}

impl PartialState {
pub fn len(&self) -> usize {
let Self::TrieValues(values) = self;
Expand Down
7 changes: 6 additions & 1 deletion core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub mod update;
const POISONED_LOCK_ERR: &str = "The lock was poisoned.";

/// For fraud proofs
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PartialStorage {
pub nodes: PartialState,
}
Expand Down Expand Up @@ -649,6 +649,11 @@ impl Trie {
}
}

/// Temporary helper, must be removed after stateless validation release.
pub fn dont_charge_gas_for_trie_node_access(&mut self) {
self.charge_gas_for_trie_node_access = false;
}

/// Makes a new trie that has everything the same except that access
/// through that trie accumulates a state proof for all nodes accessed.
pub fn recording_reads(&self) -> Self {
Expand Down
8 changes: 7 additions & 1 deletion integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use near_async::messaging::IntoSender;
use near_chain::chain::ApplyStatePartsRequest;
use near_chain::test_utils::ValidatorSchedule;
use near_chain::types::{LatestKnown, RuntimeAdapter};
#[cfg(not(feature = "nightly"))]
use near_chain::validate::validate_chunk_with_chunk_extra;
#[cfg(not(feature = "nightly"))]
use near_chain::ChainStore;
use near_chain::{
Block, BlockProcessingArtifact, ChainGenesis, ChainStore, ChainStoreAccess, Error, Provenance,
Block, BlockProcessingArtifact, ChainGenesis, ChainStoreAccess, Error, Provenance,
};
use near_chain_configs::{Genesis, DEFAULT_GC_NUM_EPOCHS_TO_KEEP};
use near_chunks::test_utils::MockClientAdapterForShardsManager;
Expand Down Expand Up @@ -2252,7 +2255,10 @@ fn test_block_height_processed_orphan() {
assert!(env.clients[0].chain.mut_store().is_height_processed(block_height).unwrap());
}

// Disabled until stateless validation release, because the test relies on
// logging which is impacted by the release process.
#[test]
#[cfg(not(feature = "nightly"))]
fn test_validate_chunk_extra() {
let mut capture = near_o11y::testonly::TracingCapture::enable();

Expand Down
12 changes: 12 additions & 0 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,18 @@ impl RuntimeAdapter for NightshadeRuntime {
storage_config.state_root,
storage_config.use_flat_storage,
)?,
StorageDataSource::DbTrieOnly => {
// If there is no flat storage on disk, use trie but simulate costs with enabled
// flat storage by not charging gas for trie nodes.
let mut trie = self.get_trie_for_shard(
shard_id,
&block.prev_block_hash,
storage_config.state_root,
false,
)?;
trie.dont_charge_gas_for_trie_node_access();
trie
}
StorageDataSource::Recorded(storage) => Trie::from_recorded_storage(
storage,
storage_config.state_root,
Expand Down

0 comments on commit 5595df6

Please sign in to comment.