Skip to content
This repository has been archived by the owner on Feb 6, 2025. It is now read-only.

Commit

Permalink
fix: cannot find parent block during livesync (#111)
Browse files Browse the repository at this point in the history
* fix: cannot find parent block during livesync

* fix some test issues

* fix some test issues

* try to fix deadlock issue

* try to fix dead loop issue

* refine codes

* fix an issue

* fix issue & refactor

* fix some review comments
  • Loading branch information
forcodedancing authored Aug 20, 2024
1 parent 380dbb2 commit cdd7cea
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 82 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl Command {
#[cfg(not(feature = "bsc"))]
let executor = block_executor!(provider_factory.chain_spec()).executor(db);

let BlockExecutionOutput { state, receipts, requests, .. } =
executor.execute((&block_with_senders.clone().unseal(), U256::MAX).into())?;
let BlockExecutionOutput { state, receipts, requests, .. } = executor
.execute((&block_with_senders.clone().unseal(), U256::MAX, None).into())?;
let execution_outcome = ExecutionOutcome::new(
state,
receipts.into(),
Expand Down
1 change: 1 addition & 0 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl Command {
.with_recovered_senders()
.ok_or(BlockValidationError::SenderRecoveryError)?,
merkle_block_td + block.difficulty,
None,
)
.into(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Command {
provider_rw.static_file_provider().clone(),
),
));
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td, None).into())?;
executor.finalize().write_to_storage(&provider_rw, None, OriginalValuesKnown::Yes)?;

