Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: remove send + sync from db tx #4215

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,25 +160,28 @@ where
pub fn run_as_fut(mut self, tip: Option<H256>) -> PipelineFut<DB> {
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
// updating metrics.
// TODO(onbjerg): Do we need this if we make all of this stuff sync, or should it just be a
// thread? If we still need it, why? Do we need more granular control over how the pipeline
// is run?
let _ = self.register_metrics(); // ignore error
Box::pin(async move {
// NOTE: the tip should only be None if we are in continuous sync mode.
if let Some(tip) = tip {
self.set_tip(tip);
}
let result = self.run_loop().await;
let result = self.run_loop();
trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished");
(self, result)
})
}

/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
/// a `max_block` in the pipeline.
pub async fn run(&mut self) -> Result<(), PipelineError> {
pub fn run(&mut self) -> Result<(), PipelineError> {
let _ = self.register_metrics(); // ignore error

loop {
let next_action = self.run_loop().await?;
let next_action = self.run_loop()?;

// Terminate the loop early if it's reached the maximum user
// configured block.
Expand Down Expand Up @@ -211,17 +214,15 @@ where
/// This will be [ControlFlow::Continue] or [ControlFlow::NoProgress] of the _last_ stage in the
/// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused
/// the unwind.
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
pub fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
let mut previous_stage = None;
for stage_index in 0..self.stages.len() {
let stage = &self.stages[stage_index];
let stage_id = stage.id();

trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
let next = self
.execute_stage_to_completion(previous_stage, stage_index)
.instrument(info_span!("execute", stage = %stage_id))
.await?;
let next = self.execute_stage_to_completion(previous_stage, stage_index)?;
//.instrument(info_span!("execute", stage = %stage_id))?;

trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");

Expand All @@ -233,7 +234,7 @@ where
}
ControlFlow::Continue { block_number } => self.progress.update(block_number),
ControlFlow::Unwind { target, bad_block } => {
self.unwind(target, Some(bad_block.number)).await?;
self.unwind(target, Some(bad_block.number))?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
}
Expand All @@ -255,7 +256,7 @@ where
/// Unwind the stages to the target block.
///
/// If the unwind is due to a bad block the number of that block should be specified.
pub async fn unwind(
pub fn unwind(
&mut self,
to: BlockNumber,
bad_block: Option<BlockNumber>,
Expand Down Expand Up @@ -283,7 +284,7 @@ where
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });

let output = stage.unwind(&provider_rw, input).await;
let output = stage.unwind(&provider_rw, input);
match output {
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
Expand Down Expand Up @@ -323,7 +324,7 @@ where
Ok(())
}

async fn execute_stage_to_completion(
fn execute_stage_to_completion(
&mut self,
previous_stage: Option<BlockNumber>,
stage_index: usize,
Expand Down Expand Up @@ -367,10 +368,7 @@ where
checkpoint: prev_checkpoint,
});

match stage
.execute(&provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.await
{
match stage.execute(&provider_rw, ExecInput { target, checkpoint: prev_checkpoint }) {
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ where
StageSetBuilder::default()
.add_stage(headers)
.add_stage(TotalDifficultyStage::new(consensus.clone()))
.add_stage(BodyStage { downloader: body_downloader, consensus })
.add_stage(BodyStage::new(body_downloader, consensus))
}

/// Create a new builder using the given bodies stage.
Expand All @@ -195,7 +195,7 @@ where
StageSetBuilder::default()
.add_stage(HeaderStage::new(self.header_downloader, self.header_mode))
.add_stage(TotalDifficultyStage::new(self.consensus.clone()))
.add_stage(BodyStage { downloader: self.body_downloader, consensus: self.consensus })
.add_stage(BodyStage::new(self.body_downloader, self.consensus))
}
}

Expand Down
34 changes: 31 additions & 3 deletions crates/stages/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError, Transactions
use std::{
cmp::{max, min},
ops::{Range, RangeInclusive},
task::{Context, Poll},
};

/// Stage execution input, see [Stage::execute].
Expand Down Expand Up @@ -189,24 +190,51 @@ pub struct UnwindOutput {
/// Stages are executed as part of a pipeline where they are executed serially.
///
/// Stages receive [`DatabaseProviderRW`].
#[async_trait]
pub trait Stage<DB: Database>: Send + Sync {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;

/// Execute the stage.
async fn execute(
fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;

/// Unwind the stage.
async fn unwind(
fn unwind(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;

/// Returns `Poll::Ready(Ok(()))` when the stage is ready to execute the given range.
///
/// This method is heavily inspired by [tower]s `Service` trait. Any asynchronous tasks or
/// communication should be handled in `poll_ready`, e.g. moving downloaded items from
/// downloaders to an internal buffer in the stage.
///
/// If the stage has any pending external state, then `Poll::Pending` is returned.
///
/// If `Poll::Ready(Err(_))` is returned, the stage may not be able to execute anymore
/// depending on the specific error. In that case, an unwind must be issued instead.
///
/// Once `Poll::Ready(Ok(()))` is returned, the stage may be executed once using `execute`.
/// Until the stage has been executed, repeated calls to `poll_ready` must return either
/// `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
///
/// Note that `poll_ready` may reserve shared resources that are consumed in a subsequent call
/// of `execute`, e.g. internal buffers. It is crucial for implementations to not assume that
/// `execute` will always be invoked and to ensure that those resources are appropriately
/// released if the stage is dropped before `execute` is called.
///
/// For the same reason, it is also important that any shared resources do not exhibit
/// unbounded growth on repeated calls to `poll_ready`.
///
/// Unwinds may happen without consulting `poll_ready` first.
fn poll_ready(&mut self, cx: &mut Context<'_>, _: ExecInput) -> Poll<Result<(), StageError>> {
Poll::Ready(Ok(()))
}
}
56 changes: 42 additions & 14 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_interfaces::{
};
use reth_primitives::stage::{EntitiesCheckpoint, StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW;
use std::sync::Arc;
use std::{sync::Arc, task::Poll};
use tracing::*;

// TODO(onbjerg): Metrics and events (gradual status for e.g. CLI)
Expand Down Expand Up @@ -50,22 +50,51 @@ use tracing::*;
/// - The [`Transactions`][reth_db::tables::Transactions] table
#[derive(Debug)]
pub struct BodyStage<D: BodyDownloader> {
buffer: Vec<BlockResponse>,
/// The body downloader.
pub downloader: D,
downloader: D,
/// The consensus engine.
pub consensus: Arc<dyn Consensus>,
consensus: Arc<dyn Consensus>,
}

impl<D: BodyDownloader> BodyStage<D> {
pub fn new(downloader: D, consensus: Arc<dyn Consensus>) -> Self {
Self { buffer: Vec::new(), downloader, consensus }
}
}

#[async_trait::async_trait]
impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::Bodies
}

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
input: ExecInput,
) -> std::task::Poll<Result<(), StageError>> {
// todo: check if this is bad async code
if !self.buffer.is_empty() {
return Poll::Ready(Ok(()))
}

match self.downloader.try_poll_next_unpin(cx) {
Poll::Ready(Some(res)) => match res {
Ok(downloaded) => {
self.buffer.extend(downloaded);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err.into())),
},
Poll::Ready(None) => Poll::Ready(Err(StageError::ChannelClosed)),
Poll::Pending => Poll::Pending,
}
}

/// Download block bodies from the last checkpoint for this stage up until the latest synced
/// header, limited by the stage's batch size.
async fn execute(
fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
Expand Down Expand Up @@ -94,13 +123,12 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {

// Task downloader can return `None` only if the response relaying channel was closed. This
// is a fatal error to prevent the pipeline from running forever.
let downloaded_bodies =
self.downloader.try_next().await?.ok_or(StageError::ChannelClosed)?;

trace!(target: "sync::stages::bodies", bodies_len = downloaded_bodies.len(), "Writing blocks");
// todo: should be in a dedicated poll rdy fn
trace!(target: "sync::stages::bodies", bodies_len = self.buffer.len(), "Writing blocks");

let mut highest_block = from_block;
for response in downloaded_bodies {
for response in self.buffer.drain(..) {
// Write block
let block_number = response.block_number();

Expand Down Expand Up @@ -161,7 +189,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}

/// Unwind the stage.
async fn unwind(
fn unwind(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
Expand Down Expand Up @@ -539,14 +567,14 @@ mod tests {
}

fn stage(&self) -> Self::S {
BodyStage {
downloader: TestBodyDownloader::new(
BodyStage::new(
TestBodyDownloader::new(
self.tx.inner_raw(),
self.responses.clone(),
self.batch_size,
),
consensus: self.consensus.clone(),
}
self.consensus.clone(),
)
}
}

Expand Down
12 changes: 7 additions & 5 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,14 @@ fn calculate_gas_used_from_headers<DB: Database>(
/// Currently 64 megabytes.
const BIG_STACK_SIZE: usize = 64 * 1024 * 1024;

#[async_trait::async_trait]
impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
/// Return the id of the stage
fn id(&self) -> StageId {
StageId::Execution
}

/// Execute the stage
async fn execute(
fn execute(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
Expand All @@ -327,7 +326,9 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// to optimize revm or move data to the heap.
//
// See https://github.com/bluealloy/revm/issues/305
std::thread::scope(|scope| {

// todo: figure out what to do here
/*std::thread::scope(|scope| {
let handle = std::thread::Builder::new()
.stack_size(BIG_STACK_SIZE)
.spawn_scoped(scope, || {
Expand All @@ -336,11 +337,12 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
})
.expect("Expects that thread name is not null");
handle.join().expect("Expects for thread to not panic")
})
})*/
self.execute_inner(provider, input)
}

/// Unwind the stage.
async fn unwind(
fn unwind(
&mut self,
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
Expand Down
6 changes: 3 additions & 3 deletions crates/stages/src/stages/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ use reth_provider::DatabaseProviderRW;
#[derive(Default, Debug, Clone)]
pub struct FinishStage;

#[async_trait::async_trait]
// todo: re-eval if we need this stage
impl<DB: Database> Stage<DB> for FinishStage {
fn id(&self) -> StageId {
StageId::Finish
}

async fn execute(
fn execute(
&mut self,
_provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
}

async fn unwind(
fn unwind(
&mut self,
_provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
Expand Down
Loading