From dfcb2e13814b35d848aa663e004c2f396b8c6f3b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 27 Aug 2024 18:09:07 +0100 Subject: [PATCH 01/29] feat(exex): subscribe to notifications explicitly --- crates/exex/exex/src/context.rs | 6 +- crates/exex/exex/src/manager.rs | 101 ++++++++++++++++++++++++++++---- crates/exex/types/src/lib.rs | 2 + 3 files changed, 96 insertions(+), 13 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index c159b90bd5e4..131b081ca14a 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -4,9 +4,9 @@ use reth_node_api::FullNodeComponents; use reth_node_core::node_config::NodeConfig; use reth_primitives::Head; use reth_tasks::TaskExecutor; -use tokio::sync::mpsc::{Receiver, UnboundedSender}; +use tokio::sync::mpsc::UnboundedSender; -use crate::{ExExEvent, ExExNotification}; +use crate::{ExExEvent, ExExNotificationsSubscriber}; /// Captures the context that an `ExEx` has access to. pub struct ExExContext { @@ -30,7 +30,7 @@ pub struct ExExContext { /// /// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the /// node. - pub notifications: Receiver, + pub notifications: ExExNotificationsSubscriber, /// node components pub components: Node, diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 0f222e0ecaa9..ab9e652be6e2 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,11 +1,13 @@ use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; use metrics::Gauge; +use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; use reth_primitives::BlockNumber; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, future::{poll_fn, Future}, + ops::{Deref, DerefMut}, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -45,6 +47,7 @@ pub struct ExExHandle { sender: PollSender, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, + handle_rx: watch::Receiver, /// The ID of the next notification to send to this `ExEx`. next_notification_id: usize, @@ -59,9 +62,12 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new(id: String) -> (Self, UnboundedSender, Receiver) { + pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (handle_tx, handle_rx) = watch::channel(ExExHandleState::Inactive); + + let notifications = ExExNotificationsSubscriber::new(notification_rx, handle_tx); ( Self { @@ -69,11 +75,12 @@ impl ExExHandle { metrics: ExExMetrics::new_with_labels(&[("exex", id)]), sender: PollSender::new(notification_tx), receiver: event_rx, + handle_rx, next_notification_id: 0, finished_height: None, }, event_tx, - notification_rx, + notifications, ) } @@ -86,6 +93,10 @@ impl ExExHandle { cx: &mut Context<'_>, (notification_id, notification): &(usize, ExExNotification), ) -> Poll>> { + if !self.handle_rx.borrow().is_active() { + return Poll::Ready(Ok(())) + } + if let Some(finished_height) = self.finished_height { match notification { ExExNotification::ChainCommitted { new } => { @@ -139,6 +150,72 @@ impl ExExHandle { } } +#[derive(Debug)] +pub struct ExExNotificationsSubscriber { + notifications: ExExNotifications, + handle_tx: watch::Sender, +} + +impl ExExNotificationsSubscriber { + pub(crate) fn new( + receiver: Receiver, + handle_tx: watch::Sender, + ) -> Self { + Self { + notifications: ExExNotifications { receiver, handle_tx: handle_tx.clone() }, + handle_tx, + } + } + + pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { + self.handle_tx.send(ExExHandleState::Active(Some(head))).unwrap(); + &mut self.notifications + } + + pub fn subscribe(&mut self) -> &mut ExExNotifications { + self.handle_tx.send(ExExHandleState::Active(None)).unwrap(); + &mut self.notifications + } +} + +#[derive(Debug)] +pub enum ExExHandleState { + Active(Option), + Inactive, +} + +impl ExExHandleState { + pub(crate) const fn is_active(&self) -> bool { + matches!(self, Self::Active(_)) + } +} + +#[derive(Debug)] +pub struct ExExNotifications { + receiver: Receiver, + handle_tx: watch::Sender, +} + +impl Deref for ExExNotifications { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +impl DerefMut for ExExNotifications { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} + +impl Drop for ExExNotifications { + fn drop(&mut self) { + let _ = self.handle_tx.send(ExExHandleState::Inactive); + } +} + /// Metrics for the `ExEx` manager. #[derive(Metrics)] #[metrics(scope = "exex_manager")] @@ -730,7 +807,8 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let notifications_rx = notifications.subscribe(); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -759,7 +837,7 @@ mod tests { // Send a notification and ensure it's received correctly match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications_rx.recv().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -772,7 +850,8 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let notifications_rx = notifications.subscribe(); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -792,7 +871,7 @@ mod tests { Poll::Ready(Ok(())) => { // The notification should be skipped, so nothing should be sent. // Check that the receiver channel is indeed empty - assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty"); + assert!(notifications_rx.try_recv().is_err(), "Receiver channel should be empty"); } Poll::Pending | Poll::Ready(Err(_)) => { panic!("Notification should not be pending or fail"); @@ -805,7 +884,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -821,7 +901,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications_rx.recv().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -835,7 +915,8 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -848,7 +929,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notification_rx.recv().await.unwrap(); + let received_notification = notifications_rx.recv().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { diff --git a/crates/exex/types/src/lib.rs b/crates/exex/types/src/lib.rs index 3c0ca731f216..8e71fbc619b3 100644 --- a/crates/exex/types/src/lib.rs +++ b/crates/exex/types/src/lib.rs @@ -9,7 +9,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod finished_height; +mod head; mod notification; pub use finished_height::FinishedExExHeight; +pub use head::ExExHead; pub use notification::ExExNotification; From eae2985e7bcc2fa6bf980272000787a09bf0bfb8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Aug 2024 12:31:21 +0100 Subject: [PATCH 02/29] add head --- Cargo.lock | 1 + crates/exex/exex/src/manager.rs | 45 ++++++++++++++++++++------------- crates/exex/types/Cargo.toml | 1 + crates/exex/types/src/head.rs | 11 ++++++++ 4 files changed, 40 insertions(+), 18 deletions(-) create mode 100644 crates/exex/types/src/head.rs diff --git a/Cargo.lock b/Cargo.lock index 7a854560032c..d75886ac2263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7451,6 +7451,7 @@ name = "reth-exex-types" version = "1.0.5" dependencies = [ "alloy-primitives", + "reth-primitives", "reth-provider", "serde", ] diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index ab9e652be6e2..201cfd8bdaba 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -47,7 +47,7 @@ pub struct ExExHandle { sender: PollSender, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, - handle_rx: watch::Receiver, + notifications_state_rx: watch::Receiver, /// The ID of the next notification to send to this `ExEx`. next_notification_id: usize, @@ -65,9 +65,11 @@ impl ExExHandle { pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let (handle_tx, handle_rx) = watch::channel(ExExHandleState::Inactive); + let (notifications_state_tx, notifications_state_rx) = + watch::channel(ExExNotificationsState::Inactive); - let notifications = ExExNotificationsSubscriber::new(notification_rx, handle_tx); + let notifications = + ExExNotificationsSubscriber::new(notification_rx, notifications_state_tx); ( Self { @@ -75,7 +77,7 @@ impl ExExHandle { metrics: ExExMetrics::new_with_labels(&[("exex", id)]), sender: PollSender::new(notification_tx), receiver: event_rx, - handle_rx, + notifications_state_rx, next_notification_id: 0, finished_height: None, }, @@ -93,7 +95,7 @@ impl ExExHandle { cx: &mut Context<'_>, (notification_id, notification): &(usize, ExExNotification), ) -> Poll>> { - if !self.handle_rx.borrow().is_active() { + if !self.notifications_state_rx.borrow().is_active() { return Poll::Ready(Ok(())) } @@ -150,50 +152,57 @@ impl ExExHandle { } } +/// A subscriber for [`ExExNotifications`]. #[derive(Debug)] pub struct ExExNotificationsSubscriber { notifications: ExExNotifications, - handle_tx: watch::Sender, + state_tx: watch::Sender, } impl ExExNotificationsSubscriber { - pub(crate) fn new( + /// Creates a new [`ExExNotificationsSubscriber`]. + pub fn new( receiver: Receiver, - handle_tx: watch::Sender, + state_tx: watch::Sender, ) -> Self { - Self { - notifications: ExExNotifications { receiver, handle_tx: handle_tx.clone() }, - handle_tx, - } + Self { notifications: ExExNotifications { receiver, state_tx: state_tx.clone() }, state_tx } } + /// Subscribe to notifications with the given head. pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { - self.handle_tx.send(ExExHandleState::Active(Some(head))).unwrap(); + self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); &mut self.notifications } + /// Subscribe to notifications. pub fn subscribe(&mut self) -> &mut ExExNotifications { - self.handle_tx.send(ExExHandleState::Active(None)).unwrap(); + self.state_tx.send(ExExNotificationsState::Active(None)).unwrap(); &mut self.notifications } } +#[allow(clippy::doc_markdown)] +/// A state of the ExEx notifications subscription. #[derive(Debug)] -pub enum ExExHandleState { +pub enum ExExNotificationsState { + /// The subscription is active and will receive notifications according to the given head, if + /// provided. Active(Option), + /// The subscription is inactive. Inactive, } -impl ExExHandleState { +impl ExExNotificationsState { pub(crate) const fn is_active(&self) -> bool { matches!(self, Self::Active(_)) } } +/// An active subscription to [`ExExNotification`]s. #[derive(Debug)] pub struct ExExNotifications { receiver: Receiver, - handle_tx: watch::Sender, + state_tx: watch::Sender, } impl Deref for ExExNotifications { @@ -212,7 +221,7 @@ impl DerefMut for ExExNotifications { impl Drop for ExExNotifications { fn drop(&mut self) { - let _ = self.handle_tx.send(ExExHandleState::Inactive); + let _ = self.state_tx.send(ExExNotificationsState::Inactive); } } diff --git a/crates/exex/types/Cargo.toml b/crates/exex/types/Cargo.toml index a70bcc1dd43c..75cd498cd173 100644 --- a/crates/exex/types/Cargo.toml +++ b/crates/exex/types/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-primitives.workspace = true reth-provider.workspace = true # reth diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs new file mode 100644 index 000000000000..c2b3c3a352e4 --- /dev/null +++ b/crates/exex/types/src/head.rs @@ -0,0 +1,11 @@ +use reth_primitives::{BlockHash, BlockNumber}; + +#[allow(clippy::doc_markdown)] +/// A head of the ExEx. It should determine the highest block committed to the internal ExEx state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExExHead { + /// The number of the ExEx head block. + pub number: BlockNumber, + /// The hash of the ExEx head block. + pub hash: BlockHash, +} From cf22e41c2a9ddbcb00284ac4d8ae51623af30ac5 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Aug 2024 14:53:27 +0100 Subject: [PATCH 03/29] doc comment --- crates/exex/exex/src/manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 201cfd8bdaba..8bb35901e97b 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -47,6 +47,7 @@ pub struct ExExHandle { sender: PollSender, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, + /// The state of the notifications channel. notifications_state_rx: watch::Receiver, /// The ID of the next notification to send to this `ExEx`. next_notification_id: usize, From aff910e2caa15293b2ddfcbb82e6ec7024646ca8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Aug 2024 14:54:47 +0100 Subject: [PATCH 04/29] more comments --- crates/exex/exex/src/context.rs | 2 +- crates/exex/exex/src/manager.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 131b081ca14a..1307a685d238 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -28,7 +28,7 @@ pub struct ExExContext { /// /// # Important /// - /// Once a an [`ExExNotification`] is sent over the channel, it is considered delivered by the + /// Once an [`ExExNotification`] is sent over the channel, it is considered delivered by the /// node. pub notifications: ExExNotificationsSubscriber, diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 8bb35901e97b..83a8f30d3d7c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -96,6 +96,7 @@ impl ExExHandle { cx: &mut Context<'_>, (notification_id, notification): &(usize, ExExNotification), ) -> Poll>> { + // If the notifications channel is not active, we don't need to send any notifications. if !self.notifications_state_rx.borrow().is_active() { return Poll::Ready(Ok(())) } @@ -169,7 +170,8 @@ impl ExExNotificationsSubscriber { Self { notifications: ExExNotifications { receiver, state_tx: state_tx.clone() }, state_tx } } - /// Subscribe to notifications with the given head. + /// Subscribe to notifications with the given head. Notifications will be sent starting from the + /// head, not inclusive. pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); &mut self.notifications From 5725e0bb0105db003b9bc4127ce22b68c1eeb170 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Aug 2024 16:03:28 +0100 Subject: [PATCH 05/29] use derive_more for deref, deref mut --- Cargo.lock | 1 + crates/exex/exex/Cargo.toml | 5 +++-- crates/exex/exex/src/manager.rs | 18 +++--------------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d75886ac2263..f5cdd0d0a544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7387,6 +7387,7 @@ dependencies = [ name = "reth-exex" version = "1.0.5" dependencies = [ + "derive_more 1.0.0", "eyre", "futures", "metrics", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 1ad906e6f010..a455392c1865 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -20,8 +20,8 @@ reth-metrics.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true reth-payload-builder.workspace = true -reth-primitives-traits.workspace = true reth-primitives = { workspace = true, features = ["secp256k1"] } +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-prune-types.workspace = true reth-revm.workspace = true @@ -35,6 +35,7 @@ tokio-util.workspace = true tokio.workspace = true ## misc +derive_more.workspace = true eyre.workspace = true metrics.workspace = true @@ -45,9 +46,9 @@ reth-db-api.workspace = true reth-db-common.workspace = true reth-evm-ethereum.workspace = true reth-node-api.workspace = true +reth-primitives-traits = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-testing-utils.workspace = true -reth-primitives-traits = { workspace = true, features = ["test-utils"] } secp256k1.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 83a8f30d3d7c..72c26b224f9f 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -202,26 +202,14 @@ impl ExExNotificationsState { } /// An active subscription to [`ExExNotification`]s. -#[derive(Debug)] +#[derive(Debug, derive_more::Deref, derive_more::DerefMut)] pub struct ExExNotifications { + #[deref] + #[deref_mut] receiver: Receiver, state_tx: watch::Sender, } -impl Deref for ExExNotifications { - type Target = Receiver; - - fn deref(&self) -> &Self::Target { - &self.receiver - } -} - -impl DerefMut for ExExNotifications { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.receiver - } -} - impl Drop for ExExNotifications { fn drop(&mut self) { let _ = self.state_tx.send(ExExNotificationsState::Inactive); From 342b15faa1bf5dd1342abb5441f590ad180c6935 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 28 Aug 2024 16:05:56 +0100 Subject: [PATCH 06/29] more comments --- crates/exex/exex/src/manager.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 72c26b224f9f..0f17d7b39862 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -172,12 +172,19 @@ impl ExExNotificationsSubscriber { /// Subscribe to notifications with the given head. Notifications will be sent starting from the /// head, not inclusive. + /// + /// When the return value is dropped, the subscription is cancelled. pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); &mut self.notifications } /// Subscribe to notifications. + /// + /// When the return value is dropped, the subscription is cancelled. + /// It means that you will miss some of the notifications that could have arrived while the + /// subscription is inactive. To prevent this, you can use [`Self::subscribe_with_head`] to + /// explicitly pass the head where you stopped and continue from there. pub fn subscribe(&mut self) -> &mut ExExNotifications { self.state_tx.send(ExExNotificationsState::Active(None)).unwrap(); &mut self.notifications From 1dd3a8bba7039a92170610e3b2acb2176aafb4f4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Aug 2024 09:32:02 +0100 Subject: [PATCH 07/29] comment subscribe with head for now --- crates/exex/exex/src/manager.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 0f17d7b39862..bde10a73af97 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -170,21 +170,24 @@ impl ExExNotificationsSubscriber { Self { notifications: ExExNotifications { receiver, state_tx: state_tx.clone() }, state_tx } } - /// Subscribe to notifications with the given head. Notifications will be sent starting from the - /// head, not inclusive. - /// - /// When the return value is dropped, the subscription is cancelled. - pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { - self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); - &mut self.notifications - } + // TODO(alexey): uncomment when we have a way to subscribe to notifications with a head + // /// Subscribe to notifications with the given head. Notifications will be sent starting from + // /// the head, not inclusive. For example, if `head.number == 10`, then the first + // /// notification will be with `block.number == 11`. + // /// + // /// When the return value is dropped, the subscription is cancelled. + // pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { + // self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); + // &mut self.notifications + // } /// Subscribe to notifications. /// /// When the return value is dropped, the subscription is cancelled. /// It means that you will miss some of the notifications that could have arrived while the - /// subscription is inactive. To prevent this, you can use [`Self::subscribe_with_head`] to - /// explicitly pass the head where you stopped and continue from there. + /// subscription is inactive. + // /// To prevent this, you can use [`Self::subscribe_with_head`] to + // /// explicitly pass the head where you stopped and continue from there. pub fn subscribe(&mut self) -> &mut ExExNotifications { self.state_tx.send(ExExNotificationsState::Active(None)).unwrap(); &mut self.notifications From 6edafa7d790ecba3de531882ffea3782baa4c6d8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Aug 2024 09:34:50 +0100 Subject: [PATCH 08/29] fix test utils --- crates/exex/exex/src/manager.rs | 1 - crates/exex/test-utils/src/lib.rs | 15 ++++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index bde10a73af97..4b6e8a9c4ec2 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -7,7 +7,6 @@ use reth_tracing::tracing::debug; use std::{ collections::VecDeque, future::{poll_fn, Future}, - ops::{Deref, DerefMut}, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index b363823eab04..7d50508fdffd 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -16,7 +16,9 @@ use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_db_common::init::init_genesis; use reth_evm::test_utils::MockExecutorProvider; use reth_execution_types::Chain; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_exex::{ + ExExContext, ExExEvent, ExExNotification, ExExNotificationsState, ExExNotificationsSubscriber, +}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes}; use reth_node_builder::{ @@ -46,7 +48,10 @@ use std::{ task::Poll, }; use thiserror::Error; -use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::sync::{ + mpsc::{Sender, UnboundedReceiver}, + watch, +}; /// A test [`PoolBuilder`] that builds a [`TestPool`]. #[derive(Debug, Default, Clone, Copy)] @@ -278,13 +283,17 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); + let notifications = ExExNotificationsSubscriber::new( + notifications_rx, + watch::channel(ExExNotificationsState::Inactive).0, + ); let ctx = ExExContext { head, config: NodeConfig::test(), reth_config: reth_config::Config::default(), events: events_tx, - notifications: notifications_rx, + notifications, components, }; From d33281e6b58daad21db3ca6e203c2c5e251161b4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Aug 2024 09:38:09 +0100 Subject: [PATCH 09/29] fix doc links --- crates/exex/exex/src/context.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 1307a685d238..775acc865614 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -24,12 +24,12 @@ pub struct ExExContext { /// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what /// blocks to receive notifications for. pub events: UnboundedSender, - /// Channel to receive [`ExExNotification`]s. + /// Channel to receive [`ExExNotification`](crate::ExExNotification)s. /// /// # Important /// - /// Once an [`ExExNotification`] is sent over the channel, it is considered delivered by the - /// node. + /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is + /// considered delivered by the node. pub notifications: ExExNotificationsSubscriber, /// node components From 5c54005425c9fb7f3a95bbe83ca58fcbe6544d64 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Aug 2024 09:59:14 +0100 Subject: [PATCH 10/29] test inactive functionality --- crates/exex/exex/src/manager.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 4b6e8a9c4ec2..9067cf02ccaa 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -817,7 +817,6 @@ mod tests { #[tokio::test] async fn exex_handle_new() { let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); - let notifications_rx = notifications.subscribe(); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -843,7 +842,20 @@ mod tests { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - // Send a notification and ensure it's received correctly + // Send a notification and ensure it's not received because the subscription is inactive + match exex_handle.send(&mut cx, &(22, notification.clone())) { + Poll::Ready(Ok(())) => { + assert!( + notifications.notifications.receiver.is_empty(), + "Receiver channel should be empty" + ); + } + Poll::Pending => panic!("Notification send is pending"), + Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e), + } + + // Send a notification and ensure it's received correctly because the subscription is active + let notifications_rx = notifications.subscribe(); match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { let received_notification = notifications_rx.recv().await.unwrap(); From a07345dab951fcfd7021f369752b69fbe74d5ec4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 29 Aug 2024 10:02:20 +0100 Subject: [PATCH 11/29] update book --- book/developers/exex/hello-world.md | 3 ++- book/developers/exex/remote.md | 6 ++++-- book/developers/exex/tracking-state.md | 12 +++++++++++- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/book/developers/exex/hello-world.md b/book/developers/exex/hello-world.md index b15a203cddbb..9f609bb3c683 100644 --- a/book/developers/exex/hello-world.md +++ b/book/developers/exex/hello-world.md @@ -107,7 +107,8 @@ use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; async fn my_exex(mut ctx: ExExContext) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + let notifications = ctx.notifications.subscribe(); + while let Some(notification) = notifications.recv().await { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); diff --git a/book/developers/exex/remote.md b/book/developers/exex/remote.md index 2e7e7dad10b0..a49070c6ef22 100644 --- a/book/developers/exex/remote.md +++ b/book/developers/exex/remote.md @@ -274,7 +274,8 @@ async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + let notifications = ctx.notifications.subscribe(); + while let Some(notification) = notifications.recv().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; @@ -381,7 +382,8 @@ async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - while let Some(notification) = ctx.notifications.recv().await { + let notifications = ctx.notifications.subscribe(); + while let Some(notification) = notifications.recv().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index 5fe8b1c9ef83..fac7b0aab79c 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -32,6 +32,16 @@ use reth_tracing::tracing::info; struct MyExEx { ctx: ExExContext, + notifications: ExExNotifications, +} + +impl MyExEx { + fn new(ctx: ExExContext) -> Self { + Self { + ctx, + notifications: ctx.notifications.subscribe(), + } + } } impl Future for MyExEx { @@ -40,7 +50,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { + while let Some(notification) = ready!(this.notifications.poll_recv(cx)) { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); From acc8310bf90660a4d85cc6b579207c13870a817e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 13:30:25 +0100 Subject: [PATCH 12/29] use two streams and no state --- book/developers/exex/tracking-state.md | 2 +- crates/exex/exex/src/manager.rs | 118 +++++++++++-------------- 2 files changed, 53 insertions(+), 67 deletions(-) diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index fac7b0aab79c..5747b9ca9428 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -26,7 +26,7 @@ use std::{ }; use reth::api::FullNodeComponents; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 9067cf02ccaa..ee88f09d3e4c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,4 +1,5 @@ use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; +use futures::{channel::oneshot, Stream, StreamExt}; use metrics::Gauge; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; @@ -18,7 +19,10 @@ use tokio::sync::{ mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender}, watch, }; -use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; +use tokio_util::{ + either::Either, + sync::{PollSendError, PollSender, ReusableBoxFuture}, +}; /// Metrics for an `ExEx`. #[derive(Metrics)] @@ -41,16 +45,12 @@ pub struct ExExHandle { id: String, /// Metrics for an `ExEx`. metrics: ExExMetrics, - /// Channel to send [`ExExNotification`]s to the `ExEx`. sender: PollSender, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, - /// The state of the notifications channel. - notifications_state_rx: watch::Receiver, /// The ID of the next notification to send to this `ExEx`. next_notification_id: usize, - /// The finished block number of the `ExEx`. /// /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event. @@ -65,11 +65,7 @@ impl ExExHandle { pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let (notifications_state_tx, notifications_state_rx) = - watch::channel(ExExNotificationsState::Inactive); - - let notifications = - ExExNotificationsSubscriber::new(notification_rx, notifications_state_tx); + let notifications = ExExNotificationsSubscriber::new(notification_rx); ( Self { @@ -77,7 +73,6 @@ impl ExExHandle { metrics: ExExMetrics::new_with_labels(&[("exex", id)]), sender: PollSender::new(notification_tx), receiver: event_rx, - notifications_state_rx, next_notification_id: 0, finished_height: None, }, @@ -95,11 +90,6 @@ impl ExExHandle { cx: &mut Context<'_>, (notification_id, notification): &(usize, ExExNotification), ) -> Poll>> { - // If the notifications channel is not active, we don't need to send any notifications. - if !self.notifications_state_rx.borrow().is_active() { - return Poll::Ready(Ok(())) - } - if let Some(finished_height) = self.finished_height { match notification { ExExNotification::ChainCommitted { new } => { @@ -153,75 +143,71 @@ impl ExExHandle { } } +/// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`] or +/// [`ExExNotificationsSubscriber::subscribe_with_head`]. +pub type ExExNotifications<'a> = + Either, ExExNotificationsWithHead<'a>>; + /// A subscriber for [`ExExNotifications`]. #[derive(Debug)] pub struct ExExNotificationsSubscriber { - notifications: ExExNotifications, - state_tx: watch::Sender, + notifications: Receiver, } impl ExExNotificationsSubscriber { /// Creates a new [`ExExNotificationsSubscriber`]. - pub fn new( - receiver: Receiver, - state_tx: watch::Sender, - ) -> Self { - Self { notifications: ExExNotifications { receiver, state_tx: state_tx.clone() }, state_tx } - } - - // TODO(alexey): uncomment when we have a way to subscribe to notifications with a head - // /// Subscribe to notifications with the given head. Notifications will be sent starting from - // /// the head, not inclusive. For example, if `head.number == 10`, then the first - // /// notification will be with `block.number == 11`. - // /// - // /// When the return value is dropped, the subscription is cancelled. - // pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications { - // self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap(); - // &mut self.notifications - // } + pub const fn new(notifications: Receiver) -> Self { + Self { notifications } + } /// Subscribe to notifications. + pub fn subscribe(&mut self) -> ExExNotifications<'_> { + ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) + } + + /// Subscribe to notifications with the given head. /// - /// When the return value is dropped, the subscription is cancelled. - /// It means that you will miss some of the notifications that could have arrived while the - /// subscription is inactive. - // /// To prevent this, you can use [`Self::subscribe_with_head`] to - // /// explicitly pass the head where you stopped and continue from there. - pub fn subscribe(&mut self) -> &mut ExExNotifications { - self.state_tx.send(ExExNotificationsState::Active(None)).unwrap(); - &mut self.notifications + /// Notifications will be sent starting from the head, not inclusive. For example, if + /// `head.number == 10`, then the first notification will be with `block.number == 11`. + pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { + ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) } } -#[allow(clippy::doc_markdown)] -/// A state of the ExEx notifications subscription. +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. #[derive(Debug)] -pub enum ExExNotificationsState { - /// The subscription is active and will receive notifications according to the given head, if - /// provided. - Active(Option), - /// The subscription is inactive. - Inactive, -} +pub struct ExExNotificationsWithoutHead<'a>(&'a mut Receiver); + +impl Stream for ExExNotificationsWithoutHead<'_> { + type Item = ExExNotification; -impl ExExNotificationsState { - pub(crate) const fn is_active(&self) -> bool { - matches!(self, Self::Active(_)) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().0.poll_recv(cx) } } -/// An active subscription to [`ExExNotification`]s. -#[derive(Debug, derive_more::Deref, derive_more::DerefMut)] -pub struct ExExNotifications { - #[deref] - #[deref_mut] - receiver: Receiver, - state_tx: watch::Sender, -} +/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are +/// committed or reverted after the given head. +#[derive(Debug)] +pub struct ExExNotificationsWithHead<'a>(&'a mut Receiver, ExExHead); + +impl Stream for ExExNotificationsWithHead<'_> { + type Item = ExExNotification; -impl Drop for ExExNotifications { - fn drop(&mut self) { - let _ = self.state_tx.send(ExExNotificationsState::Inactive); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + let Some(notification) = ready!(this.0.poll_recv(cx)) else { return Poll::Ready(None) }; + + if notification + .committed_chain() + .or_else(|| notification.reverted_chain()) + .map_or(false, |chain| chain.first().number > this.1.number) + { + return Poll::Ready(Some(notification)) + } + } } } From c8c89f4c681b739dd4cddedd8a303995271afe0c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 14:58:56 +0100 Subject: [PATCH 13/29] remove unused imports and deps --- Cargo.lock | 1 - crates/exex/exex/Cargo.toml | 1 - crates/exex/exex/src/manager.rs | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0dffd6f96861..e064a4ddaf95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7173,7 +7173,6 @@ dependencies = [ name = "reth-exex" version = "1.0.6" dependencies = [ - "derive_more 1.0.0", "eyre", "futures", "metrics", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index a455392c1865..f3decd337417 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -35,7 +35,6 @@ tokio-util.workspace = true tokio.workspace = true ## misc -derive_more.workspace = true eyre.workspace = true metrics.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index ee88f09d3e4c..2633cb771221 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,5 +1,5 @@ use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; -use futures::{channel::oneshot, Stream, StreamExt}; +use futures::Stream; use metrics::Gauge; use reth_exex_types::ExExHead; use reth_metrics::{metrics::Counter, Metrics}; From 83cfcc1cd7508d8b0c523383271235cecfa91826 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 15:06:49 +0100 Subject: [PATCH 14/29] fix tests --- crates/exex/exex/src/manager.rs | 43 ++++++++++++++----------------- crates/exex/test-utils/src/lib.rs | 14 +++------- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 2633cb771221..7feb8f0db756 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -545,6 +545,7 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { use super::*; + use futures::{FutureExt, StreamExt}; use reth_primitives::{SealedBlockWithSenders, B256}; use reth_provider::Chain; @@ -828,23 +829,11 @@ mod tests { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - // Send a notification and ensure it's not received because the subscription is inactive + // Send a notification and ensure it's received correctly + let mut notifications_rx = notifications.subscribe(); match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - assert!( - notifications.notifications.receiver.is_empty(), - "Receiver channel should be empty" - ); - } - Poll::Pending => panic!("Notification send is pending"), - Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e), - } - - // Send a notification and ensure it's received correctly because the subscription is active - let notifications_rx = notifications.subscribe(); - match exex_handle.send(&mut cx, &(22, notification.clone())) { - Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.recv().await.unwrap(); + let received_notification = notifications_rx.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -858,7 +847,7 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); - let notifications_rx = notifications.subscribe(); + let mut notifications_rx = notifications.subscribe(); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -876,9 +865,17 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification)) { Poll::Ready(Ok(())) => { - // The notification should be skipped, so nothing should be sent. - // Check that the receiver channel is indeed empty - assert!(notifications_rx.try_recv().is_err(), "Receiver channel should be empty"); + poll_fn(|cx| { + // The notification should be skipped, so nothing should be sent. + // Check that the receiver channel is indeed empty + assert_eq!( + notifications_rx.next().poll_unpin(cx), + Poll::Pending, + "Receiver channel should be empty" + ); + Poll::Ready(()) + }) + .await; } Poll::Pending | Poll::Ready(Err(_)) => { panic!("Notification should not be pending or fail"); @@ -892,7 +889,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); - let notifications_rx = notifications.subscribe(); + let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -908,7 +905,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.recv().await.unwrap(); + let received_notification = notifications_rx.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -923,7 +920,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); - let notifications_rx = notifications.subscribe(); + let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -936,7 +933,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.recv().await.unwrap(); + let received_notification = notifications_rx.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 7d50508fdffd..493d67032101 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -16,9 +16,7 @@ use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_db_common::init::init_genesis; use reth_evm::test_utils::MockExecutorProvider; use reth_execution_types::Chain; -use reth_exex::{ - ExExContext, ExExEvent, ExExNotification, ExExNotificationsState, ExExNotificationsSubscriber, -}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotificationsSubscriber}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes}; use reth_node_builder::{ @@ -48,10 +46,7 @@ use std::{ task::Poll, }; use thiserror::Error; -use tokio::sync::{ - mpsc::{Sender, UnboundedReceiver}, - watch, -}; +use tokio::sync::mpsc::{Sender, UnboundedReceiver}; /// A test [`PoolBuilder`] that builds a [`TestPool`]. #[derive(Debug, Default, Clone, Copy)] @@ -283,10 +278,7 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotificationsSubscriber::new( - notifications_rx, - watch::channel(ExExNotificationsState::Inactive).0, - ); + let notifications = ExExNotificationsSubscriber::new(notifications_rx); let ctx = ExExContext { head, From 689e5381650da5d8ac40cc997e120773f24d5de3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 15:08:19 +0100 Subject: [PATCH 15/29] update book --- book/developers/exex/hello-world.md | 4 ++-- book/developers/exex/tracking-state.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/book/developers/exex/hello-world.md b/book/developers/exex/hello-world.md index 9f609bb3c683..d3c94f9019be 100644 --- a/book/developers/exex/hello-world.md +++ b/book/developers/exex/hello-world.md @@ -107,8 +107,8 @@ use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; async fn my_exex(mut ctx: ExExContext) -> eyre::Result<()> { - let notifications = ctx.notifications.subscribe(); - while let Some(notification) = notifications.recv().await { + let mut notifications = ctx.notifications.subscribe(); + while let Some(notification) = notifications.next().await { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index 5747b9ca9428..f78791d0c100 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -50,7 +50,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.notifications.poll_recv(cx)) { + while let Some(notification) = ready!(this.notifications.poll_next_unpin(cx)) { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); From 6075bdbe5d2caa022eecd95768033e1689b7985e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 15:39:21 +0100 Subject: [PATCH 16/29] add a todo comment about backfill --- crates/exex/exex/src/manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 7feb8f0db756..7e241e5c7474 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -197,6 +197,7 @@ impl Stream for ExExNotificationsWithHead<'_> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + // TODO(alexey): backfill according to the head loop { let Some(notification) = ready!(this.0.poll_recv(cx)) else { return Poll::Ready(None) }; From 7beb8d9e7884185322584a4dbcbfed490413a434 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 16:43:36 +0100 Subject: [PATCH 17/29] comment subscribe_with_head --- crates/exex/exex/src/manager.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 7e241e5c7474..6ff5290a74e4 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -165,13 +165,14 @@ impl ExExNotificationsSubscriber { ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) } - /// Subscribe to notifications with the given head. - /// - /// Notifications will be sent starting from the head, not inclusive. For example, if - /// `head.number == 10`, then the first notification will be with `block.number == 11`. - pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { - ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) - } + // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` + // /// Subscribe to notifications with the given head. + // /// + // /// Notifications will be sent starting from the head, not inclusive. For example, if + // /// `head.number == 10`, then the first notification will be with `block.number == 11`. + // pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { + // ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) + // } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. From 49feb0ef704be631387e3e1b38186f0300c054ab Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 16:43:58 +0100 Subject: [PATCH 18/29] Revert "comment subscribe_with_head" This reverts commit 7beb8d9e7884185322584a4dbcbfed490413a434. --- crates/exex/exex/src/manager.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 6ff5290a74e4..7e241e5c7474 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -165,14 +165,13 @@ impl ExExNotificationsSubscriber { ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) } - // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` - // /// Subscribe to notifications with the given head. - // /// - // /// Notifications will be sent starting from the head, not inclusive. For example, if - // /// `head.number == 10`, then the first notification will be with `block.number == 11`. - // pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { - // ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) - // } + /// Subscribe to notifications with the given head. + /// + /// Notifications will be sent starting from the head, not inclusive. For example, if + /// `head.number == 10`, then the first notification will be with `block.number == 11`. + pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { + ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) + } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. From 47a0f324d28461abd79a0707d7e269c86cb63837 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 16:54:39 +0100 Subject: [PATCH 19/29] Revert "Revert "comment subscribe_with_head"" This reverts commit 49feb0ef704be631387e3e1b38186f0300c054ab. --- crates/exex/exex/src/manager.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 7e241e5c7474..6ff5290a74e4 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -165,13 +165,14 @@ impl ExExNotificationsSubscriber { ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) } - /// Subscribe to notifications with the given head. - /// - /// Notifications will be sent starting from the head, not inclusive. For example, if - /// `head.number == 10`, then the first notification will be with `block.number == 11`. - pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { - ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) - } + // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` + // /// Subscribe to notifications with the given head. + // /// + // /// Notifications will be sent starting from the head, not inclusive. For example, if + // /// `head.number == 10`, then the first notification will be with `block.number == 11`. + // pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { + // ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) + // } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. From 1e5f6c366aa4d008fac6f68dcad5c653ac9a49b6 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 3 Sep 2024 17:06:25 +0100 Subject: [PATCH 20/29] fix docs --- crates/exex/exex/src/manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 6ff5290a74e4..a0ee83a41ee2 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -143,8 +143,7 @@ impl ExExHandle { } } -/// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`] or -/// [`ExExNotificationsSubscriber::subscribe_with_head`]. +/// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`]. pub type ExExNotifications<'a> = Either, ExExNotificationsWithHead<'a>>; From 0e32dc027cf7c94fb96e716aec99cbdbbbe6d51d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 4 Sep 2024 10:25:10 +0100 Subject: [PATCH 21/29] use blocknumhash --- crates/exex/exex/src/manager.rs | 2 +- crates/exex/types/src/head.rs | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index a0ee83a41ee2..e410a52aab9c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -204,7 +204,7 @@ impl Stream for ExExNotificationsWithHead<'_> { if notification .committed_chain() .or_else(|| notification.reverted_chain()) - .map_or(false, |chain| chain.first().number > this.1.number) + .map_or(false, |chain| chain.first().number > this.1.block.number) { return Poll::Ready(Some(notification)) } diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs index c2b3c3a352e4..1552c2380fdb 100644 --- a/crates/exex/types/src/head.rs +++ b/crates/exex/types/src/head.rs @@ -1,11 +1,9 @@ -use reth_primitives::{BlockHash, BlockNumber}; +use reth_primitives::BlockNumHash; #[allow(clippy::doc_markdown)] -/// A head of the ExEx. It should determine the highest block committed to the internal ExEx state. +/// A head of the ExEx. It determines the highest block committed to the internal ExEx state. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ExExHead { - /// The number of the ExEx head block. - pub number: BlockNumber, - /// The hash of the ExEx head block. - pub hash: BlockHash, + /// The head block. + pub block: BlockNumHash, } From d4eb1bb0feeb7f35efdc1df0204b4dae0daf07f5 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 4 Sep 2024 14:32:15 +0100 Subject: [PATCH 22/29] consume subscriber on subscribe --- crates/exex/exex/src/manager.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index e410a52aab9c..93a2a7700781 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -144,8 +144,7 @@ impl ExExHandle { } /// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`]. -pub type ExExNotifications<'a> = - Either, ExExNotificationsWithHead<'a>>; +pub type ExExNotifications = Either; /// A subscriber for [`ExExNotifications`]. #[derive(Debug)] @@ -160,8 +159,8 @@ impl ExExNotificationsSubscriber { } /// Subscribe to notifications. - pub fn subscribe(&mut self) -> ExExNotifications<'_> { - ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) + pub fn subscribe(self) -> ExExNotifications { + ExExNotifications::Left(ExExNotificationsWithoutHead(self.notifications)) } // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` @@ -169,16 +168,16 @@ impl ExExNotificationsSubscriber { // /// // /// Notifications will be sent starting from the head, not inclusive. For example, if // /// `head.number == 10`, then the first notification will be with `block.number == 11`. - // pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { - // ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) + // pub fn subscribe_with_head(head: ExExHead) -> ExExNotifications { + // ExExNotifications::Right(ExExNotificationsWithHead(self.notifications, head)) // } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. #[derive(Debug)] -pub struct ExExNotificationsWithoutHead<'a>(&'a mut Receiver); +pub struct ExExNotificationsWithoutHead(Receiver); -impl Stream for ExExNotificationsWithoutHead<'_> { +impl Stream for ExExNotificationsWithoutHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -189,9 +188,9 @@ impl Stream for ExExNotificationsWithoutHead<'_> { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead<'a>(&'a mut Receiver, ExExHead); +pub struct ExExNotificationsWithHead(Receiver, ExExHead); -impl Stream for ExExNotificationsWithHead<'_> { +impl Stream for ExExNotificationsWithHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -804,7 +803,7 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -847,7 +846,7 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); // Set finished_height to a value higher than the block tip @@ -889,7 +888,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReorged { @@ -920,7 +919,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; From 5d0385a5c5f87e4edcf5bf0b88408523db405cf9 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 5 Sep 2024 17:20:40 +0100 Subject: [PATCH 23/29] no either type, just return separate stream types --- crates/exex/exex/src/manager.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 93a2a7700781..664387736573 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -19,10 +19,7 @@ use tokio::sync::{ mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender}, watch, }; -use tokio_util::{ - either::Either, - sync::{PollSendError, PollSender, ReusableBoxFuture}, -}; +use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; /// Metrics for an `ExEx`. #[derive(Metrics)] @@ -143,10 +140,7 @@ impl ExExHandle { } } -/// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`]. -pub type ExExNotifications = Either; - -/// A subscriber for [`ExExNotifications`]. +/// A subscriber for a stream of [`ExExNotification`]s. #[derive(Debug)] pub struct ExExNotificationsSubscriber { notifications: Receiver, @@ -160,7 +154,7 @@ impl ExExNotificationsSubscriber { /// Subscribe to notifications. pub fn subscribe(self) -> ExExNotifications { - ExExNotifications::Left(ExExNotificationsWithoutHead(self.notifications)) + ExExNotifications(self.notifications) } // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` @@ -168,16 +162,16 @@ impl ExExNotificationsSubscriber { // /// // /// Notifications will be sent starting from the head, not inclusive. For example, if // /// `head.number == 10`, then the first notification will be with `block.number == 11`. - // pub fn subscribe_with_head(head: ExExHead) -> ExExNotifications { - // ExExNotifications::Right(ExExNotificationsWithHead(self.notifications, head)) + // pub fn subscribe_with_head(head: ExExHead) -> ExExNotificationsWithHead { + // ExExNotificationsWithHead(self.notifications, head) // } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. #[derive(Debug)] -pub struct ExExNotificationsWithoutHead(Receiver); +pub struct ExExNotifications(Receiver); -impl Stream for ExExNotificationsWithoutHead { +impl Stream for ExExNotifications { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { From e3ddac49592f3e29638e7a00ba4cf3025d56f7fb Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 5 Sep 2024 17:35:52 +0100 Subject: [PATCH 24/29] revertme: init backfill job factory --- crates/exex/exex/src/manager.rs | 29 ++++++++++++++++++-------- crates/node/builder/src/launch/exex.rs | 10 +++++++-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 664387736573..4bdc34af93d5 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,4 +1,4 @@ -use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; +use crate::{BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight}; use futures::Stream; use metrics::Gauge; use reth_exex_types::ExExHead; @@ -59,10 +59,13 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { + pub fn new( + id: String, + backfill_job_factory: BackfillJobFactory, + ) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotificationsSubscriber::new(notification_rx); + let notifications = ExExNotificationsSubscriber::new(notification_rx, backfill_job_factory); ( Self { @@ -142,14 +145,18 @@ impl ExExHandle { /// A subscriber for a stream of [`ExExNotification`]s. #[derive(Debug)] -pub struct ExExNotificationsSubscriber { +pub struct ExExNotificationsSubscriber { notifications: Receiver, + backfill_job_factory: BackfillJobFactory, } -impl ExExNotificationsSubscriber { +impl ExExNotificationsSubscriber { /// Creates a new [`ExExNotificationsSubscriber`]. - pub const fn new(notifications: Receiver) -> Self { - Self { notifications } + pub const fn new( + notifications: Receiver, + backfill_job_factory: BackfillJobFactory, + ) -> Self { + Self { notifications, backfill_job_factory } } /// Subscribe to notifications. @@ -182,9 +189,13 @@ impl Stream for ExExNotifications { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead(Receiver, ExExHead); +pub struct ExExNotificationsWithHead( + Receiver, + ExExHead, + BackfillJobFactory, +); -impl Stream for ExExNotificationsWithHead { +impl Stream for ExExNotificationsWithHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 86bb14ecf551..b50b2af7d32b 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -3,7 +3,7 @@ use std::{fmt, fmt::Debug}; use futures::future; -use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; +use reth_exex::{BackfillJobFactory, ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; use reth_node_api::FullNodeComponents; use reth_primitives::Head; use reth_provider::CanonStateSubscriptions; @@ -46,9 +46,15 @@ impl ExExLauncher { let mut exex_handles = Vec::with_capacity(extensions.len()); let mut exexes = Vec::with_capacity(extensions.len()); + let backfill_job_factory = BackfillJobFactory::new( + components.block_executor().clone(), + components.provider().clone(), + ); + for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = ExExHandle::new(id.clone()); + let (handle, events, notifications) = + ExExHandle::new(id.clone(), backfill_job_factory.clone()); exex_handles.push(handle); // create the launch context for the exex From d6e700376324db498bd240dd9bbb6eeb860a3abe Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 5 Sep 2024 17:40:13 +0100 Subject: [PATCH 25/29] Revert "revertme: init backfill job factory" This reverts commit e3ddac49592f3e29638e7a00ba4cf3025d56f7fb. --- crates/exex/exex/src/manager.rs | 29 ++++++++------------------ crates/node/builder/src/launch/exex.rs | 10 ++------- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 4bdc34af93d5..664387736573 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,4 +1,4 @@ -use crate::{BackfillJobFactory, ExExEvent, ExExNotification, FinishedExExHeight}; +use crate::{ExExEvent, ExExNotification, FinishedExExHeight}; use futures::Stream; use metrics::Gauge; use reth_exex_types::ExExHead; @@ -59,13 +59,10 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new( - id: String, - backfill_job_factory: BackfillJobFactory, - ) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { + pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotificationsSubscriber::new(notification_rx, backfill_job_factory); + let notifications = ExExNotificationsSubscriber::new(notification_rx); ( Self { @@ -145,18 +142,14 @@ impl ExExHandle { /// A subscriber for a stream of [`ExExNotification`]s. #[derive(Debug)] -pub struct ExExNotificationsSubscriber { +pub struct ExExNotificationsSubscriber { notifications: Receiver, - backfill_job_factory: BackfillJobFactory, } -impl ExExNotificationsSubscriber { +impl ExExNotificationsSubscriber { /// Creates a new [`ExExNotificationsSubscriber`]. - pub const fn new( - notifications: Receiver, - backfill_job_factory: BackfillJobFactory, - ) -> Self { - Self { notifications, backfill_job_factory } + pub const fn new(notifications: Receiver) -> Self { + Self { notifications } } /// Subscribe to notifications. @@ -189,13 +182,9 @@ impl Stream for ExExNotifications { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead( - Receiver, - ExExHead, - BackfillJobFactory, -); +pub struct ExExNotificationsWithHead(Receiver, ExExHead); -impl Stream for ExExNotificationsWithHead { +impl Stream for ExExNotificationsWithHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index b50b2af7d32b..86bb14ecf551 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -3,7 +3,7 @@ use std::{fmt, fmt::Debug}; use futures::future; -use reth_exex::{BackfillJobFactory, ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; +use reth_exex::{ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; use reth_node_api::FullNodeComponents; use reth_primitives::Head; use reth_provider::CanonStateSubscriptions; @@ -46,15 +46,9 @@ impl ExExLauncher { let mut exex_handles = Vec::with_capacity(extensions.len()); let mut exexes = Vec::with_capacity(extensions.len()); - let backfill_job_factory = BackfillJobFactory::new( - components.block_executor().clone(), - components.provider().clone(), - ); - for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = - ExExHandle::new(id.clone(), backfill_job_factory.clone()); + let (handle, events, notifications) = ExExHandle::new(id.clone()); exex_handles.push(handle); // create the launch context for the exex From 44e4094ea86787aafd48d5a43bc41fa38bc16906 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Sep 2024 09:33:22 +0100 Subject: [PATCH 26/29] include components into notifications structs --- crates/exex/exex/src/context.rs | 4 +- crates/exex/exex/src/manager.rs | 118 ++++++++++++++----------- crates/exex/test-utils/src/lib.rs | 4 +- crates/node/builder/src/launch/exex.rs | 2 +- 4 files changed, 70 insertions(+), 58 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 775acc865614..476b6341559d 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -6,7 +6,7 @@ use reth_primitives::Head; use reth_tasks::TaskExecutor; use tokio::sync::mpsc::UnboundedSender; -use crate::{ExExEvent, ExExNotificationsSubscriber}; +use crate::{ExExEvent, ExExNotifications}; /// Captures the context that an `ExEx` has access to. pub struct ExExContext { @@ -30,7 +30,7 @@ pub struct ExExContext { /// /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is /// considered delivered by the node. - pub notifications: ExExNotificationsSubscriber, + pub notifications: ExExNotifications, /// node components pub components: Node, diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 664387736573..f24f4b3219aa 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -7,6 +7,7 @@ use reth_primitives::BlockNumber; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, + fmt::Debug, future::{poll_fn, Future}, pin::Pin, sync::{ @@ -59,10 +60,13 @@ impl ExExHandle { /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new(id: String) -> (Self, UnboundedSender, ExExNotificationsSubscriber) { + pub fn new( + id: String, + components: Node, + ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotificationsSubscriber::new(notification_rx); + let notifications = ExExNotifications { components, notifications: notification_rx }; ( Self { @@ -140,51 +144,61 @@ impl ExExHandle { } } -/// A subscriber for a stream of [`ExExNotification`]s. -#[derive(Debug)] -pub struct ExExNotificationsSubscriber { +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. +pub struct ExExNotifications { + components: Node, notifications: Receiver, } -impl ExExNotificationsSubscriber { - /// Creates a new [`ExExNotificationsSubscriber`]. - pub const fn new(notifications: Receiver) -> Self { - Self { notifications } +impl Debug for ExExNotifications { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExExNotifications") + .field("components", &"...") + .field("notifications", &self.notifications) + .finish() } +} - /// Subscribe to notifications. - pub fn subscribe(self) -> ExExNotifications { - ExExNotifications(self.notifications) +impl ExExNotifications { + /// Creates a new instance of [`ExExNotifications`]. + pub const fn new(components: Node, notifications: Receiver) -> Self { + Self { components, notifications } } - // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` - // /// Subscribe to notifications with the given head. - // /// - // /// Notifications will be sent starting from the head, not inclusive. For example, if - // /// `head.number == 10`, then the first notification will be with `block.number == 11`. - // pub fn subscribe_with_head(head: ExExHead) -> ExExNotificationsWithHead { - // ExExNotificationsWithHead(self.notifications, head) - // } + // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] + /// Subscribe to notifications with the given head. + /// + /// Notifications will be sent starting from the head, not inclusive. For example, if + /// `head.number == 10`, then the first notification will be with `block.number == 11`. + #[allow(dead_code)] + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + ExExNotificationsWithHead { + components: self.components, + notifications: self.notifications, + head, + } + } } -/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. -#[derive(Debug)] -pub struct ExExNotifications(Receiver); - -impl Stream for ExExNotifications { +impl Stream for ExExNotifications { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().0.poll_recv(cx) + self.get_mut().notifications.poll_recv(cx) } } /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead(Receiver, ExExHead); +pub struct ExExNotificationsWithHead { + #[allow(dead_code)] + components: Node, + notifications: Receiver, + head: ExExHead, +} -impl Stream for ExExNotificationsWithHead { +impl Stream for ExExNotificationsWithHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -192,12 +206,14 @@ impl Stream for ExExNotificationsWithHead { // TODO(alexey): backfill according to the head loop { - let Some(notification) = ready!(this.0.poll_recv(cx)) else { return Poll::Ready(None) }; + let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { + return Poll::Ready(None) + }; if notification .committed_chain() .or_else(|| notification.reverted_chain()) - .map_or(false, |chain| chain.first().number > this.1.block.number) + .map_or(false, |chain| chain.first().number > this.head.block.number) { return Poll::Ready(Some(notification)) } @@ -546,7 +562,7 @@ mod tests { #[tokio::test] async fn test_delivers_events() { let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string()); + ExExHandle::new("test_exex".to_string(), ()); // Send an event and check that it's delivered correctly event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -556,7 +572,7 @@ mod tests { #[tokio::test] async fn test_has_exexs() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); @@ -565,7 +581,7 @@ mod tests { #[tokio::test] async fn test_has_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); @@ -574,7 +590,7 @@ mod tests { #[test] fn test_push_notification() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = ExExManager::new(vec![exex_handle], 10); @@ -619,7 +635,7 @@ mod tests { #[test] fn test_update_capacity() { - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), ()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; @@ -654,7 +670,7 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string()); + ExExHandle::new("test_exex".to_string(), ()); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -691,8 +707,8 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); + let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); + let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); // Send events to update the block heights of the two handles, with the second being lower event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -722,8 +738,8 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string()); - let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string()); + let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), ()); + let (exex_handle2, event_tx2, _) = ExExHandle::new("test_exex2".to_string(), ()); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -759,7 +775,7 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { - let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), ()); // Create an ExExManager with a small max capacity let max_capacity = 2; @@ -797,7 +813,7 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -824,10 +840,9 @@ mod tests { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); // Send a notification and ensure it's received correctly - let mut notifications_rx = notifications.subscribe(); match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -840,8 +855,7 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); - let mut notifications_rx = notifications.subscribe(); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -863,7 +877,7 @@ mod tests { // The notification should be skipped, so nothing should be sent. // Check that the receiver channel is indeed empty assert_eq!( - notifications_rx.next().poll_unpin(cx), + notifications.next().poll_unpin(cx), Poll::Pending, "Receiver channel should be empty" ); @@ -882,8 +896,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); - let mut notifications_rx = notifications.subscribe(); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -899,7 +912,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -913,8 +926,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); - let mut notifications_rx = notifications.subscribe(); + let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string(), ()); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -927,7 +939,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications_rx.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 493d67032101..d2932f65c12c 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -16,7 +16,7 @@ use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_db_common::init::init_genesis; use reth_evm::test_utils::MockExecutorProvider; use reth_execution_types::Chain; -use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotificationsSubscriber}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes}; use reth_node_builder::{ @@ -278,7 +278,7 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotificationsSubscriber::new(notifications_rx); + let notifications = ExExNotifications::new(components.clone(), notifications_rx); let ctx = ExExContext { head, diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 86bb14ecf551..798ddea2d829 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -48,7 +48,7 @@ impl ExExLauncher { for (id, exex) in extensions { // create a new exex handle - let (handle, events, notifications) = ExExHandle::new(id.clone()); + let (handle, events, notifications) = ExExHandle::new(id.clone(), components.clone()); exex_handles.push(handle); // create the launch context for the exex From 2f4ac1eb8c0deac09ed44031c865a7daae9d8de2 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Sep 2024 09:45:40 +0100 Subject: [PATCH 27/29] update book --- book/developers/exex/hello-world.md | 6 ++++-- book/developers/exex/remote.md | 12 +++++++----- book/developers/exex/tracking-state.md | 18 +++++------------- book/installation/source.md | 6 +++--- crates/exex/exex/src/manager.rs | 4 ++-- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/book/developers/exex/hello-world.md b/book/developers/exex/hello-world.md index d3c94f9019be..3c90e5a693d0 100644 --- a/book/developers/exex/hello-world.md +++ b/book/developers/exex/hello-world.md @@ -24,7 +24,9 @@ reth = { git = "https://github.com/paradigmxyz/reth.git" } # Reth reth-exex = { git = "https://github.com/paradigmxyz/reth.git" } # Execution Extensions reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth.git" } # Ethereum Node implementation reth-tracing = { git = "https://github.com/paradigmxyz/reth.git" } # Logging + eyre = "0.6" # Easy error handling +futures-util = "0.3" # Stream utilities for consuming notifications ``` ### Default Reth node @@ -101,14 +103,14 @@ If you try running a node with an ExEx that exits, the node will exit as well. Now, let's extend our simplest ExEx and start actually listening to new notifications, log them, and send events back to the main node ```rust,norun,noplayground,ignore +use futures_util::StreamExt; use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; async fn my_exex(mut ctx: ExExContext) -> eyre::Result<()> { - let mut notifications = ctx.notifications.subscribe(); - while let Some(notification) = notifications.next().await { + while let Some(notification) = ctx.notifications.next().await { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); diff --git a/book/developers/exex/remote.md b/book/developers/exex/remote.md index a49070c6ef22..2db5074e1df7 100644 --- a/book/developers/exex/remote.md +++ b/book/developers/exex/remote.md @@ -268,14 +268,15 @@ Don't forget to emit `ExExEvent::FinishedHeight` ```rust,norun,noplayground,ignore // ... + +use futures_util::StreamExt; use reth_exex::{ExExContext, ExExEvent}; async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - let notifications = ctx.notifications.subscribe(); - while let Some(notification) = notifications.recv().await { + while let Some(notification) = ctx.notifications.next().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; @@ -333,6 +334,9 @@ fn main() -> eyre::Result<()> { Click to expand ```rust,norun,noplayground,ignore +use std::sync::Arc; + +use futures_util::StreamExt; use remote_exex::proto::{ self, remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, @@ -341,7 +345,6 @@ use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; -use std::sync::Arc; use tokio::sync::{broadcast, mpsc}; use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Server, Request, Response, Status}; @@ -382,8 +385,7 @@ async fn remote_exex( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { - let notifications = ctx.notifications.subscribe(); - while let Some(notification) = notifications.recv().await { + while let Some(notification) = ctx.notifications.next().await { if let Some(committed_chain) = notification.committed_chain() { ctx.events .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; diff --git a/book/developers/exex/tracking-state.md b/book/developers/exex/tracking-state.md index f78791d0c100..4d3bbd0a35ae 100644 --- a/book/developers/exex/tracking-state.md +++ b/book/developers/exex/tracking-state.md @@ -25,23 +25,14 @@ use std::{ task::{ready, Context, Poll}, }; +use futures_util::StreamExt; use reth::api::FullNodeComponents; -use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; struct MyExEx { ctx: ExExContext, - notifications: ExExNotifications, -} - -impl MyExEx { - fn new(ctx: ExExContext) -> Self { - Self { - ctx, - notifications: ctx.notifications.subscribe(), - } - } } impl Future for MyExEx { @@ -50,7 +41,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.notifications.poll_next_unpin(cx)) { + while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { match ¬ification { ExExNotification::ChainCommitted { new } => { info!(committed_chain = ?new.range(), "Received commit"); @@ -111,6 +102,7 @@ use std::{ task::{ready, Context, Poll}, }; +use futures_util::StreamExt; use reth::{api::FullNodeComponents, primitives::BlockNumber}; use reth_exex::{ExExContext, ExExEvent}; use reth_node_ethereum::EthereumNode; @@ -140,7 +132,7 @@ impl Future for MyExEx { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) { + while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { if let Some(reverted_chain) = notification.reverted_chain() { this.transactions = this.transactions.saturating_sub( reverted_chain diff --git a/book/installation/source.md b/book/installation/source.md index 14ae22e0fda4..d9642c4bc48d 100644 --- a/book/installation/source.md +++ b/book/installation/source.md @@ -8,7 +8,7 @@ You can build Reth on Linux, macOS, Windows, and Windows WSL2. ## Dependencies -First, **install Rust** using [rustup](https://rustup.rs/): +First, **install Rust** using [rustup](https://rustup.rs/): ```bash curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh @@ -32,7 +32,7 @@ operating system: These are needed to build bindings for Reth's database. -The Minimum Supported Rust Version (MSRV) of this project is 1.81.0. If you already have a version of Rust installed, +The Minimum Supported Rust Version (MSRV) of this project is 1.80.0. If you already have a version of Rust installed, you can check your version by running `rustc --version`. To update your version of Rust, run `rustup update`. ## Build Reth @@ -147,7 +147,7 @@ _(Thanks to Sigma Prime for this section from [their Lighthouse book](https://li ### Bus error (WSL2) -In WSL 2 on Windows, the default virtual disk size is set to 1TB. +In WSL 2 on Windows, the default virtual disk size is set to 1TB. You must increase the allocated disk size for your WSL2 instance before syncing reth. diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index f24f4b3219aa..8f105aa98300 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -555,7 +555,7 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { use super::*; - use futures::{FutureExt, StreamExt}; + use futures::StreamExt; use reth_primitives::{SealedBlockWithSenders, B256}; use reth_provider::Chain; @@ -877,7 +877,7 @@ mod tests { // The notification should be skipped, so nothing should be sent. // Check that the receiver channel is indeed empty assert_eq!( - notifications.next().poll_unpin(cx), + notifications.poll_next_unpin(cx), Poll::Pending, "Receiver channel should be empty" ); From d357e2d124e0e8b9b15b603a62f6b8af00c03912 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Sep 2024 09:49:58 +0100 Subject: [PATCH 28/29] add backward compatibility method --- crates/exex/exex/src/manager.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 8f105aa98300..60cdf3855d4c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -165,6 +165,21 @@ impl ExExNotifications { Self { components, notifications } } + /// Receives the next value for this receiver. + /// + /// This method returns `None` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. This indicates that no + /// further values can ever be received from this `Receiver`. The channel is + /// closed when all senders have been dropped, or when [`Receiver::close`] is called. + /// + /// For full documentation, see [`Receiver::recv`]. + #[deprecated( + note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead" + )] + pub async fn recv(&mut self) -> Option { + self.notifications.recv().await + } + // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`] /// Subscribe to notifications with the given head. /// From 291929c019b9275ecea252a7401dbe3f006084c6 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 6 Sep 2024 11:55:03 +0100 Subject: [PATCH 29/29] another backward compat method --- crates/exex/exex/src/manager.rs | 39 +++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 60cdf3855d4c..48a77d786275 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -172,12 +172,47 @@ impl ExExNotifications { /// further values can ever be received from this `Receiver`. The channel is /// closed when all senders have been dropped, or when [`Receiver::close`] is called. /// + /// # Cancel safety + /// + /// This method is cancel safe. If `recv` is used as the event in a + /// [`tokio::select!`] statement and some other branch + /// completes first, it is guaranteed that no messages were received on this + /// channel. + /// /// For full documentation, see [`Receiver::recv`]. + #[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")] + pub async fn recv(&mut self) -> Option { + self.notifications.recv().await + } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a + /// spurious failure happens. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was + /// closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. + /// + /// If this method returns `Poll::Pending` due to a spurious failure, then + /// the `Waker` will be notified when the situation causing the spurious + /// failure has been resolved. Note that receiving such a wakeup does not + /// guarantee that the next call will succeed — it could fail with another + /// spurious failure. + /// + /// For full documentation, see [`Receiver::poll_recv`]. #[deprecated( note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead" )] - pub async fn recv(&mut self) -> Option { - self.notifications.recv().await + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.notifications.poll_recv(cx) } // TODO(alexey): make it public when backfill is implemented in [`ExExNotificationsWithHead`]