Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ETL and use it on HeaderStage #6154

Merged
merged 39 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6eb8d89
feat: etl crate
onbjerg Dec 18, 2023
c248dca
add some docs and helper functions to etl collector
joshieDo Jan 22, 2024
3da24b9
cargo lock
joshieDo Jan 22, 2024
deb9d5f
add from_vec to RawKey and RawValue for already processed data
joshieDo Jan 22, 2024
0d12bb4
add Compact to SealedHeader
joshieDo Jan 22, 2024
f454c9f
use etl in HeaderStage + include HeaderTD insertion
joshieDo Jan 22, 2024
96bf23e
only exit download loop once all have been downloaded
joshieDo Jan 22, 2024
83d3af7
make sure to include last header on the collectors
joshieDo Jan 22, 2024
0370473
fix on deleting genesis block hash
joshieDo Jan 22, 2024
fea4c48
remove TotalDifficulty stage
joshieDo Jan 22, 2024
66a136a
fmt
joshieDo Jan 22, 2024
26125df
port total diffiulty unwind/tests to header stage
joshieDo Jan 23, 2024
f6a7fc0
Update crates/stages/src/stages/headers.rs
joshieDo Jan 24, 2024
4bc8b27
Update crates/stages/src/stages/headers.rs
joshieDo Jan 24, 2024
d417f3a
add additional doc to is_etl_ready usage
joshieDo Jan 24, 2024
85e66d0
Merge branch 'feat/etl-headers' of github.com:paradigmxyz/reth into f…
joshieDo Jan 24, 2024
919ea9e
add BlockHash to Compact known types
joshieDo Jan 24, 2024
a8890d7
add more info when moving from etl to db
joshieDo Jan 24, 2024
d527fd9
handle etl iter errors
joshieDo Jan 24, 2024
e218e52
HeaderStage::new with Result (propagates all stages)
joshieDo Jan 24, 2024
0081a4d
fix doc tests
joshieDo Jan 24, 2024
0a389ed
Update crates/stages/src/stages/headers.rs
joshieDo Jan 26, 2024
2d16afc
Update crates/stages/src/stages/headers.rs
joshieDo Jan 26, 2024
a0bf5f5
Update crates/stages/src/stages/headers.rs
joshieDo Jan 26, 2024
821ac5b
Update crates/stages/src/stages/headers.rs
joshieDo Jan 26, 2024
09fecdf
Update crates/stages/src/stages/headers.rs
joshieDo Jan 26, 2024
91c987c
Revert "HeaderStage::new with Result (propagates all stages)"
joshieDo Jan 26, 2024
bbd7d9c
fmt
joshieDo Jan 26, 2024
418c3c7
make OnlineStages & DefaultStages::new fallible
joshieDo Jan 26, 2024
3251f6f
Merge branch 'feat/etl-headers' of github.com:paradigmxyz/reth into f…
joshieDo Jan 26, 2024
bebe555
cargo lock update
joshieDo Jan 26, 2024
1c987fa
Merge remote-tracking branch 'origin/feat/static-files' into feat/etl…
joshieDo Jan 26, 2024
2b46710
remove left over unwrap
joshieDo Jan 26, 2024
0a5f8b1
add meaning of etl
joshieDo Jan 26, 2024
986c817
add doc on etl read_next error
joshieDo Jan 26, 2024
ed9432f
update EtlIter doc
joshieDo Jan 26, 2024
a0f9334
cargo lock update
joshieDo Jan 26, 2024
21d2c54
Merge remote-tracking branch 'origin/feat/static-files' into feat/etl…
joshieDo Jan 26, 2024
603d936
remove dup from cargo toml
joshieDo Jan 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/consensus/beacon-core/",
"crates/consensus/common/",
"crates/ethereum-forks/",
"crates/etl",
"crates/interfaces/",
"crates/metrics/",
"crates/metrics/metrics-derive/",
Expand Down Expand Up @@ -129,6 +130,7 @@ reth-ecies = { path = "crates/net/ecies" }
reth-eth-wire = { path = "crates/net/eth-wire" }
reth-ethereum-forks = { path = "crates/ethereum-forks" }
reth-ethereum-payload-builder = { path = "crates/payload/ethereum" }
reth-etl = { path = "crates/etl" }
reth-optimism-payload-builder = { path = "crates/payload/optimism" }
reth-interfaces = { path = "crates/interfaces" }
reth-ipc = { path = "crates/rpc/ipc" }
Expand Down
8 changes: 2 additions & 6 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainS
use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
Pipeline, StageSet,
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -123,11 +123,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus)
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
)?
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
Expand Down
8 changes: 2 additions & 6 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_primitives::{stage::StageId, ChainSpec, B256};
use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -173,11 +173,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus.clone())
.with_commit_threshold(config.stages.total_difficulty.commit_threshold),
)
)?
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
Expand Down
19 changes: 11 additions & 8 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,17 @@ where
.build(client.clone(), consensus.clone(), provider_factory.clone())
.into_task();

