Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: pipelined preparation for incoming and delayed receipts #11904

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
DelayedReceiptQueue, ReceiptIterator, ShardsOutgoingReceiptBuffer, TrieQueue,
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -446,6 +446,10 @@ impl DelayedReceiptQueueWrapper {
Ok(receipt)
}

pub(crate) fn peek_iter<'a>(&'a self, trie_update: &'a TrieUpdate) -> ReceiptIterator<'a> {
self.queue.iter(trie_update)
}

pub(crate) fn len(&self) -> u64 {
self.queue.len()
}
Expand Down
206 changes: 180 additions & 26 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,11 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_local_receipts", skip_all, fields(
num_receipts = processing_state.local_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_local_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1656,7 +1661,7 @@ impl Runtime {
) -> Result<(), RuntimeError> {
let local_processing_start = std::time::Instant::now();
let local_receipts = std::mem::take(&mut processing_state.local_receipts);
let local_receipt_count = local_receipts.len();
let local_receipt_count = processing_state.local_receipts.len();
if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
let (front, back) = local_receipts.as_slices();
Expand All @@ -1665,30 +1670,15 @@ impl Runtime {
}

let mut prep_lookahead_iter = local_receipts.iter();
let mut schedule_preparation = |pstate: &mut ApplyProcessingReceiptState| {
let scheduled_receipt_offset = prep_lookahead_iter.position(|peek| {
let account_id = peek.receiver_id();
let receiver = get_account(&pstate.state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
pstate.pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
};
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index = schedule_preparation(&mut processing_state);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in local_receipts.iter().enumerate() {
for receipt in local_receipts.iter() {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit
Expand All @@ -1700,13 +1690,17 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = next_schedule_index {
if index >= nsi {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = schedule_preparation(&mut processing_state)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}
// NOTE: We don't need to validate the local receipt, because it's just validated in
Expand All @@ -1719,6 +1713,10 @@ impl Runtime {
)?
}
}

let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.local_receipts_done(
local_receipt_count as u64,
local_processing_start.elapsed(),
Expand All @@ -1728,6 +1726,11 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_delayed_receipts", skip_all, fields(
num_receipts = processing_state.delayed_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_delayed_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1740,6 +1743,17 @@ impl Runtime {
let protocol_version = processing_state.protocol_version;
let mut delayed_receipt_count = 0;
let mut processed_delayed_receipts = vec![];

let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

while processing_state.delayed_receipts.len() > 0 {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1748,11 +1762,26 @@ impl Runtime {
{
break;
}

delayed_receipt_count += 1;
let receipt = processing_state
.delayed_receipts
.pop(&mut processing_state.state_update, &processing_state.apply_state.config)?
.expect("queue is not empty");
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
Expand Down Expand Up @@ -1780,6 +1809,9 @@ impl Runtime {
)?;
processed_delayed_receipts.push(receipt);
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.delayed_receipts_done(
delayed_receipt_count,
delayed_processing_start.elapsed(),
Expand All @@ -1790,6 +1822,11 @@ impl Runtime {
Ok(processed_delayed_receipts)
}

#[instrument(target = "runtime", level = "debug", "process_incoming_receipts", skip_all, fields(
num_receipts = processing_state.incoming_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_incoming_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1804,6 +1841,16 @@ impl Runtime {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&processing_state.incoming_receipts);
}

let mut prep_lookahead_iter = processing_state.incoming_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for receipt in processing_state.incoming_receipts.iter() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
Expand All @@ -1824,6 +1871,20 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

Comment on lines +1874 to +1887
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This piece of code seems to be repeating for every receipt type, is there any room to refactor it? I know it's already in a pretty good state but if you can reduce it to a single invocation it would be awesome :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've been banging my head as to how to best deduplicate this and what I have here is the best I could come up with outside of trying to come up with a way to unify how the receipt types themselves are processed. Perhaps we ought to do that eventually, but it is also non-trivial (esp. due to how different is the handling of delayed receipts.)

self.process_receipt_with_metrics(
&receipt,
&mut processing_state,
Expand All @@ -1832,6 +1893,9 @@ impl Runtime {
)?;
}
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.incoming_receipts_done(
processing_state.incoming_receipts.len() as u64,
incoming_processing_start.elapsed(),
Expand Down Expand Up @@ -2378,6 +2442,96 @@ struct ApplyProcessingReceiptState<'a> {
pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

trait MaybeRefReceipt {
fn as_ref(&self) -> &Receipt;
}

impl MaybeRefReceipt for Receipt {
fn as_ref(&self) -> &Receipt {
self
}
}

impl<'a> MaybeRefReceipt for &'a Receipt {
fn as_ref(&self) -> &Receipt {
*self
}
}

/// Schedule a one receipt for contract preparation.
///
/// The caller should call this method again after the returned number of receipts from `iterator`
/// are processed.
fn schedule_contract_preparation<'b, R: MaybeRefReceipt>(
pipeline_manager: &mut pipelining::ReceiptPreparationPipeline,
state_update: &TrieUpdate,
mut iterator: impl Iterator<Item = R>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think a for loop with an early return would be more readable. I guess it depends on individual preference so up to you.

let peek = peek.as_ref();
let account_id = peek.receiver_id();
let receiver = get_account(state_update, account_id);
let Ok(Some(receiver)) = receiver else {
// Most likely reason this can happen is because the receipt is for an account that
// does not yet exist. This is a routine occurrence as accounts are created by sending
// some NEAR to a name that's about to be created.
return false;
};

// We need to inspect each receipt recursively in case these are data receipts, thus a
// function.
fn handle_receipt(
mgr: &mut ReceiptPreparationPipeline,
state_update: &TrieUpdate,
receiver: &Account,
account_id: &AccountId,
receipt: &Receipt,
) -> bool {
match receipt.receipt() {
ReceiptEnum::Action(_) | ReceiptEnum::PromiseYield(_) => {
// This returns `true` if work may have been scheduled (thus we currently
// prepare actions in at most 2 "interesting" receipts in parallel due to
// staggering.)
mgr.submit(receipt, &receiver, None)
}
ReceiptEnum::Data(dr) => {
let key = TrieKey::PostponedReceiptId {
receiver_id: account_id.clone(),
data_id: dr.data_id,
};
let Ok(Some(rid)) = get::<CryptoHash>(state_update, &key) else {
return false;
};
let key = TrieKey::PendingDataCount {
receiver_id: account_id.clone(),
receipt_id: rid,
};
let Ok(Some(data_count)) = get::<u32>(state_update, &key) else {
return false;
};
if data_count > 1 {
return false;
}
let Ok(Some(pr)) = get_postponed_receipt(state_update, account_id, rid) else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &pr);
}
ReceiptEnum::PromiseResume(dr) => {
let Ok(Some(yr)) =
get_promise_yield_receipt(state_update, account_id, dr.data_id)
else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &yr);
}
}
}
handle_receipt(pipeline_manager, state_update, &receiver, account_id, peek)
})?;
Some(scheduled_receipt_offset.saturating_add(1))
}

/// Interface provided for gas cost estimations.
pub mod estimator {
use super::{ReceiptSink, Runtime};
Expand Down
Loading
Loading