Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provider::subscribe_logs #339

Merged
merged 13 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/contract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ futures-util.workspace = true
futures.workspace = true
thiserror.workspace = true

alloy-pubsub = { workspace = true, optional = true }

[dev-dependencies]
alloy-rpc-client.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub", "ws"] }
alloy-transport-http.workspace = true
alloy-node-bindings.workspace = true
test-utils.workspace = true
reqwest.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

[features]
pubsub = ["alloy-provider/pubsub", "dep:alloy-pubsub"]
110 changes: 98 additions & 12 deletions crates/contract/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ impl<N: Network, T: Transport + Clone, P: Provider<N, T>, E: SolEvent> Event<N,
#[doc(alias = "stream_with_meta")]
pub async fn watch(&self) -> TransportResult<EventPoller<T, E>> {
let poller = self.provider.watch_logs(&self.filter).await?;
Ok(EventPoller::new(poller))
Ok(poller.into())
}

/// Subscribes to the stream of events that match the filter.
///
/// Returns a stream of decoded events and raw logs.
#[cfg(feature = "pubsub")]
pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
let sub = self.provider.subscribe_logs(&self.filter).await?;
Ok(sub.into())
}
}

Expand All @@ -83,18 +92,16 @@ pub struct EventPoller<T, E> {
_phantom: PhantomData<E>,
}

impl<T, E> std::ops::Deref for EventPoller<T, E> {
type Target = FilterPollerBuilder<T, Log>;

impl<T, E> AsRef<FilterPollerBuilder<T, Log>> for EventPoller<T, E> {
#[inline]
fn deref(&self) -> &Self::Target {
fn as_ref(&self) -> &FilterPollerBuilder<T, Log> {
&self.poller
}
}

impl<T, E> std::ops::DerefMut for EventPoller<T, E> {
impl<T, E> AsMut<FilterPollerBuilder<T, Log>> for EventPoller<T, E> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
fn as_mut(&mut self) -> &mut FilterPollerBuilder<T, Log> {
&mut self.poller
}
}
Expand All @@ -108,14 +115,13 @@ impl<T: fmt::Debug, E> fmt::Debug for EventPoller<T, E> {
}
}

impl<T: Transport + Clone, E: SolEvent> EventPoller<T, E> {
/// Creates a new event poller with the provided filter poller.
#[allow(clippy::missing_const_for_fn)]
#[inline]
pub fn new(poller: FilterPollerBuilder<T, Log>) -> Self {
impl<T, E> From<FilterPollerBuilder<T, Log>> for EventPoller<T, E> {
fn from(poller: FilterPollerBuilder<T, Log>) -> Self {
Self { poller, _phantom: PhantomData }
}
}

impl<T: Transport + Clone, E: SolEvent> EventPoller<T, E> {
/// Starts the poller and returns a stream that yields the decoded event and the raw log.
///
/// Note that this stream will not return `None` until the provider is dropped.
Expand All @@ -131,10 +137,64 @@ fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
E::decode_raw_log(log.topics.iter().copied(), &log.data, false)
}

#[cfg(feature = "pubsub")]
pub(crate) mod subscription {
use super::*;
use alloy_pubsub::Subscription;

/// An event subscription.
///
/// Underlying subscription is available through the [`sub`](Self::sub) field.
pub struct EventSubscription<E> {
/// The inner poller.
pub sub: Subscription<Log>,
_phantom: PhantomData<E>,
}

impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
#[inline]
fn as_ref(&self) -> &Subscription<Log> {
&self.sub
}
}

impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
#[inline]
fn as_mut(&mut self) -> &mut Subscription<Log> {
&mut self.sub
}
}

impl<E> fmt::Debug for EventSubscription<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventSubscription")
.field("sub", &self.sub)
.field("event_type", &format_args!("{}", std::any::type_name::<E>()))
.finish()
}
}

impl<E> From<Subscription<Log>> for EventSubscription<E> {
fn from(sub: Subscription<Log>) -> Self {
Self { sub, _phantom: PhantomData }
}
}

impl<E: SolEvent> EventSubscription<E> {
/// Converts the subscription into a stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in an unnameable type. Is there any situation in which a user would want to name the resulting stream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we want a similar thing as SubscriptionStream here with the type we decode on poll_next?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we have these all work the same way. can convert this to issue?

pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_network::Ethereum;
use alloy_primitives::U256;
use alloy_provider::RootProvider;
use alloy_rpc_client::RpcClient;
use alloy_sol_types::sol;
use test_utils::{init_tracing, spawn_anvil};

Expand All @@ -154,6 +214,11 @@ mod tests {
#[tokio::test]
async fn event_filters() {
init_tracing();

#[cfg(feature = "pubsub")]
let (provider, anvil) = spawn_anvil();
klkvr marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(not(feature = "pubsub"))]
let (provider, _anvil) = spawn_anvil();

let contract = MyContract::deploy(&provider).await.unwrap();
Expand Down Expand Up @@ -189,5 +254,26 @@ mod tests {
assert_eq!(all.len(), 1);
assert_eq!(all[0].0, expected_event);
assert_eq!(all[0].1, stream_log);

#[cfg(feature = "pubsub")]
{
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

let contract = MyContract::new(*contract.address(), provider);
let event = contract.MyEvent_filter();

let sub = event.subscribe().await.unwrap();

contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");

let mut stream = sub.into_stream();

let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
assert_eq!(stream_event, expected_event);
assert_eq!(stream_log.address, *contract.address());
assert_eq!(stream_log.block_number, Some(U256::from(3)));
}
}
}
3 changes: 3 additions & 0 deletions crates/contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub use error::*;
mod event;
pub use event::{Event, EventPoller};

#[cfg(feature = "pubsub")]
pub use event::subscription::EventSubscription;

mod interface;
pub use interface::*;

Expand Down
37 changes: 37 additions & 0 deletions crates/provider/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,43 @@ pub trait Provider<N: Network, T: Transport + Clone = BoxTransport>: Send + Sync
self.root().get_subscription(id).await
}

/// Subscribe to a stream of logs matching given filter.
///
/// # Errors
///
/// This method is only available on `pubsub` clients, such as Websockets or IPC, and will
/// return a [`PubsubUnavailable`](TransportErrorKind::PubsubUnavailable) transport error if the
/// client does not support it.
///
/// For a polling alternative available over HTTP, use
/// [`Provider::watch_logs`]. However, be aware that polling increases
/// RPC usage drastically.
///
/// # Examples
///
/// ```no_run
/// # async fn example(provider: impl alloy_provider::Provider<alloy_network::Ethereum>) -> Result<(), Box<dyn std::error::Error>> {
/// use futures::StreamExt;
/// use alloy_primitives::keccak256;
/// use alloy_rpc_types::Filter;
///
/// let signature = keccak256("Transfer(address,address,uint256)".as_bytes());
///
/// let sub = provider.subscribe_logs(&Filter::new().event_signature(signature)).await?;
/// let mut stream = sub.into_stream().take(5);
/// while let Some(tx) = stream.next().await {
/// println!("{tx:#?}");
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "pubsub")]
async fn subscribe_logs(&self, filter: &Filter) -> TransportResult<Subscription<Log>> {
self.root().pubsub_frontend()?;
let id = self.client().request("eth_subscribe", ("logs", filter)).await?;
self.root().get_subscription(id).await
}

/// Subscribe to an RPC event.
#[cfg(feature = "pubsub")]
#[auto_impl(keep_default_for(&, &mut, Rc, Arc, Box))]
Expand Down
Loading