Skip to content

Commit

Permalink
feat: make EngineApiRequestHandler generic over request (paradigmxyz#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored and martinezjorge committed Aug 7, 2024
1 parent dba7efa commit 5a8d981
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
30 changes: 16 additions & 14 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::{
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_engine_primitives::EngineTypes;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -54,12 +53,18 @@ impl<T, S, D> EngineHandler<T, S, D> {
{
Self { handler, incoming_requests, downloader }
}

/// Returns a mutable reference to the request handler.
pub fn handler_mut(&mut self) -> &mut T {
&mut self.handler
}
}

impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
where
T: EngineRequestHandler,
S: Stream<Item = T::Request> + Send + Sync + Unpin + 'static,
S: Stream + Send + Sync + Unpin + 'static,
<S as Stream>::Item: Into<T::Request>,
D: BlockDownloader,
{
type Event = T::Event;
Expand Down Expand Up @@ -98,7 +103,7 @@ where
// pop the next incoming request
if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
// and delegate the request to the handler
self.handler.on_event(FromEngine::Request(req));
self.handler.on_event(FromEngine::Request(req.into()));
// skip downloading in this iteration to allow the handler to process the request
continue
}
Expand Down Expand Up @@ -156,32 +161,29 @@ pub trait EngineRequestHandler: Send + Sync {
/// In case required blocks are missing, the handler will request them from the network, by emitting
/// a download request upstream.
#[derive(Debug)]
pub struct EngineApiRequestHandler<T: EngineTypes> {
pub struct EngineApiRequestHandler<Request> {
/// channel to send messages to the tree to execute the payload.
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
to_tree: Sender<FromEngine<Request>>,
/// channel to receive messages from the tree.
from_tree: UnboundedReceiver<EngineApiEvent>,
}

impl<T> EngineApiRequestHandler<T>
where
T: EngineTypes,
{
impl<Request> EngineApiRequestHandler<Request> {
/// Creates a new `EngineApiRequestHandler`.
pub const fn new(
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
to_tree: Sender<FromEngine<Request>>,
from_tree: UnboundedReceiver<EngineApiEvent>,
) -> Self {
Self { to_tree, from_tree }
}
}

impl<T> EngineRequestHandler for EngineApiRequestHandler<T>
impl<Request> EngineRequestHandler for EngineApiRequestHandler<Request>
where
T: EngineTypes,
Request: Send,
{
type Event = BeaconConsensusEngineEvent;
type Request = BeaconEngineMessage<T>;
type Request = Request;

fn on_event(&mut self, event: FromEngine<Self::Request>) {
// delegate to the tree
Expand Down
2 changes: 1 addition & 1 deletion crates/ethereum/engine/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Alias for Ethereum chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EthEngineTypes>,
EngineApiRequestHandler<BeaconEngineMessage<EthEngineTypes>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
BasicBlockDownloader<Client>,
>,
Expand Down

0 comments on commit 5a8d981

Please sign in to comment.