diff --git a/crates/contract/Cargo.toml b/crates/contract/Cargo.toml index a53ff5ef633..629952b06f4 100644 --- a/crates/contract/Cargo.toml +++ b/crates/contract/Cargo.toml @@ -26,10 +26,15 @@ futures-util.workspace = true futures.workspace = true thiserror.workspace = true +alloy-pubsub = { workspace = true, optional = true } + [dev-dependencies] -alloy-rpc-client.workspace = true +alloy-rpc-client = { workspace = true, features = ["pubsub", "ws"] } alloy-transport-http.workspace = true alloy-node-bindings.workspace = true test-utils.workspace = true reqwest.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[features] +pubsub = ["alloy-provider/pubsub", "dep:alloy-pubsub"] diff --git a/crates/contract/src/event.rs b/crates/contract/src/event.rs index 6be9e7b93ff..68afd34fd37 100644 --- a/crates/contract/src/event.rs +++ b/crates/contract/src/event.rs @@ -63,7 +63,16 @@ impl, E: SolEvent> Event TransportResult> { let poller = self.provider.watch_logs(&self.filter).await?; - Ok(EventPoller::new(poller)) + Ok(poller.into()) + } + + /// Subscribes to the stream of events that match the filter. + /// + /// Returns a stream of decoded events and raw logs. + #[cfg(feature = "pubsub")] + pub async fn subscribe(&self) -> TransportResult> { + let sub = self.provider.subscribe_logs(&self.filter).await?; + Ok(sub.into()) } } @@ -83,18 +92,16 @@ pub struct EventPoller { _phantom: PhantomData, } -impl std::ops::Deref for EventPoller { - type Target = FilterPollerBuilder; - +impl AsRef> for EventPoller { #[inline] - fn deref(&self) -> &Self::Target { + fn as_ref(&self) -> &FilterPollerBuilder { &self.poller } } -impl std::ops::DerefMut for EventPoller { +impl AsMut> for EventPoller { #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { + fn as_mut(&mut self) -> &mut FilterPollerBuilder { &mut self.poller } } @@ -108,14 +115,13 @@ impl fmt::Debug for EventPoller { } } -impl EventPoller { - /// Creates a new event poller with the provided filter poller. - #[allow(clippy::missing_const_for_fn)] - #[inline] - pub fn new(poller: FilterPollerBuilder) -> Self { +impl From> for EventPoller { + fn from(poller: FilterPollerBuilder) -> Self { Self { poller, _phantom: PhantomData } } +} +impl EventPoller { /// Starts the poller and returns a stream that yields the decoded event and the raw log. /// /// Note that this stream will not return `None` until the provider is dropped. @@ -131,10 +137,64 @@ fn decode_log(log: &Log) -> alloy_sol_types::Result { E::decode_raw_log(log.topics.iter().copied(), &log.data, false) } +#[cfg(feature = "pubsub")] +pub(crate) mod subscription { + use super::*; + use alloy_pubsub::Subscription; + + /// An event subscription. + /// + /// Underlying subscription is available through the [`sub`](Self::sub) field. + pub struct EventSubscription { + /// The inner poller. + pub sub: Subscription, + _phantom: PhantomData, + } + + impl AsRef> for EventSubscription { + #[inline] + fn as_ref(&self) -> &Subscription { + &self.sub + } + } + + impl AsMut> for EventSubscription { + #[inline] + fn as_mut(&mut self) -> &mut Subscription { + &mut self.sub + } + } + + impl fmt::Debug for EventSubscription { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EventSubscription") + .field("sub", &self.sub) + .field("event_type", &format_args!("{}", std::any::type_name::())) + .finish() + } + } + + impl From> for EventSubscription { + fn from(sub: Subscription) -> Self { + Self { sub, _phantom: PhantomData } + } + } + + impl EventSubscription { + /// Converts the subscription into a stream. + pub fn into_stream(self) -> impl Stream> + Unpin { + self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log))) + } + } +} + #[cfg(test)] mod tests { use super::*; + use alloy_network::Ethereum; use alloy_primitives::U256; + use alloy_provider::RootProvider; + use alloy_rpc_client::RpcClient; use alloy_sol_types::sol; use test_utils::{init_tracing, spawn_anvil}; @@ -154,6 +214,11 @@ mod tests { #[tokio::test] async fn event_filters() { init_tracing(); + + #[cfg(feature = "pubsub")] + let (provider, anvil) = spawn_anvil(); + + #[cfg(not(feature = "pubsub"))] let (provider, _anvil) = spawn_anvil(); let contract = MyContract::deploy(&provider).await.unwrap(); @@ -189,5 +254,26 @@ mod tests { assert_eq!(all.len(), 1); assert_eq!(all[0].0, expected_event); assert_eq!(all[0].1, stream_log); + + #[cfg(feature = "pubsub")] + { + let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint()); + let client = RpcClient::connect_pubsub(ws).await.unwrap(); + let provider = RootProvider::::new(client); + + let contract = MyContract::new(*contract.address(), provider); + let event = contract.MyEvent_filter(); + + let sub = event.subscribe().await.unwrap(); + + contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt"); + + let mut stream = sub.into_stream(); + + let (stream_event, stream_log) = stream.next().await.unwrap().unwrap(); + assert_eq!(stream_event, expected_event); + assert_eq!(stream_log.address, *contract.address()); + assert_eq!(stream_log.block_number, Some(U256::from(3))); + } } } diff --git a/crates/contract/src/lib.rs b/crates/contract/src/lib.rs index 400d541ab44..151a75232be 100644 --- a/crates/contract/src/lib.rs +++ b/crates/contract/src/lib.rs @@ -24,6 +24,9 @@ pub use error::*; mod event; pub use event::{Event, EventPoller}; +#[cfg(feature = "pubsub")] +pub use event::subscription::EventSubscription; + mod interface; pub use interface::*; diff --git a/crates/provider/src/provider.rs b/crates/provider/src/provider.rs index 0cdd5805d20..289a224b7b9 100644 --- a/crates/provider/src/provider.rs +++ b/crates/provider/src/provider.rs @@ -310,6 +310,43 @@ pub trait Provider: Send + Sync self.root().get_subscription(id).await } + /// Subscribe to a stream of logs matching given filter. + /// + /// # Errors + /// + /// This method is only available on `pubsub` clients, such as Websockets or IPC, and will + /// return a [`PubsubUnavailable`](TransportErrorKind::PubsubUnavailable) transport error if the + /// client does not support it. + /// + /// For a polling alternative available over HTTP, use + /// [`Provider::watch_logs`]. However, be aware that polling increases + /// RPC usage drastically. + /// + /// # Examples + /// + /// ```no_run + /// # async fn example(provider: impl alloy_provider::Provider) -> Result<(), Box> { + /// use futures::StreamExt; + /// use alloy_primitives::keccak256; + /// use alloy_rpc_types::Filter; + /// + /// let signature = keccak256("Transfer(address,address,uint256)".as_bytes()); + /// + /// let sub = provider.subscribe_logs(&Filter::new().event_signature(signature)).await?; + /// let mut stream = sub.into_stream().take(5); + /// while let Some(tx) = stream.next().await { + /// println!("{tx:#?}"); + /// } + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "pubsub")] + async fn subscribe_logs(&self, filter: &Filter) -> TransportResult> { + self.root().pubsub_frontend()?; + let id = self.client().request("eth_subscribe", ("logs", filter)).await?; + self.root().get_subscription(id).await + } + /// Subscribe to an RPC event. #[cfg(feature = "pubsub")] #[auto_impl(keep_default_for(&, &mut, Rc, Arc, Box))]