Skip to content

Commit

Permalink
chore: Box StageError variant in PipelineError
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Dec 9, 2024
1 parent 552c623 commit f13cc9f
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/invalid_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<H: Debug> InvalidHeaderCache<H> {
struct HeaderEntry<H> {
/// Keeps track how many times this header has been hit.
hit_count: u8,
/// The actually header entry
/// The actual header entry
header: Arc<H>,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@ mod tests {
.await;
assert_matches!(
res.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. ))
);
}

Expand Down Expand Up @@ -2089,7 +2089,7 @@ mod tests {
Ok(result) => {
assert_matches!(
result,
Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage( .. ))
);
break
}
Expand Down Expand Up @@ -2141,7 +2141,7 @@ mod tests {

assert_matches!(
rx.await,
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage( .. ))
);
}

Expand Down
2 changes: 2 additions & 0 deletions crates/stages/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ reth-errors.workspace = true
reth-stages-types.workspace = true
reth-static-file-types.workspace = true

# alloy
alloy-primitives.workspace = true
alloy-eips.workspace = true

# metrics
reth-metrics.workspace = true
Expand Down
21 changes: 11 additions & 10 deletions crates/stages/api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::PipelineEvent;
use alloy_eips::eip1898::BlockWithParent;
use reth_consensus::ConsensusError;
use reth_errors::{BlockExecutionError, DatabaseError, RethError};
use reth_network_p2p::error::DownloadError;
Expand Down Expand Up @@ -34,10 +35,10 @@ impl BlockErrorKind {
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered an error related to a block.
#[error("stage encountered an error in block #{number}: {error}", number = block.number)]
#[error("stage encountered an error in block #{number}: {error}", number = block.block.number)]
Block {
/// The block that caused the error.
block: Box<SealedHeader>,
block: Box<BlockWithParent>,
/// The specific error type, either consensus or execution error.
#[source]
error: BlockErrorKind,
Expand All @@ -48,16 +49,16 @@ pub enum StageError {
"stage encountered inconsistent chain: \
downloaded header #{header_number} ({header_hash}) is detached from \
local head #{head_number} ({head_hash}): {error}",
header_number = header.number,
header_hash = header.hash(),
head_number = local_head.number,
head_hash = local_head.hash(),
header_number = header.block.number,
header_hash = header.block.hash,
head_number = local_head.block.number,
head_hash = local_head.block.hash,
)]
DetachedHead {
/// The local head we attempted to attach to.
local_head: Box<SealedHeader>,
local_head: BlockWithParent,
/// The header we attempted to attach.
header: Box<SealedHeader>,
header: BlockWithParent,
/// The error that occurred when attempting to attach the header.
#[source]
error: Box<ConsensusError>,
Expand Down Expand Up @@ -92,10 +93,10 @@ pub enum StageError {
#[error("invalid download response: {0}")]
Download(#[from] DownloadError),
/// Database is ahead of static file data.
#[error("missing static file data for block number: {number}", number = block.number)]
#[error("missing static file data for block number: {number}", number = block.block.number)]
MissingStaticFileData {
/// Starting block with missing data.
block: Box<SealedHeader>,
block: Box<BlockWithParent>,
/// Static File segment
segment: StaticFileSegment,
},
Expand Down
24 changes: 10 additions & 14 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
Err(err) => {
self.event_sender.notify(PipelineEvent::Error { stage_id });

return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
return Err(PipelineError::Stage(Box::new(StageError::Fatal(Box::new(err)))))
}
}
}
Expand Down Expand Up @@ -505,15 +505,15 @@ fn on_stage_error<N: ProviderNodeTypes>(

// We unwind because of a detached head.
let unwind_to =
local_head.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
bad_block = %block.block.number,
"Stage encountered a validation error: {validation_error}"
);

Expand Down Expand Up @@ -542,7 +542,7 @@ fn on_stage_error<N: ProviderNodeTypes>(
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
bad_block = %block.block.number,
"Stage encountered an execution error: {execution_error}"
);

Expand All @@ -560,12 +560,12 @@ fn on_stage_error<N: ProviderNodeTypes>(
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
bad_block = %block.block.number,
segment = %segment,
"Stage is missing static file data."
);

Ok(Some(ControlFlow::Unwind { target: block.number - 1, bad_block: block }))
Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
} else if err.is_fatal() {
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
Err(err.into())
Expand Down Expand Up @@ -598,12 +598,13 @@ mod tests {

use super::*;
use crate::{test_utils::TestStage, UnwindOutput};
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use assert_matches::assert_matches;
use reth_consensus::ConsensusError;
use reth_errors::ProviderError;
use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
use reth_prune::PruneModes;
use reth_testing_utils::{generators, generators::random_header};
use reth_testing_utils::generators::{self, random_block_with_parent, random_header};
use tokio_stream::StreamExt;

#[test]
Expand Down Expand Up @@ -975,7 +976,7 @@ mod tests {
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Err(StageError::Block {
block: Box::new(random_header(
block: Box::new(random_block_with_parent(
&mut generators::rng(),
5,
Default::default(),
Expand Down Expand Up @@ -1111,11 +1112,6 @@ mod tests {
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let result = pipeline.run().await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
ProviderError::BlockBodyIndicesNotFound(5)
)))
);
assert_matches!(result, Err(PipelineError::Stage(..)));
}
}
15 changes: 14 additions & 1 deletion testing/testing-utils/src/generators.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Generators for different data structures like block headers, block bodies and ranges of those.
use alloy_consensus::{Header, Transaction as _, TxLegacy};
use alloy_eips::eip4895::{Withdrawal, Withdrawals};
use alloy_eips::{
eip1898::BlockWithParent,
eip4895::{Withdrawal, Withdrawals},
NumHash,
};
use alloy_primitives::{Address, BlockNumber, Bytes, TxKind, B256, U256};
pub use rand::Rng;
use rand::{
Expand Down Expand Up @@ -95,6 +99,15 @@ pub fn random_header_range<R: Rng>(
headers
}

/// Generate a random [`BlockWithParent`].
pub fn random_block_with_parent<R: Rng>(
rng: &mut R,
number: u64,
parent: Option<B256>,
) -> BlockWithParent {
BlockWithParent { parent: parent.unwrap_or_default(), block: NumHash::new(number, rng.gen()) }
}

/// Generate a random [`SealedHeader`].
///
/// The header is assumed to not be correct if validated.
Expand Down

0 comments on commit f13cc9f

Please sign in to comment.