diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index a4868bcf098b..aaf92e9c3501 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -1,10 +1,9 @@ -use alloy_consensus::Header; +use alloy_eips::eip1898::BlockWithParent; use alloy_primitives::B256; use reth_metrics::{ metrics::{Counter, Gauge}, Metrics, }; -use reth_primitives::SealedHeader; use schnellru::{ByLength, LruMap}; use std::{fmt::Debug, sync::Arc}; use tracing::warn; @@ -17,20 +16,20 @@ const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128; /// Keeps track of invalid headers. #[derive(Debug)] -pub struct InvalidHeaderCache { +pub struct InvalidHeaderCache { /// This maps a header hash to a reference to its invalid ancestor. - headers: LruMap>, + headers: LruMap>, /// Metrics for the cache. metrics: InvalidHeaderCacheMetrics, } -impl InvalidHeaderCache { +impl InvalidHeaderCache { /// Invalid header cache constructor. pub fn new(max_length: u32) -> Self { Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() } } - fn insert_entry(&mut self, hash: B256, header: Arc) { + fn insert_entry(&mut self, hash: B256, header: Arc) { self.headers.insert(hash, HeaderEntry { header, hit_count: 0 }); } @@ -38,7 +37,7 @@ impl InvalidHeaderCache { /// /// If this is called, the hit count for the entry is incremented. /// If the hit count exceeds the threshold, the entry is evicted and `None` is returned. - pub fn get(&mut self, hash: &B256) -> Option> { + pub fn get(&mut self, hash: &B256) -> Option> { { let entry = self.headers.get(hash)?; entry.hit_count += 1; @@ -53,7 +52,11 @@ impl InvalidHeaderCache { } /// Inserts an invalid block into the cache, with a given invalid ancestor. - pub fn insert_with_invalid_ancestor(&mut self, header_hash: B256, invalid_ancestor: Arc) { + pub fn insert_with_invalid_ancestor( + &mut self, + header_hash: B256, + invalid_ancestor: Arc, + ) { if self.get(&header_hash).is_none() { warn!(target: "consensus::engine", hash=?header_hash, ?invalid_ancestor, "Bad block with existing invalid ancestor"); self.insert_entry(header_hash, invalid_ancestor); @@ -65,12 +68,10 @@ impl InvalidHeaderCache { } /// Inserts an invalid ancestor into the map. - pub fn insert(&mut self, invalid_ancestor: SealedHeader) { - if self.get(&invalid_ancestor.hash()).is_none() { - let hash = invalid_ancestor.hash(); - let header = invalid_ancestor.unseal(); - warn!(target: "consensus::engine", ?hash, ?header, "Bad block with hash"); - self.insert_entry(hash, Arc::new(header)); + pub fn insert(&mut self, invalid_ancestor: BlockWithParent) { + if self.get(&invalid_ancestor.block.hash).is_none() { + warn!(target: "consensus::engine", ?invalid_ancestor, "Bad block with hash"); + self.insert_entry(invalid_ancestor.block.hash, Arc::new(invalid_ancestor)); // update metrics self.metrics.unique_inserts.increment(1); @@ -103,13 +104,15 @@ struct InvalidHeaderCacheMetrics { #[cfg(test)] mod tests { use super::*; + use alloy_consensus::Header; + use reth_primitives::SealedHeader; #[test] fn test_hit_eviction() { let mut cache = InvalidHeaderCache::new(10); let header = Header::default(); let header = SealedHeader::seal(header); - cache.insert(header.clone()); + cache.insert(header.block_with_parent()); assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0); for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD { diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 63afd2926533..7d7eb6ba6dee 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -760,14 +760,14 @@ where // iterate over ancestors in the invalid cache // until we encounter the first valid ancestor let mut current_hash = parent_hash; - let mut current_header = self.invalid_headers.get(¤t_hash); - while let Some(header) = current_header { - current_hash = header.parent_hash; - current_header = self.invalid_headers.get(¤t_hash); + let mut current_block = self.invalid_headers.get(¤t_hash); + while let Some(block_with_parent) = current_block { + current_hash = block_with_parent.parent; + current_block = self.invalid_headers.get(¤t_hash); // If current_header is None, then the current_hash does not have an invalid // ancestor in the cache, check its presence in blockchain tree - if current_header.is_none() && + if current_block.is_none() && self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some() { return Ok(Some(current_hash)) @@ -806,13 +806,13 @@ where head: B256, ) -> ProviderResult> { // check if the check hash was previously marked as invalid - let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) }; + let Some(block_with_parent) = self.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(header.parent_hash)?; + let status = self.prepare_invalid_response(block_with_parent.parent)?; // insert the head block into the invalid header cache - self.invalid_headers.insert_with_invalid_ancestor(head, header); + self.invalid_headers.insert_with_invalid_ancestor(head, block_with_parent); Ok(Some(status)) } @@ -821,10 +821,10 @@ where /// to a forkchoice update. fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult> { // check if the head was previously marked as invalid - let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) }; + let Some(block_with_parent) = self.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) + Ok(Some(self.prepare_invalid_response(block_with_parent.parent)?)) } /// Record latency metrics for one call to make a block canonical @@ -1454,7 +1454,7 @@ where fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> { // Pipeline unwound, memorize the invalid block and wait for CL for next sync target. if let ControlFlow::Unwind { bad_block, .. } = ctrl { - warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind"); + warn!(target: "consensus::engine", invalid_num_hash=?bad_block.block, "Bad block detected in unwind"); // update the `invalid_headers` cache with the new invalid header self.invalid_headers.insert(*bad_block); return Ok(()) @@ -1673,7 +1673,7 @@ where self.latest_valid_hash_for_invalid_payload(block.parent_hash)? }; // keep track of the invalid header - self.invalid_headers.insert(block.header); + self.invalid_headers.insert(block.header.block_with_parent()); PayloadStatus::new( PayloadStatusEnum::Invalid { validation_error: error.to_string() }, latest_valid_hash, @@ -1782,7 +1782,7 @@ where let (block, err) = err.split(); warn!(target: "consensus::engine", invalid_number=?block.number, invalid_hash=?block.hash(), %err, "Marking block as invalid"); - self.invalid_headers.insert(block.header); + self.invalid_headers.insert(block.header.block_with_parent()); } } } @@ -2035,7 +2035,7 @@ mod tests { .await; assert_matches!( res.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -2089,7 +2089,7 @@ mod tests { Ok(result) => { assert_matches!( result, - Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage( .. )) + Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); break } @@ -2141,7 +2141,7 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 763d5d990c5d..6c1dd711ff2c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1328,7 +1328,7 @@ where // Pipeline unwound, memorize the invalid block and wait for CL for next sync target. if let ControlFlow::Unwind { bad_block, .. } = ctrl { - warn!(target: "engine::tree", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind"); + warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind"); // update the `invalid_headers` cache with the new invalid header self.state.invalid_headers.insert(*bad_block); return Ok(()) @@ -1678,14 +1678,14 @@ where // iterate over ancestors in the invalid cache // until we encounter the first valid ancestor let mut current_hash = parent_hash; - let mut current_header = self.state.invalid_headers.get(¤t_hash); - while let Some(header) = current_header { - current_hash = header.parent_hash; - current_header = self.state.invalid_headers.get(¤t_hash); + let mut current_block = self.state.invalid_headers.get(¤t_hash); + while let Some(block_with_parent) = current_block { + current_hash = block_with_parent.parent; + current_block = self.state.invalid_headers.get(¤t_hash); // If current_header is None, then the current_hash does not have an invalid // ancestor in the cache, check its presence in blockchain tree - if current_header.is_none() && self.block_by_hash(current_hash)?.is_some() { + if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() { return Ok(Some(current_hash)) } } @@ -1735,7 +1735,7 @@ where let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(header.parent_hash)?; + let status = self.prepare_invalid_response(header.parent)?; // insert the head block into the invalid header cache self.state.invalid_headers.insert_with_invalid_ancestor(head, header); @@ -1749,7 +1749,7 @@ where // check if the head was previously marked as invalid let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) + Ok(Some(self.prepare_invalid_response(header.parent)?)) } /// Validate if block is correct and satisfies all the consensus rules that concern the header @@ -2395,7 +2395,7 @@ where }; // keep track of the invalid header - self.state.invalid_headers.insert(block.header); + self.state.invalid_headers.insert(block.header.block_with_parent()); Ok(PayloadStatus::new( PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() }, latest_valid_hash, diff --git a/crates/primitives-traits/src/header/sealed.rs b/crates/primitives-traits/src/header/sealed.rs index e99b0e1c17ff..61b021a0879b 100644 --- a/crates/primitives-traits/src/header/sealed.rs +++ b/crates/primitives-traits/src/header/sealed.rs @@ -1,7 +1,7 @@ use crate::InMemorySize; pub use alloy_consensus::Header; use alloy_consensus::Sealed; -use alloy_eips::BlockNumHash; +use alloy_eips::{eip1898::BlockWithParent, BlockNumHash}; use alloy_primitives::{keccak256, BlockHash, Sealable}; use alloy_rlp::{Decodable, Encodable}; use bytes::BufMut; @@ -65,6 +65,11 @@ impl SealedHeader { pub fn num_hash(&self) -> BlockNumHash { BlockNumHash::new(self.number(), self.hash) } + + /// Return a [`BlockWithParent`] for this header. + pub fn block_with_parent(&self) -> BlockWithParent { + BlockWithParent { parent: self.parent_hash(), block: self.num_hash() } + } } impl InMemorySize for SealedHeader { diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index ef6d8f6a0e89..b63dd20f77c1 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -3,7 +3,6 @@ use alloy_eips::eip1898::BlockWithParent; use reth_consensus::ConsensusError; use reth_errors::{BlockExecutionError, DatabaseError, RethError}; use reth_network_p2p::error::DownloadError; -use reth_primitives_traits::SealedHeader; use reth_provider::ProviderError; use reth_prune::{PruneSegment, PruneSegmentError, PrunerError}; use reth_static_file_types::StaticFileSegment; @@ -56,9 +55,9 @@ pub enum StageError { )] DetachedHead { /// The local head we attempted to attach to. - local_head: BlockWithParent, + local_head: Box, /// The header we attempted to attach. - header: BlockWithParent, + header: Box, /// The error that occurred when attempting to attach the header. #[source] error: Box, diff --git a/crates/stages/api/src/pipeline/ctrl.rs b/crates/stages/api/src/pipeline/ctrl.rs index 161857552451..378385e97b73 100644 --- a/crates/stages/api/src/pipeline/ctrl.rs +++ b/crates/stages/api/src/pipeline/ctrl.rs @@ -1,5 +1,5 @@ +use alloy_eips::eip1898::BlockWithParent; use alloy_primitives::BlockNumber; -use reth_primitives_traits::SealedHeader; /// Determines the control flow during pipeline execution. /// @@ -11,7 +11,7 @@ pub enum ControlFlow { /// The block to unwind to. target: BlockNumber, /// The block that caused the unwind. - bad_block: Box, + bad_block: Box, }, /// The pipeline made progress. Continue { diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 273fe1fa6dbd..c02708e3b8f7 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -223,7 +223,7 @@ impl Pipeline { } ControlFlow::Continue { block_number } => self.progress.update(block_number), ControlFlow::Unwind { target, bad_block } => { - self.unwind(target, Some(bad_block.number))?; + self.unwind(target, Some(bad_block.block.number))?; return Ok(ControlFlow::Unwind { target, bad_block }) } } @@ -367,7 +367,7 @@ impl Pipeline { Err(err) => { self.event_sender.notify(PipelineEvent::Error { stage_id }); - return Err(PipelineError::Stage(Box::new(StageError::Fatal(Box::new(err))))) + return Err(PipelineError::Stage(StageError::Fatal(Box::new(err)))) } } } @@ -598,13 +598,12 @@ mod tests { use super::*; use crate::{test_utils::TestStage, UnwindOutput}; - use alloy_eips::{eip1898::BlockWithParent, NumHash}; use assert_matches::assert_matches; use reth_consensus::ConsensusError; use reth_errors::ProviderError; use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB}; use reth_prune::PruneModes; - use reth_testing_utils::generators::{self, random_block_with_parent, random_header}; + use reth_testing_utils::generators::{self, random_block_with_parent}; use tokio_stream::StreamExt; #[test] @@ -1112,6 +1111,11 @@ mod tests { StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), ); let result = pipeline.run().await; - assert_matches!(result, Err(PipelineError::Stage(..))); + assert_matches!( + result, + Err(PipelineError::Stage(StageError::DatabaseIntegrity( + ProviderError::BlockBodyIndicesNotFound(5) + ))) + ); } } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index c8cc89080867..91afc33efaa0 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -1,5 +1,6 @@ use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; use alloy_consensus::{BlockHeader, Header}; +use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::BlockNumber; use num_traits::Zero; use reth_config::config::ExecutionConfig; @@ -11,7 +12,7 @@ use reth_evm::{ }; use reth_execution_types::Chain; use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; -use reth_primitives::{SealedHeader, StaticFileSegment}; +use reth_primitives::StaticFileSegment; use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, @@ -359,9 +360,15 @@ where let execute_start = Instant::now(); self.metrics.metered_one((&block, td).into(), |input| { - executor.execute_and_verify_one(input).map_err(|error| StageError::Block { - block: Box::new(SealedHeader::seal(block.header().clone())), - error: BlockErrorKind::Execution(error), + executor.execute_and_verify_one(input).map_err(|error| { + let header = block.header(); + StageError::Block { + block: Box::new(BlockWithParent::new( + header.parent_hash(), + NumHash::new(header.number(), header.hash_slow()), + )), + error: BlockErrorKind::Execution(error), + } }) })?; diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 7b9b394b5615..7ca9cae590b6 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -1,4 +1,5 @@ use alloy_consensus::BlockHeader; +use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256}; use futures_util::StreamExt; use reth_config::config::EtlConfig; @@ -143,7 +144,10 @@ where // Header validation self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| { StageError::Block { - block: Box::new(SealedHeader::new(header.clone(), header_hash)), + block: Box::new(BlockWithParent::new( + header.parent_hash, + NumHash::new(header.number, header_hash), + )), error: BlockErrorKind::Validation(error), } })?; @@ -272,7 +276,11 @@ where } Some(Err(HeadersDownloaderError::DetachedHead { local_head, header, error })) => { error!(target: "sync::stages::headers", %error, "Cannot attach header to head"); - return Poll::Ready(Err(StageError::DetachedHead { local_head, header, error })) + return Poll::Ready(Err(StageError::DetachedHead { + local_head: Box::new(local_head.block_with_parent()), + header: Box::new(header.block_with_parent()), + error, + })) } None => return Poll::Ready(Err(StageError::ChannelClosed)), } diff --git a/crates/stages/stages/src/stages/merkle.rs b/crates/stages/stages/src/stages/merkle.rs index 8095dfed9048..ff4d37cf3f61 100644 --- a/crates/stages/stages/src/stages/merkle.rs +++ b/crates/stages/stages/src/stages/merkle.rs @@ -357,7 +357,7 @@ fn validate_state_root( error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff( GotExpected { got, expected: expected.state_root }.into(), )), - block: Box::new(expected), + block: Box::new(expected.block_with_parent()), }) } } diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index d34a4b07921a..b5506068f481 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -192,7 +192,7 @@ where })?; Err(StageError::Block { - block: Box::new(sealed_header), + block: Box::new(sealed_header.block_with_parent()), error: BlockErrorKind::Validation( ConsensusError::TransactionSignerRecoveryError, ), diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 34aaeee44beb..c2a7c6ede02f 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -279,5 +279,8 @@ where let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - Ok(StageError::MissingStaticFileData { block: missing_block, segment }) + Ok(StageError::MissingStaticFileData { + block: Box::new(missing_block.block_with_parent()), + segment, + }) }