Skip to content

Commit

Permalink
consume subscriber on subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 4, 2024
1 parent 0e32dc0 commit d4eb1bb
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl ExExHandle {
}

/// A stream of [`ExExNotification`]s returned by [`ExExNotificationsSubscriber::subscribe`].
pub type ExExNotifications<'a> =
Either<ExExNotificationsWithoutHead<'a>, ExExNotificationsWithHead<'a>>;
pub type ExExNotifications = Either<ExExNotificationsWithoutHead, ExExNotificationsWithHead>;

/// A subscriber for [`ExExNotifications`].
#[derive(Debug)]
Expand All @@ -160,25 +159,25 @@ impl ExExNotificationsSubscriber {
}

/// Subscribe to notifications.
pub fn subscribe(&mut self) -> ExExNotifications<'_> {
ExExNotifications::Left(ExExNotificationsWithoutHead(&mut self.notifications))
pub fn subscribe(self) -> ExExNotifications {
ExExNotifications::Left(ExExNotificationsWithoutHead(self.notifications))
}

// TODO(alexey): uncomment when backfill is implemented in `ExExNotificationsWithHead`
// /// Subscribe to notifications with the given head.
// ///
// /// Notifications will be sent starting from the head, not inclusive. For example, if
// /// `head.number == 10`, then the first notification will be with `block.number == 11`.
// pub fn subscribe_with_head(&mut self, head: ExExHead) -> ExExNotifications<'_> {
// ExExNotifications::Right(ExExNotificationsWithHead(&mut self.notifications, head))
// pub fn subscribe_with_head(head: ExExHead) -> ExExNotifications {
// ExExNotifications::Right(ExExNotificationsWithHead(self.notifications, head))
// }
}

/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks.
#[derive(Debug)]
pub struct ExExNotificationsWithoutHead<'a>(&'a mut Receiver<ExExNotification>);
pub struct ExExNotificationsWithoutHead(Receiver<ExExNotification>);

impl Stream for ExExNotificationsWithoutHead<'_> {
impl Stream for ExExNotificationsWithoutHead {
type Item = ExExNotification;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -189,9 +188,9 @@ impl Stream for ExExNotificationsWithoutHead<'_> {
/// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are
/// committed or reverted after the given head.
#[derive(Debug)]
pub struct ExExNotificationsWithHead<'a>(&'a mut Receiver<ExExNotification>, ExExHead);
pub struct ExExNotificationsWithHead(Receiver<ExExNotification>, ExExHead);

impl Stream for ExExNotificationsWithHead<'_> {
impl Stream for ExExNotificationsWithHead {
type Item = ExExNotification;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -804,7 +803,7 @@ mod tests {

#[tokio::test]
async fn exex_handle_new() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string());
let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string());

// Check initial state
assert_eq!(exex_handle.id, "test_exex");
Expand Down Expand Up @@ -847,7 +846,7 @@ mod tests {

#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string());
let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string());
let mut notifications_rx = notifications.subscribe();

// Set finished_height to a value higher than the block tip
Expand Down Expand Up @@ -889,7 +888,7 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string());
let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string());
let mut notifications_rx = notifications.subscribe();

let notification = ExExNotification::ChainReorged {
Expand Down Expand Up @@ -920,7 +919,7 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let (mut exex_handle, _, mut notifications) = ExExHandle::new("test_exex".to_string());
let (mut exex_handle, _, notifications) = ExExHandle::new("test_exex".to_string());
let mut notifications_rx = notifications.subscribe();

let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };
Expand Down

0 comments on commit d4eb1bb

Please sign in to comment.