Skip to content

Commit

Permalink
resolve missed conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Nov 17, 2023
1 parent 7e0351f commit 0a456c8
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 97 deletions.
2 changes: 1 addition & 1 deletion bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, Stage
use reth_stages::{
sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
Pipeline, PipelineError, StageSet,
Pipeline, StageSet,
};
use reth_tasks::TaskExecutor;
use std::{
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, PipelineError, Stage, UnwindInput,
ExecInput, Stage, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use tracing::*;
Expand Down
86 changes: 1 addition & 85 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,93 +444,9 @@ where
}
Err(err) => {
self.listeners.notify(PipelineEvent::Error { stage_id });
<<<<<<< HEAD

let out = if let StageError::DetachedHead { local_head, header, error } = err {
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, ?error, "Stage encountered detached head");

// We unwind because of a detached head.
let unwind_to = local_head
.number
.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH)
.max(1);
Ok(ControlFlow::Unwind { target: unwind_to, bad_block: local_head })
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered a validation error: {validation_error}"
);

// FIXME: When handling errors, we do not commit the database
// transaction. This leads to the Merkle
// stage not clearing its checkpoint, and
// restarting from an invalid place.
drop(provider_rw);
let provider_rw = factory.provider_rw()?;
provider_rw.save_stage_checkpoint_progress(
StageId::MerkleExecute,
vec![],
)?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
provider_rw.commit()?;

// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered an execution error: {execution_error}"
);

// We unwind because of an execution error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart
// the execution loop from the beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
}
} else if err.is_fatal() {
error!(
target: "sync::pipeline",
stage = %stage_id,
"Stage encountered a fatal error: {err}."
);
Err(err.into())
} else {
// On other errors we assume they are recoverable if we discard the
// transaction and run the stage again.
warn!(
target: "sync::pipeline",
stage = %stage_id,
"Stage encountered a non-fatal error: {err}. Retrying..."
);
continue
};
return out
=======
if let Some(ctrl) = on_stage_error(&factory, stage_id, prev_checkpoint, err)? {
return Ok(ctrl)
}
>>>>>>> 55f1ec5e0 (fix poll error handling & docs)
}
}
}
Expand Down Expand Up @@ -563,7 +479,7 @@ fn on_stage_error<DB: Database>(
// FIXME: When handling errors, we do not commit the database transaction. This
// leads to the Merkle stage not clearing its checkpoint, and restarting from an
// invalid place.
let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
let provider_rw = factory.provider_rw()?;
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
Expand Down
11 changes: 1 addition & 10 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
<<<<<<< HEAD
use reth_interfaces::provider::{ProviderResult, RootMismatch};
=======
use reth_interfaces::{
executor::{BlockExecutionError, BlockValidationError},
p2p::headers::downloader::SyncTarget,
provider::RootMismatch,
provider::{ProviderResult, RootMismatch},
RethError, RethResult,
};
>>>>>>> 661876f8b (implement proper poll ready methods for headers and bodies, fix tests, add header sync gap provider)
use reth_primitives::{
keccak256,
revm::{
Expand Down Expand Up @@ -1115,11 +1110,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
<<<<<<< HEAD
) -> ProviderResult<Option<BlockWithSenders>> {
=======
) -> RethResult<Option<BlockWithSenders>> {
>>>>>>> 661876f8b (implement proper poll ready methods for headers and bodies, fix tests, add header sync gap provider)
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };

let Some(header) = self.header_by_number(block_number)? else { return Ok(None) };
Expand Down

0 comments on commit 0a456c8

Please sign in to comment.