Skip to content

Commit

Permalink
Revert "runtime: pipelined preparation for incoming and delayed recei…
Browse files Browse the repository at this point in the history
…pts (#11904)"

This reverts commit 781805d.
  • Loading branch information
shreyan-gupta committed Aug 14, 2024
1 parent 2394e0c commit 52625eb
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 300 deletions.
6 changes: 1 addition & 5 deletions runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ReceiptIterator, ShardsOutgoingReceiptBuffer, TrieQueue,
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -445,10 +445,6 @@ impl DelayedReceiptQueueWrapper {
Ok(receipt)
}

pub(crate) fn peek_iter<'a>(&'a self, trie_update: &'a TrieUpdate) -> ReceiptIterator<'a> {
self.queue.iter(trie_update)
}

pub(crate) fn len(&self) -> u64 {
self.queue.len()
}
Expand Down
206 changes: 26 additions & 180 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,11 +1646,6 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_local_receipts", skip_all, fields(
num_receipts = processing_state.local_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_local_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1661,7 +1656,7 @@ impl Runtime {
) -> Result<(), RuntimeError> {
let local_processing_start = std::time::Instant::now();
let local_receipts = std::mem::take(&mut processing_state.local_receipts);
let local_receipt_count = processing_state.local_receipts.len();
let local_receipt_count = local_receipts.len();
if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
let (front, back) = local_receipts.as_slices();
Expand All @@ -1670,15 +1665,30 @@ impl Runtime {
}

let mut prep_lookahead_iter = local_receipts.iter();
let mut schedule_preparation = |pstate: &mut ApplyProcessingReceiptState| {
let scheduled_receipt_offset = prep_lookahead_iter.position(|peek| {
let account_id = peek.receiver_id();
let receiver = get_account(&pstate.state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
pstate.pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
};
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
let mut next_schedule_index = schedule_preparation(&mut processing_state);

for receipt in local_receipts.iter() {
for (index, receipt) in local_receipts.iter().enumerate() {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
processing_state.state_update.trie.recorded_storage_size_upper_bound() > limit
Expand All @@ -1690,17 +1700,13 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
if let Some(nsi) = next_schedule_index {
if index >= nsi {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
next_schedule_index = schedule_preparation(&mut processing_state)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
}
}
// NOTE: We don't need to validate the local receipt, because it's just validated in
Expand All @@ -1713,10 +1719,6 @@ impl Runtime {
)?
}
}

let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.local_receipts_done(
local_receipt_count as u64,
local_processing_start.elapsed(),
Expand All @@ -1726,11 +1728,6 @@ impl Runtime {
Ok(())
}

#[instrument(target = "runtime", level = "debug", "process_delayed_receipts", skip_all, fields(
num_receipts = processing_state.delayed_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_delayed_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1743,17 +1740,6 @@ impl Runtime {
let protocol_version = processing_state.protocol_version;
let mut delayed_receipt_count = 0;
let mut processed_delayed_receipts = vec![];

let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

while processing_state.delayed_receipts.len() > 0 {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1762,26 +1748,11 @@ impl Runtime {
{
break;
}

delayed_receipt_count += 1;
let receipt = processing_state
.delayed_receipts
.pop(&mut processing_state.state_update, &processing_state.apply_state.config)?
.expect("queue is not empty");
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
Expand Down Expand Up @@ -1809,9 +1780,6 @@ impl Runtime {
)?;
processed_delayed_receipts.push(receipt);
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.delayed_receipts_done(
delayed_receipt_count,
delayed_processing_start.elapsed(),
Expand All @@ -1822,11 +1790,6 @@ impl Runtime {
Ok(processed_delayed_receipts)
}

#[instrument(target = "runtime", level = "debug", "process_incoming_receipts", skip_all, fields(
num_receipts = processing_state.incoming_receipts.len(),
gas_burnt = tracing::field::Empty,
compute_usage = tracing::field::Empty,
))]
fn process_incoming_receipts<'a>(
&self,
mut processing_state: &mut ApplyProcessingReceiptState<'a>,
Expand All @@ -1841,16 +1804,6 @@ impl Runtime {
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&processing_state.incoming_receipts);
}

let mut prep_lookahead_iter = processing_state.incoming_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for receipt in processing_state.incoming_receipts.iter() {
// Validating new incoming no matter whether we have available gas or not. We don't
// want to store invalid receipts in state as delayed.
Expand All @@ -1871,20 +1824,6 @@ impl Runtime {
&processing_state.apply_state.config,
)?;
} else {
if let Some(nsi) = &mut next_schedule_after {
*nsi = nsi.saturating_sub(1);
if *nsi == 0 {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_after = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}
}

self.process_receipt_with_metrics(
&receipt,
&mut processing_state,
Expand All @@ -1893,9 +1832,6 @@ impl Runtime {
)?;
}
}
let span = tracing::Span::current();
span.record("gas_burnt", processing_state.total.gas);
span.record("compute_usage", processing_state.total.compute);
processing_state.metrics.incoming_receipts_done(
processing_state.incoming_receipts.len() as u64,
incoming_processing_start.elapsed(),
Expand Down Expand Up @@ -2442,96 +2378,6 @@ struct ApplyProcessingReceiptState<'a> {
pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

trait MaybeRefReceipt {
fn as_ref(&self) -> &Receipt;
}

impl MaybeRefReceipt for Receipt {
fn as_ref(&self) -> &Receipt {
self
}
}

impl<'a> MaybeRefReceipt for &'a Receipt {
fn as_ref(&self) -> &Receipt {
*self
}
}

/// Schedule a one receipt for contract preparation.
///
/// The caller should call this method again after the returned number of receipts from `iterator`
/// are processed.
fn schedule_contract_preparation<'b, R: MaybeRefReceipt>(
pipeline_manager: &mut pipelining::ReceiptPreparationPipeline,
state_update: &TrieUpdate,
mut iterator: impl Iterator<Item = R>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let peek = peek.as_ref();
let account_id = peek.receiver_id();
let receiver = get_account(state_update, account_id);
let Ok(Some(receiver)) = receiver else {
// Most likely reason this can happen is because the receipt is for an account that
// does not yet exist. This is a routine occurrence as accounts are created by sending
// some NEAR to a name that's about to be created.
return false;
};

// We need to inspect each receipt recursively in case these are data receipts, thus a
// function.
fn handle_receipt(
mgr: &mut ReceiptPreparationPipeline,
state_update: &TrieUpdate,
receiver: &Account,
account_id: &AccountId,
receipt: &Receipt,
) -> bool {
match receipt.receipt() {
ReceiptEnum::Action(_) | ReceiptEnum::PromiseYield(_) => {
// This returns `true` if work may have been scheduled (thus we currently
// prepare actions in at most 2 "interesting" receipts in parallel due to
// staggering.)
mgr.submit(receipt, &receiver, None)
}
ReceiptEnum::Data(dr) => {
let key = TrieKey::PostponedReceiptId {
receiver_id: account_id.clone(),
data_id: dr.data_id,
};
let Ok(Some(rid)) = get::<CryptoHash>(state_update, &key) else {
return false;
};
let key = TrieKey::PendingDataCount {
receiver_id: account_id.clone(),
receipt_id: rid,
};
let Ok(Some(data_count)) = get::<u32>(state_update, &key) else {
return false;
};
if data_count > 1 {
return false;
}
let Ok(Some(pr)) = get_postponed_receipt(state_update, account_id, rid) else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &pr);
}
ReceiptEnum::PromiseResume(dr) => {
let Ok(Some(yr)) =
get_promise_yield_receipt(state_update, account_id, dr.data_id)
else {
return false;
};
return handle_receipt(mgr, state_update, receiver, account_id, &yr);
}
}
}
handle_receipt(pipeline_manager, state_update, &receiver, account_id, peek)
})?;
Some(scheduled_receipt_offset.saturating_add(1))
}

/// Interface provided for gas cost estimations.
pub mod estimator {
use super::{ReceiptSink, Runtime};
Expand Down
Loading

0 comments on commit 52625eb

Please sign in to comment.