let checkpoint = Some(StageCheckpoint::new(
Expand Down
14 changes: 11 additions & 3 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use reth_evm::execute::{BlockExecutionOutput, BlockExecutorProvider, Executor};
use reth_execution_errors::BlockExecutionError;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256,
BlockHash, BlockNumber, ForkBlock, GotExpected, Header, SealedBlockWithSenders, SealedHeader,
B256, U256,
};
use reth_provider::{
providers::{BundleStateProvider, ConsistentDbView},
Expand All @@ -25,7 +26,7 @@ use reth_revm::database::StateProviderDatabase;
use reth_trie::updates::TrieUpdates;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
ops::{Deref, DerefMut},
time::Instant,
};
Expand Down Expand Up @@ -92,6 +93,7 @@ impl AppendableChain {
let (bundle_state, trie_updates) = Self::validate_and_execute(
block.clone(),
parent_header,
None,
state_provider,
externals,
block_attachment,
Expand Down Expand Up @@ -138,6 +140,7 @@ impl AppendableChain {
let (block_state, _) = Self::validate_and_execute(
block.clone(),
parent,
None,
bundle_state_data,
externals,
BlockAttachment::HistoricalFork,
Expand Down Expand Up @@ -170,6 +173,7 @@ impl AppendableChain {
fn validate_and_execute<EDP, DB, E>(
block: SealedBlockWithSenders,
parent_block: &SealedHeader,
ancestor_blocks: Option<&HashMap<B256, Header>>,
bundle_state_data_provider: EDP,
externals: &TreeExternals<DB, E>,
block_attachment: BlockAttachment,
Expand Down Expand Up @@ -209,7 +213,7 @@ impl AppendableChain {
let block_hash = block.hash();
let block = block.unseal();

let state = executor.execute((&block, U256::MAX).into())?;
let state = executor.execute((&block, U256::MAX, ancestor_blocks).into())?;
let BlockExecutionOutput { state, receipts, requests, .. } = state;
externals
.consensus
Expand Down Expand Up @@ -285,6 +289,9 @@ impl AppendableChain {
{
let parent_block = self.chain.tip();

let ancestor_blocks =
self.headers().map(|h| return (h.hash() as B256, h.header().clone())).collect();

let bundle_state_data = BundleStateDataRef {
execution_outcome: self.execution_outcome(),
sidechain_block_hashes: &side_chain_block_hashes,
Expand All @@ -295,6 +302,7 @@ impl AppendableChain {
let (block_state, _) = Self::validate_and_execute(
block.clone(),
parent_block,
Some(&ancestor_blocks),
bundle_state_data,
externals,
block_attachment,
Expand Down
2 changes: 1 addition & 1 deletion crates/bsc/engine/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) struct ParliaEngineTask<
> {
/// The configured chain spec
chain_spec: Arc<ChainSpec>,
/// The coneensus instance
/// The consensus instance
consensus: Parlia,
/// The provider used to read the block and header from the inserted chain
provider: Provider,
Expand Down
71 changes: 46 additions & 25 deletions crates/bsc/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,15 @@ where
&mut self,
block: &BlockWithSenders,
total_difficulty: U256,
ancestor: Option<&HashMap<B256, Header>>,
) -> Result<BscExecuteOutput, BlockExecutionError> {
// 1. get parent header and snapshot
let parent = &(self.get_header_by_hash(block.parent_hash)?);
let parent = &(self.get_header_by_hash(block.parent_hash, ancestor)?);
let snapshot_reader = SnapshotReader::new(self.provider.clone(), self.parlia.clone());
let snap = &(snapshot_reader.snapshot(parent, None)?);
let snap = &(snapshot_reader.snapshot(parent, ancestor)?);

// 2. prepare state on new block
self.on_new_block(&block.header, parent, snap)?;
self.on_new_block(&block.header, parent, ancestor, snap)?;

// 3. get data from contracts before execute transactions
let post_execution_input =
Expand All @@ -353,6 +354,7 @@ where
self.post_execution(
block,
parent,
ancestor,
snap,
post_execution_input,
&mut system_txs,
Expand All @@ -370,6 +372,7 @@ where

pub(crate) fn get_justified_header(
&self,
ancestor: Option<&HashMap<B256, Header>>,
snap: &Snapshot,
) -> Result<Header, BlockExecutionError> {
if snap.vote_data.source_hash == B256::ZERO && snap.vote_data.target_hash == B256::ZERO {
Expand All @@ -382,16 +385,23 @@ where
});
}

self.get_header_by_hash(snap.vote_data.target_hash)
self.get_header_by_hash(snap.vote_data.target_hash, ancestor)
}

pub(crate) fn get_header_by_hash(
&self,
block_hash: B256,
ancestor: Option<&HashMap<B256, Header>>,
) -> Result<Header, BlockExecutionError> {
self.provider
.header(&block_hash)
.map_err(|err| BscBlockExecutionError::ProviderInnerError { error: err.into() })?
ancestor
.and_then(|m| m.get(&block_hash).cloned())
.or_else(|| {
self.provider
.header(&block_hash)
.map_err(|err| BscBlockExecutionError::ProviderInnerError { error: err.into() })
.ok()
.flatten()
})
.ok_or_else(|| BscBlockExecutionError::UnknownHeader { block_hash }.into())
}

Expand Down Expand Up @@ -670,7 +680,7 @@ where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
P: ParliaProvider,
{
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders, Header>;
type Output = BlockExecutionOutput<Receipt>;
type Error = BlockExecutionError;

Expand All @@ -682,9 +692,9 @@ where
///
/// State changes are committed to the database.
fn execute(mut self, input: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
let BlockExecutionInput { block, total_difficulty } = input;
let BlockExecutionInput { block, total_difficulty, ancestor_headers } = input;
let BscExecuteOutput { receipts, gas_used, snapshot } =
self.execute_and_verify(block, total_difficulty)?;
self.execute_and_verify(block, total_difficulty, ancestor_headers)?;

// NOTE: we need to merge keep the reverts for the bundle retention
self.state.merge_transitions(BundleRetention::Reverts);
Expand Down Expand Up @@ -726,15 +736,15 @@ where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
P: ParliaProvider,
{
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders, Header>;
type Output = ExecutionOutcome;
type Error = BlockExecutionError;

fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> {
let BlockExecutionInput { block, total_difficulty } = input;
let BlockExecutionInput { block, total_difficulty, .. } = input;
let execute_start = Instant::now();
let BscExecuteOutput { receipts, gas_used: _, snapshot } =
self.executor.execute_and_verify(block, total_difficulty)?;
self.executor.execute_and_verify(block, total_difficulty, None)?;
self.stats.execution_duration += execute_start.elapsed();

validate_block_post_execution(block, self.executor.chain_spec(), &receipts)?;
Expand Down Expand Up @@ -806,7 +816,7 @@ where
pub fn snapshot(
&self,
header: &Header,
parent: Option<&Header>,
ancestor: Option<&HashMap<B256, Header>>,
) -> Result<Snapshot, BlockExecutionError> {
let mut cache = RECENT_SNAPS.write();

Expand Down Expand Up @@ -853,14 +863,14 @@ where

// No snapshot for this header, gather the header and move backward
skip_headers.push(header.clone());
if let Some(parent) = parent {
block_number = parent.number;
block_hash = header.parent_hash;
header = parent.clone();
} else if let Ok(h) = self.get_header_by_hash(header.parent_hash) {
if let Ok(h) = self.get_header_by_hash(header.parent_hash, ancestor) {
block_number = h.number;
block_hash = header.parent_hash;
header = h;
} else {
return Err(
BscBlockExecutionError::UnknownHeader { block_hash: header.parent_hash }.into()
)
}
}

Expand All @@ -880,7 +890,7 @@ where
{
// change validator set
let checkpoint_header =
self.find_ancient_header(header, snap.miner_history_check_len())?;
self.find_ancient_header(header, ancestor, snap.miner_history_check_len())?;

let validators_info = self
.parlia
Expand Down Expand Up @@ -924,21 +934,32 @@ where
Ok(snap)
}

fn get_header_by_hash(&self, block_hash: B256) -> Result<Header, BlockExecutionError> {
self.provider
.header(&block_hash)
.map_err(|err| BscBlockExecutionError::ProviderInnerError { error: err.into() })?
fn get_header_by_hash(
&self,
block_hash: B256,
ancestor: Option<&HashMap<B256, Header>>,
) -> Result<Header, BlockExecutionError> {
ancestor
.and_then(|m| m.get(&block_hash).cloned())
.or_else(|| {
self.provider
.header(&block_hash)
.map_err(|err| BscBlockExecutionError::ProviderInnerError { error: err.into() })
.ok()
.flatten()
})
.ok_or_else(|| BscBlockExecutionError::UnknownHeader { block_hash }.into())
}

fn find_ancient_header(
&self,
header: &Header,
ancestor: Option<&HashMap<B256, Header>>,
count: u64,
) -> Result<Header, BlockExecutionError> {
let mut result = header.clone();
for _ in 0..count {
result = self.get_header_by_hash(result.parent_hash)?;
result = self.get_header_by_hash(result.parent_hash, ancestor)?;
}
Ok(result)
}
Expand Down
14 changes: 9 additions & 5 deletions crates/bsc/evm/src/post_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_primitives::{
hex,
parlia::{Snapshot, VoteAddress, VoteAttestation},
system_contracts::SYSTEM_REWARD_CONTRACT,
Address, BlockWithSenders, GotExpected, Header, Receipt, TransactionSigned, U256,
Address, BlockWithSenders, GotExpected, Header, Receipt, TransactionSigned, B256, U256,
};
use reth_provider::ParliaProvider;
use reth_revm::bsc::SYSTEM_ADDRESS;
Expand Down Expand Up @@ -41,6 +41,7 @@ where
&mut self,
block: &BlockWithSenders,
parent: &Header,
ancestor: Option<&HashMap<B256, Header>>,
snap: &Snapshot,
post_execution_input: PostExecutionInput,
system_txs: &mut Vec<TransactionSigned>,
Expand Down Expand Up @@ -107,6 +108,7 @@ where
if self.chain_spec().is_plato_active_at_block(number) {
self.distribute_finality_reward(
header,
ancestor,
system_txs,
receipts,
cumulative_gas_used,
Expand Down Expand Up @@ -369,6 +371,7 @@ where
fn distribute_finality_reward(
&mut self,
header: &Header,
ancestor: Option<&HashMap<B256, Header>>,
system_txs: &mut Vec<TransactionSigned>,
receipts: &mut Vec<Receipt>,
cumulative_gas_used: &mut u64,
Expand All @@ -385,14 +388,14 @@ where
let end = header.number;
let mut target_hash = header.parent_hash;
for _ in (start..end).rev() {
let header = &(self.get_header_by_hash(target_hash)?);
let header = &(self.get_header_by_hash(target_hash, ancestor)?);

if let Some(attestation) =
self.parlia().get_vote_attestation_from_header(header).map_err(|err| {
BscBlockExecutionError::ParliaConsensusInnerError { error: err.into() }
})?
{
self.process_attestation(&attestation, header, &mut accumulated_weights)?;
self.process_attestation(&attestation, header, ancestor, &mut accumulated_weights)?;
}

target_hash = header.parent_hash;
Expand Down Expand Up @@ -445,10 +448,11 @@ where
&self,
attestation: &VoteAttestation,
parent_header: &Header,
ancestor: Option<&HashMap<B256, Header>>,
accumulated_weights: &mut HashMap<Address, U256>,
) -> Result<(), BlockExecutionError> {
let justified_header = self.get_header_by_hash(attestation.data.target_hash)?;
let parent = self.get_header_by_hash(justified_header.parent_hash)?;
let justified_header = self.get_header_by_hash(attestation.data.target_hash, ancestor)?;
let parent = self.get_header_by_hash(justified_header.parent_hash, ancestor)?;
let snapshot_reader = SnapshotReader::new(self.provider.clone(), self.parlia.clone());
let snapshot = &(snapshot_reader.snapshot(&parent, None)?);
let validators = &snapshot.validators;
Expand Down
Loading

0 comments on commit cdd7cea

Please sign in to comment.