Skip to content

Commit

Permalink
runtime: pipelined preparation for incoming and delayed receipts (#11904
Browse files Browse the repository at this point in the history
)

Part of #11319 and the final change in integration with the transaction
runtime as all interesting receipt types are handled now. There are also
receipt types like yield timeouts which only result in generation of new
(delayed) receipts, so they don't need to be handled by this mechanism.
  • Loading branch information
nagisa authored Aug 13, 2024
1 parent 4e1a4c4 commit 781805d
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 45 deletions.
6 changes: 5 additions & 1 deletion runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
DelayedReceiptQueue, ReceiptIterator, ShardsOutgoingReceiptBuffer, TrieQueue,
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -445,6 +445,10 @@ impl DelayedReceiptQueueWrapper {
Ok(receipt)
}

pub(crate) fn peek_iter<'a>(&'a self, trie_update: &'a TrieUpdate) -> ReceiptIterator<'a> {
self.queue.iter(trie_update)
}

pub(crate) fn len(&self) -> u64 {
self.queue.len()
}
Expand Down
206 changes: 180 additions & 26 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,11 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_local_receipts", skip_all, fields(
num_receipts = processing_state.local_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_local_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1656,7 +1661,7 @@ impl Runtime {
) -> Result<(), RuntimeError> {
let local_processing_start = std::time::Instant::now();
let local_receipts = std::mem::take(&mut processing_state.local_receipts);
let local_receipt_count = local_receipts.len();
let local_receipt_count = processing_state.local_receipts.len();
if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
let (front, back) = local_receipts.as_slices();
Expand All @@ -1665,30 +1670,15 @@ impl Runtime {
}

let mut prep_lookahead_iter = local_receipts.iter();
let mut schedule_preparation = |pstate: &mut ApplyProcessingReceiptState| {
let scheduled_receipt_offset = prep_lookahead_iter.position(|peek| {
let account_id = peek.receiver_id();
let receiver = get_account(&pstate.state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
pstate.pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
};
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index = schedule_preparation(&mut processing_state);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in local_receipts.iter().enumerate() {
for receipt in local_receipts.iter() {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit
Expand All @@ -1700,13 +1690,17 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = next_schedule_index {
if index >= nsi {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = schedule_preparation(&mut processing_state)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}
// NOTE: We don't need to validate the local receipt, because it's just validated in
Expand All @@ -1719,6 +1713,10 @@ impl Runtime {
)?
}
}

let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.local_receipts_done(
local_receipt_count as u64,
local_processing_start.elapsed(),
Expand All @@ -1728,6 +1726,11 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_delayed_receipts", skip_all, fields(
num_receipts = processing_state.delayed_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_delayed_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1740,6 +1743,17 @@ impl Runtime {
let protocol_version = processing_state.protocol_version;
let mut delayed_receipt_count = 0;
let mut processed_delayed_receipts = vec![];

let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

while processing_state.delayed_receipts.len() > 0 {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1748,11 +1762,26 @@ impl Runtime {
{
break;
}

delayed_receipt_count += 1;
let receipt = processing_state
.delayed_receipts
.pop(&mut processing_state.state_update, &processing_state.apply_state.config)?
.expect("queue is not empty");
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
Expand Down Expand Up @@ -1780,6 +1809,9 @@ impl Runtime {
)?;
processed_delayed_receipts.push(receipt);
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.delayed_receipts_done(
delayed_receipt_count,
delayed_processing_start.elapsed(),
Expand All @@ -1790,6 +1822,11 @@ impl Runtime {
Ok(processed_delayed_receipts)
}

#[instrument(target = "runtime", level = "debug", "process_incoming_receipts", skip_all, fields(
num_receipts = processing_state.incoming_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_incoming_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1804,6 +1841,16 @@ impl Runtime {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&processing_state.incoming_receipts);
}

let mut prep_lookahead_iter = processing_state.incoming_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for receipt in processing_state.incoming_receipts.iter() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
Expand All @@ -1824,6 +1871,20 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

self.process_receipt_with_metrics(
&receipt,
&mut processing_state,
Expand All @@ -1832,6 +1893,9 @@ impl Runtime {
)?;
}
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.incoming_receipts_done(
processing_state.incoming_receipts.len() as u64,
incoming_processing_start.elapsed(),
Expand Down Expand Up @@ -2378,6 +2442,96 @@ struct ApplyProcessingReceiptState<'a> {
pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

trait MaybeRefReceipt {
fn as_ref(&self) -> &Receipt;
}

impl MaybeRefReceipt for Receipt {
fn as_ref(&self) -> &Receipt {
self
}
}

impl<'a> MaybeRefReceipt for &'a Receipt {
fn as_ref(&self) -> &Receipt {
*self
}
}

/// Schedule a one receipt for contract preparation.
///
/// The caller should call this method again after the returned number of receipts from `iterator`
/// are processed.
fn schedule_contract_preparation<'b, R: MaybeRefReceipt>(
pipeline_manager: &mut pipelining::ReceiptPreparationPipeline,
state_update: &TrieUpdate,
mut iterator: impl Iterator<Item = R>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let peek = peek.as_ref();
let account_id = peek.receiver_id();
let receiver = get_account(state_update, account_id);
let Ok(Some(receiver)) = receiver else {
// Most likely reason this can happen is because the receipt is for an account that
// does not yet exist. This is a routine occurrence as accounts are created by sending
// some NEAR to a name that's about to be created.
return false;
};

// We need to inspect each receipt recursively in case these are data receipts, thus a
// function.
fn handle_receipt(
mgr: &mut ReceiptPreparationPipeline,
state_update: &TrieUpdate,
receiver: &Account,
account_id: &AccountId,
receipt: &Receipt,
) -> bool {
match receipt.receipt() {
ReceiptEnum::Action(_) | ReceiptEnum::PromiseYield(_) => {
// This returns `true` if work may have been scheduled (thus we currently
// prepare actions in at most 2 "interesting" receipts in parallel due to
// staggering.)
mgr.submit(receipt, &receiver, None)
}
ReceiptEnum::Data(dr) => {
let key = TrieKey::PostponedReceiptId {
receiver_id: account_id.clone(),
data_id: dr.data_id,
};
let Ok(Some(rid)) = get::<CryptoHash>(state_update, &key) else {
return false;
};
let key = TrieKey::PendingDataCount {
receiver_id: account_id.clone(),
receipt_id: rid,
};
let Ok(Some(data_count)) = get::<u32>(state_update, &key) else {
return false;
};
if data_count > 1 {
return false;
}
let Ok(Some(pr)) = get_postponed_receipt(state_update, account_id, rid) else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &pr);
}
ReceiptEnum::PromiseResume(dr) => {
let Ok(Some(yr)) =
get_promise_yield_receipt(state_update, account_id, dr.data_id)
else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &yr);
}
}
}
handle_receipt(pipeline_manager, state_update, &receiver, account_id, peek)
})?;
Some(scheduled_receipt_offset.saturating_add(1))
}

/// Interface provided for gas cost estimations.
pub mod estimator {
use super::{ReceiptSink, Runtime};
Expand Down
Loading

0 comments on commit 781805d

Please sign in to comment.