Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Websockets + TLS for Async-Std / Tokio (#30)
Browse files Browse the repository at this point in the history
* fix: relax trait bounds on JsonRpcClient

* refactor(provider): move http client to separate dir

* feat(provider): add initial Websocket support over Stream/Sink + Tungstenite

* test(provider): add websocket test

* feat(provider): add convenience method using tokio/async-std behind a feature flag

* test(provider): add websocket ssl test

* feat(provider): add TLS websockets for tokio/async-std

* docs(provider): add websocket docs / examples

* fix(provider): make tokio an optional dep
  • Loading branch information
gakonst authored Jun 21, 2020
1 parent ded8f50 commit 0cfeada
Show file tree
Hide file tree
Showing 14 changed files with 877 additions and 130 deletions.
391 changes: 391 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions ethers-contract/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ where
/// Returns a new contract instance at `address`.
///
/// Clones `self` internally
pub fn at<T: Into<Address>>(&self, address: T) -> Self {
pub fn at<T: Into<Address>>(&self, address: T) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.address = address.into();
this
Expand All @@ -269,7 +272,10 @@ where
/// Returns a new contract instance using the provided client
///
/// Clones `self` internally
pub fn connect(&self, client: &'a Client<P, S>) -> Self {
pub fn connect(&self, client: &'a Client<P, S>) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.client = client;
this
Expand Down
40 changes: 36 additions & 4 deletions ethers-providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ homepage = "https://docs.rs/ethers"
repository = "https://github.com/gakonst/ethers-rs"
keywords = ["ethereum", "web3", "celo", "ethers"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
ethers-core = { version = "0.1.3", path = "../ethers-core" }

Expand All @@ -23,17 +27,45 @@ url = { version = "2.1.1", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false }
pin-project = { version = "0.4.20", default-features = false }
tokio = { version = "0.2.21", default-features = false, features = ["time"] }
async-tungstenite = { version = "0.6.0", default-features = false }

# ws support async-std and tokio runtimes for the convenience methods
async-std = { version = "1.6.2", default-features = false, optional = true }
tokio = { version = "0.2.21", default-features = false, optional = true }

# needed for tls
real-tokio-native-tls = { package = "tokio-native-tls", version = "0.1.0", optional = true }
async-tls = { version = "0.7.0", optional = true }
futures-timer = "3.0.2"

[dev-dependencies]
ethers = { version = "0.1.3", path = "../ethers" }

rustc-hex = "2.1.0"
tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] }
async-std = { version = "1.6.2", default-features = false, features = ["attributes"] }
async-tungstenite = { version = "0.6.0", features = ["tokio-runtime"] }
serial_test = "0.4.0"

[features]
celo = ["ethers-core/celo"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
tokio-runtime = [
"tokio",
"async-tungstenite/tokio-runtime"
]
tokio-tls = [
"tokio-runtime",
"async-tungstenite/tokio-native-tls",
"real-tokio-native-tls"
]

async-std-runtime = [
"async-std",
"async-tungstenite/async-std-runtime"
]
async-std-tls = [
"async-std-runtime",
"async-tungstenite/async-tls",
"async-tls"
]
55 changes: 52 additions & 3 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,55 @@
//! # }
//! ```
//!
//! # Websockets
//!
//! The crate has support for WebSockets. If none of the provided async runtime
//! features are enabled, you must manually instantiate the WS connection and wrap
//! it with with a [`Ws::new`](method@crate::Ws::new) call.
//!
//! ```ignore
//! use ethers::providers::Ws;
//!
//! let ws = Ws::new(...);
//! ```
//!
//! If you have compiled the library with any of the following features, you may
//! instantiate the websocket instance with the `connect` call and your URL:
//! - `tokio-runtime`: Uses `tokio` as the runtime
//! - `async-std-runtime`: Uses `async-std-runtime`
//!
//! ```no_run
//! # #[cfg(any(
//! # feature = "tokio-runtime",
//! # feature = "tokio-tls",
//! # feature = "async-std-runtime",
//! # feature = "async-std-tls",
//! # ))]
//! # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
//! # use ethers::providers::Ws;
//! let ws = Ws::connect("ws://localhost:8545").await?;
//! # Ok(())
//! # }
//! ```
//!
//! TLS support is also provided via the following feature flags:
//! - `tokio-tls`
//! - `async-tls`
//!
//! ```no_run
//! # #[cfg(any(
//! # feature = "tokio-runtime",
//! # feature = "tokio-tls",
//! # feature = "async-std-runtime",
//! # feature = "async-std-tls",
//! # ))]
//! # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
//! # use ethers::providers::Ws;
//! let ws = Ws::connect("wss://localhost:8545").await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Ethereum Name Service
//!
//! The provider may also be used to resolve [Ethereum Name Service](https://ens.domains) (ENS) names
Expand All @@ -50,8 +99,8 @@
//! # Ok(())
//! # }
//! ```
mod http;
pub use http::Provider as Http;
mod transports;
pub use transports::{Http, Ws};

mod provider;

Expand All @@ -76,7 +125,7 @@ pub use provider::{Provider, ProviderError};
#[async_trait]
/// Trait which must be implemented by data transports to be used with the Ethereum
/// JSON-RPC provider.
pub trait JsonRpcClient: Debug + Clone + Send + Sync {
pub trait JsonRpcClient: Send + Sync {
/// A JSON-RPC Error
type Error: Error + Into<ProviderError>;

Expand Down
3 changes: 1 addition & 2 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient, PendingTransaction,
Http as HttpProvider, JsonRpcClient, PendingTransaction,
};

use ethers_core::{
Expand Down
39 changes: 23 additions & 16 deletions ethers-providers/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::ProviderError;
use ethers_core::types::U256;

use futures_core::{stream::Stream, TryFuture};
use futures_util::StreamExt;
use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project;
use serde::Deserialize;
use std::{
Expand All @@ -13,7 +14,11 @@ use std::{
time::Duration,
vec::IntoIter,
};
use tokio::time::{interval, Interval};

// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20
fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}

const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);

Expand Down Expand Up @@ -54,7 +59,7 @@ pub(crate) struct FilterWatcher<F: FutureFactory, R> {
factory: F,

// The polling interval
interval: Interval,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,

state: FilterWatcherState<F::FutureItem, R>,
}
Expand All @@ -68,7 +73,7 @@ where
pub fn new<T: Into<U256>>(id: T, factory: F) -> Self {
Self {
id: id.into(),
interval: interval(DEFAULT_POLL_DURATION),
interval: Box::new(interval(DEFAULT_POLL_DURATION)),
state: FilterWatcherState::WaitForInterval,
factory,
}
Expand All @@ -86,7 +91,7 @@ where
}

fn interval<T: Into<u64>>(mut self, duration: T) -> Self {
self.interval = interval(Duration::from_millis(duration.into()));
self.interval = Box::new(interval(Duration::from_millis(duration.into())));
self
}
}
Expand All @@ -107,21 +112,18 @@ where
*this.state = match this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let mut interval = Box::pin(this.interval.tick());
let _ready = futures_util::ready!(interval.as_mut().poll(cx));
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));

// create a new instance of the future
cx.waker().wake_by_ref();
FilterWatcherState::GetFilterChanges(this.factory.as_mut().new())
}
FilterWatcherState::GetFilterChanges(fut) => {
// wait for the future to be ready
let mut fut = Box::pin(fut);

// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
let items: Vec<R> = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
Expand All @@ -130,7 +132,10 @@ where
// for new logs
FilterWatcherState::NextItem(iter) => match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
None => {
cx.waker().wake_by_ref();
FilterWatcherState::WaitForInterval
}
},
};

