Skip to content

Commit

Permalink
runtime: implement pipelined preparation for delayed receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
nagisa committed Aug 8, 2024
1 parent 66a7c84 commit 9eb4e47
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 40 deletions.
6 changes: 5 additions & 1 deletion runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
DelayedReceiptQueue, ReceiptIterator, ShardsOutgoingReceiptBuffer, TrieQueue,
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -446,6 +446,10 @@ impl DelayedReceiptQueueWrapper {
Ok(receipt)
}

pub(crate) fn peek_iter<'a>(&'a self, trie_update: &'a TrieUpdate) -> ReceiptIterator<'a> {
self.queue.iter(trie_update)
}

pub(crate) fn len(&self) -> u64 {
self.queue.len()
}
Expand Down
161 changes: 122 additions & 39 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,12 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_local_receipts", skip_all, fields(
num_receipts = processing_state.local_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
#[inline(never)]
fn process_local_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1656,7 +1662,7 @@ impl Runtime {
) -> Result<(), RuntimeError> {
let local_processing_start = std::time::Instant::now();
let local_receipts = std::mem::take(&mut processing_state.local_receipts);
let local_receipt_count = local_receipts.len();
let local_receipt_count = processing_state.local_receipts.len();
if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
let (front, back) = local_receipts.as_slices();
Expand All @@ -1667,10 +1673,13 @@ impl Runtime {
let mut prep_lookahead_iter = local_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index =
processing_state.schedule_contract_preparation(&mut prep_lookahead_iter);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in local_receipts.iter().enumerate() {
for receipt in local_receipts.iter() {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit
Expand All @@ -1682,14 +1691,17 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = next_schedule_index {
if index >= nsi {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = processing_state
.schedule_contract_preparation(&mut prep_lookahead_iter)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}
// NOTE: We don't need to validate the local receipt, because it's just validated in
Expand All @@ -1702,6 +1714,10 @@ impl Runtime {
)?
}
}

let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.local_receipts_done(
local_receipt_count as u64,
local_processing_start.elapsed(),
Expand All @@ -1711,6 +1727,12 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_delayed_receipts", skip_all, fields(
num_receipts = processing_state.delayed_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
#[inline(never)]
fn process_delayed_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1723,6 +1745,17 @@ impl Runtime {
let protocol_version = processing_state.protocol_version;
let mut delayed_receipt_count = 0;
let mut processed_delayed_receipts = vec![];

let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

while processing_state.delayed_receipts.len() > 0 {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1731,11 +1764,26 @@ impl Runtime {
{
break;
}

delayed_receipt_count += 1;
let receipt = processing_state
.delayed_receipts
.pop(&mut processing_state.state_update, &processing_state.apply_state.config)?
.expect("queue is not empty");
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
Expand Down Expand Up @@ -1763,6 +1811,9 @@ impl Runtime {
)?;
processed_delayed_receipts.push(receipt);
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.delayed_receipts_done(
delayed_receipt_count,
delayed_processing_start.elapsed(),
Expand All @@ -1773,6 +1824,12 @@ impl Runtime {
Ok(processed_delayed_receipts)
}

#[instrument(target = "runtime", level = "debug", "process_incoming_receipts", skip_all, fields(
num_receipts = processing_state.incoming_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
#[inline(never)]
fn process_incoming_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1791,10 +1848,13 @@ impl Runtime {
let mut prep_lookahead_iter = processing_state.incoming_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index =
processing_state.schedule_contract_preparation(&mut prep_lookahead_iter);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in processing_state.incoming_receipts.iter().enumerate() {
for receipt in processing_state.incoming_receipts.iter() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
validate_receipt(
Expand All @@ -1814,14 +1874,17 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = next_schedule_index {
if index >= nsi {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = processing_state
.schedule_contract_preparation(&mut prep_lookahead_iter)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

Expand All @@ -1833,6 +1896,9 @@ impl Runtime {
)?;
}
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.incoming_receipts_done(
processing_state.incoming_receipts.len() as u64,
incoming_processing_start.elapsed(),
Expand Down Expand Up @@ -2151,6 +2217,7 @@ fn missing_chunk_apply_result(
});
}

#[inline(never)]
fn resolve_promise_yield_timeouts(
processing_state: &mut ApplyProcessingReceiptState,
receipt_sink: &mut ReceiptSink,
Expand Down Expand Up @@ -2379,31 +2446,47 @@ struct ApplyProcessingReceiptState<'a> {
pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

impl<'a> ApplyProcessingReceiptState<'a> {
fn schedule_contract_preparation<'b>(
&mut self,
mut iterator: impl Iterator<Item = &'b Receipt>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let account_id = peek.receiver_id();
let receiver = get_account(&self.state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
self.pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
trait MaybeRefReceipt {
fn as_ref(&self) -> &Receipt;
}

impl MaybeRefReceipt for Receipt {
fn as_ref(&self) -> &Receipt {
self
}
}

impl<'a> MaybeRefReceipt for &'a Receipt {
fn as_ref(&self) -> &Receipt {
*self
}
}

/// Schedule a one receipt for contract preparation.
///
/// The caller should call this method again after the returned number of receipts are processed.
fn schedule_contract_preparation<'b, R: MaybeRefReceipt>(
pipeline_manager: &mut pipelining::ReceiptPreparationPipeline,
state_update: &TrieUpdate,
mut iterator: impl Iterator<Item = R>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let peek = peek.as_ref();
let account_id = peek.receiver_id();
let receiver = get_account(state_update, account_id);
let Ok(Some(receiver)) = receiver else {
// Most likely reason this can happen is because the receipt is for an account that
// does not yet exist. This is a routine occurrence as accounts are created by sending
// some NEAR to a name that's about to be created.
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
pipeline_manager.submit(peek, &receiver, None)
})?;
Some(scheduled_receipt_offset.saturating_add(1))
}

/// Interface provided for gas cost estimations.
pub mod estimator {
use super::{ReceiptSink, Runtime};
Expand Down
13 changes: 13 additions & 0 deletions runtime/runtime/src/pipelining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ impl ReceiptPreparationPipeline {
let key = PrepareTaskKey { receipt_id: receipt.get_hash(), action_index };
let Some(task) = self.map.get(&key) else {
let gas_counter = self.gas_counter(view_config.as_ref(), function_call.gas);
tracing::debug!(
target: "runtime::pipelining",
message="function call task was not submitted for preparation",
receipt=%receipt.get_hash(),
action_index,
backtrace = %std::backtrace::Backtrace::force_capture()
);
return prepare_function_call(
&self.storage,
self.contract_cache.as_deref(),
Expand All @@ -252,6 +259,12 @@ impl ReceiptPreparationPipeline {
drop(status_guard);

let gas_counter = self.gas_counter(view_config.as_ref(), function_call.gas);
tracing::trace!(
target: "runtime::pipelining",
message="function call preparation on the main thread",
receipt=%receipt.get_hash(),
action_index
);
let contract = prepare_function_call(
&self.storage,
self.contract_cache.as_deref(),
Expand Down

0 comments on commit 9eb4e47

Please sign in to comment.