From b192923af2f7a0ff37fde2fe3a8ec5b96b9af9cd Mon Sep 17 00:00:00 2001 From: Trisfald Date: Fri, 16 Aug 2024 19:11:05 +0200 Subject: [PATCH] Revert "runtime: implement a receipt preparation pipeline (#11839)" This reverts commit 08cec9d66adfd8b6fbbf49a0b9f6b212df912e66. --- core/store/src/contract.rs | 52 --- core/store/src/lib.rs | 10 +- core/store/src/trie/update.rs | 28 +- .../src/tests/runtime/state_viewer.rs | 9 +- runtime/runtime/src/actions.rs | 102 +++--- runtime/runtime/src/ext.rs | 17 +- runtime/runtime/src/lib.rs | 112 ++---- runtime/runtime/src/pipelining.rs | 323 ------------------ runtime/runtime/src/state_viewer/mod.rs | 53 ++- 9 files changed, 161 insertions(+), 545 deletions(-) delete mode 100644 core/store/src/contract.rs delete mode 100644 runtime/runtime/src/pipelining.rs diff --git a/core/store/src/contract.rs b/core/store/src/contract.rs deleted file mode 100644 index b0f5ffca0a5..00000000000 --- a/core/store/src/contract.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::TrieStorage; -use near_primitives::hash::CryptoHash; -use near_vm_runner::ContractCode; -use std::collections::BTreeMap; -use std::sync::{Arc, Mutex}; - -/// Reads contract code from the trie by its hash. -/// -/// Cloning is cheap. -#[derive(Clone)] -pub struct ContractStorage { - storage: Arc, - - /// During an apply of a single chunk contracts may be deployed through the - /// `Action::DeployContract`. - /// - /// Unfortunately `TrieStorage` does not have a way to write to the underlying storage, and the - /// `TrieUpdate` will only write the contract once the whole transaction is committed at the - /// end of the chunk's apply. - /// - /// As a temporary work-around while we're still involving `Trie` for `ContractCode` storage, - /// we'll keep a list of such deployed contracts here. Once the contracts are no longer part of - /// The State this field should be removed, and the `Storage::store` function should be - /// adjusted to write out the contract into the relevant part of the database immediately - /// (without going through transactional storage operations and such). - uncommitted_deploys: Arc>>, -} - -impl ContractStorage { - pub fn new(storage: Arc) -> Self { - Self { storage, uncommitted_deploys: Default::default() } - } - - pub fn get(&self, code_hash: CryptoHash) -> Option { - { - let guard = self.uncommitted_deploys.lock().expect("no panics"); - if let Some(v) = guard.get(&code_hash) { - return Some(ContractCode::new(v.code().to_vec(), Some(code_hash))); - } - } - - match self.storage.retrieve_raw_bytes(&code_hash) { - Ok(raw_code) => Some(ContractCode::new(raw_code.to_vec(), Some(code_hash))), - Err(_) => None, - } - } - - pub fn store(&self, code: ContractCode) { - let mut guard = self.uncommitted_deploys.lock().expect("no panics"); - guard.insert(*code.hash(), code); - } -} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index e9b16d1b9ba..081b2010959 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -46,7 +46,6 @@ use strum; pub mod cold_storage; mod columns; pub mod config; -pub mod contract; pub mod db; pub mod flat; pub mod genesis; @@ -971,6 +970,15 @@ pub fn set_code(state_update: &mut TrieUpdate, account_id: AccountId, code: &Con state_update.set(TrieKey::ContractCode { account_id }, code.code().to_vec()); } +pub fn get_code( + trie: &dyn TrieAccess, + account_id: &AccountId, + code_hash: Option, +) -> Result, StorageError> { + let key = TrieKey::ContractCode { account_id: account_id.clone() }; + trie.get(&key).map(|opt| opt.map(|code| ContractCode::new(code, code_hash))) +} + /// Removes account, code and all access keys associated to it. pub fn remove_account( state_update: &mut TrieUpdate, diff --git a/core/store/src/trie/update.rs b/core/store/src/trie/update.rs index a6c102d4203..d5632e0a5b0 100644 --- a/core/store/src/trie/update.rs +++ b/core/store/src/trie/update.rs @@ -1,19 +1,41 @@ pub use self::iterator::TrieUpdateIterator; use super::accounting_cache::TrieAccountingCacheSwitch; use super::{OptimizedValueRef, Trie, TrieWithReadLock}; -use crate::contract::ContractStorage; use crate::trie::{KeyLookupMode, TrieChanges}; -use crate::StorageError; +use crate::{StorageError, TrieStorage}; use near_primitives::hash::CryptoHash; use near_primitives::trie_key::TrieKey; use near_primitives::types::{ AccountId, RawStateChange, RawStateChanges, RawStateChangesWithTrieKey, StateChangeCause, StateRoot, TrieCacheMode, }; +use near_vm_runner::ContractCode; use std::collections::BTreeMap; +use std::sync::Arc; mod iterator; +/// Reads contract code from the trie by its hash. +/// Currently, uses `TrieStorage`. Consider implementing separate logic for +/// requesting and compiling contracts, as any contract code read and +/// compilation is a major bottleneck during chunk execution. +struct ContractStorage { + storage: Arc, +} + +impl ContractStorage { + fn new(storage: Arc) -> Self { + Self { storage } + } + + pub fn get(&self, code_hash: CryptoHash) -> Option { + match self.storage.retrieve_raw_bytes(&code_hash) { + Ok(raw_code) => Some(ContractCode::new(raw_code.to_vec(), Some(code_hash))), + Err(_) => None, + } + } +} + /// Key-value update. Contains a TrieKey and a value. pub struct TrieKeyValueUpdate { pub trie_key: TrieKey, @@ -27,7 +49,7 @@ pub type TrieUpdates = BTreeMap, TrieKeyValueUpdate>; /// TODO (#7327): rename to StateUpdate pub struct TrieUpdate { pub trie: Trie, - pub contract_storage: ContractStorage, + contract_storage: ContractStorage, committed: RawStateChanges, prospective: TrieUpdates, } diff --git a/integration-tests/src/tests/runtime/state_viewer.rs b/integration-tests/src/tests/runtime/state_viewer.rs index 062429a89e1..8c71577aa72 100644 --- a/integration-tests/src/tests/runtime/state_viewer.rs +++ b/integration-tests/src/tests/runtime/state_viewer.rs @@ -1,12 +1,11 @@ use std::{collections::HashMap, io, sync::Arc}; use borsh::BorshDeserialize; -use near_vm_runner::ContractCode; use crate::runtime_utils::{get_runtime_and_trie, get_test_trie_viewer, TEST_SHARD_UID}; use near_primitives::{ account::Account, - hash::CryptoHash, + hash::{hash as sha256, CryptoHash}, serialize::to_base64, trie_key::trie_key_parsers, types::{AccountId, StateRoot}, @@ -375,14 +374,12 @@ fn test_view_state_with_large_contract() { let (_, tries, root) = get_runtime_and_trie(); let mut state_update = tries.new_trie_update(TEST_SHARD_UID, root); let contract_code = [0; Account::MAX_ACCOUNT_DELETION_STORAGE_USAGE as usize].to_vec(); - let code = ContractCode::new(contract_code, None); set_account( &mut state_update, alice_account(), - &Account::new(0, 0, 0, *code.hash(), 50_001, PROTOCOL_VERSION), + &Account::new(0, 0, 0, sha256(&contract_code), 50_001, PROTOCOL_VERSION), ); - // FIXME: this really should use the deploy action. - state_update.contract_storage.store(code); + state_update.set(TrieKey::ContractCode { account_id: alice_account() }, contract_code); let trie_viewer = TrieViewer::new(Some(50_000), None); let result = trie_viewer.view_state(&state_update, &alice_account(), b"", false); assert!(result.is_ok()); diff --git a/runtime/runtime/src/actions.rs b/runtime/runtime/src/actions.rs index ac6956298c0..6cc9807ed53 100644 --- a/runtime/runtime/src/actions.rs +++ b/runtime/runtime/src/actions.rs @@ -2,7 +2,7 @@ use crate::config::{ safe_add_compute, safe_add_gas, total_prepaid_exec_fees, total_prepaid_gas, total_prepaid_send_fees, }; -use crate::ext::{ExternalError, RuntimeExt}; +use crate::ext::{ExternalError, RuntimeContractExt, RuntimeExt}; use crate::receipt_manager::ReceiptManager; use crate::{metrics, ActionResult, ApplyState}; use near_crypto::PublicKey; @@ -30,15 +30,16 @@ use near_primitives::version::{ }; use near_primitives_core::account::id::AccountType; use near_store::{ - enqueue_promise_yield_timeout, get_access_key, get_promise_yield_indices, remove_access_key, - remove_account, set_access_key, set_code, set_promise_yield_indices, StorageError, TrieUpdate, + enqueue_promise_yield_timeout, get_access_key, get_code, get_promise_yield_indices, + remove_access_key, remove_account, set_access_key, set_code, set_promise_yield_indices, + StorageError, TrieUpdate, }; use near_vm_runner::logic::errors::{ CompilationError, FunctionCallError, InconsistentStateError, VMRunnerError, }; use near_vm_runner::logic::{VMContext, VMOutcome}; +use near_vm_runner::ContractCode; use near_vm_runner::{precompile_contract, PreparedContract}; -use near_vm_runner::{ContractCode, ContractRuntimeCache}; use near_wallet_contract::{wallet_contract, wallet_contract_magic_bytes}; use std::sync::Arc; @@ -160,6 +161,44 @@ pub(crate) fn execute_function_call( Ok(outcome) } +pub(crate) fn prepare_function_call( + state_update: &TrieUpdate, + apply_state: &ApplyState, + account: &Account, + account_id: &AccountId, + function_call: &FunctionCallAction, + config: &RuntimeConfig, + epoch_info_provider: &(dyn EpochInfoProvider), + view_config: Option, +) -> Box { + let max_gas_burnt = match view_config { + Some(ViewConfig { max_gas_burnt }) => max_gas_burnt, + None => config.wasm_config.limit_config.max_gas_burnt, + }; + let gas_counter = near_vm_runner::logic::GasCounter::new( + config.wasm_config.ext_costs.clone(), + max_gas_burnt, + config.wasm_config.regular_op_cost, + function_call.gas, + view_config.is_some(), + ); + let code_ext = RuntimeContractExt { + trie_update: state_update, + account_id, + account, + chain_id: &epoch_info_provider.chain_id(), + current_protocol_version: apply_state.current_protocol_version, + }; + let contract = near_vm_runner::prepare( + &code_ext, + Arc::clone(&config.wasm_config), + apply_state.cache.as_deref(), + gas_counter, + &function_call.method_name, + ); + contract +} + pub(crate) fn action_function_call( state_update: &mut TrieUpdate, apply_state: &ApplyState, @@ -606,12 +645,11 @@ pub(crate) fn action_deploy_contract( account: &mut Account, account_id: &AccountId, deploy_contract: &DeployContractAction, - config: Arc, - cache: Option<&dyn ContractRuntimeCache>, + apply_state: &ApplyState, ) -> Result<(), StorageError> { let _span = tracing::debug_span!(target: "runtime", "action_deploy_contract").entered(); let code = ContractCode::new(deploy_contract.code.clone(), None); - let prev_code = state_update.contract_storage.get(account.code_hash()); + let prev_code = get_code(state_update, account_id, Some(account.code_hash()))?; let prev_code_length = prev_code.map(|code| code.code().len() as u64).unwrap_or_default(); account.set_storage_usage(account.storage_usage().saturating_sub(prev_code_length)); account.set_storage_usage( @@ -623,20 +661,16 @@ pub(crate) fn action_deploy_contract( })?, ); account.set_code_hash(*code.hash()); - // Legacy: populate the mapping from `AccountId => sha256(code)` thus making contracts part of - // The State. For the time being we are also relying on the `TrieUpdate` to actually write the - // contracts into the storage as part of the commit routine, however no code should be relying - // that the contracts are written to The State. set_code(state_update, account_id.clone(), &code); - // Precompile the contract and store result (compiled code or error) in the contract runtime - // cache. - // Note, that contract compilation costs are already accounted in deploy cost using special - // logic in estimator (see get_runtime_config() function). - precompile_contract(&code, config, cache).ok(); - // Inform the `store::contract::Storage` about the new deploy (so that the `get` method can - // return the contract before the contract is written out to the underlying storage as part of - // the `TrieUpdate` commit.) - state_update.contract_storage.store(code); + // Precompile the contract and store result (compiled code or error) in the database. + // Note, that contract compilation costs are already accounted in deploy cost using + // special logic in estimator (see get_runtime_config() function). + precompile_contract( + &code, + Arc::clone(&apply_state.config.wasm_config), + apply_state.cache.as_deref(), + ) + .ok(); Ok(()) } @@ -653,7 +687,7 @@ pub(crate) fn action_delete_account( if current_protocol_version >= ProtocolFeature::DeleteActionRestriction.protocol_version() { let account = account.as_ref().unwrap(); let mut account_storage_usage = account.storage_usage(); - let contract_code = state_update.contract_storage.get(account.code_hash()); + let contract_code = get_code(state_update, account_id, Some(account.code_hash()))?; if let Some(code) = contract_code { // account storage usage should be larger than code size let code_len = code.code().len() as u64; @@ -1156,8 +1190,10 @@ mod tests { use near_primitives::action::delegate::NonDelegateAction; use near_primitives::congestion_info::BlockCongestionInfo; use near_primitives::errors::InvalidAccessKeyError; + use near_primitives::hash::hash; use near_primitives::runtime::migration_data::MigrationFlags; use near_primitives::transaction::CreateAccountAction; + use near_primitives::trie_key::TrieKey; use near_primitives::types::{EpochId, StateChangeCause}; use near_primitives_core::version::PROTOCOL_VERSION; use near_store::set_account; @@ -1319,25 +1355,11 @@ mod tests { let mut state_update = tries.new_trie_update(ShardUId::single_shard(), CryptoHash::default()); let account_id = "alice".parse::().unwrap(); - let deploy_action = DeployContractAction { code: [0; 10_000].to_vec() }; - let mut account = - Account::new(100, 0, 0, CryptoHash::default(), storage_usage, PROTOCOL_VERSION); - let apply_state = create_apply_state(0); - let res = action_deploy_contract( - &mut state_update, - &mut account, - &account_id, - &deploy_action, - Arc::clone(&apply_state.config.wasm_config), - None, - ); - assert!(res.is_ok()); - test_delete_large_account( - &account_id, - &account.code_hash(), - storage_usage, - &mut state_update, - ) + let trie_key = TrieKey::ContractCode { account_id: account_id.clone() }; + let empty_contract = [0; 10_000].to_vec(); + let contract_hash = hash(&empty_contract); + state_update.set(trie_key, empty_contract); + test_delete_large_account(&account_id, &contract_hash, storage_usage, &mut state_update) } #[test] diff --git a/runtime/runtime/src/ext.rs b/runtime/runtime/src/ext.rs index 2cef823b472..99b8c155d13 100644 --- a/runtime/runtime/src/ext.rs +++ b/runtime/runtime/src/ext.rs @@ -6,10 +6,9 @@ use near_primitives::checked_feature; use near_primitives::errors::{EpochError, StorageError}; use near_primitives::hash::CryptoHash; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; -use near_primitives::types::{AccountId, Balance, EpochId, EpochInfoProvider, Gas}; +use near_primitives::types::{AccountId, Balance, EpochId, EpochInfoProvider, Gas, TrieCacheMode}; use near_primitives::utils::create_receipt_id_from_action_hash; use near_primitives::version::ProtocolVersion; -use near_store::contract::ContractStorage; use near_store::{has_promise_yield_receipt, KeyLookupMode, TrieUpdate, TrieUpdateValuePtr}; use near_vm_runner::logic::errors::{AnyError, VMLogicError}; use near_vm_runner::logic::types::ReceiptIndex; @@ -363,21 +362,22 @@ impl<'a> External for RuntimeExt<'a> { } pub(crate) struct RuntimeContractExt<'a> { - pub(crate) storage: ContractStorage, + pub(crate) trie_update: &'a TrieUpdate, pub(crate) account_id: &'a AccountId, - pub(crate) code_hash: CryptoHash, + pub(crate) account: &'a Account, pub(crate) chain_id: &'a str, pub(crate) current_protocol_version: ProtocolVersion, } impl<'a> Contract for RuntimeContractExt<'a> { fn hash(&self) -> CryptoHash { - self.code_hash + self.account.code_hash() } fn get_code(&self) -> Option> { let account_id = self.account_id; let code_hash = self.hash(); + let version = self.current_protocol_version; let chain_id = self.chain_id; if checked_feature!("stable", EthImplicitAccounts, self.current_protocol_version) && account_id.get_account_type() == AccountType::EthImplicitAccount @@ -385,6 +385,11 @@ impl<'a> Contract for RuntimeContractExt<'a> { { return Some(wallet_contract(&chain_id)); } - self.storage.get(code_hash).map(Arc::new) + let mode = match checked_feature!("stable", ChunkNodesCache, version) { + true => Some(TrieCacheMode::CachingShard), + false => None, + }; + let _guard = self.trie_update.with_trie_cache_mode(mode); + self.trie_update.get_code(self.account_id.clone(), code_hash).map(Arc::new) } } diff --git a/runtime/runtime/src/lib.rs b/runtime/runtime/src/lib.rs index 7d86b770c4a..82e62a0be5f 100644 --- a/runtime/runtime/src/lib.rs +++ b/runtime/runtime/src/lib.rs @@ -63,7 +63,6 @@ pub use near_vm_runner::with_ext_cost_counter; use near_vm_runner::ContractCode; use near_vm_runner::ContractRuntimeCache; use near_vm_runner::ProfileDataV3; -use pipelining::ReceiptPreparationPipeline; use std::cmp::max; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; @@ -77,7 +76,6 @@ mod congestion_control; mod conversions; pub mod ext; mod metrics; -mod pipelining; mod prefetch; pub mod receipt_manager; pub mod state_viewer; @@ -369,7 +367,6 @@ impl Runtime { action: &Action, state_update: &mut TrieUpdate, apply_state: &ApplyState, - preparation_pipeline: &ReceiptPreparationPipeline, account: &mut Option, actor_id: &mut AccountId, receipt: &Receipt, @@ -435,16 +432,19 @@ impl Runtime { account.as_mut().expect(EXPECT_ACCOUNT_EXISTS), account_id, deploy_contract, - Arc::clone(&apply_state.config.wasm_config), - apply_state.cache.as_deref(), + apply_state, )?; } Action::FunctionCall(function_call) => { let account = account.as_mut().expect(EXPECT_ACCOUNT_EXISTS); - let contract = preparation_pipeline.get_contract( - receipt, - account.code_hash(), - action_index, + let contract = prepare_function_call( + state_update, + apply_state, + account, + account_id, + function_call, + &apply_state.config, + epoch_info_provider, None, ); let is_last_action = action_index + 1 == actions.len(); @@ -559,7 +559,6 @@ impl Runtime { &self, state_update: &mut TrieUpdate, apply_state: &ApplyState, - preparation_pipeline: &ReceiptPreparationPipeline, receipt: &Receipt, receipt_sink: &mut ReceiptSink, validator_proposals: &mut Vec, @@ -630,7 +629,6 @@ impl Runtime { action, state_update, apply_state, - preparation_pipeline, &mut account, &mut actor_id, receipt, @@ -978,19 +976,14 @@ impl Runtime { fn process_receipt( &self, - processing_state: &mut ApplyProcessingReceiptState, + state_update: &mut TrieUpdate, + apply_state: &ApplyState, receipt: &Receipt, receipt_sink: &mut ReceiptSink, validator_proposals: &mut Vec, + stats: &mut ApplyStats, + epoch_info_provider: &(dyn EpochInfoProvider), ) -> Result, RuntimeError> { - let ApplyProcessingReceiptState { - ref mut state_update, - apply_state, - epoch_info_provider, - ref pipeline_manager, - ref mut stats, - .. - } = *processing_state; let account_id = receipt.receiver_id(); match receipt.receipt() { ReceiptEnum::Data(ref data_receipt) => { @@ -1053,7 +1046,6 @@ impl Runtime { .apply_action_receipt( state_update, apply_state, - pipeline_manager, &ready_receipt, receipt_sink, validator_proposals, @@ -1108,7 +1100,6 @@ impl Runtime { .apply_action_receipt( state_update, apply_state, - pipeline_manager, receipt, receipt_sink, validator_proposals, @@ -1160,7 +1151,6 @@ impl Runtime { .apply_action_receipt( state_update, apply_state, - pipeline_manager, &yield_receipt, receipt_sink, validator_proposals, @@ -1586,22 +1576,24 @@ impl Runtime { compute_usage = tracing::field::Empty, ) .entered(); + let total = &mut processing_state.total; let state_update = &mut processing_state.state_update; let node_counter_before = state_update.trie().get_trie_nodes_count(); let recorded_storage_size_before = state_update.trie().recorded_storage_size(); let storage_proof_size_upper_bound_before = state_update.trie().recorded_storage_size_upper_bound(); let result = self.process_receipt( - processing_state, + state_update, + processing_state.apply_state, receipt, &mut receipt_sink, &mut validator_proposals, + &mut processing_state.stats, + processing_state.epoch_info_provider, ); - - let total = &mut processing_state.total; - let state_update = &mut processing_state.state_update; let node_counter_after = state_update.trie().get_trie_nodes_count(); tracing::trace!(target: "runtime", ?node_counter_before, ?node_counter_after); + let recorded_storage_diff = state_update .trie() .recorded_storage_size() @@ -1655,40 +1647,14 @@ impl Runtime { validator_proposals: &mut Vec, ) -> Result<(), RuntimeError> { let local_processing_start = std::time::Instant::now(); - let local_receipts = std::mem::take(&mut processing_state.local_receipts); - let local_receipt_count = local_receipts.len(); + let local_receipt_count = processing_state.local_receipts.len(); if let Some(prefetcher) = &mut processing_state.prefetcher { // Prefetcher is allowed to fail - let (front, back) = local_receipts.as_slices(); + let (front, back) = processing_state.local_receipts.as_slices(); _ = prefetcher.prefetch_receipts_data(front); _ = prefetcher.prefetch_receipts_data(back); } - - let mut prep_lookahead_iter = local_receipts.iter(); - let mut schedule_preparation = |pstate: &mut ApplyProcessingReceiptState| { - let scheduled_receipt_offset = prep_lookahead_iter.position(|peek| { - let account_id = peek.receiver_id(); - let receiver = get_account(&pstate.state_update, account_id); - let Ok(Some(receiver)) = receiver else { - tracing::error!( - target: "runtime", - message="unable to read receiver of an upcoming local receipt", - ?account_id, - receipt=%peek.get_hash() - ); - return false; - }; - // This returns `true` if work may have been scheduled (thus we currently prepare - // actions in at most 2 "interesting" receipts in parallel due to staggering.) - pstate.pipeline_manager.submit(peek, &receiver, None) - }); - scheduled_receipt_offset - }; - // Advance the preparation by one step (stagger it) so that we're preparing one interesting - // receipt in advance. - let mut next_schedule_index = schedule_preparation(&mut processing_state); - - for (index, receipt) in local_receipts.iter().enumerate() { + while let Some(receipt) = processing_state.next_local_receipt() { if processing_state.total.compute >= compute_limit || proof_size_limit.is_some_and(|limit| { processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit @@ -1700,15 +1666,6 @@ impl Runtime { &processing_state.apply_state.config, )?; } else { - if let Some(nsi) = next_schedule_index { - if index >= nsi { - // We're about to process a receipt that has been submitted for - // preparation, so lets submit the next one in anticipation that it might - // be processed too (it might also be not if we run out of gas/compute.) - next_schedule_index = schedule_preparation(&mut processing_state) - .and_then(|adv| nsi.checked_add(1)?.checked_add(adv)); - } - } // NOTE: We don't need to validate the local receipt, because it's just validated in // the `verify_and_charge_transaction`. self.process_receipt_with_metrics( @@ -2333,15 +2290,7 @@ impl<'a> ApplyProcessingState<'a> { incoming_receipts: &'a [Receipt], delayed_receipts: DelayedReceiptQueueWrapper, ) -> ApplyProcessingReceiptState<'a> { - let pipeline_manager = pipelining::ReceiptPreparationPipeline::new( - Arc::clone(&self.apply_state.config), - self.apply_state.cache.as_ref().map(|v| v.handle()), - self.epoch_info_provider.chain_id(), - self.apply_state.current_protocol_version, - self.state_update.contract_storage.clone(), - ); ApplyProcessingReceiptState { - pipeline_manager, protocol_version: self.protocol_version, apply_state: self.apply_state, prefetcher: self.prefetcher, @@ -2375,14 +2324,19 @@ struct ApplyProcessingReceiptState<'a> { local_receipts: VecDeque, incoming_receipts: &'a [Receipt], delayed_receipts: DelayedReceiptQueueWrapper, - pipeline_manager: pipelining::ReceiptPreparationPipeline, +} + +impl<'a> ApplyProcessingReceiptState<'a> { + /// Obtain the next receipt that should be executed. + fn next_local_receipt(&mut self) -> Option { + self.local_receipts.pop_front() + } } /// Interface provided for gas cost estimations. pub mod estimator { use super::{ReceiptSink, Runtime}; use crate::congestion_control::ReceiptSinkV2; - use crate::pipelining::ReceiptPreparationPipeline; use crate::{ApplyState, ApplyStats}; use near_primitives::congestion_info::CongestionInfo; use near_primitives::errors::RuntimeError; @@ -2414,17 +2368,9 @@ pub mod estimator { outgoing_buffers: ShardsOutgoingReceiptBuffer::load(&state_update.trie)?, outgoing_receipts, }); - let empty_pipeline = ReceiptPreparationPipeline::new( - std::sync::Arc::clone(&apply_state.config), - apply_state.cache.as_ref().map(|c| c.handle()), - epoch_info_provider.chain_id(), - apply_state.current_protocol_version, - state_update.contract_storage.clone(), - ); Runtime {}.apply_action_receipt( state_update, apply_state, - &empty_pipeline, receipt, &mut receipt_sink, validator_proposals, diff --git a/runtime/runtime/src/pipelining.rs b/runtime/runtime/src/pipelining.rs deleted file mode 100644 index c874766dc89..00000000000 --- a/runtime/runtime/src/pipelining.rs +++ /dev/null @@ -1,323 +0,0 @@ -#![allow(dead_code)] - -use crate::ext::RuntimeContractExt; -use near_parameters::RuntimeConfig; -use near_primitives::account::Account; -use near_primitives::action::Action; -use near_primitives::config::ViewConfig; -use near_primitives::hash::CryptoHash; -use near_primitives::receipt::{Receipt, ReceiptEnum}; -use near_primitives::types::{AccountId, Gas}; -use near_store::contract::ContractStorage; -use near_vm_runner::logic::{GasCounter, ProtocolVersion}; -use near_vm_runner::{ContractRuntimeCache, PreparedContract}; -use std::collections::{BTreeMap, BTreeSet}; -use std::sync::{Arc, Condvar, Mutex}; - -pub(crate) struct ReceiptPreparationPipeline { - /// Mapping from a Receipt's ID to a parallel "task" to prepare the receipt's data. - /// - /// The task itself may be run in the current thread, a separate thread or forced in any other - /// way. - map: BTreeMap>, - - /// List of Receipt receiver IDs that must not be prepared for this chunk. - /// - /// This solves an issue wherein the pipelining implementation only has access to the committed - /// storage (read: data as a result of applying the previous chunk,) and not the state that has - /// been built up as a result of processing the current chunk. One notable thing that may have - /// occurred there is a contract deployment. Once that happens, we can no longer get the - /// "current" contract code for the account. - /// - /// However, even if we had access to the transaction of the current chunk and were able to - /// access the new code, there's a risk of a race between when the deployment is executed - /// and when a parallel preparation may occur, leading back to needing to hold prefetching of - /// that account's contracts until the deployment is executed. - /// - /// As deployments are a relatively rare event, it is probably just fine to entirely disable - /// pipelining for the account in question for that particular block. This field implements - /// exactly that. - /// - /// In the future, however, it may make sense to either move the responsibility of executing - /// deployment actions to this pipelining thingy OR, even better, modify the protocol such that - /// contract deployments in block N only take effect in the block N+1 as that, among other - /// things, would give the runtime more time to compile the contract. - block_accounts: BTreeSet, - - /// The Runtime config for these pipelining requests. - config: Arc, - - /// The contract cache. - contract_cache: Option>, - - /// The chain ID being processed. - chain_id: String, - - /// Protocol version for this chunk. - protocol_version: u32, - - /// Storage for WASM code. - storage: ContractStorage, -} - -#[derive(PartialEq, Eq, PartialOrd, Ord)] -struct PrepareTaskKey { - receipt_id: CryptoHash, - action_index: usize, -} - -struct PrepareTask { - status: Mutex, - condvar: Condvar, -} - -enum PrepareTaskStatus { - Pending, - Working, - Prepared(Box), - Finished, -} - -impl ReceiptPreparationPipeline { - pub(crate) fn new( - config: Arc, - contract_cache: Option>, - chain_id: String, - protocol_version: u32, - storage: ContractStorage, - ) -> Self { - Self { - map: Default::default(), - block_accounts: Default::default(), - config, - contract_cache, - chain_id, - protocol_version, - storage, - } - } - - /// Submit a receipt to the "pipeline" for preparation of likely eventual execution. - /// - /// Note that not all receipts submitted here must be actually handled in some way. That said, - /// while it is perfectly fine to not use the results of submitted work (e.g. because a - /// applying a chunk ran out of gas or compute cost,) this work would eventually get lost, so - /// for the most part it is best to submit work with limited look-ahead. - /// - /// Returns `true` if the receipt is interesting and that pipelining has acted on it in some - /// way. Currently `true` is returned for any receipts containing `Action::DeployContract` (in - /// which case no further processing for the receiver account will be done), and - /// `Action::FunctionCall` (provided the account has not been blocked.) - pub(crate) fn submit( - &mut self, - receipt: &Receipt, - account: &Account, - view_config: Option, - ) -> bool { - let account_id = receipt.receiver_id(); - if self.block_accounts.contains(account_id) { - return false; - } - let actions = match receipt.receipt() { - ReceiptEnum::Action(a) | ReceiptEnum::PromiseYield(a) => &a.actions, - ReceiptEnum::Data(_) | ReceiptEnum::PromiseResume(_) => return false, - }; - let mut any_function_calls = false; - for (action_index, action) in actions.iter().enumerate() { - let account_id = account_id.clone(); - match action { - Action::DeployContract(_) => { - // FIXME: instead of blocking these accounts, move the handling of - // deploy action into here, so that the necessary data dependencies can be - // established. - return self.block_accounts.insert(account_id); - } - Action::FunctionCall(function_call) => { - let key = PrepareTaskKey { receipt_id: receipt.get_hash(), action_index }; - let gas_counter = self.gas_counter(view_config.as_ref(), function_call.gas); - let entry = match self.map.entry(key) { - std::collections::btree_map::Entry::Vacant(v) => v, - // Already been submitted. - // TODO: Warning? - std::collections::btree_map::Entry::Occupied(_) => continue, - }; - let config = Arc::clone(&self.config.wasm_config); - let cache = self.contract_cache.as_ref().map(|c| c.handle()); - let storage = self.storage.clone(); - let chain_id = self.chain_id.clone(); - let protocol_version = self.protocol_version; - let code_hash = account.code_hash(); - let status = Mutex::new(PrepareTaskStatus::Pending); - let task = Arc::new(PrepareTask { status, condvar: Condvar::new() }); - let method_name = function_call.method_name.clone(); - entry.insert(Arc::clone(&task)); - // FIXME: don't spawn all tasks at once. We want to keep some capacity for - // other things and also to control (in a way) the concurrency here. - rayon::spawn_fifo(move || { - let task_status = { - let mut status = task.status.lock().expect("mutex lock"); - std::mem::replace(&mut *status, PrepareTaskStatus::Working) - }; - match &task_status { - PrepareTaskStatus::Pending => {} - PrepareTaskStatus::Working => return, - // TODO: seeing Prepared here may mean there's double spawning for the - // same receipt index. Maybe output a warning? - PrepareTaskStatus::Prepared(..) => return, - PrepareTaskStatus::Finished => return, - }; - let contract = prepare_function_call( - &storage, - cache.as_deref(), - &chain_id, - protocol_version, - config, - gas_counter, - code_hash, - &account_id, - &method_name, - ); - - let mut status = task.status.lock().expect("mutex lock"); - *status = PrepareTaskStatus::Prepared(contract); - task.condvar.notify_all(); - }); - any_function_calls = true; - } - // No need to handle this receipt as it only generates other new receipts. - Action::Delegate(_) => {} - // No handling for these. - Action::CreateAccount(_) - | Action::Transfer(_) - | Action::Stake(_) - | Action::AddKey(_) - | Action::DeleteKey(_) - | Action::DeleteAccount(_) => {} - #[cfg(feature = "protocol_feature_nonrefundable_transfer_nep491")] - Action::NonrefundableStorageTransfer(_) => {} - } - } - return any_function_calls; - } - - /// Obtain the prepared contract for the provided receipt. - /// - /// If the contract is currently being prepared this function will block waiting for the - /// preparation to complete. - /// - /// If the preparation hasn't been started yet (either because it hasn't been scheduled for any - /// reason, or because the pipeline didn't make it in time), this function will prepare the - /// contract in the calling thread. - pub(crate) fn get_contract( - &self, - receipt: &Receipt, - code_hash: CryptoHash, - action_index: usize, - view_config: Option, - ) -> Box { - let account_id = receipt.receiver_id(); - let action = match receipt.receipt() { - ReceiptEnum::Action(r) | ReceiptEnum::PromiseYield(r) => r - .actions - .get(action_index) - .expect("indexing receipt actions by an action_index failed!"), - ReceiptEnum::Data(_) | ReceiptEnum::PromiseResume(_) => { - panic!("attempting to get_contract with a non-action receipt!?") - } - }; - let Action::FunctionCall(function_call) = action else { - panic!("referenced receipt action is not a function call!"); - }; - let key = PrepareTaskKey { receipt_id: receipt.get_hash(), action_index }; - let Some(task) = self.map.get(&key) else { - let gas_counter = self.gas_counter(view_config.as_ref(), function_call.gas); - return prepare_function_call( - &self.storage, - self.contract_cache.as_deref(), - &self.chain_id, - self.protocol_version, - Arc::clone(&self.config.wasm_config), - gas_counter, - code_hash, - &account_id, - &function_call.method_name, - ); - }; - let mut status_guard = task.status.lock().unwrap(); - loop { - let current = std::mem::replace(&mut *status_guard, PrepareTaskStatus::Working); - match current { - PrepareTaskStatus::Pending => { - *status_guard = PrepareTaskStatus::Finished; - drop(status_guard); - - let gas_counter = self.gas_counter(view_config.as_ref(), function_call.gas); - let contract = prepare_function_call( - &self.storage, - self.contract_cache.as_deref(), - &self.chain_id, - self.protocol_version, - Arc::clone(&self.config.wasm_config), - gas_counter, - code_hash, - &account_id, - &function_call.method_name, - ); - return contract; - } - PrepareTaskStatus::Working => { - status_guard = task.condvar.wait(status_guard).unwrap(); - continue; - } - PrepareTaskStatus::Prepared(c) => { - *status_guard = PrepareTaskStatus::Finished; - return c; - } - PrepareTaskStatus::Finished => { - *status_guard = PrepareTaskStatus::Finished; - // Don't poison the lock. - drop(status_guard); - panic!("attempting to get_contract that has already been taken"); - } - } - } - } - - fn gas_counter(&self, view_config: Option<&ViewConfig>, gas: Gas) -> GasCounter { - let max_gas_burnt = match view_config { - Some(ViewConfig { max_gas_burnt }) => *max_gas_burnt, - None => self.config.wasm_config.limit_config.max_gas_burnt, - }; - GasCounter::new( - self.config.wasm_config.ext_costs.clone(), - max_gas_burnt, - self.config.wasm_config.regular_op_cost, - gas, - view_config.is_some(), - ) - } -} - -fn prepare_function_call( - contract_storage: &ContractStorage, - cache: Option<&dyn ContractRuntimeCache>, - - chain_id: &str, - protocol_version: ProtocolVersion, - config: Arc, - gas_counter: GasCounter, - - code_hash: CryptoHash, - account_id: &AccountId, - method_name: &str, -) -> Box { - let code_ext = RuntimeContractExt { - storage: contract_storage.clone(), - account_id, - code_hash, - chain_id, - current_protocol_version: protocol_version, - }; - let contract = near_vm_runner::prepare(&code_ext, config, cache, gas_counter, method_name); - contract -} diff --git a/runtime/runtime/src/state_viewer/mod.rs b/runtime/runtime/src/state_viewer/mod.rs index b1f51eff5d4..4ba62c44dea 100644 --- a/runtime/runtime/src/state_viewer/mod.rs +++ b/runtime/runtime/src/state_viewer/mod.rs @@ -1,14 +1,13 @@ use crate::actions::execute_function_call; use crate::ext::RuntimeExt; -use crate::pipelining::ReceiptPreparationPipeline; use crate::receipt_manager::ReceiptManager; -use crate::ApplyState; +use crate::{prepare_function_call, ApplyState}; use near_crypto::{KeyType, PublicKey}; use near_parameters::RuntimeConfigStore; use near_primitives::account::{AccessKey, Account}; use near_primitives::borsh::BorshDeserialize; use near_primitives::hash::CryptoHash; -use near_primitives::receipt::{ActionReceipt, Receipt, ReceiptEnum, ReceiptV1}; +use near_primitives::receipt::ActionReceipt; use near_primitives::runtime::migration_data::{MigrationData, MigrationFlags}; use near_primitives::transaction::FunctionCallAction; use near_primitives::trie_key::trie_key_parsers; @@ -18,7 +17,7 @@ use near_primitives::types::{ use near_primitives::version::PROTOCOL_VERSION; use near_primitives::views::{StateItem, ViewStateResult}; use near_primitives_core::config::ViewConfig; -use near_store::{get_access_key, get_account, TrieUpdate}; +use near_store::{get_access_key, get_account, get_code, TrieUpdate}; use near_vm_runner::logic::{ProtocolVersion, ReturnData}; use near_vm_runner::{ContractCode, ContractRuntimeCache}; use std::{str, sync::Arc, time::Instant}; @@ -90,7 +89,7 @@ impl TrieViewer { account_id: &AccountId, ) -> Result { let account = self.view_account(state_update, account_id)?; - state_update.contract_storage.get(account.code_hash()).ok_or_else(|| { + get_code(state_update, account_id, Some(account.code_hash()))?.ok_or_else(|| { errors::ViewContractCodeError::NoContractCode { contract_account_id: account_id.clone(), } @@ -147,9 +146,7 @@ impl TrieViewer { ) -> Result { match get_account(state_update, account_id)? { Some(account) => { - let code_len = state_update - .contract_storage - .get(account.code_hash()) + let code_len = get_code(state_update, account_id, Some(account.code_hash()))? .map(|c| c.code().len() as u64) .unwrap_or_default(); if let Some(limit) = self.state_size_limit { @@ -226,37 +223,31 @@ impl TrieViewer { migration_flags: MigrationFlags::default(), congestion_info: Default::default(), }; - let function_call = FunctionCallAction { - method_name: method_name.to_string(), - args: args.to_vec(), - gas: self.max_gas_burnt_view, - deposit: 0, - }; let action_receipt = ActionReceipt { signer_id: originator_id.clone(), signer_public_key: public_key, gas_price: 0, output_data_receivers: vec![], input_data_ids: vec![], - actions: vec![function_call.clone().into()], + actions: vec![], + }; + let function_call = FunctionCallAction { + method_name: method_name.to_string(), + args: args.to_vec(), + gas: self.max_gas_burnt_view, + deposit: 0, }; - let receipt = Receipt::V1(ReceiptV1 { - predecessor_id: contract_id.clone(), - receiver_id: contract_id.clone(), - receipt_id: empty_hash, - receipt: ReceiptEnum::Action(action_receipt.clone()), - priority: 0, - }); - let pipeline = ReceiptPreparationPipeline::new( - Arc::clone(config), - apply_state.cache.as_ref().map(|v| v.handle()), - epoch_info_provider.chain_id(), - apply_state.current_protocol_version, - state_update.contract_storage.clone(), - ); let view_config = Some(ViewConfig { max_gas_burnt: self.max_gas_burnt_view }); - let contract = pipeline.get_contract(&receipt, account.code_hash(), 0, view_config.clone()); - + let contract = prepare_function_call( + &state_update, + &apply_state, + &account, + &contract_id, + &function_call, + config, + epoch_info_provider, + view_config.clone(), + ); let mut runtime_ext = RuntimeExt::new( &mut state_update, &mut receipt_manager,