From d4eb1bb0feeb7f35efdc1df0204b4dae0daf07f5 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 4 Sep 2024 14:32:15 +0100 Subject: [PATCH] consume subscriber on subscribe --- crates/exex/exex/src/manager.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index e410a52aab9c..93a2a7700781 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -144,8 +144,7 @@ impl ExExHandle { } /// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`]. -pub type ExExNotifications<'a> = - Either, ExExNotificationsWithHead<'a>>; +pub type ExExNotifications = Either; /// A subscriber for [`ExExNotifications`]. #[derive(Debug)] @@ -160,8 +159,8 @@ impl ExExNotificationsSubscriber { } /// Subscribe to notifications. - pub fn subscribe(&mut self) -> ExExNotifications<'_> { - ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications)) + pub fn subscribe(self) -> ExExNotifications { + ExExNotifications::Left(ExExNotificationsWithoutHead(self.notifications)) } // TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead` @@ -169,16 +168,16 @@ impl ExExNotificationsSubscriber { // /// // /// Notifications will be sent starting from the head, not inclusive. For example, if // /// `head.number == 10`, then the first notification will be with `block.number == 11`. - // pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> { - // ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head)) + // pub fn subscribe_with_head(head: ExExHead) -> ExExNotifications { + // ExExNotifications::Right(ExExNotificationsWithHead(self.notifications, head)) // } } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. #[derive(Debug)] -pub struct ExExNotificationsWithoutHead<'a>(&'a mut Receiver); +pub struct ExExNotificationsWithoutHead(Receiver); -impl Stream for ExExNotificationsWithoutHead<'_> { +impl Stream for ExExNotificationsWithoutHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -189,9 +188,9 @@ impl Stream for ExExNotificationsWithoutHead<'_> { /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are /// committed or reverted after the given head. #[derive(Debug)] -pub struct ExExNotificationsWithHead<'a>(&'a mut Receiver, ExExHead); +pub struct ExExNotificationsWithHead(Receiver, ExExHead); -impl Stream for ExExNotificationsWithHead<'_> { +impl Stream for ExExNotificationsWithHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -804,7 +803,7 @@ mod tests { #[tokio::test] async fn exex_handle_new() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -847,7 +846,7 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); // Set finished_height to a value higher than the block tip @@ -889,7 +888,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReorged { @@ -920,7 +919,7 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { - let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string()); + let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string()); let mut notifications_rx = notifications.subscribe(); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };