Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use BlockWithParent for StageError #13198

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 22 additions & 19 deletions crates/consensus/beacon/src/engine/invalid_headers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use alloy_consensus::Header;
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::B256;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use reth_primitives::SealedHeader;
use schnellru::{ByLength, LruMap};
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;
use tracing::warn;

/// The max hit counter for invalid headers in the cache before it is forcefully evicted.
Expand All @@ -17,28 +16,28 @@ const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;

/// Keeps track of invalid headers.
#[derive(Debug)]
pub struct InvalidHeaderCache<H = Header> {
pub struct InvalidHeaderCache {
/// This maps a header hash to a reference to its invalid ancestor.
headers: LruMap<B256, HeaderEntry<H>>,
headers: LruMap<B256, HeaderEntry>,
/// Metrics for the cache.
metrics: InvalidHeaderCacheMetrics,
}

impl<H: Debug> InvalidHeaderCache<H> {
impl InvalidHeaderCache {
/// Invalid header cache constructor.
pub fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
}

fn insert_entry(&mut self, hash: B256, header: Arc<H>) {
fn insert_entry(&mut self, hash: B256, header: BlockWithParent) {
self.headers.insert(hash, HeaderEntry { header, hit_count: 0 });
}

/// Returns the invalid ancestor's header if it exists in the cache.
///
/// If this is called, the hit count for the entry is incremented.
/// If the hit count exceeds the threshold, the entry is evicted and `None` is returned.
pub fn get(&mut self, hash: &B256) -> Option<Arc<H>> {
pub fn get(&mut self, hash: &B256) -> Option<BlockWithParent> {
{
let entry = self.headers.get(hash)?;
entry.hit_count += 1;
Expand All @@ -53,7 +52,11 @@ impl<H: Debug> InvalidHeaderCache<H> {
}

/// Inserts an invalid block into the cache, with a given invalid ancestor.
pub fn insert_with_invalid_ancestor(&mut self, header_hash: B256, invalid_ancestor: Arc<H>) {
pub fn insert_with_invalid_ancestor(
&mut self,
header_hash: B256,
invalid_ancestor: BlockWithParent,
) {
if self.get(&header_hash).is_none() {
warn!(target: "consensus::engine", hash=?header_hash, ?invalid_ancestor, "Bad block with existing invalid ancestor");
self.insert_entry(header_hash, invalid_ancestor);
Expand All @@ -65,12 +68,10 @@ impl<H: Debug> InvalidHeaderCache<H> {
}

/// Inserts an invalid ancestor into the map.
pub fn insert(&mut self, invalid_ancestor: SealedHeader<H>) {
if self.get(&invalid_ancestor.hash()).is_none() {
let hash = invalid_ancestor.hash();
let header = invalid_ancestor.unseal();
warn!(target: "consensus::engine", ?hash, ?header, "Bad block with hash");
self.insert_entry(hash, Arc::new(header));
pub fn insert(&mut self, invalid_ancestor: BlockWithParent) {
if self.get(&invalid_ancestor.block.hash).is_none() {
warn!(target: "consensus::engine", ?invalid_ancestor, "Bad block with hash");
self.insert_entry(invalid_ancestor.block.hash, invalid_ancestor);

// update metrics
self.metrics.unique_inserts.increment(1);
Expand All @@ -79,11 +80,11 @@ impl<H: Debug> InvalidHeaderCache<H> {
}
}

struct HeaderEntry<H> {
struct HeaderEntry {
/// Keeps track how many times this header has been hit.
hit_count: u8,
/// The actually header entry
header: Arc<H>,
/// The actual header entry
header: BlockWithParent,
}

/// Metrics for the invalid headers cache.
Expand All @@ -103,13 +104,15 @@ struct InvalidHeaderCacheMetrics {
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use reth_primitives::SealedHeader;

#[test]
fn test_hit_eviction() {
let mut cache = InvalidHeaderCache::new(10);
let header = Header::default();
let header = SealedHeader::seal(header);
cache.insert(header.clone());
cache.insert(header.block_with_parent());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0);

for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD {
Expand Down
30 changes: 15 additions & 15 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,14 +760,14 @@ where
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.invalid_headers.get(&current_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.invalid_headers.get(&current_hash);
let mut current_block = self.invalid_headers.get(&current_hash);
while let Some(block) = current_block {
current_hash = block.parent;
current_block = self.invalid_headers.get(&current_hash);

// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() &&
if current_block.is_none() &&
self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some()
{
return Ok(Some(current_hash))
Expand Down Expand Up @@ -806,13 +806,13 @@ where
head: B256,
) -> ProviderResult<Option<PayloadStatus>> {
// check if the check hash was previously marked as invalid
let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) };
let Some(block) = self.invalid_headers.get(&check) else { return Ok(None) };

// populate the latest valid hash field
let status = self.prepare_invalid_response(header.parent_hash)?;
let status = self.prepare_invalid_response(block.parent)?;

// insert the head block into the invalid header cache
self.invalid_headers.insert_with_invalid_ancestor(head, header);
self.invalid_headers.insert_with_invalid_ancestor(head, block);

Ok(Some(status))
}
Expand All @@ -821,10 +821,10 @@ where
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
// check if the head was previously marked as invalid
let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) };
let Some(block) = self.invalid_headers.get(&head) else { return Ok(None) };

// populate the latest valid hash field
Ok(Some(self.prepare_invalid_response(header.parent_hash)?))
Ok(Some(self.prepare_invalid_response(block.parent)?))
}

/// Record latency metrics for one call to make a block canonical
Expand Down Expand Up @@ -1454,7 +1454,7 @@ where
fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> {
// Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
warn!(target: "consensus::engine", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind");
warn!(target: "consensus::engine", invalid_num_hash=?bad_block.block, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid header
self.invalid_headers.insert(*bad_block);
return Ok(())
Expand Down Expand Up @@ -1673,7 +1673,7 @@ where
self.latest_valid_hash_for_invalid_payload(block.parent_hash)?
};
// keep track of the invalid header
self.invalid_headers.insert(block.header);
self.invalid_headers.insert(block.header.block_with_parent());
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
latest_valid_hash,
Expand Down Expand Up @@ -1782,7 +1782,7 @@ where
let (block, err) = err.split();
warn!(target: "consensus::engine", invalid_number=?block.number, invalid_hash=?block.hash(), %err, "Marking block as invalid");

self.invalid_headers.insert(block.header);
self.invalid_headers.insert(block.header.block_with_parent());
}
}
}
Expand Down Expand Up @@ -2035,7 +2035,7 @@ mod tests {
.await;
assert_matches!(
res.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
);
}

Expand Down Expand Up @@ -2141,7 +2141,7 @@ mod tests {

assert_matches!(
rx.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
);
}

Expand Down
18 changes: 9 additions & 9 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ where

// Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
warn!(target: "engine::tree", invalid_hash=?bad_block.hash(), invalid_number=?bad_block.number, "Bad block detected in unwind");
warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid header
self.state.invalid_headers.insert(*bad_block);
return Ok(())
Expand Down Expand Up @@ -1678,14 +1678,14 @@ where
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.state.invalid_headers.get(&current_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.state.invalid_headers.get(&current_hash);
let mut current_block = self.state.invalid_headers.get(&current_hash);
while let Some(block_with_parent) = current_block {
current_hash = block_with_parent.parent;
current_block = self.state.invalid_headers.get(&current_hash);

// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() && self.block_by_hash(current_hash)?.is_some() {
if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
return Ok(Some(current_hash))
}
}
Expand Down Expand Up @@ -1735,7 +1735,7 @@ where
let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };

// populate the latest valid hash field
let status = self.prepare_invalid_response(header.parent_hash)?;
let status = self.prepare_invalid_response(header.parent)?;

// insert the head block into the invalid header cache
self.state.invalid_headers.insert_with_invalid_ancestor(head, header);
Expand All @@ -1749,7 +1749,7 @@ where
// check if the head was previously marked as invalid
let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
// populate the latest valid hash field
Ok(Some(self.prepare_invalid_response(header.parent_hash)?))
Ok(Some(self.prepare_invalid_response(header.parent)?))
}

/// Validate if block is correct and satisfies all the consensus rules that concern the header
Expand Down Expand Up @@ -2395,7 +2395,7 @@ where
};

// keep track of the invalid header
self.state.invalid_headers.insert(block.header);
self.state.invalid_headers.insert(block.header.block_with_parent());
Ok(PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
latest_valid_hash,
Expand Down
7 changes: 6 additions & 1 deletion crates/primitives-traits/src/header/sealed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::InMemorySize;
pub use alloy_consensus::Header;
use alloy_consensus::Sealed;
use alloy_eips::BlockNumHash;
use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
use alloy_primitives::{keccak256, BlockHash, Sealable};
use alloy_rlp::{Decodable, Encodable};
use bytes::BufMut;
Expand Down Expand Up @@ -65,6 +65,11 @@ impl<H: alloy_consensus::BlockHeader> SealedHeader<H> {
pub fn num_hash(&self) -> BlockNumHash {
BlockNumHash::new(self.number(), self.hash)
}

/// Return a [`BlockWithParent`] for this header.
pub fn block_with_parent(&self) -> BlockWithParent {
BlockWithParent { parent: self.parent_hash(), block: self.num_hash() }
mattsse marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<H: InMemorySize> InMemorySize for SealedHeader<H> {
Expand Down
2 changes: 2 additions & 0 deletions crates/stages/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ reth-errors.workspace = true
reth-stages-types.workspace = true
reth-static-file-types.workspace = true

# alloy
alloy-primitives.workspace = true
alloy-eips.workspace = true

# metrics
reth-metrics.workspace = true
Expand Down
22 changes: 11 additions & 11 deletions crates/stages/api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::PipelineEvent;
use alloy_eips::eip1898::BlockWithParent;
use reth_consensus::ConsensusError;
use reth_errors::{BlockExecutionError, DatabaseError, RethError};
use reth_network_p2p::error::DownloadError;
use reth_primitives_traits::SealedHeader;
use reth_provider::ProviderError;
use reth_prune::{PruneSegment, PruneSegmentError, PrunerError};
use reth_static_file_types::StaticFileSegment;
Expand Down Expand Up @@ -34,10 +34,10 @@ impl BlockErrorKind {
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered an error related to a block.
#[error("stage encountered an error in block #{number}: {error}", number = block.number)]
#[error("stage encountered an error in block #{number}: {error}", number = block.block.number)]
Block {
/// The block that caused the error.
block: Box<SealedHeader>,
block: Box<BlockWithParent>,
/// The specific error type, either consensus or execution error.
#[source]
error: BlockErrorKind,
Expand All @@ -48,16 +48,16 @@ pub enum StageError {
"stage encountered inconsistent chain: \
downloaded header #{header_number} ({header_hash}) is detached from \
local head #{head_number} ({head_hash}): {error}",
header_number = header.number,
header_hash = header.hash(),
head_number = local_head.number,
head_hash = local_head.hash(),
header_number = header.block.number,
header_hash = header.block.hash,
head_number = local_head.block.number,
head_hash = local_head.block.hash,
)]
DetachedHead {
/// The local head we attempted to attach to.
local_head: Box<SealedHeader>,
local_head: Box<BlockWithParent>,
/// The header we attempted to attach.
header: Box<SealedHeader>,
header: Box<BlockWithParent>,
/// The error that occurred when attempting to attach the header.
#[source]
error: Box<ConsensusError>,
Expand Down Expand Up @@ -92,10 +92,10 @@ pub enum StageError {
#[error("invalid download response: {0}")]
Download(#[from] DownloadError),
/// Database is ahead of static file data.
#[error("missing static file data for block number: {number}", number = block.number)]
#[error("missing static file data for block number: {number}", number = block.block.number)]
MissingStaticFileData {
/// Starting block with missing data.
block: Box<SealedHeader>,
block: Box<BlockWithParent>,
/// Static File segment
segment: StaticFileSegment,
},
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/api/src/pipeline/ctrl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::BlockNumber;
use reth_primitives_traits::SealedHeader;

/// Determines the control flow during pipeline execution.
///
Expand All @@ -11,7 +11,7 @@ pub enum ControlFlow {
/// The block to unwind to.
target: BlockNumber,
/// The block that caused the unwind.
bad_block: Box<SealedHeader>,
bad_block: Box<BlockWithParent>,
},
/// The pipeline made progress.
Continue {
Expand Down
Loading
Loading