From f13cc9f6f0b39828c77b0339821ed2d89021f57d Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 6 Dec 2024 17:15:03 -0500 Subject: [PATCH 1/5] chore: Box `StageError` variant in `PipelineError` --- Cargo.lock | 1 + .../beacon/src/engine/invalid_headers.rs | 2 +- crates/consensus/beacon/src/engine/mod.rs | 6 ++--- crates/stages/api/Cargo.toml | 2 ++ crates/stages/api/src/error.rs | 21 ++++++++-------- crates/stages/api/src/pipeline/mod.rs | 24 ++++++++----------- testing/testing-utils/src/generators.rs | 15 +++++++++++- 7 files changed, 42 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb3910d40e57..23798b50a898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9237,6 +9237,7 @@ dependencies = [ name = "reth-stages-api" version = "1.1.2" dependencies = [ + "alloy-eips", "alloy-primitives", "aquamarine", "assert_matches", diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index 0a72129a6274..a4868bcf098b 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -82,7 +82,7 @@ impl InvalidHeaderCache { struct HeaderEntry { /// Keeps track how many times this header has been hit. hit_count: u8, - /// The actually header entry + /// The actual header entry header: Arc, } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index f188e495be4e..63afd2926533 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -2035,7 +2035,7 @@ mod tests { .await; assert_matches!( res.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) ); } @@ -2089,7 +2089,7 @@ mod tests { Ok(result) => { assert_matches!( result, - Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) + Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage( .. )) ); break } @@ -2141,7 +2141,7 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) ); } diff --git a/crates/stages/api/Cargo.toml b/crates/stages/api/Cargo.toml index 88a8e3b96d13..ffa34afa71e7 100644 --- a/crates/stages/api/Cargo.toml +++ b/crates/stages/api/Cargo.toml @@ -23,7 +23,9 @@ reth-errors.workspace = true reth-stages-types.workspace = true reth-static-file-types.workspace = true +# alloy alloy-primitives.workspace = true +alloy-eips.workspace = true # metrics reth-metrics.workspace = true diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index 9a4ef35aaf25..ef6d8f6a0e89 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -1,4 +1,5 @@ use crate::PipelineEvent; +use alloy_eips::eip1898::BlockWithParent; use reth_consensus::ConsensusError; use reth_errors::{BlockExecutionError, DatabaseError, RethError}; use reth_network_p2p::error::DownloadError; @@ -34,10 +35,10 @@ impl BlockErrorKind { #[derive(Error, Debug)] pub enum StageError { /// The stage encountered an error related to a block. - #[error("stage encountered an error in block #{number}: {error}", number = block.number)] + #[error("stage encountered an error in block #{number}: {error}", number = block.block.number)] Block { /// The block that caused the error. - block: Box, + block: Box, /// The specific error type, either consensus or execution error. #[source] error: BlockErrorKind, @@ -48,16 +49,16 @@ pub enum StageError { "stage encountered inconsistent chain: \ downloaded header #{header_number} ({header_hash}) is detached from \ local head #{head_number} ({head_hash}): {error}", - header_number = header.number, - header_hash = header.hash(), - head_number = local_head.number, - head_hash = local_head.hash(), + header_number = header.block.number, + header_hash = header.block.hash, + head_number = local_head.block.number, + head_hash = local_head.block.hash, )] DetachedHead { /// The local head we attempted to attach to. - local_head: Box, + local_head: BlockWithParent, /// The header we attempted to attach. - header: Box, + header: BlockWithParent, /// The error that occurred when attempting to attach the header. #[source] error: Box, @@ -92,10 +93,10 @@ pub enum StageError { #[error("invalid download response: {0}")] Download(#[from] DownloadError), /// Database is ahead of static file data. - #[error("missing static file data for block number: {number}", number = block.number)] + #[error("missing static file data for block number: {number}", number = block.block.number)] MissingStaticFileData { /// Starting block with missing data. - block: Box, + block: Box, /// Static File segment segment: StaticFileSegment, }, diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 39d26cd88082..273fe1fa6dbd 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -367,7 +367,7 @@ impl Pipeline { Err(err) => { self.event_sender.notify(PipelineEvent::Error { stage_id }); - return Err(PipelineError::Stage(StageError::Fatal(Box::new(err)))) + return Err(PipelineError::Stage(Box::new(StageError::Fatal(Box::new(err))))) } } } @@ -505,7 +505,7 @@ fn on_stage_error( // We unwind because of a detached head. let unwind_to = - local_head.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1); + local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1); Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head })) } else if let StageError::Block { block, error } = err { match error { @@ -513,7 +513,7 @@ fn on_stage_error( error!( target: "sync::pipeline", stage = %stage_id, - bad_block = %block.number, + bad_block = %block.block.number, "Stage encountered a validation error: {validation_error}" ); @@ -542,7 +542,7 @@ fn on_stage_error( error!( target: "sync::pipeline", stage = %stage_id, - bad_block = %block.number, + bad_block = %block.block.number, "Stage encountered an execution error: {execution_error}" ); @@ -560,12 +560,12 @@ fn on_stage_error( error!( target: "sync::pipeline", stage = %stage_id, - bad_block = %block.number, + bad_block = %block.block.number, segment = %segment, "Stage is missing static file data." ); - Ok(Some(ControlFlow::Unwind { target: block.number - 1, bad_block: block })) + Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block })) } else if err.is_fatal() { error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}"); Err(err.into()) @@ -598,12 +598,13 @@ mod tests { use super::*; use crate::{test_utils::TestStage, UnwindOutput}; + use alloy_eips::{eip1898::BlockWithParent, NumHash}; use assert_matches::assert_matches; use reth_consensus::ConsensusError; use reth_errors::ProviderError; use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB}; use reth_prune::PruneModes; - use reth_testing_utils::{generators, generators::random_header}; + use reth_testing_utils::generators::{self, random_block_with_parent, random_header}; use tokio_stream::StreamExt; #[test] @@ -975,7 +976,7 @@ mod tests { .add_stage( TestStage::new(StageId::Other("B")) .add_exec(Err(StageError::Block { - block: Box::new(random_header( + block: Box::new(random_block_with_parent( &mut generators::rng(), 5, Default::default(), @@ -1111,11 +1112,6 @@ mod tests { StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), ); let result = pipeline.run().await; - assert_matches!( - result, - Err(PipelineError::Stage(StageError::DatabaseIntegrity( - ProviderError::BlockBodyIndicesNotFound(5) - ))) - ); + assert_matches!(result, Err(PipelineError::Stage(..))); } } diff --git a/testing/testing-utils/src/generators.rs b/testing/testing-utils/src/generators.rs index 9963b447e96d..28ba171bdb37 100644 --- a/testing/testing-utils/src/generators.rs +++ b/testing/testing-utils/src/generators.rs @@ -1,7 +1,11 @@ //! Generators for different data structures like block headers, block bodies and ranges of those. use alloy_consensus::{Header, Transaction as _, TxLegacy}; -use alloy_eips::eip4895::{Withdrawal, Withdrawals}; +use alloy_eips::{ + eip1898::BlockWithParent, + eip4895::{Withdrawal, Withdrawals}, + NumHash, +}; use alloy_primitives::{Address, BlockNumber, Bytes, TxKind, B256, U256}; pub use rand::Rng; use rand::{ @@ -95,6 +99,15 @@ pub fn random_header_range( headers } +/// Generate a random [`BlockWithParent`]. +pub fn random_block_with_parent( + rng: &mut R, + number: u64, + parent: Option, +) -> BlockWithParent { + BlockWithParent { parent: parent.unwrap_or_default(), block: NumHash::new(number, rng.gen()) } +} + /// Generate a random [`SealedHeader`]. /// /// The header is assumed to not be correct if validated. From 661e06c3913ff080b03abc23b5ca5674b4472ba1 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 9 Dec 2024 11:55:45 -0500 Subject: [PATCH 2/5] chore: use BlockWithParent in InvalidHeadersCache --- .../beacon/src/engine/invalid_headers.rs | 33 ++++++++++--------- crates/consensus/beacon/src/engine/mod.rs | 32 +++++++++--------- crates/engine/tree/src/tree/mod.rs | 18 +++++----- crates/primitives-traits/src/header/sealed.rs | 7 +++- crates/stages/api/src/error.rs | 5 ++- crates/stages/api/src/pipeline/ctrl.rs | 4 +-- crates/stages/api/src/pipeline/mod.rs | 14 +++++--- crates/stages/stages/src/stages/execution.rs | 15 ++++++--- crates/stages/stages/src/stages/headers.rs | 12 +++++-- crates/stages/stages/src/stages/merkle.rs | 2 +- .../stages/src/stages/sender_recovery.rs | 2 +- crates/stages/stages/src/stages/utils.rs | 5 ++- 12 files changed, 89 insertions(+), 60 deletions(-) diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index a4868bcf098b..aaf92e9c3501 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -1,10 +1,9 @@ -use alloy_consensus::Header; +use alloy_eips::eip1898::BlockWithParent; use alloy_primitives::B256; use reth_metrics::{ metrics::{Counter, Gauge}, Metrics, }; -use reth_primitives::SealedHeader; use schnellru::{ByLength, LruMap}; use std::{fmt::Debug, sync::Arc}; use tracing::warn; @@ -17,20 +16,20 @@ const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128; /// Keeps track of invalid headers. #[derive(Debug)] -pub struct InvalidHeaderCache { +pub struct InvalidHeaderCache { /// This maps a header hash to a reference to its invalid ancestor. - headers: LruMap>, + headers: LruMap>, /// Metrics for the cache. metrics: InvalidHeaderCacheMetrics, } -impl InvalidHeaderCache { +impl InvalidHeaderCache { /// Invalid header cache constructor. pub fn new(max_length: u32) -> Self { Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() } } - fn insert_entry(&mut self, hash: B256, header: Arc) { + fn insert_entry(&mut self, hash: B256, header: Arc) { self.headers.insert(hash, HeaderEntry { header, hit_count: 0 }); } @@ -38,7 +37,7 @@ impl InvalidHeaderCache { /// /// If this is called, the hit count for the entry is incremented. /// If the hit count exceeds the threshold, the entry is evicted and `None` is returned. - pub fn get(&mut self, hash: &B256) -> Option> { + pub fn get(&mut self, hash: &B256) -> Option> { { let entry = self.headers.get(hash)?; entry.hit_count += 1; @@ -53,7 +52,11 @@ impl InvalidHeaderCache { } /// Inserts an invalid block into the cache, with a given invalid ancestor. - pub fn insert_with_invalid_ancestor(&mut self, header_hash: B256, invalid_ancestor: Arc) { + pub fn insert_with_invalid_ancestor( + &mut self, + header_hash: B256, + invalid_ancestor: Arc, + ) { if self.get(&header_hash).is_none() { warn!(target: "consensus::engine", hash=?header_hash, ?invalid_ancestor, "Bad block with existing invalid ancestor"); self.insert_entry(header_hash, invalid_ancestor); @@ -65,12 +68,10 @@ impl InvalidHeaderCache { } /// Inserts an invalid ancestor into the map. - pub fn insert(&mut self, invalid_ancestor: SealedHeader) { - if self.get(&invalid_ancestor.hash()).is_none() { - let hash = invalid_ancestor.hash(); - let header = invalid_ancestor.unseal(); - warn!(target: "consensus::engine", ?hash, ?header, "Bad block with hash"); - self.insert_entry(hash, Arc::new(header)); + pub fn insert(&mut self, invalid_ancestor: BlockWithParent) { + if self.get(&invalid_ancestor.block.hash).is_none() { + warn!(target: "consensus::engine", ?invalid_ancestor, "Bad block with hash"); + self.insert_entry(invalid_ancestor.block.hash, Arc::new(invalid_ancestor)); // update metrics self.metrics.unique_inserts.increment(1); @@ -103,13 +104,15 @@ struct InvalidHeaderCacheMetrics { #[cfg(test)] mod tests { use super::*; + use alloy_consensus::Header; + use reth_primitives::SealedHeader; #[test] fn test_hit_eviction() { let mut cache = InvalidHeaderCache::new(10); let header = Header::default(); let header = SealedHeader::seal(header); - cache.insert(header.clone()); + cache.insert(header.block_with_parent()); assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0); for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD { diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 63afd2926533..7d7eb6ba6dee 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -760,14 +760,14 @@ where // iterate over ancestors in the invalid cache // until we encounter the first valid ancestor let mut current_hash = parent_hash; - let mut current_header = self.invalid_headers.get(¤t_hash); - while let Some(header) = current_header { - current_hash = header.parent_hash; - current_header = self.invalid_headers.get(¤t_hash); + let mut current_block = self.invalid_headers.get(¤t_hash); + while let Some(block_with_parent) = current_block { + current_hash = block_with_parent.parent; + current_block = self.invalid_headers.get(¤t_hash); // If current_header is None, then the current_hash does not have an invalid // ancestor in the cache, check its presence in blockchain tree - if current_header.is_none() && + if current_block.is_none() && self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some() { return Ok(Some(current_hash)) @@ -806,13 +806,13 @@ where head: B256, ) -> ProviderResult> { // check if the check hash was previously marked as invalid - let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) }; + let Some(block_with_parent) = self.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(header.parent_hash)?; + let status = self.prepare_invalid_response(block_with_parent.parent)?; // insert the head block into the invalid header cache - self.invalid_headers.insert_with_invalid_ancestor(head, header); + self.invalid_headers.insert_with_invalid_ancestor(head, block_with_parent); Ok(Some(status)) } @@ -821,10 +821,10 @@ where /// to a forkchoice update. fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult> { // check if the head was previously marked as invalid - let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) }; + let Some(block_with_parent) = self.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) + Ok(Some(self.prepare_invalid_response(block_with_parent.parent)?)) } /// Record latency metrics for one call to make a block canonical @@ -1454,7 +1454,7 @@ where fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> { // Pipeline unwound, memorize the invalid block and wait for CL for next sync target. if let ControlFlow::Unwind { bad_block, .. } = ctrl { - warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind"); + warn!(target: "consensus::engine", invalid_num_hash=?bad_block.block, "Bad block detected in unwind"); // update the `invalid_headers` cache with the new invalid header self.invalid_headers.insert(*bad_block); return Ok(()) @@ -1673,7 +1673,7 @@ where self.latest_valid_hash_for_invalid_payload(block.parent_hash)? }; // keep track of the invalid header - self.invalid_headers.insert(block.header); + self.invalid_headers.insert(block.header.block_with_parent()); PayloadStatus::new( PayloadStatusEnum::Invalid { validation_error: error.to_string() }, latest_valid_hash, @@ -1782,7 +1782,7 @@ where let (block, err) = err.split(); warn!(target: "consensus::engine", invalid_number=?block.number, invalid_hash=?block.hash(), %err, "Marking block as invalid"); - self.invalid_headers.insert(block.header); + self.invalid_headers.insert(block.header.block_with_parent()); } } } @@ -2035,7 +2035,7 @@ mod tests { .await; assert_matches!( res.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -2089,7 +2089,7 @@ mod tests { Ok(result) => { assert_matches!( result, - Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage( .. )) + Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); break } @@ -2141,7 +2141,7 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. )) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed)) ); } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 763d5d990c5d..6c1dd711ff2c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1328,7 +1328,7 @@ where // Pipeline unwound, memorize the invalid block and wait for CL for next sync target. if let ControlFlow::Unwind { bad_block, .. } = ctrl { - warn!(target: "engine::tree", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind"); + warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind"); // update the `invalid_headers` cache with the new invalid header self.state.invalid_headers.insert(*bad_block); return Ok(()) @@ -1678,14 +1678,14 @@ where // iterate over ancestors in the invalid cache // until we encounter the first valid ancestor let mut current_hash = parent_hash; - let mut current_header = self.state.invalid_headers.get(¤t_hash); - while let Some(header) = current_header { - current_hash = header.parent_hash; - current_header = self.state.invalid_headers.get(¤t_hash); + let mut current_block = self.state.invalid_headers.get(¤t_hash); + while let Some(block_with_parent) = current_block { + current_hash = block_with_parent.parent; + current_block = self.state.invalid_headers.get(¤t_hash); // If current_header is None, then the current_hash does not have an invalid // ancestor in the cache, check its presence in blockchain tree - if current_header.is_none() && self.block_by_hash(current_hash)?.is_some() { + if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() { return Ok(Some(current_hash)) } } @@ -1735,7 +1735,7 @@ where let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(header.parent_hash)?; + let status = self.prepare_invalid_response(header.parent)?; // insert the head block into the invalid header cache self.state.invalid_headers.insert_with_invalid_ancestor(head, header); @@ -1749,7 +1749,7 @@ where // check if the head was previously marked as invalid let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) + Ok(Some(self.prepare_invalid_response(header.parent)?)) } /// Validate if block is correct and satisfies all the consensus rules that concern the header @@ -2395,7 +2395,7 @@ where }; // keep track of the invalid header - self.state.invalid_headers.insert(block.header); + self.state.invalid_headers.insert(block.header.block_with_parent()); Ok(PayloadStatus::new( PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() }, latest_valid_hash, diff --git a/crates/primitives-traits/src/header/sealed.rs b/crates/primitives-traits/src/header/sealed.rs index e99b0e1c17ff..61b021a0879b 100644 --- a/crates/primitives-traits/src/header/sealed.rs +++ b/crates/primitives-traits/src/header/sealed.rs @@ -1,7 +1,7 @@ use crate::InMemorySize; pub use alloy_consensus::Header; use alloy_consensus::Sealed; -use alloy_eips::BlockNumHash; +use alloy_eips::{eip1898::BlockWithParent, BlockNumHash}; use alloy_primitives::{keccak256, BlockHash, Sealable}; use alloy_rlp::{Decodable, Encodable}; use bytes::BufMut; @@ -65,6 +65,11 @@ impl SealedHeader { pub fn num_hash(&self) -> BlockNumHash { BlockNumHash::new(self.number(), self.hash) } + + /// Return a [`BlockWithParent`] for this header. + pub fn block_with_parent(&self) -> BlockWithParent { + BlockWithParent { parent: self.parent_hash(), block: self.num_hash() } + } } impl InMemorySize for SealedHeader { diff --git a/crates/stages/api/src/error.rs b/crates/stages/api/src/error.rs index ef6d8f6a0e89..b63dd20f77c1 100644 --- a/crates/stages/api/src/error.rs +++ b/crates/stages/api/src/error.rs @@ -3,7 +3,6 @@ use alloy_eips::eip1898::BlockWithParent; use reth_consensus::ConsensusError; use reth_errors::{BlockExecutionError, DatabaseError, RethError}; use reth_network_p2p::error::DownloadError; -use reth_primitives_traits::SealedHeader; use reth_provider::ProviderError; use reth_prune::{PruneSegment, PruneSegmentError, PrunerError}; use reth_static_file_types::StaticFileSegment; @@ -56,9 +55,9 @@ pub enum StageError { )] DetachedHead { /// The local head we attempted to attach to. - local_head: BlockWithParent, + local_head: Box, /// The header we attempted to attach. - header: BlockWithParent, + header: Box, /// The error that occurred when attempting to attach the header. #[source] error: Box, diff --git a/crates/stages/api/src/pipeline/ctrl.rs b/crates/stages/api/src/pipeline/ctrl.rs index 161857552451..378385e97b73 100644 --- a/crates/stages/api/src/pipeline/ctrl.rs +++ b/crates/stages/api/src/pipeline/ctrl.rs @@ -1,5 +1,5 @@ +use alloy_eips::eip1898::BlockWithParent; use alloy_primitives::BlockNumber; -use reth_primitives_traits::SealedHeader; /// Determines the control flow during pipeline execution. /// @@ -11,7 +11,7 @@ pub enum ControlFlow { /// The block to unwind to. target: BlockNumber, /// The block that caused the unwind. - bad_block: Box, + bad_block: Box, }, /// The pipeline made progress. Continue { diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 273fe1fa6dbd..c02708e3b8f7 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -223,7 +223,7 @@ impl Pipeline { } ControlFlow::Continue { block_number } => self.progress.update(block_number), ControlFlow::Unwind { target, bad_block } => { - self.unwind(target, Some(bad_block.number))?; + self.unwind(target, Some(bad_block.block.number))?; return Ok(ControlFlow::Unwind { target, bad_block }) } } @@ -367,7 +367,7 @@ impl Pipeline { Err(err) => { self.event_sender.notify(PipelineEvent::Error { stage_id }); - return Err(PipelineError::Stage(Box::new(StageError::Fatal(Box::new(err))))) + return Err(PipelineError::Stage(StageError::Fatal(Box::new(err)))) } } } @@ -598,13 +598,12 @@ mod tests { use super::*; use crate::{test_utils::TestStage, UnwindOutput}; - use alloy_eips::{eip1898::BlockWithParent, NumHash}; use assert_matches::assert_matches; use reth_consensus::ConsensusError; use reth_errors::ProviderError; use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB}; use reth_prune::PruneModes; - use reth_testing_utils::generators::{self, random_block_with_parent, random_header}; + use reth_testing_utils::generators::{self, random_block_with_parent}; use tokio_stream::StreamExt; #[test] @@ -1112,6 +1111,11 @@ mod tests { StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), ); let result = pipeline.run().await; - assert_matches!(result, Err(PipelineError::Stage(..))); + assert_matches!( + result, + Err(PipelineError::Stage(StageError::DatabaseIntegrity( + ProviderError::BlockBodyIndicesNotFound(5) + ))) + ); } } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index c8cc89080867..91afc33efaa0 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -1,5 +1,6 @@ use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; use alloy_consensus::{BlockHeader, Header}; +use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::BlockNumber; use num_traits::Zero; use reth_config::config::ExecutionConfig; @@ -11,7 +12,7 @@ use reth_evm::{ }; use reth_execution_types::Chain; use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; -use reth_primitives::{SealedHeader, StaticFileSegment}; +use reth_primitives::StaticFileSegment; use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, @@ -359,9 +360,15 @@ where let execute_start = Instant::now(); self.metrics.metered_one((&block, td).into(), |input| { - executor.execute_and_verify_one(input).map_err(|error| StageError::Block { - block: Box::new(SealedHeader::seal(block.header().clone())), - error: BlockErrorKind::Execution(error), + executor.execute_and_verify_one(input).map_err(|error| { + let header = block.header(); + StageError::Block { + block: Box::new(BlockWithParent::new( + header.parent_hash(), + NumHash::new(header.number(), header.hash_slow()), + )), + error: BlockErrorKind::Execution(error), + } }) })?; diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 7b9b394b5615..7ca9cae590b6 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -1,4 +1,5 @@ use alloy_consensus::BlockHeader; +use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256}; use futures_util::StreamExt; use reth_config::config::EtlConfig; @@ -143,7 +144,10 @@ where // Header validation self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| { StageError::Block { - block: Box::new(SealedHeader::new(header.clone(), header_hash)), + block: Box::new(BlockWithParent::new( + header.parent_hash, + NumHash::new(header.number, header_hash), + )), error: BlockErrorKind::Validation(error), } })?; @@ -272,7 +276,11 @@ where } Some(Err(HeadersDownloaderError::DetachedHead { local_head, header, error })) => { error!(target: "sync::stages::headers", %error, "Cannot attach header to head"); - return Poll::Ready(Err(StageError::DetachedHead { local_head, header, error })) + return Poll::Ready(Err(StageError::DetachedHead { + local_head: Box::new(local_head.block_with_parent()), + header: Box::new(header.block_with_parent()), + error, + })) } None => return Poll::Ready(Err(StageError::ChannelClosed)), } diff --git a/crates/stages/stages/src/stages/merkle.rs b/crates/stages/stages/src/stages/merkle.rs index 8095dfed9048..ff4d37cf3f61 100644 --- a/crates/stages/stages/src/stages/merkle.rs +++ b/crates/stages/stages/src/stages/merkle.rs @@ -357,7 +357,7 @@ fn validate_state_root( error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff( GotExpected { got, expected: expected.state_root }.into(), )), - block: Box::new(expected), + block: Box::new(expected.block_with_parent()), }) } } diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index d34a4b07921a..b5506068f481 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -192,7 +192,7 @@ where })?; Err(StageError::Block { - block: Box::new(sealed_header), + block: Box::new(sealed_header.block_with_parent()), error: BlockErrorKind::Validation( ConsensusError::TransactionSignerRecoveryError, ), diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 34aaeee44beb..c2a7c6ede02f 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -279,5 +279,8 @@ where let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - Ok(StageError::MissingStaticFileData { block: missing_block, segment }) + Ok(StageError::MissingStaticFileData { + block: Box::new(missing_block.block_with_parent()), + segment, + }) } From 06d2c39ed3a4f42fe2de41e6320f3c90d036398d Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:43:08 -0500 Subject: [PATCH 3/5] chore: remove generic from HeaderEntry --- crates/consensus/beacon/src/engine/invalid_headers.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index aaf92e9c3501..3460e692294c 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -18,7 +18,7 @@ const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128; #[derive(Debug)] pub struct InvalidHeaderCache { /// This maps a header hash to a reference to its invalid ancestor. - headers: LruMap>, + headers: LruMap, /// Metrics for the cache. metrics: InvalidHeaderCacheMetrics, } @@ -80,11 +80,11 @@ impl InvalidHeaderCache { } } -struct HeaderEntry { +struct HeaderEntry { /// Keeps track how many times this header has been hit. hit_count: u8, /// The actual header entry - header: Arc, + header: Arc, } /// Metrics for the invalid headers cache. From 54eeb5fa83a28ec301b195e039fae6dcc76a5d55 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 9 Dec 2024 19:01:01 -0500 Subject: [PATCH 4/5] chore: remove Arc from HeaderEntry --- .../consensus/beacon/src/engine/invalid_headers.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/consensus/beacon/src/engine/invalid_headers.rs b/crates/consensus/beacon/src/engine/invalid_headers.rs index 3460e692294c..2e2bc37a27ee 100644 --- a/crates/consensus/beacon/src/engine/invalid_headers.rs +++ b/crates/consensus/beacon/src/engine/invalid_headers.rs @@ -5,7 +5,7 @@ use reth_metrics::{ Metrics, }; use schnellru::{ByLength, LruMap}; -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; use tracing::warn; /// The max hit counter for invalid headers in the cache before it is forcefully evicted. @@ -29,7 +29,7 @@ impl InvalidHeaderCache { Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() } } - fn insert_entry(&mut self, hash: B256, header: Arc) { + fn insert_entry(&mut self, hash: B256, header: BlockWithParent) { self.headers.insert(hash, HeaderEntry { header, hit_count: 0 }); } @@ -37,7 +37,7 @@ impl InvalidHeaderCache { /// /// If this is called, the hit count for the entry is incremented. /// If the hit count exceeds the threshold, the entry is evicted and `None` is returned. - pub fn get(&mut self, hash: &B256) -> Option> { + pub fn get(&mut self, hash: &B256) -> Option { { let entry = self.headers.get(hash)?; entry.hit_count += 1; @@ -55,7 +55,7 @@ impl InvalidHeaderCache { pub fn insert_with_invalid_ancestor( &mut self, header_hash: B256, - invalid_ancestor: Arc, + invalid_ancestor: BlockWithParent, ) { if self.get(&header_hash).is_none() { warn!(target: "consensus::engine", hash=?header_hash, ?invalid_ancestor, "Bad block with existing invalid ancestor"); @@ -71,7 +71,7 @@ impl InvalidHeaderCache { pub fn insert(&mut self, invalid_ancestor: BlockWithParent) { if self.get(&invalid_ancestor.block.hash).is_none() { warn!(target: "consensus::engine", ?invalid_ancestor, "Bad block with hash"); - self.insert_entry(invalid_ancestor.block.hash, Arc::new(invalid_ancestor)); + self.insert_entry(invalid_ancestor.block.hash, invalid_ancestor); // update metrics self.metrics.unique_inserts.increment(1); @@ -84,7 +84,7 @@ struct HeaderEntry { /// Keeps track how many times this header has been hit. hit_count: u8, /// The actual header entry - header: Arc, + header: BlockWithParent, } /// Metrics for the invalid headers cache. From 5214c99995c450af99ea57b3a469c4f869840e93 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 9 Dec 2024 19:09:00 -0500 Subject: [PATCH 5/5] rename to block --- crates/consensus/beacon/src/engine/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 7d7eb6ba6dee..c41f9283db85 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -761,8 +761,8 @@ where // until we encounter the first valid ancestor let mut current_hash = parent_hash; let mut current_block = self.invalid_headers.get(¤t_hash); - while let Some(block_with_parent) = current_block { - current_hash = block_with_parent.parent; + while let Some(block) = current_block { + current_hash = block.parent; current_block = self.invalid_headers.get(¤t_hash); // If current_header is None, then the current_hash does not have an invalid @@ -806,13 +806,13 @@ where head: B256, ) -> ProviderResult> { // check if the check hash was previously marked as invalid - let Some(block_with_parent) = self.invalid_headers.get(&check) else { return Ok(None) }; + let Some(block) = self.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(block_with_parent.parent)?; + let status = self.prepare_invalid_response(block.parent)?; // insert the head block into the invalid header cache - self.invalid_headers.insert_with_invalid_ancestor(head, block_with_parent); + self.invalid_headers.insert_with_invalid_ancestor(head, block); Ok(Some(status)) } @@ -821,10 +821,10 @@ where /// to a forkchoice update. fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult> { // check if the head was previously marked as invalid - let Some(block_with_parent) = self.invalid_headers.get(&head) else { return Ok(None) }; + let Some(block) = self.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Ok(Some(self.prepare_invalid_response(block_with_parent.parent)?)) + Ok(Some(self.prepare_invalid_response(block.parent)?)) } /// Record latency metrics for one call to make a block canonical