Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
greged93 committed Jul 3, 2024
1 parent 1f5aa67 commit e1d07f3
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 64 deletions.
2 changes: 1 addition & 1 deletion crates/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub trait BatchExecutor<DB> {
/// Contains the state changes, transaction receipts, and total gas used in the block.
///
/// TODO(mattsse): combine with `ExecutionOutcome`
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct BlockExecutionOutput<T> {
/// The changed state of the block after execution.
pub state: BundleState,
Expand Down
176 changes: 113 additions & 63 deletions crates/exex/exex/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use reth_db_api::database::Database;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber};
use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{Chain, ExecutionOutcome, FullProvider, ProviderError, TransactionVariant};
use reth_provider::{Chain, FullProvider, ProviderError, TransactionVariant};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ExecutionStageThresholds;
Expand Down Expand Up @@ -193,54 +195,93 @@ where
let chain = Chain::new(blocks, executor.finalize(), None);
Ok(chain)
}
}

/// Returns an iterator that executes each block in the range separately and yields the
/// resulting [`Block`] and [`ExecutionOutcome`].
pub fn into_single_blocks(
&self,
) -> impl Iterator<Item = Result<(Block, ExecutionOutcome), BlockExecutionError>> + '_ {
self.range.clone().map(|block_number| {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
impl<E, DB, P> From<BackfillJob<E, DB, P>> for SingleBlockBackfillJob<E, DB, P> {
fn from(value: BackfillJob<E, DB, P>) -> Self {
Self {
executor: value.executor,
provider: value.provider,
range: value.range,
_db: PhantomData,
}
}
}

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
pub struct SingleBlockBackfillJob<E, DB, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
_db: PhantomData<DB>,
}

// Configure the executor to use the previous block's state.
let mut executor = self.executor.batch_executor(
StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
),
);
impl<E, DB, P> Iterator for SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}

impl<E, DB, P> SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));

trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");
trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");

executor
.execute_and_verify_one((&block_with_senders, td).into())?;
let block_execution_output = executor.execute((&block_with_senders, td).into())?;

Ok((block_with_senders.block, executor.finalize()))
})
Ok((block_with_senders, block_execution_output))
}
}

#[cfg(test)]
mod tests {
use crate::BackfillJobFactory;
use crate::{BackfillJobFactory, SingleBlockBackfillJob};
use eyre::OptionExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
use reth_evm::execute::{BlockExecutionInput, BlockExecutorProvider, Executor};
use reth_evm::execute::{
BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, BlockWithSenders,
Genesis, GenesisAccount, Header, Requests, SealedBlockWithSenders, Transaction, TxEip2930,
TxKind, U256,
Genesis, GenesisAccount, Header, Receipt, Requests, SealedBlockWithSenders, Transaction,
TxEip2930, TxKind, U256,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
Expand All @@ -251,6 +292,18 @@ mod tests {
use secp256k1::Keypair;
use std::sync::Arc;

fn to_execution_outcome(
block_number: u64,
block_execution_output: &BlockExecutionOutput<Receipt>,
) -> ExecutionOutcome {
ExecutionOutcome {
bundle: block_execution_output.state.clone(),
receipts: block_execution_output.receipts.clone().into(),
first_block: block_number,
requests: vec![Requests(block_execution_output.requests.clone())],
}
}

fn chain_spec(address: Address) -> Arc<ChainSpec> {
// Create a chain spec with a genesis state that contains the
// provided sender
Expand All @@ -274,48 +327,43 @@ mod tests {
provider_factory: &ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
block: &BlockWithSenders,
) -> eyre::Result<ExecutionOutcome>
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: reth_db_api::database::Database,
{
let provider = provider_factory.provider()?;

// Execute the block to produce a block execution output
let block_execution_output = EthExecutorProvider::ethereum(chain_spec)
let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec)
.executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?;
block_execution_output.state.reverts.sort();

// Convert the block execution output to an execution outcome
let mut execution_outcome = ExecutionOutcome {
bundle: block_execution_output.state,
receipts: block_execution_output.receipts.into(),
first_block: block.number,
requests: vec![Requests(block_execution_output.requests)],
};
execution_outcome.bundle.reverts.sort();
// Convert the block execution output to an execution outcome for committing to the database
let execution_outcome = to_execution_outcome(block.number, &block_execution_output);

// Commit the block's execution outcome to the database
let provider_rw = provider_factory.provider_rw()?;
let block = block.clone().seal_slow();
provider_rw.append_blocks_with_state(
vec![block],
execution_outcome.clone(),
execution_outcome,
Default::default(),
Default::default(),
)?;
provider_rw.commit()?;

Ok(execution_outcome)
Ok(block_execution_output)
}

fn blocks_and_execution_outcomes<DB>(
fn blocks_and_execution_outputs<DB>(
provider_factory: ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, ExecutionOutcome)>>
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
DB: reth_db_api::database::Database,
{
Expand Down Expand Up @@ -379,15 +427,15 @@ mod tests {
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;

let outcome1 =
let block_output1 =
execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?;
let outcome2 =
let block_output2 =
execute_block_and_commit_to_database(&provider_factory, chain_spec, &block2)?;

let block1 = block1.seal_slow();
let block2 = block2.seal_slow();

Ok(vec![(block1, outcome1), (block2, outcome2)])
Ok(vec![(block1, block_output1), (block2, block_output2)])
}

#[test]
Expand All @@ -408,9 +456,10 @@ mod tests {
Arc::new(NoopBlockchainTree::default()),
)?;

let blocks_and_execution_outcomes =
blocks_and_execution_outcomes(provider_factory, chain_spec, key_pair)?;
let (block1, outcome_single) = blocks_and_execution_outcomes.first().unwrap().clone();
let blocks_and_execution_outputs =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
let execution_outcome = to_execution_outcome(block.number, block_execution_output);

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
Expand All @@ -422,14 +471,14 @@ mod tests {
assert_eq!(chains.len(), 1);
let mut chain = chains.into_iter().next().unwrap();
chain.execution_outcome_mut().bundle.reverts.sort();
assert_eq!(chain.blocks(), &[(1, block1)].into());
assert_eq!(chain.execution_outcome(), &outcome_single);
assert_eq!(chain.blocks(), &[(1, block.clone())].into());
assert_eq!(chain.execution_outcome(), &execution_outcome);

Ok(())
}

#[test]
fn test_into_single_blocks() -> eyre::Result<()> {
fn test_single_block_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
Expand All @@ -447,12 +496,13 @@ mod tests {
)?;

let blocks_and_execution_outcomes =
blocks_and_execution_outcomes(provider_factory, chain_spec, key_pair)?;
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=1);
let block_execution_it = job.into_single_blocks();
let single_job: SingleBlockBackfillJob<_, _, _> = job.into();
let block_execution_it = single_job.into_iter();

// Assert that the backfill job only produces a single block
let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
Expand All @@ -461,15 +511,15 @@ mod tests {
// Assert that the backfill job single block iterator produces the expected output for each
// block
for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
let (block, mut outcome) = res?;
outcome.bundle.reverts.sort();
let (block, mut execution_output) = res?;
execution_output.state.reverts.sort();

let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone();
let expected_block = sealed_block_with_senders.unseal().block;
let expected_outcome = &blocks_and_execution_outcomes[i].1;
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[i].1;

assert_eq!(block, expected_block);
assert_eq!(outcome, expected_outcome.clone());
assert_eq!(&execution_output, expected_output);
}

Ok(())
Expand Down

0 comments on commit e1d07f3

Please sign in to comment.