Skip to content

Commit

Permalink
feat(snapshots): write to snapshots during BodyStage (#5733)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 26, 2024
1 parent 9acc7d4 commit 3dcb835
Show file tree
Hide file tree
Showing 39 changed files with 1,476 additions and 923 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions bin/reth/src/commands/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use reth_primitives::{
};
use reth_provider::{
providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
TransactionsProviderExt,
};
use std::{
path::{Path, PathBuf},
Expand Down Expand Up @@ -45,12 +44,8 @@ impl Command {
let mut row_indexes = block_range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();

let tx_range = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone())
.provider()?
.transaction_range_by_block_range(block_range.clone())?;

let path: PathBuf = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = SnapshotProvider::new(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
Expand Down
55 changes: 32 additions & 23 deletions bin/reth/src/commands/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use reth_db::{
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::{BlockNumReader, ProviderFactory, TransactionsProviderExt};
use reth_provider::{BlockNumReader, ProviderFactory};
use reth_snapshot::{segments as snap_segments, segments::Segment};
use std::{
ops::RangeInclusive,
Expand Down Expand Up @@ -83,14 +85,15 @@ impl Command {
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
let all_combinations =
self.segments.iter().cartesian_product(self.compression.iter()).cartesian_product(
if self.phf.is_empty() {
vec![None]
} else {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
},
);
let all_combinations = self
.segments
.iter()
.cartesian_product(self.compression.iter().copied())
.cartesian_product(if self.phf.is_empty() {
vec![None]
} else {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
});

{
let db = open_db_read_only(
Expand All @@ -111,45 +114,48 @@ impl Command {
match mode {
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Headers::new(*compression, filters),
snap_segments::Headers,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Transactions::new(*compression, filters),
snap_segments::Transactions,
SegmentConfig { filters, compression },
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Receipts::new(*compression, filters),
snap_segments::Receipts,
SegmentConfig { filters, compression },
)?,
}
}
}
}

if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations.clone() {
for ((mode, compression), phf) in all_combinations {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Transactions => self.bench_transactions_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Receipts => self.bench_receipts_snapshot(
db_path,
log_level,
chain.clone(),
*compression,
compression,
InclusionFilter::Cuckoo,
phf,
)?,
Expand Down Expand Up @@ -179,7 +185,8 @@ impl Command {
fn generate_snapshot<DB: Database>(
&self,
factory: Arc<ProviderFactory<DB>>,
segment: impl Segment + Send + Sync,
segment: impl Segment<DB> + Send + Sync,
config: SegmentConfig,
) -> eyre::Result<()> {
let dir = PathBuf::default();
let ranges = self.block_ranges(factory.best_block_number()?);
Expand All @@ -194,13 +201,15 @@ impl Command {
let provider = factory.provider()?;

if !self.only_stats {
segment.snapshot::<DB>(&provider, &dir, block_range.clone())?;
segment.create_snapshot_file(
&provider,
dir.as_path(),
config,
block_range.clone(),
)?;
}

let tx_range =
provider.transaction_range_by_block_range(block_range.clone())?;

Ok(segment.segment().filename(block_range, &tx_range))
Ok(segment.segment().filename(block_range))
})
.collect::<Result<Vec<_>, eyre::Report>>()?;

Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/commands/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Command {
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path: PathBuf = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.filename_with_configuration(filters, compression, &block_range)
.into();

let provider = SnapshotProvider::new(PathBuf::default())?;
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/commands/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Command {
let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path: PathBuf = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range, &tx_range)
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = SnapshotProvider::new(PathBuf::default())?;
let jar_provider = provider.get_segment_provider_from_block(
Expand Down
1 change: 0 additions & 1 deletion crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tracing::debug;

#[derive(Debug)]
pub(crate) struct PolledHook {
#[allow(dead_code)]
pub(crate) name: &'static str,
pub(crate) event: EngineHookEvent,
pub(crate) db_access_level: EngineHookDBAccessLevel,
Expand Down
6 changes: 2 additions & 4 deletions crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<DB: Database + 'static> SnapshotHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
SnapshotterState::Idle(snapshotter) => {
let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) };
let Some(snapshotter) = snapshotter.take() else { return Ok(None) };

let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;

Expand Down Expand Up @@ -112,9 +112,7 @@ impl<DB: Database + 'static> EngineHook for SnapshotHook<DB> {
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<RethResult<EngineHookEvent>> {
let Some(finalized_block_number) = ctx.finalized_block_number else {
return Poll::Ready(Ok(EngineHookEvent::NotReady))
};
let Some(finalized_block_number) = ctx.finalized_block_number else { return Poll::Pending };

// Try to spawn a snapshotter
match self.try_spawn_snapshotter(finalized_block_number)? {
Expand Down
21 changes: 17 additions & 4 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1689,9 +1689,18 @@ where
None
}

fn on_hook_result(&self, result: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if result.db_access_level.is_read_write() {
match result.event {
fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
error!(
target: "consensus::engine",
name = %polled_hook.name,
?error,
"Hook finished with error"
)
}

if polled_hook.db_access_level.is_read_write() {
match polled_hook.event {
EngineHookEvent::NotReady => {}
EngineHookEvent::Started => {
// If the hook has read-write access to the database, it means that the engine
Expand All @@ -1709,7 +1718,11 @@ where
if let Err(error) =
self.blockchain.connect_buffered_blocks_to_canonical_hashes()
{
error!(target: "consensus::engine", ?error, "Error connecting buffered blocks to canonical hashes on hook result");
error!(
target: "consensus::engine",
?error,
"Error connecting buffered blocks to canonical hashes on hook result"
);
return Err(error.into())
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ where
5,
self.base_config.chain_spec.prune_delete_limit,
config.max_reorg_depth() as usize,
watch::channel(None).1,
);

let mut hooks = EngineHooks::new();
Expand Down
5 changes: 4 additions & 1 deletion crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ pub enum ProviderError {
#[error("not able to find {0} snapshot file for block number {1}")]
MissingSnapshotBlock(SnapshotSegment, BlockNumber),
/// Snapshot file is not found for requested transaction.
#[error("not able to find {0} snapshot file for transaction id {1}")]
#[error("unable to find {0} snapshot file for transaction id {1}")]
MissingSnapshotTx(SnapshotSegment, TxNumber),
/// Snapshot is finalized and cannot be written to.
#[error("unable to write block #{1} to finalized snapshot {0}")]
FinalizedSnapshot(SnapshotSegment, BlockNumber),
/// Error encountered when the block number conversion from U256 to u64 causes an overflow.
#[error("failed to convert block number U256 to u64: {0}")]
BlockNumberOverflow(U256),
Expand Down
46 changes: 19 additions & 27 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook},
hooks::{EngineHooks, PruneHook, SnapshotHook},
BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError,
MIN_BLOCKS_FOR_PIPELINE_RUN,
};
Expand Down Expand Up @@ -1015,20 +1015,9 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
let prometheus_handle = self.config.install_prometheus_recorder()?;
info!(target: "reth::cli", "Database opened");

let mut provider_factory =
ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain));

// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
provider_factory.clone(),
self.data_dir.snapshots_path(),
self.config.chain.snapshot_block_interval,
)?;

provider_factory = provider_factory.with_snapshots(
self.data_dir.snapshots_path(),
snapshotter.highest_snapshot_receiver(),
)?;
let provider_factory =
ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain))
.with_snapshots(self.data_dir.snapshots_path())?;

self.config.start_metrics_endpoint(prometheus_handle, Arc::clone(&self.db)).await?;

Expand Down Expand Up @@ -1207,20 +1196,23 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
let initial_target = self.config.initial_pipeline_target(genesis_hash);
let mut hooks = EngineHooks::new();

let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(self.config.chain.prune_delete_limit)
.build(provider_factory, snapshotter.highest_snapshot_receiver());
let mut pruner = PrunerBuilder::new(prune_config.clone().unwrap_or_default())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(self.config.chain.prune_delete_limit)
.build(provider_factory.clone());

let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
let pruner_events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");

info!(target: "reth::cli", ?prune_config, "Pruner initialized");
Either::Left(events)
} else {
Either::Right(stream::empty())
};
let snapshotter = reth_snapshot::Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
);
hooks.add(SnapshotHook::new(snapshotter, Box::new(executor.clone())));
info!(target: "reth::cli", "Snapshotter initialized");

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
Expand Down
7 changes: 7 additions & 0 deletions crates/primitives/src/prune/mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ pub enum PruneMode {
}

impl PruneMode {
/// Prune blocks up to the specified block number. The specified block number is also pruned.
///
/// This acts as `PruneMode::Before(block_number + 1)`.
pub fn before_inclusive(block_number: BlockNumber) -> Self {
Self::Before(block_number + 1)
}

/// Returns block up to which variant pruning needs to be done, inclusive, according to the
/// provided tip.
pub fn prune_target_block(
Expand Down
8 changes: 8 additions & 0 deletions crates/primitives/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use alloy_primitives::BlockNumber;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentConfig, SegmentHeader, SnapshotSegment};
use std::ops::RangeInclusive;

/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;
Expand Down Expand Up @@ -45,3 +46,10 @@ impl HighestSnapshots {
}
}
}

/// Each snapshot has a fixed number of blocks. This gives out the range where the requested block
/// is positioned. Used for segment filename.
pub fn find_fixed_range(block: BlockNumber) -> RangeInclusive<BlockNumber> {
let start = (block / BLOCKS_PER_SNAPSHOT) * BLOCKS_PER_SNAPSHOT;
start..=start + BLOCKS_PER_SNAPSHOT - 1
}
Loading

0 comments on commit 3dcb835

Please sign in to comment.