Skip to content

Commit

Permalink
add head
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Aug 28, 2024
1 parent dfcb2e1 commit c4d7798
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
44 changes: 28 additions & 16 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct ExExHandle {
sender: PollSender<ExExNotification>,
/// Channel to receive [`ExExEvent`]s from the `ExEx`.
receiver: UnboundedReceiver<ExExEvent>,
handle_rx: watch::Receiver<ExExHandleState>,
notifications_state_rx: watch::Receiver<ExExNotificationsState>,
/// The ID of the next notification to send to this `ExEx`.
next_notification_id: usize,

Expand All @@ -65,17 +65,19 @@ impl ExExHandle {
pub fn new(id: String) -> (Self, UnboundedSender<ExExEvent>, ExExNotificationsSubscriber) {
let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (handle_tx, handle_rx) = watch::channel(ExExHandleState::Inactive);
let (notifications_state_tx, notifications_state_rx) =
watch::channel(ExExNotificationsState::Inactive);

let notifications = ExExNotificationsSubscriber::new(notification_rx, handle_tx);
let notifications =
ExExNotificationsSubscriber::new(notification_rx, notifications_state_tx);

(
Self {
id: id.clone(),
metrics: ExExMetrics::new_with_labels(&[("exex", id)]),
sender: PollSender::new(notification_tx),
receiver: event_rx,
handle_rx,
notifications_state_rx,
next_notification_id: 0,
finished_height: None,
},
Expand All @@ -93,7 +95,7 @@ impl ExExHandle {
cx: &mut Context<'_>,
(notification_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
if !self.handle_rx.borrow().is_active() {
if !self.notifications_state_rx.borrow().is_active() {
return Poll::Ready(Ok(()))
}

Expand Down Expand Up @@ -150,50 +152,60 @@ impl ExExHandle {
}
}

/// A subscriber for [`ExExNotifications`].
#[derive(Debug)]
pub struct ExExNotificationsSubscriber {
notifications: ExExNotifications,
handle_tx: watch::Sender<ExExHandleState>,
state_tx: watch::Sender<ExExNotificationsState>,
}

impl ExExNotificationsSubscriber {
pub(crate) fn new(
/// Creates a new [`ExExNotificationsSubscriber`].
pub fn new(
receiver: Receiver<ExExNotification>,
handle_tx: watch::Sender<ExExHandleState>,
state_tx: watch::Sender<ExExNotificationsState>,
) -> Self {
Self {
notifications: ExExNotifications { receiver, handle_tx: handle_tx.clone() },
handle_tx,
notifications: ExExNotifications { receiver, handle_tx: state_tx.clone() },
state_tx,
}
}

/// Subscribe to notifications with the given head.
pub fn subscribe_with_head(&mut self, head: ExExHead) -> &mut ExExNotifications {
self.handle_tx.send(ExExHandleState::Active(Some(head))).unwrap();
self.state_tx.send(ExExNotificationsState::Active(Some(head))).unwrap();
&mut self.notifications
}

/// Subscribe to notifications.
pub fn subscribe(&mut self) -> &mut ExExNotifications {
self.handle_tx.send(ExExHandleState::Active(None)).unwrap();
self.state_tx.send(ExExNotificationsState::Active(None)).unwrap();
&mut self.notifications
}
}

#[allow(clippy::doc_markdown)]
/// A state of the ExEx notifications subscription.
#[derive(Debug)]
pub enum ExExHandleState {
pub enum ExExNotificationsState {
/// The subscription is active and will receive notifications according to the given head, if
/// provided.
Active(Option<ExExHead>),
/// The subscription is inactive.
Inactive,
}

impl ExExHandleState {
impl ExExNotificationsState {
pub(crate) const fn is_active(&self) -> bool {
matches!(self, Self::Active(_))
}
}

/// An active subscription to [`ExExNotification`]s.
#[derive(Debug)]
pub struct ExExNotifications {
receiver: Receiver<ExExNotification>,
handle_tx: watch::Sender<ExExHandleState>,
handle_tx: watch::Sender<ExExNotificationsState>,
}

impl Deref for ExExNotifications {
Expand All @@ -212,7 +224,7 @@ impl DerefMut for ExExNotifications {

impl Drop for ExExNotifications {
fn drop(&mut self) {
let _ = self.handle_tx.send(ExExHandleState::Inactive);
let _ = self.handle_tx.send(ExExNotificationsState::Inactive);
}
}

Expand Down
9 changes: 9 additions & 0 deletions crates/exex/types/src/head.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[allow(clippy::doc_markdown)]
/// A head of the ExEx. It should determine the highest block committed to the internal ExEx state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExExHead {
/// The number of the ExEx head block.
pub number: u64,
/// The hash of the ExEx head block.
pub hash: String,
}

0 comments on commit c4d7798

Please sign in to comment.