Skip to content

Commit

Permalink
implement proper poll ready methods for headers and bodies, fix tests…
Browse files Browse the repository at this point in the history
…, add header sync gap provider
  • Loading branch information
rkrasiuk committed Nov 15, 2023
1 parent 5836352 commit 661876f
Show file tree
Hide file tree
Showing 28 changed files with 498 additions and 475 deletions.
17 changes: 7 additions & 10 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
},
dirs::{DataDirPath, MaybePlatformPath},
init::init_genesis,
node::events::{handle_events, NodeEvent},
Expand All @@ -8,12 +12,6 @@ use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_provider::{ProviderFactory, StageCheckpointReader};

use crate::args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
};
use reth_config::Config;
use reth_db::{database::Database, init_db};
use reth_downloaders::{
Expand All @@ -22,12 +20,10 @@ use reth_downloaders::{
};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{stage::StageId, ChainSpec, B256};
use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
prelude::*,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -164,6 +160,7 @@ impl ImportCommand {
.with_max_block(max_block)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
HeaderSyncMode::Tip(tip_rx),
consensus.clone(),
header_downloader,
Expand Down
8 changes: 3 additions & 5 deletions bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ use reth_interfaces::{
use reth_network::{NetworkEvents, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
sets::DefaultStages,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
Pipeline, PipelineError, StageSet,
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -118,6 +115,7 @@ impl Command {
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()), // TODO:
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
57 changes: 23 additions & 34 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,53 +222,42 @@ impl Command {
None
};

execution_stage
.execute(
execution_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
},
)?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
)?;
account_hashing_done = output.done;
}

let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage
.execute(
let output = storage_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await;
)?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage.execute(
&provider_rw,
ExecInput { target: Some(block), checkpoint: progress.map(StageCheckpoint::new) },
);

if incremental_result.is_err() {
tracing::warn!(target: "reth::cli", block, "Incremental calculation failed, retrying from scratch");
Expand All @@ -285,7 +274,7 @@ impl Command {

let clean_input = ExecInput { target: Some(block), checkpoint: None };
loop {
let clean_result = merkle_stage.execute(&provider_rw, clean_input).await;
let clean_result = merkle_stage.execute(&provider_rw, clean_input);
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
break
Expand Down
9 changes: 5 additions & 4 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use reth_primitives::{
};
use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, ProviderFactory, StageCheckpointReader,
HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
};
use reth_prune::{segments::SegmentSet, Pruner};
use reth_revm::Factory;
Expand All @@ -71,9 +71,9 @@ use reth_snapshot::HighestSnapshotsTracker;
use reth_stages::{
prelude::*,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TotalDifficultyStage, TransactionLookupStage,
},
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -892,6 +892,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.with_metrics_tx(metrics_tx.clone())
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
30 changes: 13 additions & 17 deletions bin/reth/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_primitives::{stage::StageCheckpoint, ChainSpec};
use reth_provider::ProviderFactory;
use reth_revm::Factory;
use reth_stages::{stages::ExecutionStage, Stage, UnwindInput};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_execution_stage<DB: Database>(
Expand Down Expand Up @@ -100,16 +100,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone()));

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;

let unwind_inner_tx = provider.into_tx();

Expand All @@ -131,16 +129,14 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage. [dry-run]");

let factory = ProviderFactory::new(&output_db, chain.clone());
let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));
let mut exec_stage: Box<dyn Stage<DB>> =
Box::new(ExecutionStage::new_with_factory(Factory::new(chain.clone())));

let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?;
exec_stage.execute(&factory.provider_rw()?, input)?;

info!(target: "reth::cli", "Success.");
info!(target: "reth::cli", "Success");

Ok(())
}
11 changes: 4 additions & 7 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec};
use reth_provider::ProviderFactory;
use reth_stages::{stages::AccountHashingStage, Stage, UnwindInput};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_account_stage<DB: Database>(
Expand Down Expand Up @@ -68,21 +68,18 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage {
let mut exec_stage: Box<dyn Stage<DB>> = Box::new(AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};
});

let mut done = false;
while !done {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
done = poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?
.done;
done = exec_stage.execute(&provider, input)?.done;
}

info!(target: "reth::cli", "Success.");
Expand Down
11 changes: 4 additions & 7 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, ChainSpec};
use reth_provider::ProviderFactory;
use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
Expand Down Expand Up @@ -67,21 +67,18 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage {
let mut exec_stage: Box<dyn Stage<DB>> = Box::new(StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};
});

let mut done = false;
while !done {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
done = poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?
.done;
done = exec_stage.execute(&provider, input)?.done;
}

info!(target: "reth::cli", "Success.");
Expand Down
53 changes: 17 additions & 36 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec, PruneModes};
use reth_provider::{DatabaseProviderRW, ProviderFactory};
use reth_provider::ProviderFactory;
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
},
ExecInput, ExecOutput, Stage, StageError, UnwindInput,
Stage, UnwindInput,
};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_merkle_stage<DB: Database>(
Expand Down Expand Up @@ -88,21 +88,12 @@ async fn unwind_and_copy<DB: Database>(
)?;

// Bring hashes to TO

poll_and_execute(
&mut AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX },
&provider,
execute_input,
)
.await
.unwrap();
poll_and_execute(
&mut StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX },
&provider,
execute_input,
)
.await
.unwrap();
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.unwrap();

let unwind_inner_tx = provider.into_tx();

Expand All @@ -117,15 +108,6 @@ async fn unwind_and_copy<DB: Database>(
Ok(())
}

// todo: move to test_utils in reth_stages and use it where we currently manually poll
async fn poll_and_execute<DB: Database, S: Stage<DB>>(
stage: &mut S,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
poll_fn(|cx| stage.poll_ready(cx, input)).await.and_then(|_| stage.execute(&provider, input))
}

/// Try to re-execute the stage straightaway
async fn dry_run<DB: Database>(
chain: Arc<ChainSpec>,
Expand All @@ -136,20 +118,19 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage.");
let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_output = false;
while !exec_output {
exec_output = poll_and_execute(
&mut MerkleStage::Execution {
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
},
let mut done = false;
while !done {
done = MerkleStage::Execution {
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
}
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
)?
.done;
}

Expand Down
Loading

0 comments on commit 661876f

Please sign in to comment.