Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ethereum engine chain orchestrator #9241

Merged
merged 9 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"crates/errors/",
"crates/ethereum-forks/",
"crates/ethereum/consensus/",
"crates/ethereum/engine/",
"crates/ethereum/engine-primitives/",
"crates/ethereum/evm",
"crates/ethereum/node",
Expand Down
16 changes: 15 additions & 1 deletion crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ reth-payload-validator.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
Expand All @@ -54,11 +55,24 @@ aquamarine.workspace = true
parking_lot.workspace = true
tracing.workspace = true

# optional deps for test-utils
reth-stages = { workspace = true, optional = true }
reth-tracing = { workspace = true, optional = true }

[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-prune-types.workspace = true
reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true

assert_matches.workspace = true
assert_matches.workspace = true

[features]
test-utils = [
"reth-db/test-utils",
"reth-network-p2p/test-utils",
"reth-stages/test-utils",
"reth-tracing"
]
58 changes: 3 additions & 55 deletions crates/engine/tree/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,17 @@ impl<DB: Database> PipelineState<DB> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::insert_headers_into_client;
use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockNumber, Header, B256};
use reth_provider::{
test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome,
};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages::ExecOutput;
use reth_stages_api::StageCheckpoint;
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
use tokio::sync::watch;

struct TestHarness {
pipeline_sync: PipelineSync<Arc<TempDatabase<DatabaseEnv>>>,
Expand Down Expand Up @@ -263,52 +257,6 @@ mod tests {
}
}

struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<ExecutionOutcome>,
}

impl TestPipelineBuilder {
/// Create a new [`TestPipelineBuilder`].
const fn new() -> Self {
Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() }
}

/// Set the pipeline execution outputs to use for the test consensus engine.
fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}

/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_results = executor_results;
self
}

/// Builds the pipeline.
fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<Arc<TempDatabase<DatabaseEnv>>> {
reth_tracing::init_test_tracing();

// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);

let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);

let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());

pipeline.build(provider_factory, static_file_producer)
}
}

#[tokio::test]
async fn pipeline_started_and_finished() {
const TOTAL_BLOCKS: usize = 10;
Expand Down
19 changes: 15 additions & 4 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
sync::mpsc::Sender,
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;

/// Advances the chain based on incoming requests.
///
Expand Down Expand Up @@ -146,13 +147,23 @@ pub trait EngineRequestHandler: Send + Sync {
#[derive(Debug)]
pub struct EngineApiRequestHandler<T: EngineTypes> {
/// channel to send messages to the tree to execute the payload.
to_tree: std::sync::mpsc::Sender<FromEngine<BeaconEngineMessage<T>>>,
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
/// channel to receive messages from the tree.
from_tree: mpsc::UnboundedReceiver<EngineApiEvent>,
from_tree: UnboundedReceiver<EngineApiEvent>,
// TODO add db controller
}

impl<T> EngineApiRequestHandler<T> where T: EngineTypes {}
impl<T> EngineApiRequestHandler<T>
where
T: EngineTypes,
{
pub const fn new(
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
from_tree: UnboundedReceiver<EngineApiEvent>,
) -> Self {
Self { to_tree, from_tree }
}
}

impl<T> EngineRequestHandler for EngineApiRequestHandler<T>
where
Expand Down
8 changes: 6 additions & 2 deletions crates/engine/tree/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//! This crate includes the core components for advancing a reth chain.
//!
//! ## Feature Flags
//!
//! - `test-utils`: Export utilities for testing

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
Expand Down Expand Up @@ -27,5 +31,5 @@ pub mod persistence;
/// Support for interacting with the blockchain tree.
pub mod tree;

#[cfg(test)]
mod test_utils;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
60 changes: 58 additions & 2 deletions crates/engine/tree/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,62 @@
use reth_chainspec::ChainSpec;
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::{BlockBody, SealedHeader};
use std::ops::Range;
use reth_primitives::{BlockBody, SealedHeader, B256};
use reth_provider::{test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome};
use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages_api::Pipeline;
use reth_static_file::StaticFileProducer;
use std::{collections::VecDeque, ops::Range, sync::Arc};
use tokio::sync::watch;

/// Test pipeline builder.
#[derive(Default)]
pub struct TestPipelineBuilder {
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<ExecutionOutcome>,
}

impl TestPipelineBuilder {
/// Create a new [`TestPipelineBuilder`].
pub const fn new() -> Self {
Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() }
}

/// Set the pipeline execution outputs to use for the test consensus engine.
pub fn with_pipeline_exec_outputs(
mut self,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
) -> Self {
self.pipeline_exec_outputs = pipeline_exec_outputs;
self
}

/// Set the executor results to use for the test consensus engine.
#[allow(dead_code)]
pub fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
self.executor_results = executor_results;
self
}

/// Builds the pipeline.
pub fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<Arc<TempDatabase<DatabaseEnv>>> {
reth_tracing::init_test_tracing();

// Setup pipeline
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);

let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);

let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());

pipeline.build(provider_factory, static_file_producer)
}
}

pub(crate) fn insert_headers_into_client(
client: &TestFullBlockClient,
Expand Down
31 changes: 31 additions & 0 deletions crates/ethereum/engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "reth-ethereum-engine"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[lints]
workspace = true

[dependencies]
# reth
reth-beacon-consensus.workspace = true
reth-chainspec.workspace = true
reth-db-api.workspace = true
reth-engine-tree.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-network-p2p.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true

# async
futures.workspace = true
pin-project.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

[dev-dependencies]
reth-engine-tree = { workspace = true, features = ["test-utils"] }
12 changes: 12 additions & 0 deletions crates/ethereum/engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! Ethereum engine implementation.

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

/// Ethereum engine orchestrator.
pub mod orchestrator;
Loading
Loading