Skip to content

Commit

Permalink
runtime: implement pipelined preparation for delayed receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
nagisa committed Aug 7, 2024
1 parent ef009e5 commit b05c4dc
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 34 deletions.
9 changes: 8 additions & 1 deletion runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
DelayedReceiptQueue, ReceiptIterator, ShardsOutgoingReceiptBuffer, TrieQueue
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -446,6 +446,13 @@ impl DelayedReceiptQueueWrapper {
Ok(receipt)
}

pub(crate) fn peek_iter<'a>(
&'a self,
trie_update: &'a TrieUpdate,
) -> ReceiptIterator<'a> {
self.queue.iter(trie_update)
}

pub(crate) fn len(&self) -> u64 {
self.queue.len()
}
Expand Down
118 changes: 85 additions & 33 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ impl Runtime {
) -> Result<(), RuntimeError> {
let local_processing_start = std::time::Instant::now();
let local_receipts = std::mem::take(&mut processing_state.local_receipts);
let local_receipt_count = local_receipts.len();
let local_receipt_count = processing_state.local_receipts.len();
if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
let (front, back) = local_receipts.as_slices();
Expand All @@ -1665,8 +1665,11 @@ impl Runtime {
let mut prep_lookahead_iter = local_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index =
processing_state.schedule_contract_preparation(&mut prep_lookahead_iter);
let mut next_schedule_index = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in local_receipts.iter().enumerate() {
if processing_state.total.compute >= compute_limit
Expand All @@ -1685,9 +1688,12 @@ impl Runtime {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = processing_state
.schedule_contract_preparation(&mut prep_lookahead_iter)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_index = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
}
}
// NOTE: We don't need to validate the local receipt, because it's just validated in
Expand Down Expand Up @@ -1721,6 +1727,17 @@ impl Runtime {
let protocol_version = processing_state.protocol_version;
let mut delayed_receipt_count = 0;
let mut processed_delayed_receipts = vec![];

let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
let mut scheduled_receipt_offset = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

while processing_state.delayed_receipts.len() > 0 {
if processing_state.total.compute >= compute_limit
|| proof_size_limit.is_some_and(|limit| {
Expand All @@ -1729,11 +1746,24 @@ impl Runtime {
{
break;
}

delayed_receipt_count += 1;
let receipt = processing_state
.delayed_receipts
.pop(&mut processing_state.state_update, &processing_state.apply_state.config)?
.expect("queue is not empty");
scheduled_receipt_offset = scheduled_receipt_offset.and_then(|o| o.checked_sub(1));
if scheduled_receipt_offset == Some(0) {
let mut prep_lookahead_iter = processing_state
.delayed_receipts
.peek_iter(&processing_state.state_update)
.map_while(Result::ok);
scheduled_receipt_offset = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);
}

if let Some(prefetcher) = &mut processing_state.prefetcher {
// Prefetcher is allowed to fail
Expand Down Expand Up @@ -1789,8 +1819,11 @@ impl Runtime {
let mut prep_lookahead_iter = processing_state.incoming_receipts.iter();
// Advance the preparation by one step (stagger it) so that we're preparing one interesting
// receipt in advance.
let mut next_schedule_index =
processing_state.schedule_contract_preparation(&mut prep_lookahead_iter);
let mut next_schedule_index = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
);

for (index, receipt) in processing_state.incoming_receipts.iter().enumerate() {
// Validating new incoming no matter whether we have available gas or not. We don't
Expand All @@ -1817,9 +1850,12 @@ impl Runtime {
// We're about to process a receipt that has been submitted for
// preparation, so lets submit the next one in anticipation that it might
// be processed too (it might also be not if we run out of gas/compute.)
next_schedule_index = processing_state
.schedule_contract_preparation(&mut prep_lookahead_iter)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
next_schedule_index = schedule_contract_preparation(
&mut processing_state.pipeline_manager,
&processing_state.state_update,
&mut prep_lookahead_iter,
)
.and_then(|adv| nsi.checked_add(1)?.checked_add(adv));
}
}

Expand Down Expand Up @@ -2377,31 +2413,47 @@ struct ApplyProcessingReceiptState<'a> {
pipeline_manager: pipelining::ReceiptPreparationPipeline,
}

impl<'a> ApplyProcessingReceiptState<'a> {
fn schedule_contract_preparation<'b>(
&mut self,
mut iterator: impl Iterator<Item = &'b Receipt>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let account_id = peek.receiver_id();
let receiver = get_account(&self.state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
self.pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
trait MaybeRefReceipt {
fn as_ref(&self) -> &Receipt;
}

impl MaybeRefReceipt for Receipt {
fn as_ref(&self) -> &Receipt {
self
}
}

impl<'a> MaybeRefReceipt for &'a Receipt {
fn as_ref(&self) -> &Receipt {
*self
}
}

fn schedule_contract_preparation<'b, R: MaybeRefReceipt>(
pipeline_manager: &mut pipelining::ReceiptPreparationPipeline,
state_update: &TrieUpdate,
mut iterator: impl Iterator<Item = R>,
) -> Option<usize> {
let scheduled_receipt_offset = iterator.position(|peek| {
let peek = peek.as_ref();
let account_id = peek.receiver_id();
let receiver = get_account(state_update, account_id);
let Ok(Some(receiver)) = receiver else {
tracing::error!(
target: "runtime",
message="unable to read receiver of an upcoming local receipt",
?account_id,
receipt=%peek.get_hash()
);
return false;
};
// This returns `true` if work may have been scheduled (thus we currently prepare
// actions in at most 2 "interesting" receipts in parallel due to staggering.)
pipeline_manager.submit(peek, &receiver, None)
});
scheduled_receipt_offset
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down

0 comments on commit b05c4dc

Please sign in to comment.