Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: implement a receipt preparation pipeline #11839

Merged
merged 15 commits into from
Aug 7, 2024
4 changes: 2 additions & 2 deletions runtime/runtime/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::{
safe_add_compute, safe_add_gas, total_prepaid_exec_fees, total_prepaid_gas,
total_prepaid_send_fees,
};
use crate::ext::{ExternalError, RuntimeContractExt, RuntimeExt};
use crate::ext::{ContractStorage, ExternalError, RuntimeContractExt, RuntimeExt};
use crate::receipt_manager::ReceiptManager;
use crate::{metrics, ActionResult, ApplyState};
use near_crypto::PublicKey;
Expand Down Expand Up @@ -183,7 +183,7 @@ pub(crate) fn prepare_function_call(
view_config.is_some(),
);
let code_ext = RuntimeContractExt {
trie_update: state_update,
storage: ContractStorage::Transaction(state_update),
account_id,
account,
chain_id: &epoch_info_provider.chain_id(),
Expand Down
31 changes: 27 additions & 4 deletions runtime/runtime/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::{AccountId, Balance, EpochId, EpochInfoProvider, Gas, TrieCacheMode};
use near_primitives::utils::create_receipt_id_from_action_hash;
use near_primitives::version::ProtocolVersion;
use near_store::{has_promise_yield_receipt, KeyLookupMode, TrieUpdate, TrieUpdateValuePtr};
use near_store::{
has_promise_yield_receipt, KeyLookupMode, TrieStorage, TrieUpdate, TrieUpdateValuePtr,
};
use near_vm_runner::logic::errors::{AnyError, VMLogicError};
use near_vm_runner::logic::types::ReceiptIndex;
use near_vm_runner::logic::{External, StorageGetMode, ValuePtr};
Expand Down Expand Up @@ -361,8 +363,19 @@ impl<'a> External for RuntimeExt<'a> {
}
}

pub(crate) enum ContractStorage<'a> {
/// Access the transaction first.
Transaction(&'a TrieUpdate),
/// Access the storage layer directly, forgoing the transaction.
///
/// When using this, care must be taken to not accidentally access outdated contract code (i.e.
/// old code for an account that has deployed a contract but for which the code has not been
/// committed yet.)
DB(&'a dyn TrieStorage),
}

pub(crate) struct RuntimeContractExt<'a> {
pub(crate) trie_update: &'a TrieUpdate,
pub(crate) storage: ContractStorage<'a>,
pub(crate) account_id: &'a AccountId,
pub(crate) account: &'a Account,
pub(crate) chain_id: &'a str,
Expand All @@ -389,7 +402,17 @@ impl<'a> Contract for RuntimeContractExt<'a> {
true => Some(TrieCacheMode::CachingShard),
false => None,
};
let _guard = self.trie_update.with_trie_cache_mode(mode);
self.trie_update.get_code(self.account_id.clone(), code_hash).map(Arc::new)
match self.storage {
ContractStorage::Transaction(trie_update) => {
let _guard = trie_update.with_trie_cache_mode(mode);
trie_update.get_code(self.account_id.clone(), code_hash).map(Arc::new)
}
ContractStorage::DB(db) => match db.retrieve_raw_bytes(&code_hash) {
Ok(raw_code) => {
Some(Arc::new(ContractCode::new(raw_code.to_vec(), Some(code_hash))))
}
Err(_) => None,
},
}
}
}
37 changes: 26 additions & 11 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod prefetch;
pub mod receipt_manager;
pub mod state_viewer;
mod verifier;
mod pipelining;

const EXPECT_ACCOUNT_EXISTS: &str = "account exists, checked above";