Expand Down Expand Up @@ -184,14 +189,16 @@ mod watch {
async fn stream() {
let factory = || Box::pin(async { Ok::<Vec<u64>, ProviderError>(vec![1, 2, 3]) });
let filter = FilterWatcher::<_, u64>::new(1, factory);
let mut stream = filter.interval(1u64).stream();
assert_eq!(stream.next().await.unwrap(), 1);
// stream combinator calls are still doable since FilterStream extends
// Stream and StreamExt
let mut stream = filter.interval(100u64).stream().map(|x| 2 * x);
assert_eq!(stream.next().await.unwrap(), 2);
assert_eq!(stream.next().await.unwrap(), 3);
assert_eq!(stream.next().await.unwrap(), 4);
assert_eq!(stream.next().await.unwrap(), 6);
// this will poll the factory function again since it consumed the entire
// vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds
// until new blocks are mined, until the call to the factory returns a non-empty
// vector of logs
assert_eq!(stream.next().await.unwrap(), 1);
assert_eq!(stream.next().await.unwrap(), 2);
}
}
85 changes: 85 additions & 0 deletions ethers-providers/src/transports/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
use thiserror::Error;

#[derive(Serialize, Deserialize, Debug, Clone, Error)]
/// A JSON-RPC 2.0 error
pub struct JsonRpcError {
/// The error code
pub code: i64,
/// The error message
pub message: String,
/// Additional data
pub data: Option<Value>,
}

impl fmt::Display for JsonRpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"(code: {}, message: {}, data: {:?})",
self.code, self.message, self.data
)
}
}

#[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC request
pub struct Request<'a, T> {
id: u64,
jsonrpc: &'a str,
method: &'a str,
params: T,
}

impl<'a, T> Request<'a, T> {
/// Creates a new JSON RPC request
pub fn new(id: u64, method: &'a str, params: T) -> Self {
Self {
id,
jsonrpc: "2.0",
method,
params,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Response<T> {
id: u64,
jsonrpc: String,
#[serde(flatten)]
pub data: ResponseData<T>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum ResponseData<R> {
Error { error: JsonRpcError },
Success { result: R },
}

impl<R> ResponseData<R> {
/// Consume response and return value
pub fn into_result(self) -> Result<R, JsonRpcError> {
match self {
ResponseData::Success { result } => Ok(result),
ResponseData::Error { error } => Err(error),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn response() {
let response: Response<u64> =
serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap();
assert_eq!(response.id, 1);
assert_eq!(response.data.into_result().unwrap(), 19);
}
}
Loading

0 comments on commit 0cfeada

Please sign in to comment.