Skip to content

Commit

Permalink
refactor: stage error fatal/recoverable variants
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg committed Dec 15, 2022
1 parent 4df40f3 commit 5a8ad6c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
16 changes: 13 additions & 3 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,25 @@ pub enum StageError {
/// rely on external downloaders
#[error("Invalid download response: {0}")]
Download(String),
/// The stage encountered an internal error.
/// The stage encountered a recoverable error.
///
/// These types of errors are caught by the [Pipeline] and trigger a restart of the stage.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Recoverable(Box<dyn std::error::Error + Send + Sync>),
/// The stage encountered a fatal error.
///
/// These types of errors stop the pipeline.
#[error(transparent)]
Fatal(Box<dyn std::error::Error + Send + Sync>),
}

impl StageError {
/// If the error is fatal the pipeline will stop.
pub fn is_fatal(&self) -> bool {
matches!(self, StageError::Database(_) | StageError::DatabaseIntegrity(_))
matches!(
self,
StageError::Database(_) | StageError::DatabaseIntegrity(_) | StageError::Fatal(_)
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<DB: Database> Pipeline<DB> {
}
Err(err) => {
self.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return Err(PipelineError::Stage(StageError::Internal(err)))
return Err(PipelineError::Stage(StageError::Fatal(err)))
}
}
}
Expand Down Expand Up @@ -739,7 +739,7 @@ mod tests {
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Internal(Box::new(std::fmt::Error))))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })),
false,
)
Expand Down
25 changes: 8 additions & 17 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use reth_primitives::{
BlockNumber, SealedHeader,
};
use std::{fmt::Debug, sync::Arc};
use tracing::{error, warn};
use tracing::warn;

const BODIES: StageId = StageId("Bodies");

Expand Down Expand Up @@ -111,23 +111,14 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some(result) = bodies_stream.next().await {
let block = match result {
Ok(block) => block,
Err(err) => {
error!(
"Encountered error downloading block {}. Details: {:?}",
highest_block + 1,
err
);
// Exit the stage early
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
}
let Ok(block) = result else {
warn!(
"Encountered an error downloading block {}: {:?}",
highest_block + 1,
result.unwrap_err()
);
break
};

let block_number = block.number;
// Write block
let key = (block_number, block.hash()).into();
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum SendersStageError {

impl From<SendersStageError> for StageError {
fn from(error: SendersStageError) -> Self {
StageError::Internal(Box::new(error))
StageError::Fatal(Box::new(error))
}
}

Expand Down

0 comments on commit 5a8ad6c

Please sign in to comment.