Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't stop the pipeline on internal stage errs #453

Merged
merged 7 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum StageError {
Database(#[from] DbError),
#[error("Stage encountered a execution error in block {block}: {error}.")]
/// The stage encountered a execution error
// TODO: Probably redundant, should be rolled into `Validation`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

ExecutionError {
/// The block that failed execution.
block: BlockNumber,
Expand All @@ -35,9 +36,26 @@ pub enum StageError {
/// rely on external downloaders
#[error("Invalid download response: {0}")]
Download(String),
/// The stage encountered an internal error.
/// The stage encountered a recoverable error.
///
/// These types of errors are caught by the [Pipeline] and trigger a restart of the stage.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Recoverable(Box<dyn std::error::Error + Send + Sync>),
/// The stage encountered a fatal error.
///
/// These types of errors stop the pipeline.
#[error(transparent)]
Fatal(Box<dyn std::error::Error + Send + Sync>),
}

impl StageError {
/// If the error is fatal the pipeline will stop.
pub fn is_fatal(&self) -> bool {
matches!(
self,
StageError::Database(_) | StageError::DatabaseIntegrity(_) | StageError::Fatal(_)
)
}
}

/// A database integrity error.
Expand Down
55 changes: 49 additions & 6 deletions crates/stages/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<DB: Database> Pipeline<DB> {
}
Err(err) => {
self.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return Err(PipelineError::Stage(StageError::Internal(err)))
return Err(PipelineError::Stage(StageError::Fatal(err)))
}
}
}
Expand Down Expand Up @@ -287,7 +287,7 @@ impl<DB: Database> QueuedStage<DB> {
) -> Result<ControlFlow, PipelineError> {
let stage_id = self.stage.id();
if self.require_tip && !state.reached_tip() {
info!("Tip not reached, skipping.");
warn!(stage = %stage_id, "Tip not reached as required by stage, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;

// Stage requires us to reach the tip of the chain first, but we have
Expand All @@ -304,7 +304,7 @@ impl<DB: Database> QueuedStage<DB> {
.zip(state.max_block)
.map_or(false, |(prev_progress, target)| prev_progress >= target);
if stage_reached_max_block {
info!("Stage reached maximum block, skipping.");
warn!(stage = %stage_id, "Stage reached maximum block, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;

// We reached the maximum block, so we skip the stage
Expand All @@ -323,7 +323,7 @@ impl<DB: Database> QueuedStage<DB> {
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
info!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
stage_id.save_progress(db.deref(), stage_progress)?;

state
Expand All @@ -345,7 +345,7 @@ impl<DB: Database> QueuedStage<DB> {
state.events_sender.send(PipelineEvent::Error { stage_id }).await?;

return if let StageError::Validation { block, error } = err {
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");
warn!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");

// We unwind because of a validation error. If the unwind itself fails,
// we bail entirely, otherwise we restart the execution loop from the
Expand All @@ -354,8 +354,14 @@ impl<DB: Database> QueuedStage<DB> {
target: prev_progress.unwrap_or_default(),
bad_block: Some(block),
})
} else {
} else if err.is_fatal() {
error!(stage = %stage_id, "Stage encountered a fatal error: {err}.");
Err(err.into())
} else {
// On other errors we assume they are recoverable if we discard the
// transaction and run the stage again.
warn!(stage = %stage_id, "Stage encountered a non-fatal error: {err}. Retrying");
continue
}
}
}
Expand All @@ -367,6 +373,7 @@ impl<DB: Database> QueuedStage<DB> {
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap};
use reth_interfaces::consensus;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -724,6 +731,42 @@ mod tests {
);
}

/// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
#[tokio::test]
async fn pipeline_error_handling() {
// Non-fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })),
false,
)
.set_max_block(Some(10))
.run(db)
.await;
assert_matches!(result, Ok(()));

// Fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 },
))),
false,
)
.run(db)
.await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 }
)))
);
}

mod utils {
use super::*;
use async_trait::async_trait;
Expand Down
29 changes: 12 additions & 17 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use reth_primitives::{
BlockNumber, SealedHeader,
};
use std::{fmt::Debug, sync::Arc};
use tracing::{error, warn};
use tracing::warn;

const BODIES: StageId = StageId("Bodies");

Expand Down Expand Up @@ -111,23 +111,18 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some(result) = bodies_stream.next().await {
let block = match result {
Ok(block) => block,
Err(err) => {
error!(
"Encountered error downloading block {}. Details: {:?}",
highest_block + 1,
err
);
// Exit the stage early
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
}
let Ok(block) = result else {
error!(
"Encountered an error downloading block {}: {:?}",
highest_block + 1,
result.unwrap_err()
);
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
};

let block_number = block.number;
// Write block
let key = (block_number, block.hash()).into();
Expand Down
15 changes: 5 additions & 10 deletions crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
Err(e) => match e {
DownloadError::Timeout => {
warn!("No response for header request");
return Ok(ExecOutput { stage_progress, reached_tip: false, done: false })
return Err(StageError::Recoverable(DownloadError::Timeout.into()))
}
DownloadError::HeaderValidation { hash, error } => {
error!("Validation error for header {hash}: {error}");
return Err(StageError::Validation { block: stage_progress, error })
}
error => {
error!(?error, "An unexpected error occurred");
return Ok(ExecOutput { stage_progress, reached_tip: false, done: false })
return Err(StageError::Recoverable(error.into()))
}
},
}
Expand Down Expand Up @@ -279,10 +279,8 @@ mod tests {
let rx = runner.execute(input);
runner.consensus.update_tip(H256::from_low_u64_be(1));
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: false, reached_tip: false, stage_progress: 100 })
);
// TODO: Downcast the internal error and actually check it
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}

Expand Down Expand Up @@ -324,10 +322,7 @@ mod tests {

// These errors are not fatal but hand back control to the pipeline
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { stage_progress: 1000, done: false, reached_tip: false })
);
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}

Expand Down
3 changes: 2 additions & 1 deletion crates/stages/src/stages/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct SendersStage {
pub commit_threshold: u64,
}

// TODO(onbjerg): Should unwind
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All cases I think. If we can't recover the sender then the transaction is invalid (assuming our recover algo is not broken)

#[derive(Error, Debug)]
enum SendersStageError {
#[error("Sender recovery failed for transaction {tx}.")]
Expand All @@ -36,7 +37,7 @@ enum SendersStageError {

impl From<SendersStageError> for StageError {
fn from(error: SendersStageError) -> Self {
StageError::Internal(Box::new(error))
StageError::Fatal(Box::new(error))
}
}

Expand Down