Pipeline::builder().add_stages(DefaultStages::new(
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
))
Pipeline::builder().add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
)
.expect("should build"),
)
}
};

Expand Down
16 changes: 16 additions & 0 deletions crates/etl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "reth-etl"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
exclude.workspace = true

[dependencies]
tempfile.workspace = true
reth-db.workspace = true

[dev-dependencies]
reth-primitives.workspace = true
256 changes: 256 additions & 0 deletions crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//! ETL data collector.
//!
//! This crate is useful for dumping unsorted data into temporary files and iterating on their
//! sorted representation later on.
//!
//! This has multiple uses, such as optimizing database inserts (for Btree based databases) and
//! memory management (as it moves the buffer to disk instead of memory).

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![warn(missing_debug_implementations, missing_docs, unreachable_pub, rustdoc::all)]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use std::{
cmp::Reverse,
collections::BinaryHeap,
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::Path,
sync::Arc,
};

use reth_db::table::{Compress, Encode, Key, Value};
use tempfile::{NamedTempFile, TempDir};

/// An ETL data collector.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
///
/// Data is pushed to the collector which internally flushes the data in a sorted manner to files of
/// some specified capacity.
///
/// The data can later be iterated over in a sorted manner.
#[derive(Debug)]
pub struct Collector<K, V>
where
K: Encode + Ord,
V: Compress,
<K as Encode>::Encoded: std::fmt::Debug,
<V as Compress>::Compressed: std::fmt::Debug,
{
/// Directory for temporary file storage
dir: Arc<TempDir>,
/// Collection of temporary ETL files
files: Vec<EtlFile>,
/// Current buffer size in bytes
buffer_size_bytes: usize,
/// Maximum buffer capacity in bytes, triggers flush when reached
buffer_capacity_bytes: usize,
/// In-memory buffer storing encoded and compressed key-value pairs
buffer: Vec<(<K as Encode>::Encoded, <V as Compress>::Compressed)>,
/// Total number of elements in the collector, including all files
len: usize,
}

impl<K, V> Collector<K, V>
where
K: Key,
V: Value,
<K as Encode>::Encoded: Ord + std::fmt::Debug,
<V as Compress>::Compressed: Ord + std::fmt::Debug,
{
/// Create a new collector in a specific temporary directory with some capacity.
///
/// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
pub fn new(dir: Arc<TempDir>, buffer_capacity_bytes: usize) -> Self {
Self {
dir,
buffer_size_bytes: 0,
files: Vec::new(),
buffer_capacity_bytes,
buffer: Vec::new(),
len: 0,
}
}

/// Returns number of elements currently in the collector.
pub fn len(&self) -> usize {
self.len
}

/// Returns `true` if there are currently no elements in the collector.
pub fn is_empty(&self) -> bool {
self.len == 0
}

/// Insert an entry into the collector.
pub fn insert(&mut self, key: K, value: V) {
let key = key.encode();
let value = value.compress();
self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len();
self.buffer.push((key, value));
if self.buffer_size_bytes > self.buffer_capacity_bytes {
self.flush();
}
self.len += 1;
}

fn flush(&mut self) {
self.buffer_size_bytes = 0;
self.buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))
}
mattsse marked this conversation as resolved.
Show resolved Hide resolved

