diff --git a/runtime/runtime/src/lib.rs b/runtime/runtime/src/lib.rs index 80ff2f2e5e8..1197b122be1 100644 --- a/runtime/runtime/src/lib.rs +++ b/runtime/runtime/src/lib.rs @@ -13,6 +13,7 @@ pub use crate::verifier::{ use config::total_prepaid_send_fees; pub use congestion_control::bootstrap_congestion_info; use congestion_control::ReceiptSink; +use metrics::ApplyMetrics; pub use near_crypto; use near_parameters::{ActionCosts, RuntimeConfig}; pub use near_primitives; @@ -1360,41 +1361,54 @@ impl Runtime { // the check is not necessary. It’s defence in depth to make sure any // future refactoring won’t break the condition. assert!(cfg!(feature = "sandbox") || state_patch.is_empty()); - let protocol_version = apply_state.current_protocol_version; - let mut prefetcher = TriePrefetcher::new_if_enabled(&trie); - let mut state_update = TrieUpdate::new(trie); - let mut total = TotalResourceGuard { - span: tracing::Span::current(), - // This contains the gas "burnt" for refund receipts. Even though we don't actually - // charge any gas for refund receipts, we still count the gas use towards the block gas - // limit - gas: 0, - compute: 0, - }; - if let Some(prefetcher) = &mut prefetcher { + // What this function does can be broken down conceptually into the following steps: + // 1. Update validator accounts. + // 2. Apply migrations. + // 3. Process transactions. + // 4. Process receipts. + // 5. Validate and apply the state update. + + let mut processing_state = + ApplyProcessingState::new(&apply_state, trie, epoch_info_provider, transactions); + + if let Some(prefetcher) = &mut processing_state.prefetcher { // Prefetcher is allowed to fail _ = prefetcher.prefetch_transactions_data(transactions); } - let mut stats = ApplyStats::default(); - + // Step 1: update validator accounts. if let Some(validator_accounts_update) = validator_accounts_update { self.update_validator_accounts( - &mut state_update, + &mut processing_state.state_update, validator_accounts_update, - &mut stats, + &mut processing_state.stats, )?; } + // Step 2: apply migrations. let (gas_used_for_migrations, mut receipts_to_restore) = self .apply_migrations( - &mut state_update, + &mut processing_state.state_update, &apply_state.migration_data, &apply_state.migration_flags, - protocol_version, + processing_state.protocol_version, ) .map_err(RuntimeError::StorageError)?; + processing_state.total.add(gas_used_for_migrations, gas_used_for_migrations)?; + + let delayed_receipts = DelayedReceiptQueueWrapper::new(DelayedReceiptQueue::load( + &processing_state.state_update, + )?); + + // If the chunk is missing, exit early and don't process any receipts. + if !apply_state.is_new_chunk + && processing_state.protocol_version + >= ProtocolFeature::FixApplyChunks.protocol_version() + { + return missing_chunk_apply_result(&delayed_receipts, processing_state); + } + // If we have receipts that need to be restored, prepend them to the list of incoming receipts let incoming_receipts = if receipts_to_restore.is_empty() { incoming_receipts @@ -1403,68 +1417,95 @@ impl Runtime { receipts_to_restore.as_slice() }; - let delayed_receipts_queue = DelayedReceiptQueue::load(&state_update)?; - let mut delayed_receipts = DelayedReceiptQueueWrapper::new(delayed_receipts_queue); - - if !apply_state.is_new_chunk - && protocol_version >= ProtocolFeature::FixApplyChunks.protocol_version() - { - let (trie, trie_changes, state_changes) = state_update.finalize()?; - let proof = trie.recorded_storage(); - - // For old chunks, copy the congestion info exactly as it came in, - // potentially returning `None` even if the congestion control - // feature is enabled for the protocol version. - let congestion_info = apply_state - .congestion_info - .get(&apply_state.shard_id) - .map(|extended_info| extended_info.congestion_info); - - return Ok(ApplyResult { - state_root: trie_changes.new_root, - trie_changes, - validator_proposals: vec![], - outgoing_receipts: vec![], - outcomes: vec![], - state_changes, - stats, - processed_delayed_receipts: vec![], - processed_yield_timeouts: vec![], - proof, - delayed_receipts_count: delayed_receipts.len(), - metrics: None, - congestion_info, - }); - } - let mut outgoing_receipts = Vec::new(); - let mut validator_proposals = vec![]; - let mut local_receipts = vec![]; - let mut outcomes = vec![]; - let mut processed_delayed_receipts = vec![]; - let mut own_congestion_info = - apply_state.own_congestion_info(protocol_version, &state_update)?; - let mut metrics = metrics::ApplyMetrics::default(); + let mut processing_state = + processing_state.into_processing_receipt_state(incoming_receipts, delayed_receipts); + let mut own_congestion_info = apply_state.own_congestion_info( + processing_state.protocol_version, + &processing_state.state_update, + )?; let mut receipt_sink = ReceiptSink::new( - protocol_version, - &state_update.trie, + processing_state.protocol_version, + &processing_state.state_update.trie, apply_state, &mut own_congestion_info, &mut outgoing_receipts, )?; - // Forward buffered receipts from previous chunks. - receipt_sink.forward_from_buffer(&mut state_update, apply_state)?; + receipt_sink.forward_from_buffer(&mut processing_state.state_update, apply_state)?; + + // Step 3: process transactions. + let local_receipts = self.process_transactions(&mut processing_state, &mut receipt_sink)?; + + // Step 4: process receipts. + let process_receipts_result = + self.process_receipts(&mut processing_state, &mut receipt_sink, &local_receipts)?; + + // After receipt processing is done, report metrics on outgoing buffers + // and on congestion indicators. + metrics::report_congestion_metrics( + &receipt_sink, + apply_state.shard_id, + &apply_state.config.congestion_control_config, + ); + + // Step 5: validate and apply the state update. + self.validate_apply_state_update( + processing_state, + process_receipts_result, + own_congestion_info, + validator_accounts_update, + state_patch, + outgoing_receipts, + ) + } - total.add(gas_used_for_migrations, gas_used_for_migrations)?; + fn apply_state_patch(&self, state_update: &mut TrieUpdate, state_patch: SandboxStatePatch) { + if state_patch.is_empty() { + return; + } + for record in state_patch { + match record { + StateRecord::Account { account_id, account } => { + set_account(state_update, account_id, &account); + } + StateRecord::Data { account_id, data_key, value } => { + state_update.set(TrieKey::ContractData { key: data_key.into(), account_id }, value.into()); + } + StateRecord::Contract { account_id, code } => { + let acc = get_account(state_update, &account_id).expect("Failed to read state").expect("Code state record should be preceded by the corresponding account record"); + // Recompute contract code hash. + let code = ContractCode::new(code, None); + set_code(state_update, account_id, &code); + assert_eq!(*code.hash(), acc.code_hash()); + } + StateRecord::AccessKey { account_id, public_key, access_key } => { + set_access_key(state_update, account_id, public_key, &access_key); + } + _ => unimplemented!("patch_state can only patch Account, AccessKey, Contract and Data kind of StateRecord") + } + } + state_update.commit(StateChangeCause::Migration); + } + + /// Processes a collection of transactions. Returns the receipts generated during processing. + fn process_transactions<'a>( + &self, + processing_state: &mut ApplyProcessingReceiptState<'a>, + receipt_sink: &mut ReceiptSink, + ) -> Result, RuntimeError> { + let total = &mut processing_state.total; + let apply_state = &mut processing_state.apply_state; + let state_update = &mut processing_state.state_update; + let mut local_receipts = Vec::new(); - for signed_transaction in transactions { + for signed_transaction in processing_state.transactions { let (receipt, outcome_with_id) = self.process_transaction( - &mut state_update, + state_update, apply_state, signed_transaction, - &mut stats, + &mut processing_state.stats, )?; if receipt.receiver_id() == signed_transaction.transaction.signer_id() { local_receipts.push(receipt); @@ -1472,11 +1513,10 @@ impl Runtime { receipt_sink.forward_or_buffer_receipt( receipt, apply_state, - &mut state_update, - epoch_info_provider, + state_update, + processing_state.epoch_info_provider, )?; } - total.add( outcome_with_id.outcome.gas_burnt, outcome_with_id @@ -1484,142 +1524,168 @@ impl Runtime { .compute_usage .expect("`process_transaction` must populate compute usage"), )?; - if !checked_feature!("stable", ComputeCosts, protocol_version) { + if !checked_feature!("stable", ComputeCosts, processing_state.protocol_version) { assert_eq!(total.compute, total.gas, "Compute usage must match burnt gas"); } - - outcomes.push(outcome_with_id); + processing_state.outcomes.push(outcome_with_id); } - metrics.tx_processing_done(total.gas, total.compute); - - let mut process_receipt = |receipt: &Receipt, - state_update: &mut TrieUpdate, - total: &mut TotalResourceGuard| - -> Result<_, RuntimeError> { - let span = tracing::debug_span!( - target: "runtime", - "process_receipt", - receipt_id = %receipt.receipt_id(), - predecessor = %receipt.predecessor_id(), - receiver = %receipt.receiver_id(), - gas_burnt = tracing::field::Empty, - compute_usage = tracing::field::Empty, - ) - .entered(); - let node_counter_before = state_update.trie().get_trie_nodes_count(); - let recorded_storage_size_before = state_update.trie().recorded_storage_size(); - let storage_proof_size_upper_bound_before = - state_update.trie().recorded_storage_size_upper_bound(); - let result = self.process_receipt( - state_update, - apply_state, - receipt, - &mut receipt_sink, - &mut validator_proposals, - &mut stats, - epoch_info_provider, - ); - let node_counter_after = state_update.trie().get_trie_nodes_count(); - tracing::trace!(target: "runtime", ?node_counter_before, ?node_counter_after); - - let recorded_storage_diff = state_update - .trie() - .recorded_storage_size() - .saturating_sub(recorded_storage_size_before) - as f64; - let recorded_storage_upper_bound_diff = state_update - .trie() - .recorded_storage_size_upper_bound() - .saturating_sub(storage_proof_size_upper_bound_before) - as f64; - metrics::RECEIPT_RECORDED_SIZE.observe(recorded_storage_diff); - metrics::RECEIPT_RECORDED_SIZE_UPPER_BOUND.observe(recorded_storage_upper_bound_diff); - let recorded_storage_proof_ratio = - recorded_storage_upper_bound_diff / f64::max(1.0, recorded_storage_diff); - // Record the ratio only for large receipts, small receipts can have a very high ratio, - // but the ratio is not that important for them. - if recorded_storage_upper_bound_diff > 100_000. { - metrics::RECEIPT_RECORDED_SIZE_UPPER_BOUND_RATIO - .observe(recorded_storage_proof_ratio); - } - if let Some(outcome_with_id) = result? { - let gas_burnt = outcome_with_id.outcome.gas_burnt; - let compute_usage = outcome_with_id - .outcome - .compute_usage - .expect("`process_receipt` must populate compute usage"); - total.add(gas_burnt, compute_usage)?; - span.record("gas_burnt", gas_burnt); - span.record("compute_usage", compute_usage); + processing_state.metrics.tx_processing_done(total.gas, total.compute); + Ok(local_receipts) + } - if !checked_feature!("stable", ComputeCosts, protocol_version) { - assert_eq!(total.compute, total.gas, "Compute usage must match burnt gas"); - } - outcomes.push(outcome_with_id); + /// This function wraps [Runtime::process_receipt]. It adds a tracing span around the latter + /// and populates various metrics. + fn process_receipt_with_metrics<'a>( + &self, + receipt: &Receipt, + processing_state: &mut ApplyProcessingReceiptState<'a>, + mut receipt_sink: &mut ReceiptSink, + mut validator_proposals: &mut Vec, + ) -> Result<(), RuntimeError> { + let span = tracing::debug_span!( + target: "runtime", + "process_receipt", + receipt_id = %receipt.receipt_id(), + predecessor = %receipt.predecessor_id(), + receiver = %receipt.receiver_id(), + gas_burnt = tracing::field::Empty, + compute_usage = tracing::field::Empty, + ) + .entered(); + let total = &mut processing_state.total; + let state_update = &mut processing_state.state_update; + let node_counter_before = state_update.trie().get_trie_nodes_count(); + let recorded_storage_size_before = state_update.trie().recorded_storage_size(); + let storage_proof_size_upper_bound_before = + state_update.trie().recorded_storage_size_upper_bound(); + let result = self.process_receipt( + state_update, + processing_state.apply_state, + receipt, + &mut receipt_sink, + &mut validator_proposals, + &mut processing_state.stats, + processing_state.epoch_info_provider, + ); + let node_counter_after = state_update.trie().get_trie_nodes_count(); + tracing::trace!(target: "runtime", ?node_counter_before, ?node_counter_after); + + let recorded_storage_diff = state_update + .trie() + .recorded_storage_size() + .saturating_sub(recorded_storage_size_before) + as f64; + let recorded_storage_upper_bound_diff = state_update + .trie() + .recorded_storage_size_upper_bound() + .saturating_sub(storage_proof_size_upper_bound_before) + as f64; + metrics::RECEIPT_RECORDED_SIZE.observe(recorded_storage_diff); + metrics::RECEIPT_RECORDED_SIZE_UPPER_BOUND.observe(recorded_storage_upper_bound_diff); + let recorded_storage_proof_ratio = + recorded_storage_upper_bound_diff / f64::max(1.0, recorded_storage_diff); + // Record the ratio only for large receipts, small receipts can have a very high ratio, + // but the ratio is not that important for them. + if recorded_storage_upper_bound_diff > 100_000. { + metrics::RECEIPT_RECORDED_SIZE_UPPER_BOUND_RATIO.observe(recorded_storage_proof_ratio); + } + if let Some(outcome_with_id) = result? { + let gas_burnt = outcome_with_id.outcome.gas_burnt; + let compute_usage = outcome_with_id + .outcome + .compute_usage + .expect("`process_receipt` must populate compute usage"); + total.add(gas_burnt, compute_usage)?; + span.record("gas_burnt", gas_burnt); + span.record("compute_usage", compute_usage); + + if !checked_feature!("stable", ComputeCosts, processing_state.protocol_version) { + assert_eq!(total.compute, total.gas, "Compute usage must match burnt gas"); } - Ok(()) - }; - - // TODO(#8859): Introduce a dedicated `compute_limit` for the chunk. - // For now compute limit always matches the gas limit. - let compute_limit = apply_state.gas_limit.unwrap_or(Gas::max_value()); - let proof_size_limit = - if checked_feature!("stable", StateWitnessSizeLimit, protocol_version) { - Some(apply_state.config.witness_config.main_storage_proof_size_soft_limit) - } else { - None - }; + processing_state.outcomes.push(outcome_with_id); + } + Ok(()) + } - // We first process local receipts. They contain staking, local contract calls, etc. + fn process_local_receipts<'a>( + &self, + mut processing_state: &mut ApplyProcessingReceiptState<'a>, + receipt_sink: &mut ReceiptSink, + compute_limit: u64, + proof_size_limit: Option, + validator_proposals: &mut Vec, + local_receipts: &'a [Receipt], + ) -> Result<(), RuntimeError> { let local_processing_start = std::time::Instant::now(); - if let Some(prefetcher) = &mut prefetcher { + if let Some(prefetcher) = &mut processing_state.prefetcher { // Prefetcher is allowed to fail _ = prefetcher.prefetch_receipts_data(&local_receipts); } for receipt in local_receipts.iter() { - if total.compute >= compute_limit + if processing_state.total.compute >= compute_limit || proof_size_limit.is_some_and(|limit| { - state_update.trie.recorded_storage_size_upper_bound() > limit + processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit }) { - delayed_receipts.push(&mut state_update, receipt, &apply_state.config)?; + processing_state.delayed_receipts.push( + &mut processing_state.state_update, + receipt, + &processing_state.apply_state.config, + )?; } else { // NOTE: We don't need to validate the local receipt, because it's just validated in // the `verify_and_charge_transaction`. - process_receipt(receipt, &mut state_update, &mut total)?; + self.process_receipt_with_metrics( + receipt, + &mut processing_state, + receipt_sink, + validator_proposals, + )? } } - metrics.local_receipts_done( + processing_state.metrics.local_receipts_done( local_receipts.len() as u64, local_processing_start.elapsed(), - total.gas, - total.compute, + processing_state.total.gas, + processing_state.total.compute, ); + Ok(()) + } - // Then we process the delayed receipts. It's a backlog of receipts from the past blocks. + fn process_delayed_receipts<'a>( + &self, + mut processing_state: &mut ApplyProcessingReceiptState<'a>, + receipt_sink: &mut ReceiptSink, + compute_limit: u64, + proof_size_limit: Option, + validator_proposals: &mut Vec, + ) -> Result, RuntimeError> { let delayed_processing_start = std::time::Instant::now(); + let protocol_version = processing_state.protocol_version; let mut delayed_receipt_count = 0; - while delayed_receipts.len() > 0 { - if total.compute >= compute_limit + let mut processed_delayed_receipts = vec![]; + while processing_state.delayed_receipts.len() > 0 { + if processing_state.total.compute >= compute_limit || proof_size_limit.is_some_and(|limit| { - state_update.trie.recorded_storage_size_upper_bound() > limit + processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit }) { break; } delayed_receipt_count += 1; - let receipt = delayed_receipts - .pop(&mut state_update, &apply_state.config)? + let receipt = processing_state + .delayed_receipts + .pop(&mut processing_state.state_update, &processing_state.apply_state.config)? .expect("queue is not empty"); - if let Some(prefetcher) = &mut prefetcher { + if let Some(prefetcher) = &mut processing_state.prefetcher { // Prefetcher is allowed to fail _ = prefetcher.prefetch_receipts_data(std::slice::from_ref(&receipt)); } // Validating the delayed receipt. If it fails, it's likely the state is inconsistent. validate_receipt( - &apply_state.config.wasm_config.limit_config, + &processing_state.apply_state.config.wasm_config.limit_config, &receipt, protocol_version, ) @@ -1630,145 +1696,163 @@ impl Runtime { )) })?; - process_receipt(&receipt, &mut state_update, &mut total)?; + self.process_receipt_with_metrics( + &receipt, + &mut processing_state, + receipt_sink, + validator_proposals, + )?; processed_delayed_receipts.push(receipt); } - metrics.delayed_receipts_done( + processing_state.metrics.delayed_receipts_done( delayed_receipt_count, delayed_processing_start.elapsed(), - total.gas, - total.compute, + processing_state.total.gas, + processing_state.total.compute, ); - // And then we process the new incoming receipts. These are receipts from other shards. + Ok(processed_delayed_receipts) + } + + fn process_incoming_receipts<'a>( + &self, + mut processing_state: &mut ApplyProcessingReceiptState<'a>, + receipt_sink: &mut ReceiptSink, + compute_limit: u64, + proof_size_limit: Option, + validator_proposals: &mut Vec, + ) -> Result<(), RuntimeError> { let incoming_processing_start = std::time::Instant::now(); - if let Some(prefetcher) = &mut prefetcher { + let protocol_version = processing_state.protocol_version; + if let Some(prefetcher) = &mut processing_state.prefetcher { // Prefetcher is allowed to fail - _ = prefetcher.prefetch_receipts_data(&incoming_receipts); + _ = prefetcher.prefetch_receipts_data(&processing_state.incoming_receipts); } - for receipt in incoming_receipts.iter() { + for receipt in processing_state.incoming_receipts.iter() { // Validating new incoming no matter whether we have available gas or not. We don't // want to store invalid receipts in state as delayed. validate_receipt( - &apply_state.config.wasm_config.limit_config, + &processing_state.apply_state.config.wasm_config.limit_config, receipt, protocol_version, ) .map_err(RuntimeError::ReceiptValidationError)?; - if total.compute >= compute_limit + if processing_state.total.compute >= compute_limit || proof_size_limit.is_some_and(|limit| { - state_update.trie.recorded_storage_size_upper_bound() > limit + processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit }) { - delayed_receipts.push(&mut state_update, receipt, &apply_state.config)?; + processing_state.delayed_receipts.push( + &mut processing_state.state_update, + receipt, + &processing_state.apply_state.config, + )?; } else { - process_receipt(receipt, &mut state_update, &mut total)?; + self.process_receipt_with_metrics( + &receipt, + &mut processing_state, + receipt_sink, + validator_proposals, + )?; } } - metrics.incoming_receipts_done( - incoming_receipts.len() as u64, + processing_state.metrics.incoming_receipts_done( + processing_state.incoming_receipts.len() as u64, incoming_processing_start.elapsed(), - total.gas, - total.compute, + processing_state.total.gas, + processing_state.total.compute, ); + Ok(()) + } - // Resolve timed-out PromiseYield receipts - let mut promise_yield_indices: PromiseYieldIndices = - get(&state_update, &TrieKey::PromiseYieldIndices)?.unwrap_or_default(); - let initial_promise_yield_indices = promise_yield_indices.clone(); - let mut new_receipt_index: usize = 0; - - let mut processed_yield_timeouts = vec![]; - let mut timeout_receipts = vec![]; - let yield_processing_start = std::time::Instant::now(); - while promise_yield_indices.first_index < promise_yield_indices.next_available_index { - if total.compute >= compute_limit - || proof_size_limit.is_some_and(|limit| { - state_update.trie.recorded_storage_size_upper_bound() > limit - }) - { - break; - } + /// Processes all receipts (local, delayed and incoming). + /// Returns a structure containing the result of the processing. + fn process_receipts<'a>( + &self, + processing_state: &mut ApplyProcessingReceiptState<'a>, + receipt_sink: &mut ReceiptSink, + local_receipts: &'a [Receipt], + ) -> Result { + let mut validator_proposals = vec![]; + let protocol_version = processing_state.protocol_version; + let apply_state = &processing_state.apply_state; - let queue_entry_key = - TrieKey::PromiseYieldTimeout { index: promise_yield_indices.first_index }; + // TODO(#8859): Introduce a dedicated `compute_limit` for the chunk. + // For now compute limit always matches the gas limit. + let compute_limit = apply_state.gas_limit.unwrap_or(Gas::max_value()); + let proof_size_limit = + if checked_feature!("stable", StateWitnessSizeLimit, protocol_version) { + Some(apply_state.config.witness_config.main_storage_proof_size_soft_limit) + } else { + None + }; - let queue_entry = get::(&state_update, &queue_entry_key)? - .ok_or_else(|| { - StorageError::StorageInconsistentState(format!( - "PromiseYield timeout queue entry #{} should be in the state", - promise_yield_indices.first_index - )) - })?; + // We first process local receipts. They contain staking, local contract calls, etc. + self.process_local_receipts( + processing_state, + receipt_sink, + compute_limit, + proof_size_limit, + &mut validator_proposals, + local_receipts, + )?; - // Queue entries are ordered by expires_at - if queue_entry.expires_at > apply_state.block_height { - break; - } + // Then we process the delayed receipts. It's a backlog of receipts from the past blocks. + let processed_delayed_receipts = self.process_delayed_receipts( + processing_state, + receipt_sink, + compute_limit, + proof_size_limit, + &mut validator_proposals, + )?; - // Check if the yielded promise still needs to be resolved - let promise_yield_key = TrieKey::PromiseYieldReceipt { - receiver_id: queue_entry.account_id.clone(), - data_id: queue_entry.data_id, - }; - if state_update.contains_key(&promise_yield_key)? { - let new_receipt_id = create_receipt_id_from_receipt_id( - protocol_version, - &queue_entry.data_id, - &apply_state.prev_block_hash, - &apply_state.block_hash, - new_receipt_index, - ); - new_receipt_index += 1; - - // Create a PromiseResume receipt to resolve the timed-out yield. - let resume_receipt = Receipt::V0(ReceiptV0 { - predecessor_id: queue_entry.account_id.clone(), - receiver_id: queue_entry.account_id.clone(), - receipt_id: new_receipt_id, - receipt: ReceiptEnum::PromiseResume(DataReceipt { - data_id: queue_entry.data_id, - data: None, - }), - }); + // And then we process the new incoming receipts. These are receipts from other shards. + self.process_incoming_receipts( + processing_state, + receipt_sink, + compute_limit, + proof_size_limit, + &mut validator_proposals, + )?; - // The receipt is destined for the local shard and will be placed in the outgoing - // receipts buffer. It is possible that there is already an outgoing receipt resolving - // this yield if `yield_resume` was invoked by some receipt which was processed in - // the current chunk. The ordering will be maintained because the receipts are - // destined for the same shard; the timeout will be processed second and discarded. - receipt_sink.forward_or_buffer_receipt( - resume_receipt.clone(), - apply_state, - &mut state_update, - epoch_info_provider, - )?; - timeout_receipts.push(resume_receipt); - } + // Resolve timed-out PromiseYield receipts + let promise_yield_result = resolve_promise_yield_timeouts( + processing_state, + receipt_sink, + compute_limit, + proof_size_limit, + )?; - processed_yield_timeouts.push(queue_entry); - state_update.remove(queue_entry_key); - // Math checked above: first_index is less than next_available_index - promise_yield_indices.first_index += 1; - } - metrics.yield_timeouts_done( - processed_yield_timeouts.len() as u64, - yield_processing_start.elapsed(), - total.gas, - total.compute, - ); - // After receipt processing is done, report metrics on outgoing buffers - // and on congestion indicators. - metrics::report_congestion_metrics( - &receipt_sink, - apply_state.shard_id, - &apply_state.config.congestion_control_config, - ); + Ok(ProcessReceiptsResult { + promise_yield_result, + validator_proposals, + processed_delayed_receipts, + }) + } + fn validate_apply_state_update<'a>( + &self, + processing_state: ApplyProcessingReceiptState<'a>, + process_receipts_result: ProcessReceiptsResult, + mut own_congestion_info: Option, + validator_accounts_update: &Option, + state_patch: SandboxStatePatch, + outgoing_receipts: Vec, + ) -> Result { let _span = tracing::debug_span!(target: "runtime", "apply_commit").entered(); + let apply_state = processing_state.apply_state; + let mut state_update = processing_state.state_update; + let delayed_receipts = processing_state.delayed_receipts; + let promise_yield_result = process_receipts_result.promise_yield_result; - if promise_yield_indices != initial_promise_yield_indices { - set(&mut state_update, TrieKey::PromiseYieldIndices, &promise_yield_indices); + if promise_yield_result.promise_yield_indices + != promise_yield_result.initial_promise_yield_indices + { + set( + &mut state_update, + TrieKey::PromiseYieldIndices, + &promise_yield_result.promise_yield_indices, + ); } // Congestion info needs a final touch to select an allowed shard if @@ -1797,11 +1881,11 @@ impl Runtime { &apply_state.config, &state_update, validator_accounts_update, - incoming_receipts, - &timeout_receipts, - transactions, + processing_state.incoming_receipts, + &promise_yield_result.timeout_receipts, + processing_state.transactions, &outgoing_receipts, - &stats, + &processing_state.stats, )?; state_update.commit(StateChangeCause::UpdatedDelayedReceipts); @@ -1810,7 +1894,8 @@ impl Runtime { state_update.trie.recorded_storage_size_upper_bound() as f64; metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND.observe(chunk_recorded_size_upper_bound); let (trie, trie_changes, state_changes) = state_update.finalize()?; - if let Some(prefetcher) = &prefetcher { + + if let Some(prefetcher) = &processing_state.prefetcher { // Only clear the prefetcher queue after finalize is done because as part of receipt // processing we also prefetch account data and access keys that are accessed in // finalize. This data can take a very long time otherwise if not prefetched. @@ -1829,7 +1914,7 @@ impl Runtime { // The order is deterministically changed. let mut unique_proposals = vec![]; let mut account_ids = HashSet::new(); - for proposal in validator_proposals.into_iter().rev() { + for proposal in process_receipts_result.validator_proposals.into_iter().rev() { let account_id = proposal.account_id(); if !account_ids.contains(account_id) { account_ids.insert(account_id.clone()); @@ -1843,50 +1928,24 @@ impl Runtime { metrics::CHUNK_RECORDED_SIZE_UPPER_BOUND_RATIO .observe(chunk_recorded_size_upper_bound / f64::max(1.0, chunk_recorded_size)); let proof = trie.recorded_storage(); + let processed_delayed_receipts = process_receipts_result.processed_delayed_receipts; + let processed_yield_timeouts = promise_yield_result.processed_yield_timeouts; Ok(ApplyResult { state_root, trie_changes, validator_proposals: unique_proposals, - outgoing_receipts, - outcomes, + outgoing_receipts: outgoing_receipts, + outcomes: processing_state.outcomes, state_changes, - stats, + stats: processing_state.stats, processed_delayed_receipts, processed_yield_timeouts, proof, delayed_receipts_count, - metrics: Some(metrics), + metrics: Some(processing_state.metrics), congestion_info: own_congestion_info, }) } - - fn apply_state_patch(&self, state_update: &mut TrieUpdate, state_patch: SandboxStatePatch) { - if state_patch.is_empty() { - return; - } - for record in state_patch { - match record { - StateRecord::Account { account_id, account } => { - set_account(state_update, account_id, &account); - } - StateRecord::Data { account_id, data_key, value } => { - state_update.set(TrieKey::ContractData { key: data_key.into(), account_id }, value.into()); - } - StateRecord::Contract { account_id, code } => { - let acc = get_account(state_update, &account_id).expect("Failed to read state").expect("Code state record should be preceded by the corresponding account record"); - // Recompute contract code hash. - let code = ContractCode::new(code, None); - set_code(state_update, account_id, &code); - assert_eq!(*code.hash(), acc.code_hash()); - } - StateRecord::AccessKey { account_id, public_key, access_key } => { - set_access_key(state_update, account_id, public_key, &access_key); - } - _ => unimplemented!("patch_state can only patch Account, AccessKey, Contract and Data kind of StateRecord") - } - } - state_update.commit(StateChangeCause::Migration); - } } impl ApplyState { @@ -1967,6 +2026,140 @@ fn action_transfer_or_implicit_account_creation( }) } +fn missing_chunk_apply_result( + delayed_receipts: &DelayedReceiptQueueWrapper, + processing_state: ApplyProcessingState, +) -> Result { + let (trie, trie_changes, state_changes) = processing_state.state_update.finalize()?; + let proof = trie.recorded_storage(); + + // For old chunks, copy the congestion info exactly as it came in, + // potentially returning `None` even if the congestion control + // feature is enabled for the protocol version. + let congestion_info = processing_state + .apply_state + .congestion_info + .get(&processing_state.apply_state.shard_id) + .map(|extended_info| extended_info.congestion_info); + + return Ok(ApplyResult { + state_root: trie_changes.new_root, + trie_changes, + validator_proposals: vec![], + outgoing_receipts: vec![], + outcomes: vec![], + state_changes, + stats: processing_state.stats, + processed_delayed_receipts: vec![], + processed_yield_timeouts: vec![], + proof, + delayed_receipts_count: delayed_receipts.len(), + metrics: None, + congestion_info, + }); +} + +fn resolve_promise_yield_timeouts( + processing_state: &mut ApplyProcessingReceiptState, + receipt_sink: &mut ReceiptSink, + compute_limit: u64, + proof_size_limit: Option, +) -> Result { + let mut state_update = &mut processing_state.state_update; + let total = &mut processing_state.total; + let apply_state = &processing_state.apply_state; + + let mut promise_yield_indices: PromiseYieldIndices = + get(state_update, &TrieKey::PromiseYieldIndices)?.unwrap_or_default(); + let initial_promise_yield_indices = promise_yield_indices.clone(); + let mut new_receipt_index: usize = 0; + + let mut processed_yield_timeouts = vec![]; + let mut timeout_receipts = vec![]; + let yield_processing_start = std::time::Instant::now(); + while promise_yield_indices.first_index < promise_yield_indices.next_available_index { + if total.compute >= compute_limit + || proof_size_limit + .is_some_and(|limit| state_update.trie.recorded_storage_size_upper_bound() > limit) + { + break; + } + + let queue_entry_key = + TrieKey::PromiseYieldTimeout { index: promise_yield_indices.first_index }; + + let queue_entry = + get::(state_update, &queue_entry_key)?.ok_or_else(|| { + StorageError::StorageInconsistentState(format!( + "PromiseYield timeout queue entry #{} should be in the state", + promise_yield_indices.first_index + )) + })?; + + // Queue entries are ordered by expires_at + if queue_entry.expires_at > apply_state.block_height { + break; + } + + // Check if the yielded promise still needs to be resolved + let promise_yield_key = TrieKey::PromiseYieldReceipt { + receiver_id: queue_entry.account_id.clone(), + data_id: queue_entry.data_id, + }; + if state_update.contains_key(&promise_yield_key)? { + let new_receipt_id = create_receipt_id_from_receipt_id( + processing_state.protocol_version, + &queue_entry.data_id, + &apply_state.prev_block_hash, + &apply_state.block_hash, + new_receipt_index, + ); + new_receipt_index += 1; + + // Create a PromiseResume receipt to resolve the timed-out yield. + let resume_receipt = Receipt::V0(ReceiptV0 { + predecessor_id: queue_entry.account_id.clone(), + receiver_id: queue_entry.account_id.clone(), + receipt_id: new_receipt_id, + receipt: ReceiptEnum::PromiseResume(DataReceipt { + data_id: queue_entry.data_id, + data: None, + }), + }); + + // The receipt is destined for the local shard and will be placed in the outgoing + // receipts buffer. It is possible that there is already an outgoing receipt resolving + // this yield if `yield_resume` was invoked by some receipt which was processed in + // the current chunk. The ordering will be maintained because the receipts are + // destined for the same shard; the timeout will be processed second and discarded. + receipt_sink.forward_or_buffer_receipt( + resume_receipt.clone(), + apply_state, + &mut state_update, + processing_state.epoch_info_provider, + )?; + timeout_receipts.push(resume_receipt); + } + + processed_yield_timeouts.push(queue_entry); + state_update.remove(queue_entry_key); + // Math checked above: first_index is less than next_available_index + promise_yield_indices.first_index += 1; + } + processing_state.metrics.yield_timeouts_done( + processed_yield_timeouts.len() as u64, + yield_processing_start.elapsed(), + total.gas, + total.compute, + ); + Ok(ResolvePromiseYieldTimeoutsResult { + timeout_receipts, + initial_promise_yield_indices, + promise_yield_indices, + processed_yield_timeouts, + }) +} + struct TotalResourceGuard { gas: u64, compute: u64, @@ -1988,6 +2181,101 @@ impl TotalResourceGuard { } } +struct ProcessReceiptsResult { + promise_yield_result: ResolvePromiseYieldTimeoutsResult, + validator_proposals: Vec, + processed_delayed_receipts: Vec, +} + +struct ResolvePromiseYieldTimeoutsResult { + timeout_receipts: Vec, + initial_promise_yield_indices: PromiseYieldIndices, + promise_yield_indices: PromiseYieldIndices, + processed_yield_timeouts: Vec, +} + +/// This struct is a convenient way to hold the processing state during [Runtime::apply]. +struct ApplyProcessingState<'a> { + protocol_version: ProtocolVersion, + apply_state: &'a ApplyState, + prefetcher: Option, + state_update: TrieUpdate, + epoch_info_provider: &'a dyn EpochInfoProvider, + transactions: &'a [SignedTransaction], + total: TotalResourceGuard, + stats: ApplyStats, +} + +impl<'a> ApplyProcessingState<'a> { + fn new( + apply_state: &'a ApplyState, + trie: Trie, + epoch_info_provider: &'a dyn EpochInfoProvider, + transactions: &'a [SignedTransaction], + ) -> Self { + let protocol_version = apply_state.current_protocol_version; + let prefetcher = TriePrefetcher::new_if_enabled(&trie); + let state_update = TrieUpdate::new(trie); + let total = TotalResourceGuard { + span: tracing::Span::current(), + // This contains the gas "burnt" for refund receipts. Even though we don't actually + // charge any gas for refund receipts, we still count the gas use towards the block gas + // limit + gas: 0, + compute: 0, + }; + let stats = ApplyStats::default(); + Self { + protocol_version, + apply_state, + prefetcher, + state_update, + epoch_info_provider, + transactions, + total, + stats, + } + } + + fn into_processing_receipt_state( + self, + incoming_receipts: &'a [Receipt], + delayed_receipts: DelayedReceiptQueueWrapper, + ) -> ApplyProcessingReceiptState<'a> { + ApplyProcessingReceiptState { + protocol_version: self.protocol_version, + apply_state: self.apply_state, + prefetcher: self.prefetcher, + state_update: self.state_update, + epoch_info_provider: self.epoch_info_provider, + transactions: self.transactions, + total: self.total, + stats: self.stats, + outcomes: Vec::new(), + metrics: metrics::ApplyMetrics::default(), + incoming_receipts, + delayed_receipts, + } + } +} + +/// Similar to [ApplyProcessingState], with the difference that this contains extra state used +/// by receipt processing. +struct ApplyProcessingReceiptState<'a> { + protocol_version: ProtocolVersion, + apply_state: &'a ApplyState, + prefetcher: Option, + state_update: TrieUpdate, + epoch_info_provider: &'a dyn EpochInfoProvider, + transactions: &'a [SignedTransaction], + total: TotalResourceGuard, + stats: ApplyStats, + outcomes: Vec, + metrics: ApplyMetrics, + incoming_receipts: &'a [Receipt], + delayed_receipts: DelayedReceiptQueueWrapper, +} + #[cfg(test)] mod tests { use assert_matches::assert_matches;