Expand Down Expand Up @@ -974,14 +975,18 @@ impl Runtime {

fn process_receipt(
&self,
state_update: &mut TrieUpdate,
apply_state: &ApplyState,
processing_state: &mut ApplyProcessingReceiptState,
receipt: &Receipt,
receipt_sink: &mut ReceiptSink,
validator_proposals: &mut Vec<ValidatorStake>,
stats: &mut ApplyStats,
epoch_info_provider: &(dyn EpochInfoProvider),
) -> Result<Option<ExecutionOutcomeWithId>, RuntimeError> {
let ApplyProcessingReceiptState {
ref mut state_update,
apply_state,
epoch_info_provider,
ref mut stats,
..
} = *processing_state;
let account_id = receipt.receiver_id();
match receipt.receipt() {
ReceiptEnum::Data(ref data_receipt) => {
Expand Down Expand Up @@ -1574,24 +1579,22 @@ impl Runtime {
compute_usage = tracing::field::Empty,
)
.entered();
let total = &mut processing_state.total;
let state_update = &mut processing_state.state_update;
let node_counter_before = state_update.trie().get_trie_nodes_count();
let recorded_storage_size_before = state_update.trie().recorded_storage_size();
let storage_proof_size_upper_bound_before =
state_update.trie().recorded_storage_size_upper_bound();
let result = self.process_receipt(
state_update,
processing_state.apply_state,
processing_state,
receipt,
&mut receipt_sink,
&mut validator_proposals,
&mut processing_state.stats,
processing_state.epoch_info_provider,
);

let total = &mut processing_state.total;
let state_update = &mut processing_state.state_update;
let node_counter_after = state_update.trie().get_trie_nodes_count();
tracing::trace!(target: "runtime", ?node_counter_before, ?node_counter_after);

let recorded_storage_diff = state_update
.trie()
.recorded_storage_size()
Expand Down Expand Up @@ -2288,7 +2291,16 @@ impl<'a> ApplyProcessingState<'a> {
incoming_receipts: &'a [Receipt],
delayed_receipts: DelayedReceiptQueueWrapper,
) -> ApplyProcessingReceiptState<'a> {
let pipeline_manager = pipelining::ReceiptPreparationPipeline::new(
Arc::clone(&self.apply_state.config),
self.apply_state.cache.as_ref().map(|v| v.handle()),
self.epoch_info_provider.chain_id(),
self.apply_state.current_protocol_version,
todo!(),
// self.state_update.trie.storage, // TODO??
nagisa marked this conversation as resolved.
Show resolved Hide resolved
);
ApplyProcessingReceiptState {
pipeline_manager,
protocol_version: self.protocol_version,
apply_state: self.apply_state,
prefetcher: self.prefetcher,
Expand Down Expand Up @@ -2322,12 +2334,15 @@ struct ApplyProcessingReceiptState<'a> {
local_receipts: VecDeque<Receipt>,
incoming_receipts: &'a [Receipt],
delayed_receipts: DelayedReceiptQueueWrapper,

pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

impl<'a> ApplyProcessingReceiptState<'a> {
/// Obtain the next receipt that should be executed.
fn next_local_receipt(&mut self) -> Option<Receipt> {
self.local_receipts.pop_front()
let receipt = self.local_receipts.pop_front()?;
Some(receipt)
}
}

Expand Down
182 changes: 182 additions & 0 deletions runtime/runtime/src/pipelining.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use near_parameters::RuntimeConfig;
use near_primitives::account::Account;
use near_primitives::action::Action;
use near_primitives::config::ViewConfig;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::AccountId;
use near_vm_runner::logic::GasCounter;
use near_vm_runner::{ContractRuntimeCache, PreparedContract};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Condvar, Mutex};

use crate::ext::{ContractStorage, RuntimeContractExt};

pub(crate) struct ReceiptPreparationPipeline {
/// Mapping from a Receipt's ID to a parallel "task" to prepare the receipt's data.
///
/// The task itself may be run in the current thread, a separate thread or forced in any other
/// way.
map: BTreeMap<PrepareTaskKey, Arc<PrepareTask>>,

/// List of Receipt receiver IDs that must not be prepared for this chunk.
///
/// This solves an issue wherein the pipelining implementation only has access to the committed
/// storage (read: data as a result of applying the previous chunk,) and not the state that has
/// been built up as a result of processing the current chunk. One notable thing that may have
/// occurred there is a contract deployment. Once that happens, we can no longer get the
/// "current" contract code for the account.
///
/// However, even if we had access to the transaction of the current chunk and were able to
/// access the new code, there's a risk of a race between when the deployment is executed
/// and when a parallel preparation may occur, leading back to needing to hold prefetching of
/// that account's contracts until the deployment is executed.
///
/// As deployments are a relatively rare event, it is probably just fine to entirely disable
/// pipelining for the account in question for that particular block. This field implements
/// exactly that.
///
/// In the future, however, it may make sense to either move the responsibility of executing
/// deployment actions to this pipelining thingy OR, even better, modify the protocol such that
/// contract deployments in block N only take effect in the block N+1 as that, among other
/// things, would give the runtime more time to compile the contract.
block_accounts: BTreeSet<AccountId>,

/// The Runtime config for these pipelining requests.
config: Arc<RuntimeConfig>,

/// The contract cache.
contract_cache: Option<Box<dyn ContractRuntimeCache>>,

/// The chain ID being processed.
chain_id: String,

/// Protocol version for this chunk.
protocol_version: u32,

/// Storage.
storage: Arc<dyn near_store::TrieStorage>,
nagisa marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct PrepareTaskKey {
receipt_id: CryptoHash,
action_index: usize,
}

struct PrepareTask {
status: Mutex<PrepareTaskStatus>,
condvar: Condvar,
}

enum PrepareTaskStatus {
FunctionCallTask { method: String },
Working,
Done(Box<dyn PreparedContract>),
}

impl ReceiptPreparationPipeline {
pub(crate) fn new(
config: Arc<RuntimeConfig>,
contract_cache: Option<Box<dyn ContractRuntimeCache>>,
chain_id: String,
protocol_version: u32,
storage: Arc<dyn near_store::TrieStorage>,
) -> Self {
Self {
map: Default::default(),
block_accounts: Default::default(),
config,
contract_cache,
chain_id,
protocol_version,
storage,
}
}
/// Submit a receipt to the "pipeline" for preparation of likely eventual execution.
///
/// Note that not all receipts submitted here must be actually handled in some way. That said,
/// while it is perfectly fine to not use the results of submitted work (e.g. because a
/// applying a chunk ran out of gas or compute cost,) this work would eventually get lost, so
/// for the most part it is best to submit work with limited look-ahead.
Ekleog-NEAR marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns `true` if the receipt is interesting and work has been scheduled for its
/// preparation.
pub(crate) fn submit(
&mut self,
receipt: &Receipt,
account: &Account,
view_config: Option<ViewConfig>,
) -> bool {
let account_id = receipt.receiver_id();
if self.block_accounts.contains(account_id) {
return false;
}
let actions = match receipt.receipt() {
ReceiptEnum::Action(a) | ReceiptEnum::PromiseYield(a) => &a.actions,
ReceiptEnum::Data(_) | ReceiptEnum::PromiseResume(_) => return false,
};
let mut any_function_calls = false;
for (action_index, action) in actions.iter().enumerate() {
match action {
Action::DeployContract(_) => {
self.block_accounts.insert(account_id.clone());
break;
Ekleog-NEAR marked this conversation as resolved.
Show resolved Hide resolved
}
Action::FunctionCall(function_call) => {
let key = PrepareTaskKey { receipt_id: receipt.get_hash(), action_index };
let entry = match self.map.entry(key) {
std::collections::btree_map::Entry::Vacant(v) => v,
// Already been submitted.
std::collections::btree_map::Entry::Occupied(_) => continue,
};
let max_gas_burnt = match view_config {
Some(ViewConfig { max_gas_burnt }) => max_gas_burnt,
None => self.config.wasm_config.limit_config.max_gas_burnt,
};
let gas_counter = GasCounter::new(
self.config.wasm_config.ext_costs.clone(),
max_gas_burnt,
self.config.wasm_config.regular_op_cost,
function_call.gas,
view_config.is_some(),
);
// TODO: This part needs to be executed in a thread eventually.
let code_ext = RuntimeContractExt {
storage: ContractStorage::DB(&*self.storage),
account_id,
account,
chain_id: &self.chain_id,
current_protocol_version: self.protocol_version,
};
let contract = near_vm_runner::prepare(
&code_ext,
Arc::clone(&self.config.wasm_config),
self.contract_cache.as_deref(),
gas_counter,
&function_call.method_name,
);
let task = Arc::new(PrepareTask {
status: Mutex::new(PrepareTaskStatus::Done(contract)),
condvar: Condvar::new(),
});
// let task = Arc::new(Mutex::new(PrepareTaskStatus::FunctionCallTask {
// method: function_call.method_name.clone(),
// }));
entry.insert(Arc::clone(&task));
any_function_calls = true;
}
// No need to handle this receipt as it only generates other new receipts.
Action::Delegate(_) => {}
// No handling for these.
Action::CreateAccount(_)
| Action::Transfer(_)
| Action::Stake(_)
| Action::AddKey(_)
| Action::DeleteKey(_)
| Action::DeleteAccount(_) => {}
}
}
return any_function_calls;
}
}
Loading