/// Returns an iterator over the collector data.
///
/// The items of the iterator are sorted across all underlying files.
///
/// # Note
///
/// The keys and values have been pre-encoded, meaning they *SHOULD NOT* be encoded or
/// compressed again.
pub fn iter(&mut self) -> std::io::Result<EtlIter<'_>> {
// Flush the remaining items to disk
if self.buffer_size_bytes > 0 {
self.flush();
}

let mut heap = BinaryHeap::new();
for (current_id, file) in self.files.iter_mut().enumerate() {
if let Some((current_key, current_value)) = file.read_next()? {
heap.push((Reverse((current_key, current_value)), current_id));
}
}

Ok(EtlIter { heap, files: &mut self.files })
}
}

/// An iterator over sorted data in a collection of ETL files.
#[derive(Debug)]
pub struct EtlIter<'a> {
/// Heap managing the next items to be iterated.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
#[allow(clippy::type_complexity)]
heap: BinaryHeap<(Reverse<(Vec<u8>, Vec<u8>)>, usize)>,
/// Reference to the vector of ETL files being iterated over.
files: &'a mut Vec<EtlFile>,
}

impl<'a> EtlIter<'a> {
/// Peeks into the next element
pub fn peek(&self) -> Option<&(Vec<u8>, Vec<u8>)> {
self.heap.peek().map(|(Reverse(entry), _)| entry)
}
}

impl<'a> Iterator for EtlIter<'a> {
type Item = std::io::Result<(Vec<u8>, Vec<u8>)>;

fn next(&mut self) -> Option<Self::Item> {
// Get the next sorted entry from the heap
let (Reverse(entry), id) = self.heap.pop()?;

// Populate the heap with the next entry from the same file
match self.files[id].read_next() {
Ok(Some((key, value))) => {
self.heap.push((Reverse((key, value)), id));
Some(Ok(entry))
}
Ok(None) => Some(Ok(entry)),
err => err.transpose(),
}
}
}

/// A temporary ETL file.
#[derive(Debug)]
struct EtlFile {
file: BufReader<NamedTempFile>,
len: usize,
}

impl EtlFile {
/// Create a new file with the given data (which should be pre-sorted) at the given path.
///
/// The file will be a temporary file.
pub(crate) fn new<K, V>(dir: &Path, buffer: Vec<(K, V)>) -> std::io::Result<Self>
where
Self: Sized,
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let file = NamedTempFile::new_in(dir)?;
let mut w = BufWriter::new(file);
for entry in &buffer {
let k = entry.0.as_ref();
let v = entry.1.as_ref();

w.write_all(&k.len().to_be_bytes())?;
w.write_all(&v.len().to_be_bytes())?;
w.write_all(k)?;
w.write_all(v)?;
}

let mut file = BufReader::new(w.into_inner()?);
file.seek(SeekFrom::Start(0))?;
let len = buffer.len();
Ok(Self { file, len })
}

/// Read the next entry in the file.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn read_next(&mut self) -> std::io::Result<Option<(Vec<u8>, Vec<u8>)>> {
if self.len == 0 {
return Ok(None);
}

let mut buffer_key_length = [0; 8];
let mut buffer_value_length = [0; 8];

self.file.read_exact(&mut buffer_key_length)?;
self.file.read_exact(&mut buffer_value_length)?;

let key_length = usize::from_be_bytes(buffer_key_length);
let value_length = usize::from_be_bytes(buffer_value_length);
let mut key = vec![0; key_length];
let mut value = vec![0; value_length];

self.file.read_exact(&mut key)?;
self.file.read_exact(&mut value)?;

self.len -= 1;

Ok(Some((key, value)))
}
}

#[cfg(test)]
mod tests {
use reth_primitives::{TxHash, TxNumber};
use tempfile::TempDir;

use super::*;

#[test]
fn etl_hashes() {
let mut entries: Vec<_> =
(0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();

let mut collector = Collector::new(Arc::new(TempDir::new().unwrap()), 1024);
for (k, v) in entries.clone() {
collector.insert(k, v);
}
entries.sort_unstable_by_key(|entry| entry.0);

for (id, entry) in collector.iter().unwrap().enumerate() {
let expected = entries[id];
assert_eq!(
entry.unwrap(),
(expected.0.encode().to_vec(), expected.1.compress().to_vec())
);
}
}
}
Loading