From 21ff6331b075a70d6f90d8584e19d2b959130762 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 22 Feb 2021 16:41:17 -0500 Subject: [PATCH 01/28] Refactor to add HTTPS client Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 2 + rpc/src/client.rs | 2 +- rpc/src/client/transport/http.rs | 89 ++++++++++++++++++++++++-------- rpc/src/error.rs | 7 +++ rpc/src/lib.rs | 2 +- 5 files changed, 78 insertions(+), 24 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 4cfa8d4d5..796791862 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -30,6 +30,7 @@ http-client = [ "futures", "http", "hyper", + "hyper-tls", "tokio/fs", "tokio/macros", "tracing" @@ -66,6 +67,7 @@ async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } +hyper-tls = { version = "0.5", optional = true } tokio = { version = "1.0", optional = true } tracing = { version = "0.1", optional = true } pin-project = "1.0.1" diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 5ad9a1cf4..88fac541e 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -8,7 +8,7 @@ mod transport; pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher}; #[cfg(feature = "http-client")] -pub use transport::http::HttpClient; +pub use transport::http::{HttpClient, HttpsClient, HyperClient}; #[cfg(feature = "websocket-client")] pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index d773dff5c..5e48ae5eb 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -2,12 +2,16 @@ use async_trait::async_trait; use hyper::body::Buf; -use hyper::header; +use hyper::{header, Uri}; use tendermint::net; use crate::client::transport::utils::get_tcp_host_port; use crate::{Client, Error, Response, Result, SimpleRequest}; +use hyper::client::connect::Connect; +use hyper::client::HttpConnector; +use hyper_tls::HttpsConnector; +use std::convert::TryInto; use std::io::Read; /// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]). @@ -36,23 +40,71 @@ use std::io::Read; /// [`Client`]: trait.Client.html /// [`Event`]: ./event/struct.Event.html /// [`WebSocketClient`]: struct.WebSocketClient.html +pub type HttpClient = HyperClient; + +/// A JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. +/// +/// Similar to [`HttpClient`], but allows for connection to the RPC endpoint +/// via HTTPS. +pub type HttpsClient = HyperClient>; + +/// A [`hyper`]-based Tendermint RPC client. +/// +/// Generic over the connector type used for the client. +/// +/// [`hyper`]: https://hyper.rs/ #[derive(Debug, Clone)] -pub struct HttpClient { - host: String, - port: u16, +pub struct HyperClient { + uri: Uri, + inner: hyper::Client, } #[async_trait] -impl Client for HttpClient { +impl Client for HyperClient +where + C: Connect + Clone + Send + Sync + 'static, +{ async fn perform(&self, request: R) -> Result where R: SimpleRequest, { + let request = self.build_request(request)?; + let response = self.inner.request(request).await?; + let response_body = response_to_string(response).await?; + tracing::debug!("Incoming response: {}", response_body); + R::Response::from_string(&response_body) + } +} + +impl HyperClient { + /// Create a new JSON-RPC/HTTP Tendermint RPC client. + pub fn new(address: net::Address) -> Result { + let (host, port) = get_tcp_host_port(address)?; + Ok(Self { + uri: format!("http://{}:{}/", host, port).try_into()?, + inner: hyper::Client::new(), + }) + } +} + +impl HyperClient> { + /// Create a new JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. + pub fn new(address: net::Address) -> Result { + let (host, port) = get_tcp_host_port(address)?; + Ok(Self { + uri: format!("https://{}:{}/", host, port).try_into()?, + inner: hyper::Client::builder().build(HttpsConnector::new()), + }) + } +} + +impl HyperClient { + fn build_request(&self, request: R) -> Result> { let request_body = request.into_json(); let mut request = hyper::Request::builder() .method("POST") - .uri(&format!("http://{}:{}/", self.host, self.port)) + .uri(&self.uri) .body(hyper::Body::from(request_body.into_bytes()))?; { @@ -66,23 +118,16 @@ impl Client for HttpClient { ); } - let http_client = hyper::Client::new(); - let response = http_client.request(request).await?; - let mut response_body = String::new(); - hyper::body::aggregate(response.into_body()) - .await? - .reader() - .read_to_string(&mut response_body) - .map_err(|_| Error::client_internal_error("failed to read response body to string"))?; - tracing::debug!("Incoming response: {}", response_body); - R::Response::from_string(&response_body) + Ok(request) } } -impl HttpClient { - /// Create a new JSON-RPC/HTTP Tendermint RPC client. - pub fn new(address: net::Address) -> Result { - let (host, port) = get_tcp_host_port(address)?; - Ok(HttpClient { host, port }) - } +async fn response_to_string(response: hyper::Response) -> Result { + let mut response_body = String::new(); + hyper::body::aggregate(response.into_body()) + .await? + .reader() + .read_to_string(&mut response_body) + .map_err(|_| Error::client_internal_error("failed to read response body to string"))?; + Ok(response_body) } diff --git a/rpc/src/error.rs b/rpc/src/error.rs index 18d8e8725..c23d07498 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -123,6 +123,13 @@ impl From for Error { } } +#[cfg(feature = "http-client")] +impl From for Error { + fn from(e: http::uri::InvalidUri) -> Self { + Error::http_error(e.to_string()) + } +} + #[cfg(feature = "websocket-client")] impl From for Error { fn from(websocket_error: WSError) -> Error { diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index eff517e69..fa63babd5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -42,7 +42,7 @@ pub use client::{ }; #[cfg(feature = "http-client")] -pub use client::HttpClient; +pub use client::{HttpClient, HttpsClient, HyperClient}; #[cfg(feature = "websocket-client")] pub use client::{WebSocketClient, WebSocketClientDriver}; From dbc14f6d372dc01276a2a46c45347af3cca84a4b Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 22 Feb 2021 17:05:59 -0500 Subject: [PATCH 02/28] Switch to Rustls Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 4 ++-- rpc/src/client/transport/http.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 796791862..833a82c96 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -30,7 +30,7 @@ http-client = [ "futures", "http", "hyper", - "hyper-tls", + "hyper-rustls", "tokio/fs", "tokio/macros", "tracing" @@ -67,7 +67,7 @@ async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } -hyper-tls = { version = "0.5", optional = true } +hyper-rustls = { version = "0.22.1", optional = true } tokio = { version = "1.0", optional = true } tracing = { version = "0.1", optional = true } pin-project = "1.0.1" diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 5e48ae5eb..6d150d6d9 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -10,7 +10,7 @@ use crate::client::transport::utils::get_tcp_host_port; use crate::{Client, Error, Response, Result, SimpleRequest}; use hyper::client::connect::Connect; use hyper::client::HttpConnector; -use hyper_tls::HttpsConnector; +use hyper_rustls::HttpsConnector; use std::convert::TryInto; use std::io::Read; @@ -93,7 +93,7 @@ impl HyperClient> { let (host, port) = get_tcp_host_port(address)?; Ok(Self { uri: format!("https://{}:{}/", host, port).try_into()?, - inner: hyper::Client::builder().build(HttpsConnector::new()), + inner: hyper::Client::builder().build(HttpsConnector::with_native_roots()), }) } } From a932ad71b35d2ba2beb3b51a998e15edc72f3e8f Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 11:48:29 -0500 Subject: [PATCH 03/28] Add TLS support for WebSocket connections Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 2 +- rpc/src/client/transport/websocket.rs | 30 ++++++++++++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 833a82c96..b3d36e3be 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -63,7 +63,7 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] } walkdir = "2.3" async-trait = { version = "0.1", optional = true } -async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = true } +async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rustls"], optional = true } futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 750aba6e5..ec931caef 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -14,7 +14,7 @@ use crate::{ SubscriptionClient, }; use async_trait::async_trait; -use async_tungstenite::tokio::{connect_async, TokioAdapter}; +use async_tungstenite::tokio::{connect_async, connect_async_with_tls_connector, ConnectStream}; use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use async_tungstenite::tungstenite::protocol::CloseFrame; use async_tungstenite::tungstenite::Message; @@ -25,7 +25,6 @@ use std::borrow::Cow; use std::collections::HashMap; use std::ops::Add; use tendermint::net; -use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; use tracing::{debug, error}; @@ -149,6 +148,21 @@ impl WebSocketClient { Ok((Self { cmd_tx }, driver)) } + /// Construct a WebSocket client, but over a secure connection. + /// + /// Works similarly to [`WebSocketClient::new`]. + pub async fn new_with_tls(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { + let (host, port) = get_tcp_host_port(address)?; + // Not supplying a connector means async_tungstenite will create the + // connector for us. + let (stream, _response) = + connect_async_with_tls_connector(&format!("wss://{}:{}/websocket", &host, port), None) + .await?; + let (cmd_tx, cmd_rx) = unbounded(); + let driver = WebSocketClientDriver::new(stream, cmd_rx); + Ok((Self { cmd_tx }, driver)) + } + fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { self.cmd_tx.send(cmd).map_err(|e| { Error::client_internal_error(format!("failed to send command to client driver: {}", e)) @@ -271,10 +285,9 @@ impl Response for GenericJsonResponse {} /// /// This is the primary component responsible for transport-level interaction /// with the remote WebSocket endpoint. -#[derive(Debug)] pub struct WebSocketClientDriver { // The underlying WebSocket network connection. - stream: WebSocketStream>, + stream: WebSocketStream, // Facilitates routing of events to their respective subscriptions. router: SubscriptionRouter, // How we receive incoming commands from the WebSocketClient. @@ -285,10 +298,7 @@ pub struct WebSocketClientDriver { } impl WebSocketClientDriver { - fn new( - stream: WebSocketStream>, - cmd_rx: ChannelRx, - ) -> Self { + fn new(stream: WebSocketStream, cmd_rx: ChannelRx) -> Self { Self { stream, router: SubscriptionRouter::default(), @@ -516,13 +526,13 @@ mod test { use super::*; use crate::query::EventType; use crate::{request, Id, Method}; - use async_tungstenite::tokio::accept_async; + use async_tungstenite::tokio::{accept_async, TokioAdapter}; use futures::StreamExt; use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; use tokio::fs; - use tokio::net::TcpListener; + use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; // Interface to a driver that manages all incoming WebSocket connections. From 75edc5b6f2e5b4d1ace5e5505e4eb6086de09cb1 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 15:39:08 -0500 Subject: [PATCH 04/28] Add support for secure WebSocket connections Signed-off-by: Thane Thomson --- rpc/src/client.rs | 4 +- rpc/src/client/transport/websocket.rs | 66 +++++++++++++++++++++++---- rpc/src/lib.rs | 6 ++- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 88fac541e..0a497e0ae 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -10,7 +10,9 @@ pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatch #[cfg(feature = "http-client")] pub use transport::http::{HttpClient, HttpsClient, HyperClient}; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; +pub use transport::websocket::{ + AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, +}; use crate::endpoint::*; use crate::query::Query; diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index ec931caef..ea1710369 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -125,12 +125,32 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// [`run`]: struct.WebSocketClientDriver.html#method.run /// [`Subscription`]: struct.Subscription.html /// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 +pub type WebSocketClient = AsyncTungsteniteClient; + +/// Similar to [`WebSocketClient`], but facilitates connectivity over TLS. +pub type SecureWebSocketClient = AsyncTungsteniteClient; + +/// Marker for the [`AsyncTungsteniteClient`] for clients operating over +/// unsecure connections. +pub struct Unsecure; + +/// Marker for the [`AsyncTungsteniteClient`] for clients operating over +/// secure connections. +pub struct Secure; + +/// An [`async-tungstenite`]-based WebSocket client. +/// +/// Different modes of operation (secure and unsecure) are facilitated by +/// different variants of this type. +/// +/// [`async-tungstenite`]: https://crates.io/crates/async-tungstenite #[derive(Debug, Clone)] -pub struct WebSocketClient { +pub struct AsyncTungsteniteClient { cmd_tx: ChannelTx, + _client_type: std::marker::PhantomData, } -impl WebSocketClient { +impl AsyncTungsteniteClient { /// Construct a WebSocket client. Immediately attempts to open a WebSocket /// connection to the node with the given address. /// @@ -145,13 +165,27 @@ impl WebSocketClient { connect_async(&format!("ws://{}:{}/websocket", &host, port)).await?; let (cmd_tx, cmd_rx) = unbounded(); let driver = WebSocketClientDriver::new(stream, cmd_rx); - Ok((Self { cmd_tx }, driver)) + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) } +} - /// Construct a WebSocket client, but over a secure connection. +impl AsyncTungsteniteClient { + /// Construct a WebSocket client. Immediately attempts to open a WebSocket + /// connection to the node with the given address, but over a secure + /// connection. /// - /// Works similarly to [`WebSocketClient::new`]. - pub async fn new_with_tls(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { + /// On success, this returns both a client handle (a `WebSocketClient` + /// instance) as well as the WebSocket connection driver. The execution of + /// this driver becomes the responsibility of the client owner, and must be + /// executed in a separate asynchronous context to the client to ensure it + /// doesn't block the client. + pub async fn new(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { let (host, port) = get_tcp_host_port(address)?; // Not supplying a connector means async_tungstenite will create the // connector for us. @@ -160,9 +194,17 @@ impl WebSocketClient { .await?; let (cmd_tx, cmd_rx) = unbounded(); let driver = WebSocketClientDriver::new(stream, cmd_rx); - Ok((Self { cmd_tx }, driver)) + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) } +} +impl AsyncTungsteniteClient { fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { self.cmd_tx.send(cmd).map_err(|e| { Error::client_internal_error(format!("failed to send command to client driver: {}", e)) @@ -176,7 +218,10 @@ impl WebSocketClient { } #[async_trait] -impl Client for WebSocketClient { +impl Client for AsyncTungsteniteClient +where + C: Send + Sync, +{ async fn perform(&self, request: R) -> Result where R: SimpleRequest, @@ -199,7 +244,10 @@ impl Client for WebSocketClient { } #[async_trait] -impl SubscriptionClient for WebSocketClient { +impl SubscriptionClient for AsyncTungsteniteClient +where + C: Send + Sync, +{ async fn subscribe(&self, query: Query) -> Result { let (subscription_tx, subscription_rx) = unbounded(); let (response_tx, mut response_rx) = unbounded(); diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index fa63babd5..61d5b64dd 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -41,10 +41,12 @@ pub use client::{ SubscriptionClient, }; +#[cfg(feature = "websocket-client")] +pub use client::{ + AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, +}; #[cfg(feature = "http-client")] pub use client::{HttpClient, HttpsClient, HyperClient}; -#[cfg(feature = "websocket-client")] -pub use client::{WebSocketClient, WebSocketClientDriver}; pub mod endpoint; pub mod error; From 4287c712578affaa119058a96bca4ade77b870b9 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 15:49:58 -0500 Subject: [PATCH 05/28] Update docs and exports Signed-off-by: Thane Thomson --- rpc/src/client.rs | 4 +--- rpc/src/lib.rs | 29 +++++++++++------------------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 0a497e0ae..78f7db671 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -10,9 +10,7 @@ pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatch #[cfg(feature = "http-client")] pub use transport::http::{HttpClient, HttpsClient, HyperClient}; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{ - AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, -}; +pub use transport::websocket::{SecureWebSocketClient, WebSocketClient, WebSocketClientDriver}; use crate::endpoint::*; use crate::query::Query; diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 61d5b64dd..159000e7b 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -8,14 +8,15 @@ //! //! Two features are provided at present: //! -//! * `http-client` - Provides [`HttpClient`], which is a basic RPC client that -//! interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This -//! client does not provide [`Event`] subscription functionality. See the -//! [Tendermint RPC] for more details. -//! * `websocket-client` - Provides [`WebSocketClient`], which provides full -//! client functionality, including general RPC functionality (such as that -//! provided by `HttpClient`) as well as [`Event`] subscription -//! functionality. +//! * `http-client` - Provides [`HttpClient`] and [`HttpsClient`], which are +//! basic RPC clients that interact with remote Tendermint nodes via +//! **JSON-RPC over HTTP or HTTPS**. This client does not provide +//! [`event::Event`] subscription functionality. See the [Tendermint RPC] for +//! more details. +//! * `websocket-client` - Provides [`WebSocketClient`] and +//! [`SecureWebSocketClient`], which provide full client functionality, +//! including general RPC functionality as well as [`event::Event`] +//! subscription functionality. //! //! ### Mock Clients //! @@ -24,14 +25,8 @@ //! [`MockClient`], which implements both [`Client`] and [`SubscriptionClient`] //! traits. //! -//! [`Client`]: trait.Client.html -//! [`SubscriptionClient`]: trait.SubscriptionClient.html -//! [`HttpClient`]: struct.HttpClient.html -//! [`Event`]: event/struct.Event.html -//! [`WebSocketClient`]: struct.WebSocketClient.html //! [Tendermint RPC]: https://docs.tendermint.com/master/rpc/ //! [`/subscribe` endpoint]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe -//! [`MockClient`]: struct.MockClient.html #[cfg(any(feature = "http-client", feature = "websocket-client"))] mod client; @@ -41,12 +36,10 @@ pub use client::{ SubscriptionClient, }; -#[cfg(feature = "websocket-client")] -pub use client::{ - AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, -}; #[cfg(feature = "http-client")] pub use client::{HttpClient, HttpsClient, HyperClient}; +#[cfg(feature = "websocket-client")] +pub use client::{SecureWebSocketClient, WebSocketClient, WebSocketClientDriver}; pub mod endpoint; pub mod error; From e41964504ea3f7573f6d1d19ba0111d695c1b6db Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 16:32:02 -0500 Subject: [PATCH 06/28] Update docs to use new link format Signed-off-by: Thane Thomson --- rpc/src/client.rs | 4 +++- rpc/src/client/subscription.rs | 6 ------ rpc/src/client/transport/http.rs | 11 +++++------ rpc/src/client/transport/websocket.rs | 17 ++++++----------- rpc/src/lib.rs | 6 ++++-- 5 files changed, 18 insertions(+), 26 deletions(-) diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 78f7db671..0a497e0ae 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -10,7 +10,9 @@ pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatch #[cfg(feature = "http-client")] pub use transport::http::{HttpClient, HttpsClient, HyperClient}; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{SecureWebSocketClient, WebSocketClient, WebSocketClientDriver}; +pub use transport::websocket::{ + AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, +}; use crate::endpoint::*; use crate::query::Query; diff --git a/rpc/src/client/subscription.rs b/rpc/src/client/subscription.rs index 07fb2805c..5bf158b5a 100644 --- a/rpc/src/client/subscription.rs +++ b/rpc/src/client/subscription.rs @@ -12,8 +12,6 @@ use std::pin::Pin; /// A client that exclusively provides [`Event`] subscription capabilities, /// without any other RPC method support. -/// -/// [`Event`]: event/struct.Event.html #[async_trait] pub trait SubscriptionClient { /// `/subscribe`: subscribe to receive events produced by the given query. @@ -27,8 +25,6 @@ pub trait SubscriptionClient { /// no longer have access to the individual `Subscription` instances to /// terminate them separately. /// - /// [`Subscription`]: struct.Subscription.html - /// [`Query`]: struct.Query.html /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html async fn unsubscribe(&self, query: Query) -> Result<()>; } @@ -61,8 +57,6 @@ pub(crate) type SubscriptionRx = ChannelRx>; /// } /// } /// ``` -/// -/// [`Event`]: ./event/struct.Event.html #[pin_project] #[derive(Debug)] pub struct Subscription { diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 6d150d6d9..82c7dc39e 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -16,8 +16,11 @@ use std::io::Read; /// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]). /// -/// Does not provide [`Event`] subscription facilities (see [`WebSocketClient`] -/// for a client that does provide [`Event`] subscription facilities). +/// Does not provide [`crate::event::Event`] subscription facilities (see +/// [`crate::WebSocketClient`] for a client that does). +/// +/// Does not provide any security. For a JSON-RPC/HTTPS client, see +/// [`HttpsClient`]. /// /// ## Examples /// @@ -36,10 +39,6 @@ use std::io::Read; /// println!("Got ABCI info: {:?}", abci_info); /// } /// ``` -/// -/// [`Client`]: trait.Client.html -/// [`Event`]: ./event/struct.Event.html -/// [`WebSocketClient`]: struct.WebSocketClient.html pub type HttpClient = HyperClient; /// A JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index ea1710369..2e17eef24 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -45,13 +45,13 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// (including [`Event`] subscription) over a WebSocket connection. /// /// The `WebSocketClient` itself is effectively just a handle to its driver -/// (see the [`new`] method). The driver is the component of the client that -/// actually interacts with the remote RPC over the WebSocket connection. -/// The `WebSocketClient` can therefore be cloned into different asynchronous -/// contexts, effectively allowing for asynchronous access to the driver. +/// The driver is the component of the client that actually interacts with the +/// remote RPC over the WebSocket connection. The `WebSocketClient` can +/// therefore be cloned into different asynchronous contexts, effectively +/// allowing for asynchronous access to the driver. /// /// It is the caller's responsibility to spawn an asynchronous task in which to -/// execute the driver's [`run`] method. See the example below. +/// execute the [`WebSocketClientDriver::run`] method. See the example below. /// /// Dropping [`Subscription`]s will automatically terminate them (the /// `WebSocketClientDriver` detects a disconnected channel and removes the @@ -71,7 +71,7 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// /// The WebSocket client implements a keep-alive mechanism whereby it sends a /// PING message to the server every 27 seconds, matching the PING cadence of -/// the Tendermint server (see [this code](tendermint-websocket-ping) for +/// the Tendermint server (see [this code][tendermint-websocket-ping] for /// details). /// /// This is not configurable at present. @@ -119,11 +119,6 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// } /// ``` /// -/// [`Event`]: ./event/struct.Event.html -/// [`close`]: struct.WebSocketClient.html#method.close -/// [`new`]: struct.WebSocketClient.html#method.new -/// [`run`]: struct.WebSocketClientDriver.html#method.run -/// [`Subscription`]: struct.Subscription.html /// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 pub type WebSocketClient = AsyncTungsteniteClient; diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 159000e7b..f4e08ea67 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -36,10 +36,12 @@ pub use client::{ SubscriptionClient, }; +#[cfg(feature = "websocket-client")] +pub use client::{ + AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, +}; #[cfg(feature = "http-client")] pub use client::{HttpClient, HttpsClient, HyperClient}; -#[cfg(feature = "websocket-client")] -pub use client::{SecureWebSocketClient, WebSocketClient, WebSocketClientDriver}; pub mod endpoint; pub mod error; From 0c22bfe51ae70c3824145ec6aa5d362ddb88fdff Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 16:32:34 -0500 Subject: [PATCH 07/28] Remove unused file Signed-off-by: Thane Thomson --- rpc/src/serialization.rs | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 rpc/src/serialization.rs diff --git a/rpc/src/serialization.rs b/rpc/src/serialization.rs deleted file mode 100644 index 90d5e8b91..000000000 --- a/rpc/src/serialization.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Custom serialization/deserialization functionality for the RPC. - -pub mod timestamp; From fb35558b5d8e17fa8634a1f51f0b59fd7849254c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 17:18:49 -0500 Subject: [PATCH 08/28] Add required attributes to WebSocket client markers Signed-off-by: Thane Thomson --- rpc/src/client/transport/websocket.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 2e17eef24..106a16c0f 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -127,10 +127,12 @@ pub type SecureWebSocketClient = AsyncTungsteniteClient; /// Marker for the [`AsyncTungsteniteClient`] for clients operating over /// unsecure connections. +#[derive(Debug, Clone)] pub struct Unsecure; /// Marker for the [`AsyncTungsteniteClient`] for clients operating over /// secure connections. +#[derive(Debug, Clone)] pub struct Secure; /// An [`async-tungstenite`]-based WebSocket client. From b058a700155317e1bbfecfb5e18474609b72b10b Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 23 Feb 2021 18:40:27 -0500 Subject: [PATCH 09/28] Refactor out constructor common bits Signed-off-by: Thane Thomson --- rpc/src/client/transport/http.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 82c7dc39e..800b963aa 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -78,26 +78,34 @@ where impl HyperClient { /// Create a new JSON-RPC/HTTP Tendermint RPC client. pub fn new(address: net::Address) -> Result { - let (host, port) = get_tcp_host_port(address)?; - Ok(Self { - uri: format!("http://{}:{}/", host, port).try_into()?, - inner: hyper::Client::new(), - }) + Self::new_with_scheme(address, "http", hyper::Client::new()) } } impl HyperClient> { /// Create a new JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. pub fn new(address: net::Address) -> Result { + Self::new_with_scheme( + address, + "https", + hyper::Client::builder().build(HttpsConnector::with_native_roots()), + ) + } +} + +impl HyperClient { + fn new_with_scheme( + address: net::Address, + scheme: &str, + inner: hyper::Client, + ) -> Result { let (host, port) = get_tcp_host_port(address)?; Ok(Self { - uri: format!("https://{}:{}/", host, port).try_into()?, - inner: hyper::Client::builder().build(HttpsConnector::with_native_roots()), + uri: format!("{}://{}:{}/", scheme, host, port).try_into()?, + inner, }) } -} -impl HyperClient { fn build_request(&self, request: R) -> Result> { let request_body = request.into_json(); From 7bf2603813f87d6eb7213f87f01ee5d773d7dbbe Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 26 Feb 2021 17:19:51 -0500 Subject: [PATCH 10/28] Add tendermint-rpc binary This is unfortunately a big commit. It primarily adds an optional binary application (behind the "cli" feature flag) that allows you to easily perform queries against a Tendermint endpoint via several protocols (http, https, ws, wss). It only contains support at present for Client queries, and SubscriptionClient queries (i.e. subscriptions) are planned for subsequent commits. Some of the bulk of this commit is due to the fact that we need to be able to parse queries (e.g. from tx_search), so I added a PEG-based parser for queries along with some tests. Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 20 +- rpc/src/client.rs | 11 +- rpc/src/client/bin/main.rs | 311 ++++++++++++++++++++ rpc/src/client/transport/websocket.rs | 7 + rpc/src/error.rs | 14 + rpc/src/lib.rs | 2 +- rpc/src/order.rs | 17 ++ rpc/src/query.rs | 389 +++++++++++++++++++++++++- 8 files changed, 760 insertions(+), 11 deletions(-) create mode 100644 rpc/src/client/bin/main.rs diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index b3d36e3be..28504dc9f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -23,8 +23,20 @@ description = """ [package.metadata.docs.rs] all-features = true +[[bin]] +name = "tendermint-rpc" +path = "src/client/bin/main.rs" +required-features = [ "cli" ] + [features] default = [] +cli = [ + "http-client", + "structopt", + "tracing-subscriber", + "url", + "websocket-client" +] http-client = [ "async-trait", "futures", @@ -52,6 +64,10 @@ websocket-client = [ bytes = "1.0" chrono = "0.4" getrandom = "0.1" +# TODO(thane): Use a released version once support for inverted patterns is released. +# See https://github.com/kevinmehall/rust-peg/pull/245 +peg = { git = "https://github.com/kevinmehall/rust-peg.git", rev = "ba6019539b2cf80289190cbb9537c94113b6b7d1" } +pin-project = "1.0.1" serde = { version = "1", features = [ "derive" ] } serde_bytes = "0.11" serde_json = "1" @@ -68,6 +84,8 @@ futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } hyper-rustls = { version = "0.22.1", optional = true } +structopt = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true } tracing = { version = "0.1", optional = true } -pin-project = "1.0.1" +tracing-subscriber = { version = "0.2", optional = true } +url = { version = "2.2", optional = true } diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 0a497e0ae..1b9d748f4 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -106,8 +106,8 @@ pub trait Client { self.perform(broadcast::tx_sync::Request::new(tx)).await } - /// `/broadcast_tx_sync`: broadcast a transaction, returning the response - /// from `CheckTx`. + /// `/broadcast_tx_commit`: broadcast a transaction, returning the response + /// from `DeliverTx`. async fn broadcast_tx_commit(&self, tx: Transaction) -> Result { self.perform(broadcast::tx_commit::Request::new(tx)).await } @@ -185,3 +185,10 @@ pub trait Client { where R: SimpleRequest; } + +/// Applicable to transports whose connection can/must be terminated. +// TODO(thane): Replace with a Closeable trait once we can make breaking API changes. +pub trait Terminate { + /// Signal to the underlying transport to terminate. + fn terminate(self) -> Result<()>; +} diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs new file mode 100644 index 000000000..6af75710e --- /dev/null +++ b/rpc/src/client/bin/main.rs @@ -0,0 +1,311 @@ +//! CLI for performing simple interactions against a Tendermint node's RPC. + +use std::str::FromStr; +use structopt::StructOpt; +use tendermint::abci::{Path, Transaction}; +use tendermint::net::Address; +use tendermint_rpc::query::Query; +use tendermint_rpc::{ + Client, Error, HttpClient, HttpsClient, Order, Result, SecureWebSocketClient, + SubscriptionClient, Terminate, WebSocketClient, WebSocketClientDriver, +}; +use tracing::level_filters::LevelFilter; +use tracing::{error, info, warn}; +use url::Url; + +/// CLI for performing simple interactions against a Tendermint node's RPC. +/// +/// Supports HTTP, HTTPS, WebSocket and secure WebSocket (wss://) URLs. +#[derive(Debug, StructOpt)] +struct Opt { + /// The URL of the Tendermint node's RPC endpoint. + #[structopt( + short, + long, + default_value = "http://127.0.0.1:26657", + env = "TENDERMINT_RPC_URL" + )] + url: Url, + + /// Increase output logging verbosity to DEBUG level. + #[structopt(short, long)] + verbose: bool, + + #[structopt(subcommand)] + req: Request, +} + +#[derive(Debug, StructOpt)] +enum Request { + #[structopt(flatten)] + ClientRequest(ClientRequest), + /// Subscribe to receive events produced by a specific query. + Subscribe, +} + +#[derive(Debug, StructOpt)] +enum ClientRequest { + /// Request information about the ABCI application. + AbciInfo, + /// Query the ABCI application. + AbciQuery { + /// The path for which you want to query, if any. + #[structopt(long)] + path: Option, + /// The data for which you want to query. + data: String, + /// The block height at which to query. + #[structopt(long)] + height: Option, + #[structopt(long)] + prove: bool, + }, + /// Get a block at a given height. + Block { height: u32 }, + /// Get block headers between two heights (min <= height <= max). + Blockchain { + /// The minimum height + min: u32, + /// The maximum height. + max: u32, + }, + /// Request the block results at a given height. + BlockResults { + /// The height of the block you want. + height: u32, + }, + // TODO(thane): Implement evidence broadcast + /// Broadcast a transaction asynchronously (without waiting for the ABCI + /// app to check it or for it to be committed). + BroadcastTxAsync { + /// The transaction to broadcast. + tx: String, + }, + /// Broadcast a transaction, waiting for it to be fully committed before + /// returning. + BroadcastTxCommit { + /// The transaction to broadcast. + tx: String, + }, + /// Broadcast a transaction synchronously (waiting for the ABCI app to + /// check it, but not for it to be committed). + BroadcastTxSync { + /// The transaction to broadcast. + tx: String, + }, + /// Get the commit for the given height. + Commit { height: u32 }, + /// Get the current consensus state. + ConsensusState, + /// Get the node's genesis data. + Genesis, + /// Get the node's health. + Health, + /// Request the latest block. + LatestBlock, + /// Request the results for the latest block. + LatestBlockResults, + /// Request the latest commit. + LatestCommit, + /// Obtain information about the P2P stack and other network connections. + NetInfo, + /// Get Tendermint status (node info, public key, latest block hash, etc.). + Status, + /// Search for transactions with their results. + TxSearch { + /// The query against which transactions should be matched. + query: Query, + #[structopt(long, default_value = "1")] + page: u32, + #[structopt(long, default_value = "10")] + per_page: u8, + #[structopt(long, default_value = "asc")] + order: Order, + #[structopt(long)] + prove: bool, + }, + /// Get the validators at the given height. + Validators { height: u32 }, +} + +#[tokio::main] +async fn main() { + let opt: Opt = Opt::from_args(); + let log_level = if opt.verbose { + LevelFilter::DEBUG + } else { + LevelFilter::INFO + }; + // All our logging goes to stderr, so our output can go to stdout + tracing_subscriber::fmt() + .with_max_level(log_level) + .with_writer(std::io::stderr) + .init(); + + let host = match opt.url.host_str() { + Some(h) => h, + None => { + error!("Missing host in URL: {}", opt.url); + std::process::exit(-1); + } + }; + let port = opt.url.port_or_known_default().unwrap_or(26657); + if opt.url.path().len() > 1 { + warn!("URL paths are ignored at present: {}", opt.url.path()); + } + let address = Address::Tcp { + peer_id: None, + host: host.to_owned(), + port, + }; + let result = match opt.url.scheme() { + "http" => http_request(address, opt.req).await, + "https" => https_request(address, opt.req).await, + "ws" => websocket_request(address, opt.req).await, + "wss" => secure_websocket_request(address, opt.req).await, + scheme => Err(Error::invalid_params(&format!( + "unsupported RPC endpoint scheme: {}", + scheme + ))), + }; + if let Err(e) = result { + error!("Failed: {}", e); + std::process::exit(-1); + } +} + +async fn http_request(address: Address, req: Request) -> Result<()> { + info!("Using HTTP client to submit request to: {}", address); + let client = HttpClient::new(address)?; + match req { + Request::ClientRequest(r) => client_request(&client, r).await, + _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)")) + } +} + +async fn https_request(address: Address, req: Request) -> Result<()> { + info!("Using HTTPS client to submit request to: {}", address); + let client = HttpsClient::new(address)?; + match req { + Request::ClientRequest(r) => client_request(&client, r).await, + _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)")) + } +} + +async fn websocket_request(address: Address, req: Request) -> Result<()> { + info!("Using WebSocket client to submit request to: {}", address); + let (client, driver) = WebSocketClient::new(address).await?; + run_websocket_request(client, driver, req).await +} + +async fn secure_websocket_request(address: Address, req: Request) -> Result<()> { + info!( + "Using secure WebSocket client to submit request to: {}", + address + ); + let (client, driver) = SecureWebSocketClient::new(address).await?; + run_websocket_request(client, driver, req).await +} + +async fn run_websocket_request( + client: C, + driver: WebSocketClientDriver, + req: Request, +) -> Result<()> +where + C: Client + SubscriptionClient + Terminate + Sync, +{ + let driver_hdl = tokio::spawn(async move { driver.run().await }); + + let result = match req { + Request::ClientRequest(r) => client_request(&client, r).await, + Request::Subscribe => unimplemented!(), + }; + + client.terminate()?; + driver_hdl + .await + .map_err(|e| Error::client_internal_error(e.to_string()))??; + result +} + +async fn client_request(client: &C, req: ClientRequest) -> Result<()> +where + C: Client + Sync, +{ + let result = match req { + ClientRequest::AbciInfo => serde_json::to_string_pretty(&client.abci_info().await?)?, + ClientRequest::AbciQuery { + path, + data, + height, + prove, + } => serde_json::to_string_pretty( + &client + .abci_query( + path.map(|s| Path::from_str(&s)).transpose()?, + data, + height.map(Into::into), + prove, + ) + .await?, + )?, + ClientRequest::Block { height } => { + serde_json::to_string_pretty(&client.block(height).await?)? + } + ClientRequest::Blockchain { min, max } => { + serde_json::to_string_pretty(&client.blockchain(min, max).await?)? + } + ClientRequest::BlockResults { height } => { + serde_json::to_string_pretty(&client.block_results(height).await?)? + } + ClientRequest::BroadcastTxAsync { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_async(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::BroadcastTxCommit { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_commit(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::BroadcastTxSync { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_sync(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::Commit { height } => { + serde_json::to_string_pretty(&client.commit(height).await?)? + } + ClientRequest::LatestBlock => serde_json::to_string_pretty(&client.latest_block().await?)?, + ClientRequest::LatestBlockResults => { + serde_json::to_string_pretty(&client.latest_block_results().await?)? + } + ClientRequest::LatestCommit => { + serde_json::to_string_pretty(&client.latest_commit().await?)? + } + ClientRequest::ConsensusState => { + serde_json::to_string_pretty(&client.consensus_state().await?)? + } + ClientRequest::Genesis => serde_json::to_string_pretty(&client.genesis().await?)?, + ClientRequest::Health => serde_json::to_string_pretty(&client.health().await?)?, + ClientRequest::NetInfo => serde_json::to_string_pretty(&client.net_info().await?)?, + ClientRequest::Status => serde_json::to_string_pretty(&client.status().await?)?, + ClientRequest::TxSearch { + query, + page, + per_page, + order, + prove, + } => serde_json::to_string_pretty( + &client + .tx_search(query, prove, page, per_page, order) + .await?, + )?, + ClientRequest::Validators { height } => { + serde_json::to_string_pretty(&client.validators(height).await?)? + } + }; + println!("{}", result); + Ok(()) +} diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 106a16c0f..ff2434e96 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -4,6 +4,7 @@ use crate::client::subscription::SubscriptionTx; use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; use crate::client::transport::router::{PublishResult, SubscriptionRouter}; use crate::client::transport::utils::get_tcp_host_port; +use crate::client::Terminate; use crate::endpoint::{subscribe, unsubscribe}; use crate::event::Event; use crate::query::Query; @@ -240,6 +241,12 @@ where } } +impl Terminate for AsyncTungsteniteClient { + fn terminate(self) -> Result<()> { + self.close() + } +} + #[async_trait] impl SubscriptionClient for AsyncTungsteniteClient where diff --git a/rpc/src/error.rs b/rpc/src/error.rs index c23d07498..cdc280a93 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -137,6 +137,20 @@ impl From for Error { } } +#[cfg(feature = "cli")] +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + +#[cfg(feature = "cli")] +impl From for Error { + fn from(e: tendermint::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + /// Tendermint RPC error codes. /// /// See `func RPC*Error()` definitions in: diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index f4e08ea67..8f619162e 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -33,7 +33,7 @@ mod client; #[cfg(any(feature = "http-client", feature = "websocket-client"))] pub use client::{ Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, Subscription, - SubscriptionClient, + SubscriptionClient, Terminate, }; #[cfg(feature = "websocket-client")] diff --git a/rpc/src/order.rs b/rpc/src/order.rs index b9294464e..79ff77142 100644 --- a/rpc/src/order.rs +++ b/rpc/src/order.rs @@ -1,6 +1,8 @@ //! Ordering of paginated RPC responses. +use crate::Error; use serde::{Deserialize, Serialize}; +use std::str::FromStr; /// Ordering of paginated RPC responses. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -13,3 +15,18 @@ pub enum Order { #[serde(rename = "desc")] Descending, } + +impl FromStr for Order { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "asc" => Ok(Self::Ascending), + "desc" => Ok(Self::Descending), + _ => Err(Error::invalid_params(&format!( + "invalid order type: {} (must be \"asc\" or \"desc\")", + s + ))), + } + } +} diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 8afcd94ba..4c94fe208 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -4,9 +4,13 @@ //! //! [`Query`]: struct.Query.html -use std::fmt; +// TODO(thane): These warnings are generated by the PEG for some reason. Try to fix and remove. +#![allow(clippy::redundant_closure_call, clippy::unit_arg)] -use chrono::{Date, DateTime, FixedOffset, Utc}; +use crate::{Error, Result}; +use chrono::{Date, DateTime, FixedOffset, NaiveDate, Utc}; +use std::fmt; +use std::str::FromStr; /// A structured query for use in interacting with the Tendermint RPC event /// subscription system. @@ -185,6 +189,163 @@ impl fmt::Display for Query { } } +peg::parser! { + grammar query_parser() for str { + // Some or no whitespace. + rule _() = quiet!{[' ']*} + + // At least some whitespace. + rule __() = quiet!{[' ']+} + + rule string() -> &'input str + = "'" s:$([^'\'']*) "'" { s } + + rule unsigned() -> u64 + = s:$(['0'..='9']+) {? + u64::from_str(s) + .map_err(|_| "failed to parse as an unsigned integer") + } + + rule signed() -> i64 + = s:$("-" ['1'..='9'] ['0'..='9']*) {? + i64::from_str(s) + .map_err(|_| "failed to parse as a signed integer") + } + + rule year() -> &'input str + = $(['0'..='9']*<4>) + + rule month() -> &'input str + = $(['0' | '1'] ['0'..='9']) + + rule day() -> &'input str + = $(['0'..='3'] ['0'..='9']) + + rule date() -> &'input str + = $(year() "-" month() "-" day()) + + rule hour() -> &'input str + = $(['0'..='2'] ['0'..='9']) + + rule min_sec() -> &'input str + = $(['0'..='5'] ['0'..='9']) + + rule nanosec() -> &'input str + = $("." ['0'..='9']+) + + rule time() -> &'input str + = $(hour() ":" min_sec() ":" min_sec() nanosec()? "Z") + + rule datetime() -> &'input str + = dt:$(date() "T" time()) { dt } + + rule string_op() -> Operand + = s:string() { Operand::String(s.to_owned()) } + + rule unsigned_op() -> Operand + = u:unsigned() { Operand::Unsigned(u) } + + rule signed_op() -> Operand + = s:signed() { Operand::Signed(s) } + + rule datetime_op() -> Operand + = "TIME" __ dt:datetime() {? + DateTime::parse_from_rfc3339(dt) + .map(|dt| Operand::DateTime(dt.with_timezone(&Utc))) + .map_err(|_| "failed to parse as RFC3339-compatible date/time") + } + + rule date_op() -> Operand + = "DATE" __ dt:date() {? + let naive_date = NaiveDate::parse_from_str(dt, "%Y-%m-%d") + .map_err(|_| "failed to parse as RFC3339-compatible date")?; + Ok(Operand::Date(Date::from_utc(naive_date, Utc))) + } + + rule tag() -> &'input str + = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) + + rule operand() -> Operand + = datetime_op() / date_op() / string_op() / signed_op() / unsigned_op() + + rule eq() -> Condition + = t:tag() _ "=" _ op:operand() { Condition::Eq(t.to_owned(), op) } + + rule lte() -> Condition + = t:tag() _ "<=" _ op:operand() { Condition::Lte(t.to_owned(), op) } + + rule lt() -> Condition + = t:tag() _ "<" _ op:operand() { Condition::Lt(t.to_owned(), op) } + + rule gte() -> Condition + = t:tag() _ ">=" _ op:operand() { Condition::Gte(t.to_owned(), op) } + + rule gt() -> Condition + = t:tag() _ ">" _ op:operand() { Condition::Gt(t.to_owned(), op) } + + rule contains() -> Condition + = t:tag() __ "CONTAINS" __ op:string() { Condition::Contains(t.to_owned(), op.to_owned()) } + + rule exists() -> Condition + = t:tag() __ "EXISTS" { Condition::Exists(t.to_owned()) } + + rule event_type() -> Term + = "tm.event" _ "=" _ "'" et:$("NewBlock" / "Tx") "'" { + Term::EventType(EventType::from_str(et).unwrap()) + } + + rule condition() -> Term + = c:(eq() / lte() / lt() / gte() / gt() / contains() / exists()) { Term::Condition(c) } + + rule term() -> Term + = event_type() / condition() + + pub rule query() -> Vec + = t:term() ** ( __ "AND" __ ) { t } + } +} + +/// A term in a query is either an event type or a general condition. +/// Exclusively used for query parsing. +#[derive(Debug)] +pub enum Term { + EventType(EventType), + Condition(Condition), +} + +// Separate a list of terms into lists of each type of term. +fn separate_terms(terms: Vec) -> (Vec, Vec) { + terms + .into_iter() + .fold((Vec::new(), Vec::new()), |mut v, t| { + match t { + Term::EventType(et) => v.0.push(et), + Term::Condition(c) => v.1.push(c), + } + v + }) +} + +impl FromStr for Query { + type Err = Error; + + fn from_str(s: &str) -> Result { + let (event_types, conditions) = separate_terms( + query_parser::query(s) + .map_err(|e| Error::invalid_params(&format!("failed to parse query: {}", e)))?, + ); + if event_types.len() > 1 { + return Err(Error::invalid_params( + "tm.event can only be used once in a query", + )); + } + Ok(Query { + event_type: event_types.first().cloned(), + conditions, + }) + } +} + fn join(f: &mut fmt::Formatter<'_>, separator: S, iterable: I) -> fmt::Result where S: fmt::Display, @@ -219,6 +380,21 @@ impl fmt::Display for EventType { } } +impl FromStr for EventType { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "NewBlock" => Ok(Self::NewBlock), + "Tx" => Ok(Self::Tx), + invalid => Err(Error::invalid_params(&format!( + "unrecognized event type: {}", + invalid + ))), + } + } +} + /// The different types of conditions supported by a [`Query`]. /// /// [`Query`]: struct.Query.html @@ -279,8 +455,8 @@ impl fmt::Display for Operand { Operand::Signed(i) => write!(f, "{}", i), Operand::Unsigned(u) => write!(f, "{}", u), Operand::Float(h) => write!(f, "{}", h), - Operand::Date(d) => write!(f, "{}", escape(&d.format("%Y-%m-%d").to_string())), - Operand::DateTime(dt) => write!(f, "{}", escape(&dt.to_rfc3339())), + Operand::Date(d) => write!(f, "DATE {}", d.format("%Y-%m-%d").to_string()), + Operand::DateTime(dt) => write!(f, "TIME {}", dt.to_rfc3339()), } } } @@ -402,7 +578,7 @@ fn escape(s: &str) -> String { #[cfg(test)] mod test { use super::*; - use chrono::NaiveDate; + use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; #[test] fn empty_query() { @@ -464,7 +640,7 @@ mod test { "some_date", Date::from_utc(NaiveDate::from_ymd(2020, 9, 24), Utc), ); - assert_eq!("some_date = '2020-09-24'", query.to_string()); + assert_eq!("some_date = DATE 2020-09-24", query.to_string()); } #[test] @@ -474,7 +650,7 @@ mod test { DateTime::parse_from_rfc3339("2020-09-24T10:17:23-04:00").unwrap(), ); assert_eq!( - "some_date_time = '2020-09-24T14:17:23+00:00'", + "some_date_time = TIME 2020-09-24T14:17:23+00:00", query.to_string() ); } @@ -500,4 +676,203 @@ mod test { query.to_string() ); } + + #[test] + fn query_event_type_parsing() { + // Test the empty query (that matches all possible events) + let query = Query::from_str("").unwrap(); + assert_eq!(query, Query::default()); + + // With just one event type + let query = Query::from_str("tm.event='Tx'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert!(query.conditions.is_empty()); + let query = Query::from_str("tm.event='NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // One event type, with whitespace + let query = Query::from_str("tm.event = 'NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // Two event types are not allowed + assert!(Query::from_str("tm.event='Tx' AND tm.event='NewBlock'").is_err()); + } + + #[test] + fn query_string_term_parsing() { + // Query with string term + let query = Query::from_str("tm.event='Tx' AND transfer.sender='AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + // Query with string term, with extra whitespace + let query = Query::from_str("tm.event = 'Tx' AND transfer.sender = 'AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + } + + #[test] + fn query_unsigned_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND tx.height = 10").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("tx.height".to_owned(), Operand::Unsigned(10))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND tx.height <= 100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "tx.height".to_owned(), + Operand::Unsigned(100) + )] + ); + } + + #[test] + fn query_signed_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.value = -1").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("some.value".to_owned(), Operand::Signed(-1))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND some.value <= -100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.value".to_owned(), + Operand::Signed(-100) + )] + ); + } + + #[test] + fn query_date_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.date <= DATE 2022-02-03").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.date".to_owned(), + Operand::Date(Date::from_utc(NaiveDate::from_ymd(2022, 2, 3), Utc)) + )] + ); + } + + #[test] + fn query_datetime_parsing() { + let query = + Query::from_str("tm.event = 'Tx' AND some.datetime = TIME 2021-02-26T17:05:02.1495Z") + .unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "some.datetime".to_owned(), + Operand::DateTime(DateTime::from_utc( + NaiveDateTime::new( + NaiveDate::from_ymd(2021, 2, 26), + NaiveTime::from_hms_nano(17, 5, 2, 149500000) + ), + Utc + )) + )] + ) + } + + #[test] + fn query_conditions() { + let query = Query::from_str("some.field = 'string'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Eq( + "some.field".to_owned(), + Operand::String("string".to_owned()) + )] + } + ); + + let query = Query::from_str("some.field < 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field <= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field > 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field >= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field CONTAINS 'inner'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Contains( + "some.field".to_owned(), + "inner".to_owned() + )] + } + ); + + let query = Query::from_str("some.field EXISTS").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Exists("some.field".to_owned())] + } + ); + } } From 514d73cd8dfcdd66368df8b2e651b4234deaac63 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 26 Feb 2021 17:42:42 -0500 Subject: [PATCH 11/28] Add support for subscription to RPC CLI Signed-off-by: Thane Thomson --- rpc/src/client/bin/main.rs | 85 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 3 deletions(-) diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs index 6af75710e..37f82c070 100644 --- a/rpc/src/client/bin/main.rs +++ b/rpc/src/client/bin/main.rs @@ -1,12 +1,14 @@ //! CLI for performing simple interactions against a Tendermint node's RPC. +use futures::StreamExt; use std::str::FromStr; +use std::time::Duration; use structopt::StructOpt; use tendermint::abci::{Path, Transaction}; use tendermint::net::Address; use tendermint_rpc::query::Query; use tendermint_rpc::{ - Client, Error, HttpClient, HttpsClient, Order, Result, SecureWebSocketClient, + Client, Error, HttpClient, HttpsClient, Order, Result, SecureWebSocketClient, Subscription, SubscriptionClient, Terminate, WebSocketClient, WebSocketClientDriver, }; use tracing::level_filters::LevelFilter; @@ -40,7 +42,17 @@ enum Request { #[structopt(flatten)] ClientRequest(ClientRequest), /// Subscribe to receive events produced by a specific query. - Subscribe, + Subscribe { + /// The query against which events will be matched. + query: Query, + /// The maximum number of events to receive before terminating. + #[structopt(long)] + max_events: Option, + /// The maximum amount of time (in seconds) to listen for events before + /// terminating. + #[structopt(long)] + max_time: Option, + }, } #[derive(Debug, StructOpt)] @@ -219,7 +231,11 @@ where let result = match req { Request::ClientRequest(r) => client_request(&client, r).await, - Request::Subscribe => unimplemented!(), + Request::Subscribe { + query, + max_events, + max_time, + } => subscription_client_request(&client, query, max_events, max_time).await, }; client.terminate()?; @@ -309,3 +325,66 @@ where println!("{}", result); Ok(()) } + +async fn subscription_client_request( + client: &C, + query: Query, + max_events: Option, + max_time: Option, +) -> Result<()> +where + C: SubscriptionClient, +{ + info!("Creating subcription for query: {}", query); + let subs = client.subscribe(query).await?; + match max_time { + Some(secs) => recv_events_with_timeout(subs, max_events, secs).await, + None => recv_events(subs, max_events).await, + } +} + +async fn recv_events_with_timeout( + mut subs: Subscription, + max_events: Option, + timeout_secs: u32, +) -> Result<()> { + let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64)); + let mut event_count = 0u64; + tokio::pin!(timeout); + loop { + tokio::select! { + Some(result) = subs.next() => { + let event = result?; + println!("{}", serde_json::to_string_pretty(&event)?); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + _ = &mut timeout => { + info!("Reached event receive timeout of {} seconds", timeout_secs); + return Ok(()) + } + } + } +} + +async fn recv_events(mut subs: Subscription, max_events: Option) -> Result<()> { + let mut event_count = 0u64; + while let Some(result) = subs.next().await { + let event = result?; + println!("{}", serde_json::to_string_pretty(&event)?); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + info!("The server terminated the subscription"); + Ok(()) +} From a046ba1682b592393560b70b748b81a1c9855c39 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 26 Feb 2021 17:58:59 -0500 Subject: [PATCH 12/28] Add support for parsing floating point numbers in RPC queries Signed-off-by: Thane Thomson --- rpc/src/query.rs | 54 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 4c94fe208..10b743693 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -239,6 +239,12 @@ peg::parser! { rule datetime() -> &'input str = dt:$(date() "T" time()) { dt } + rule float() -> f64 + = s:$("-"? ['0'..='9']+ "." ['0'..='9']+) {? + f64::from_str(s) + .map_err(|_| "failed to parse as a 64-bit floating point number") + } + rule string_op() -> Operand = s:string() { Operand::String(s.to_owned()) } @@ -262,11 +268,14 @@ peg::parser! { Ok(Operand::Date(Date::from_utc(naive_date, Utc))) } + rule float_op() -> Operand + = f:float() { Operand::Float(f) } + rule tag() -> &'input str = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) rule operand() -> Operand - = datetime_op() / date_op() / string_op() / signed_op() / unsigned_op() + = datetime_op() / date_op() / string_op() / float_op() / signed_op() / unsigned_op() rule eq() -> Condition = t:tag() _ "=" _ op:operand() { Condition::Eq(t.to_owned(), op) } @@ -798,6 +807,49 @@ mod test { ) } + #[test] + fn query_float_parsing() { + // Positive floating point number + let query = Query::from_str("short.pi = 3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + + // Negative floating point number + let query = Query::from_str("short.pi = -3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, -std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + } + + // From https://stackoverflow.com/a/41447964/1156132 + fn floats_eq(a: f64, b: f64, precision: u8) -> bool { + let factor = 10.0f64.powi(precision as i32); + let a = (a * factor).trunc(); + let b = (b * factor).trunc(); + a == b + } + #[test] fn query_conditions() { let query = Query::from_str("some.field = 'string'").unwrap(); From 38c381ffd7c1eea4622df1e0643f000c70695f61 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Sat, 27 Feb 2021 07:05:16 -0500 Subject: [PATCH 13/28] Add termination option for subscriptions in CLI app Signed-off-by: Thane Thomson --- rpc/src/client/bin/main.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs index 37f82c070..f23509527 100644 --- a/rpc/src/client/bin/main.rs +++ b/rpc/src/client/bin/main.rs @@ -353,7 +353,14 @@ async fn recv_events_with_timeout( tokio::pin!(timeout); loop { tokio::select! { - Some(result) = subs.next() => { + result_opt = subs.next() => { + let result = match result_opt { + Some(r) => r, + None => { + info!("The server terminated the subscription"); + return Ok(()); + } + }; let event = result?; println!("{}", serde_json::to_string_pretty(&event)?); event_count += 1; From 2a02d207fce6a3685fd368cb680e4f2b95037b14 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 10:07:29 -0500 Subject: [PATCH 14/28] Update docs to describe CLI tool Signed-off-by: Thane Thomson --- rpc/README.md | 68 ++++++++++++++++++++++++++++++++++++++++++-------- rpc/src/lib.rs | 2 +- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/rpc/README.md b/rpc/README.md index f35f68f77..273e865fd 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -21,16 +21,64 @@ This crate optionally provides access to different types of RPC client functionality and different client transports based on which features you select when using it. -Two features are provided at present. - -* `http-client` - Provides `HttpClient`, which is a basic RPC client that - interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This - client does not provide `Event` subscription functionality. See the - [Tendermint RPC] for more details. -* `websocket-client` - Provides `WebSocketClient`, which provides full - client functionality, including general RPC functionality (such as that - provided by `HttpClient`) as well as `Event` subscription - functionality. +Several client-related features are provided at present: + +* `http-client` - Provides `HttpClient` and `HttpsClient`, which are + basic RPC clients that interact with remote Tendermint nodes via + **JSON-RPC over HTTP or HTTPS**. This client does not provide + `Event` subscription functionality. See the [Tendermint RPC] for + more details. +* `websocket-client` - Provides `WebSocketClient` and + `SecureWebSocketClient`, which provide full client functionality, + including general RPC functionality as well as `Event` + subscription functionality. + +### CLI + +A `tendermint-rpc` console application is provided for testing/experimentation +purposes. To build this application, from the `rpc` crate's directory: + +```bash +cargo build --bin tendermint-rpc --features cli + +# To run directly and show usage information +cargo run --bin tendermint-rpc --features cli -- --help + +# To install the binary to your Cargo binaries path +# (should be globally accessible) +cargo install --bin tendermint-rpc --features cli --path . +``` + +The application sends its logs to **stderr** and its output to **stdout**, so +it's relatively easy to capture RPC output. + +**Usage examples:** (assuming you've installed the binary) + +```bash +# Check which RPC commands/endpoints are supported. +tendermint-rpc --help + +# Query the status of the Tendermint node bound to tcp://127.0.0.1:26657 +tendermint-rpc status + +# Submit a transaction to the key/value store ABCI app via a Tendermint node +# bound to tcp://127.0.0.1:26657 +tendermint-rpc broadcast-tx-async somekey=somevalue + +# Query the value associated with key "somekey" (still assuming a key/value +# store ABCI app) +tendermint-rpc abci-query somekey + +# Subscribe to receive new blocks (must use the WebSocket endpoint) +# Prints out all incoming events +tendermint-rpc -u ws://127.0.0.1:26657 subscribe "tm.event='NewBlock'" + +# If you want to execute a number of queries against a specific endpoint and +# don't feel like re-typing the URL over and over again, just set the +# TENDERMINT_RPC_URL environment variable +export TENDERMINT_RPC_URL=ws://127.0.0.1:26657 +tendermint-rpc subscribe "tm.event='Tx'" +``` ### Mock Clients diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8f619162e..d96867a30 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -6,7 +6,7 @@ //! functionality and different client transports based on which features you //! select when using it. //! -//! Two features are provided at present: +//! Several client-related features are provided at present: //! //! * `http-client` - Provides [`HttpClient`] and [`HttpsClient`], which are //! basic RPC clients that interact with remote Tendermint nodes via From 918a9285010e2979fac48b439f4c44a353c231da Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 17:43:25 -0500 Subject: [PATCH 15/28] Refactor RPC to allow for generic URLs This commit refactors the RPC interface, while trying to reduce the amount of downstream impact as far as possible, with the aim of allowing the user to specify a generic URL as input to the HTTP and WebSocket clients. This allows the client to infer whether or not to use a secure connection based on the supplied URL. Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 9 +- rpc/src/client.rs | 13 +- rpc/src/client/bin/main.rs | 98 ++---- rpc/src/client/subscription.rs | 4 + rpc/src/client/transport.rs | 1 - rpc/src/client/transport/http.rs | 314 +++++++++++++----- rpc/src/client/transport/mock.rs | 4 + rpc/src/client/transport/utils.rs | 16 - rpc/src/client/transport/websocket.rs | 440 +++++++++++++++++--------- rpc/src/error.rs | 13 + rpc/src/lib.rs | 30 +- rpc/src/url.rs | 132 ++++++++ 12 files changed, 733 insertions(+), 341 deletions(-) delete mode 100644 rpc/src/client/transport/utils.rs create mode 100644 rpc/src/url.rs diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 28504dc9f..6228b5984 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -34,7 +34,6 @@ cli = [ "http-client", "structopt", "tracing-subscriber", - "url", "websocket-client" ] http-client = [ @@ -42,10 +41,12 @@ http-client = [ "futures", "http", "hyper", + "hyper-proxy", "hyper-rustls", "tokio/fs", "tokio/macros", - "tracing" + "tracing", + "url" ] secp256k1 = [ "tendermint/secp256k1" ] websocket-client = [ @@ -57,7 +58,8 @@ websocket-client = [ "tokio/macros", "tokio/sync", "tokio/time", - "tracing" + "tracing", + "url" ] [dependencies] @@ -83,6 +85,7 @@ async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rust futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } +hyper-proxy = { version = "0.9", optional = true } hyper-rustls = { version = "0.22.1", optional = true } structopt = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true } diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 1b9d748f4..b6708575d 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -8,11 +8,9 @@ mod transport; pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher}; #[cfg(feature = "http-client")] -pub use transport::http::{HttpClient, HttpsClient, HyperClient}; +pub use transport::http::HttpClient; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{ - AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, -}; +pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; use crate::endpoint::*; use crate::query::Query; @@ -185,10 +183,3 @@ pub trait Client { where R: SimpleRequest; } - -/// Applicable to transports whose connection can/must be terminated. -// TODO(thane): Replace with a Closeable trait once we can make breaking API changes. -pub trait Terminate { - /// Signal to the underlying transport to terminate. - fn terminate(self) -> Result<()>; -} diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs index f23509527..57ddb442f 100644 --- a/rpc/src/client/bin/main.rs +++ b/rpc/src/client/bin/main.rs @@ -5,15 +5,13 @@ use std::str::FromStr; use std::time::Duration; use structopt::StructOpt; use tendermint::abci::{Path, Transaction}; -use tendermint::net::Address; use tendermint_rpc::query::Query; use tendermint_rpc::{ - Client, Error, HttpClient, HttpsClient, Order, Result, SecureWebSocketClient, Subscription, - SubscriptionClient, Terminate, WebSocketClient, WebSocketClientDriver, + Client, Error, HttpClient, Order, Result, Scheme, Subscription, SubscriptionClient, Url, + WebSocketClient, }; use tracing::level_filters::LevelFilter; -use tracing::{error, info, warn}; -use url::Url; +use tracing::{error, info}; /// CLI for performing simple interactions against a Tendermint node's RPC. /// @@ -29,6 +27,12 @@ struct Opt { )] url: Url, + /// An optional HTTP/S proxy through which to submit requests to the + /// Tendermint node's RPC endpoint. Only available for HTTP/HTTPS endpoints + /// (i.e. WebSocket proxies are not supported). + #[structopt(long, env = "HTTP_PROXY")] + proxy_url: Option, + /// Increase output logging verbosity to DEBUG level. #[structopt(short, long)] verbose: bool, @@ -154,31 +158,14 @@ async fn main() { .with_writer(std::io::stderr) .init(); - let host = match opt.url.host_str() { - Some(h) => h, - None => { - error!("Missing host in URL: {}", opt.url); - std::process::exit(-1); - } - }; - let port = opt.url.port_or_known_default().unwrap_or(26657); - if opt.url.path().len() > 1 { - warn!("URL paths are ignored at present: {}", opt.url.path()); - } - let address = Address::Tcp { - peer_id: None, - host: host.to_owned(), - port, - }; let result = match opt.url.scheme() { - "http" => http_request(address, opt.req).await, - "https" => https_request(address, opt.req).await, - "ws" => websocket_request(address, opt.req).await, - "wss" => secure_websocket_request(address, opt.req).await, - scheme => Err(Error::invalid_params(&format!( - "unsupported RPC endpoint scheme: {}", - scheme - ))), + Scheme::Http | Scheme::Https => http_request(opt.url, opt.proxy_url, opt.req).await, + Scheme::WebSocket | Scheme::SecureWebSocket => match opt.proxy_url { + Some(_) => Err(Error::invalid_params( + "proxies are only supported for use with HTTP clients at present", + )), + None => websocket_request(opt.url, opt.req).await, + }, }; if let Err(e) = result { error!("Failed: {}", e); @@ -186,47 +173,30 @@ async fn main() { } } -async fn http_request(address: Address, req: Request) -> Result<()> { - info!("Using HTTP client to submit request to: {}", address); - let client = HttpClient::new(address)?; - match req { - Request::ClientRequest(r) => client_request(&client, r).await, - _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)")) - } -} +async fn http_request(url: Url, proxy_url: Option, req: Request) -> Result<()> { + let client = match proxy_url { + Some(proxy_url) => { + info!( + "Using HTTP client with proxy {} to submit request to {}", + proxy_url, url + ); + HttpClient::new_with_proxy(url, proxy_url) + } + None => { + info!("Using HTTP client to submit request to: {}", url); + HttpClient::new(url) + } + }?; -async fn https_request(address: Address, req: Request) -> Result<()> { - info!("Using HTTPS client to submit request to: {}", address); - let client = HttpsClient::new(address)?; match req { Request::ClientRequest(r) => client_request(&client, r).await, _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)")) } } -async fn websocket_request(address: Address, req: Request) -> Result<()> { - info!("Using WebSocket client to submit request to: {}", address); - let (client, driver) = WebSocketClient::new(address).await?; - run_websocket_request(client, driver, req).await -} - -async fn secure_websocket_request(address: Address, req: Request) -> Result<()> { - info!( - "Using secure WebSocket client to submit request to: {}", - address - ); - let (client, driver) = SecureWebSocketClient::new(address).await?; - run_websocket_request(client, driver, req).await -} - -async fn run_websocket_request( - client: C, - driver: WebSocketClientDriver, - req: Request, -) -> Result<()> -where - C: Client + SubscriptionClient + Terminate + Sync, -{ +async fn websocket_request(url: Url, req: Request) -> Result<()> { + info!("Using WebSocket client to submit request to: {}", url); + let (client, driver) = WebSocketClient::new(url).await?; let driver_hdl = tokio::spawn(async move { driver.run().await }); let result = match req { @@ -238,7 +208,7 @@ where } => subscription_client_request(&client, query, max_events, max_time).await, }; - client.terminate()?; + client.close()?; driver_hdl .await .map_err(|e| Error::client_internal_error(e.to_string()))??; diff --git a/rpc/src/client/subscription.rs b/rpc/src/client/subscription.rs index 5bf158b5a..4d69d601d 100644 --- a/rpc/src/client/subscription.rs +++ b/rpc/src/client/subscription.rs @@ -27,6 +27,10 @@ pub trait SubscriptionClient { /// /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html async fn unsubscribe(&self, query: Query) -> Result<()>; + + /// Subscription clients will usually have long-running underlying + /// transports that will need to be closed at some point. + fn close(self) -> Result<()>; } pub(crate) type SubscriptionTx = ChannelTx>; diff --git a/rpc/src/client/transport.rs b/rpc/src/client/transport.rs index 3aa8c649b..aa00e6241 100644 --- a/rpc/src/client/transport.rs +++ b/rpc/src/client/transport.rs @@ -2,7 +2,6 @@ pub mod mock; mod router; -mod utils; #[cfg(feature = "http-client")] pub mod http; diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 800b963aa..5eeaf108e 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -1,27 +1,19 @@ //! HTTP-based transport for Tendermint RPC Client. +use crate::client::Client; +use crate::{Error, Result, Scheme, SimpleRequest, Url}; use async_trait::async_trait; -use hyper::body::Buf; -use hyper::{header, Uri}; - +use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; use tendermint::net; -use crate::client::transport::utils::get_tcp_host_port; -use crate::{Client, Error, Response, Result, SimpleRequest}; -use hyper::client::connect::Connect; -use hyper::client::HttpConnector; -use hyper_rustls::HttpsConnector; -use std::convert::TryInto; -use std::io::Read; - -/// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]). +/// A JSON-RPC/HTTP Tendermint RPC client (implements [`crate::Client`]). +/// +/// Supports both HTTP and HTTPS connections to Tendermint RPC endpoints. /// /// Does not provide [`crate::event::Event`] subscription facilities (see /// [`crate::WebSocketClient`] for a client that does). /// -/// Does not provide any security. For a JSON-RPC/HTTPS client, see -/// [`HttpsClient`]. -/// /// ## Examples /// /// ```rust,ignore @@ -29,7 +21,7 @@ use std::io::Read; /// /// #[tokio::main] /// async fn main() { -/// let client = HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()) +/// let client = HttpClient::new("http://127.0.0.1:26657") /// .unwrap(); /// /// let abci_info = client.abci_info() @@ -39,102 +31,250 @@ use std::io::Read; /// println!("Got ABCI info: {:?}", abci_info); /// } /// ``` -pub type HttpClient = HyperClient; +#[derive(Debug, Clone)] +pub struct HttpClient { + inner: sealed::HttpClient, +} -/// A JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. -/// -/// Similar to [`HttpClient`], but allows for connection to the RPC endpoint -/// via HTTPS. -pub type HttpsClient = HyperClient>; +impl HttpClient { + /// Construct a new Tendermint RPC HTTP/S client connecting to the given + /// URL. + pub fn new(url: U) -> Result + where + U: TryInto, + { + let url = url.try_into()?; + Ok(Self { + inner: if url.0.is_secure() { + sealed::HttpClient::new_https(url.try_into()?) + } else { + sealed::HttpClient::new_http(url.try_into()?) + }, + }) + } -/// A [`hyper`]-based Tendermint RPC client. -/// -/// Generic over the connector type used for the client. -/// -/// [`hyper`]: https://hyper.rs/ -#[derive(Debug, Clone)] -pub struct HyperClient { - uri: Uri, - inner: hyper::Client, + /// Construct a new Tendermint RPC HTTP/S client connecting to the given + /// URL, but via the specified proxy's URL. + pub fn new_with_proxy(url: U, proxy_url: P) -> Result + where + U: TryInto, + P: TryInto, + { + let url = url.try_into()?; + let proxy_url = proxy_url.try_into()?; + Ok(Self { + inner: if proxy_url.0.is_secure() { + sealed::HttpClient::new_https_proxy(url.try_into()?, proxy_url.try_into()?)? + } else { + sealed::HttpClient::new_http_proxy(url.try_into()?, proxy_url.try_into()?)? + }, + }) + } } #[async_trait] -impl Client for HyperClient -where - C: Connect + Clone + Send + Sync + 'static, -{ +impl Client for HttpClient { async fn perform(&self, request: R) -> Result where R: SimpleRequest, { - let request = self.build_request(request)?; - let response = self.inner.request(request).await?; - let response_body = response_to_string(response).await?; - tracing::debug!("Incoming response: {}", response_body); - R::Response::from_string(&response_body) + self.inner.perform(request).await + } +} + +/// A URL limited to use with HTTP clients. +/// +/// Facilitates useful type conversions and inferences. +#[derive(Debug, Clone)] +pub struct HttpClientUrl(Url); + +impl TryFrom for HttpClientUrl { + type Error = Error; + + fn try_from(value: Url) -> Result { + match value.scheme() { + Scheme::Http | Scheme::Https => Ok(Self(value)), + _ => Err(Error::invalid_params(&format!( + "cannot use URL {} with HTTP clients", + value + ))), + } } } -impl HyperClient { - /// Create a new JSON-RPC/HTTP Tendermint RPC client. - pub fn new(address: net::Address) -> Result { - Self::new_with_scheme(address, "http", hyper::Client::new()) +impl FromStr for HttpClientUrl { + type Err = Error; + + fn from_str(s: &str) -> Result { + let url: Url = s.parse()?; + url.try_into() } } -impl HyperClient> { - /// Create a new JSON-RPC/HTTPS (i.e. HTTP/TLS) Tendermint RPC client. - pub fn new(address: net::Address) -> Result { - Self::new_with_scheme( - address, - "https", - hyper::Client::builder().build(HttpsConnector::with_native_roots()), - ) +impl TryFrom<&str> for HttpClientUrl { + type Error = Error; + + fn try_from(value: &str) -> Result { + value.parse() } } -impl HyperClient { - fn new_with_scheme( - address: net::Address, - scheme: &str, - inner: hyper::Client, - ) -> Result { - let (host, port) = get_tcp_host_port(address)?; - Ok(Self { - uri: format!("{}://{}:{}/", scheme, host, port).try_into()?, - inner, - }) +impl TryFrom for HttpClientUrl { + type Error = Error; + + fn try_from(value: net::Address) -> Result { + match value { + net::Address::Tcp { + peer_id: _, + host, + port, + } => format!("http://{}:{}", host, port).parse(), + net::Address::Unix { .. } => Err(Error::invalid_params( + "only TCP-based node addresses are supported", + )), + } } +} + +impl TryFrom for hyper::Uri { + type Error = Error; - fn build_request(&self, request: R) -> Result> { - let request_body = request.into_json(); + fn try_from(value: HttpClientUrl) -> Result { + Ok(value.0.to_string().parse()?) + } +} - let mut request = hyper::Request::builder() - .method("POST") - .uri(&self.uri) - .body(hyper::Body::from(request_body.into_bytes()))?; +mod sealed { + use crate::{Error, Response, Result, SimpleRequest}; + use hyper::body::Buf; + use hyper::client::connect::Connect; + use hyper::client::HttpConnector; + use hyper::{header, Uri}; + use hyper_proxy::{Intercept, Proxy, ProxyConnector}; + use hyper_rustls::HttpsConnector; + use std::io::Read; + /// A wrapper for a `hyper`-based client, generic over the connector type. + #[derive(Debug, Clone)] + pub struct HyperClient { + uri: Uri, + inner: hyper::Client, + } + + impl HyperClient { + pub fn new(uri: Uri, inner: hyper::Client) -> Self { + Self { uri, inner } + } + } + + impl HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, { - let headers = request.headers_mut(); - headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); - headers.insert( - header::USER_AGENT, - format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) - .parse() - .unwrap(), - ); + let request = self.build_request(request)?; + let response = self.inner.request(request).await?; + let response_body = response_to_string(response).await?; + tracing::debug!("Incoming response: {}", response_body); + R::Response::from_string(&response_body) } + } - Ok(request) + impl HyperClient { + /// Build a request using the given Tendermint RPC request. + pub fn build_request( + &self, + request: R, + ) -> Result> { + let request_body = request.into_json(); + + let mut request = hyper::Request::builder() + .method("POST") + .uri(&self.uri) + .body(hyper::Body::from(request_body.into_bytes()))?; + + { + let headers = request.headers_mut(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert( + header::USER_AGENT, + format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) + .parse() + .unwrap(), + ); + } + + Ok(request) + } + } + + /// We offer several variations of `hyper`-based client. + /// + /// Here we erase the type signature of the underlying `hyper`-based + /// client, allowing the higher-level HTTP client to operate via HTTP or + /// HTTPS, and with or without a proxy. + #[derive(Debug, Clone)] + pub enum HttpClient { + Http(HyperClient), + Https(HyperClient>), + HttpProxy(HyperClient>), + HttpsProxy(HyperClient>>), + } + + impl HttpClient { + pub fn new_http(uri: Uri) -> Self { + Self::Http(HyperClient::new(uri, hyper::Client::new())) + } + + pub fn new_https(uri: Uri) -> Self { + Self::Https(HyperClient::new( + uri, + hyper::Client::builder().build(HttpsConnector::with_native_roots()), + )) + } + + pub fn new_http_proxy(uri: Uri, proxy_uri: Uri) -> Result { + let proxy = Proxy::new(Intercept::All, proxy_uri); + let proxy_connector = ProxyConnector::from_proxy(HttpConnector::new(), proxy)?; + Ok(Self::HttpProxy(HyperClient::new( + uri, + hyper::Client::builder().build(proxy_connector), + ))) + } + + pub fn new_https_proxy(uri: Uri, proxy_uri: Uri) -> Result { + let proxy = Proxy::new(Intercept::All, proxy_uri); + let proxy_connector = + ProxyConnector::from_proxy(HttpsConnector::with_native_roots(), proxy)?; + Ok(Self::HttpsProxy(HyperClient::new( + uri, + hyper::Client::builder().build(proxy_connector), + ))) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + match self { + HttpClient::Http(c) => c.perform(request).await, + HttpClient::Https(c) => c.perform(request).await, + HttpClient::HttpProxy(c) => c.perform(request).await, + HttpClient::HttpsProxy(c) => c.perform(request).await, + } + } } -} -async fn response_to_string(response: hyper::Response) -> Result { - let mut response_body = String::new(); - hyper::body::aggregate(response.into_body()) - .await? - .reader() - .read_to_string(&mut response_body) - .map_err(|_| Error::client_internal_error("failed to read response body to string"))?; - Ok(response_body) + async fn response_to_string(response: hyper::Response) -> Result { + let mut response_body = String::new(); + hyper::body::aggregate(response.into_body()) + .await? + .reader() + .read_to_string(&mut response_body) + .map_err(|_| Error::client_internal_error("failed to read response body to string"))?; + Ok(response_body) + } } diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index 04bad190e..9bd46fc6c 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -110,6 +110,10 @@ impl SubscriptionClient for MockClient { .send(DriverCommand::Unsubscribe { query, result_tx })?; result_rx.recv().await.unwrap() } + + fn close(self) -> Result<()> { + Ok(()) + } } #[derive(Debug)] diff --git a/rpc/src/client/transport/utils.rs b/rpc/src/client/transport/utils.rs deleted file mode 100644 index 16419b1e0..000000000 --- a/rpc/src/client/transport/utils.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! Client transport-related utilities. - -use crate::{Error, Result}; -use tendermint::net; - -/// Convenience method to extract the host and port associated with the given -/// address, but only if it's a TCP address (it fails otherwise). -pub(crate) fn get_tcp_host_port(address: net::Address) -> Result<(String, u16)> { - match address { - net::Address::Tcp { host, port, .. } => Ok((host, port)), - other => Err(Error::invalid_params(&format!( - "invalid RPC address: {:?}", - other - ))), - } -} diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index ff2434e96..7e728ef35 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -1,21 +1,18 @@ //! WebSocket-based clients for accessing Tendermint RPC functionality. use crate::client::subscription::SubscriptionTx; -use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; +use crate::client::sync::{ChannelRx, ChannelTx}; use crate::client::transport::router::{PublishResult, SubscriptionRouter}; -use crate::client::transport::utils::get_tcp_host_port; -use crate::client::Terminate; use crate::endpoint::{subscribe, unsubscribe}; use crate::event::Event; use crate::query::Query; use crate::request::Wrapper; -use crate::utils::uuid_str; use crate::{ - response, Client, Error, Id, Request, Response, Result, SimpleRequest, Subscription, - SubscriptionClient, + response, Client, Error, Id, Request, Response, Result, Scheme, SimpleRequest, Subscription, + SubscriptionClient, Url, }; use async_trait::async_trait; -use async_tungstenite::tokio::{connect_async, connect_async_with_tls_connector, ConnectStream}; +use async_tungstenite::tokio::ConnectStream; use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use async_tungstenite::tungstenite::protocol::CloseFrame; use async_tungstenite::tungstenite::Message; @@ -24,7 +21,9 @@ use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; use std::ops::Add; +use std::str::FromStr; use tendermint::net; use tokio::time::{Duration, Instant}; use tracing::{debug, error}; @@ -87,7 +86,7 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// /// #[tokio::main] /// async fn main() { -/// let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) +/// let (client, driver) = WebSocketClient::new("ws://127.0.0.1:26657/websocket") /// .await /// .unwrap(); /// let driver_handle = tokio::spawn(async move { driver.run().await }); @@ -121,165 +120,318 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// ``` /// /// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 -pub type WebSocketClient = AsyncTungsteniteClient; +#[derive(Debug, Clone)] +pub struct WebSocketClient { + inner: sealed::WebSocketClient, +} -/// Similar to [`WebSocketClient`], but facilitates connectivity over TLS. -pub type SecureWebSocketClient = AsyncTungsteniteClient; +impl WebSocketClient { + /// Construct a new WebSocket-based client connecting to the given + /// Tendermint node's RPC endpoint. + /// + /// Supports both `ws://` and `wss://` protocols. + pub async fn new(url: U) -> Result<(Self, WebSocketClientDriver)> + where + U: TryInto, + { + let url = url.try_into()?; + let (inner, driver) = if url.0.is_secure() { + sealed::WebSocketClient::new_secure(url.0).await? + } else { + sealed::WebSocketClient::new_unsecure(url.0).await? + }; + Ok((Self { inner }, driver)) + } +} -/// Marker for the [`AsyncTungsteniteClient`] for clients operating over -/// unsecure connections. -#[derive(Debug, Clone)] -pub struct Unsecure; +#[async_trait] +impl Client for WebSocketClient { + async fn perform(&self, request: R) -> Result<::Response> + where + R: SimpleRequest, + { + self.inner.perform(request).await + } +} -/// Marker for the [`AsyncTungsteniteClient`] for clients operating over -/// secure connections. -#[derive(Debug, Clone)] -pub struct Secure; +#[async_trait] +impl SubscriptionClient for WebSocketClient { + async fn subscribe(&self, query: Query) -> Result { + self.inner.subscribe(query).await + } -/// An [`async-tungstenite`]-based WebSocket client. -/// -/// Different modes of operation (secure and unsecure) are facilitated by -/// different variants of this type. + async fn unsubscribe(&self, query: Query) -> Result<()> { + self.inner.unsubscribe(query).await + } + + fn close(self) -> Result<()> { + self.inner.close() + } +} + +/// A URL limited to use with WebSocket clients. /// -/// [`async-tungstenite`]: https://crates.io/crates/async-tungstenite +/// Facilitates useful type conversions and inferences. #[derive(Debug, Clone)] -pub struct AsyncTungsteniteClient { - cmd_tx: ChannelTx, - _client_type: std::marker::PhantomData, +pub struct WebSocketClientUrl(Url); + +impl TryFrom for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: Url) -> Result { + match value.scheme() { + Scheme::WebSocket | Scheme::SecureWebSocket => Ok(Self(value)), + _ => Err(Error::invalid_params(&format!( + "cannot use URL {} with WebSocket clients", + value + ))), + } + } } -impl AsyncTungsteniteClient { - /// Construct a WebSocket client. Immediately attempts to open a WebSocket - /// connection to the node with the given address. - /// - /// On success, this returns both a client handle (a `WebSocketClient` - /// instance) as well as the WebSocket connection driver. The execution of - /// this driver becomes the responsibility of the client owner, and must be - /// executed in a separate asynchronous context to the client to ensure it - /// doesn't block the client. - pub async fn new(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { - let (host, port) = get_tcp_host_port(address)?; - let (stream, _response) = - connect_async(&format!("ws://{}:{}/websocket", &host, port)).await?; - let (cmd_tx, cmd_rx) = unbounded(); - let driver = WebSocketClientDriver::new(stream, cmd_rx); - Ok(( - Self { - cmd_tx, - _client_type: Default::default(), - }, - driver, - )) +impl FromStr for WebSocketClientUrl { + type Err = Error; + + fn from_str(s: &str) -> Result { + let url: Url = s.parse()?; + url.try_into() } } -impl AsyncTungsteniteClient { - /// Construct a WebSocket client. Immediately attempts to open a WebSocket - /// connection to the node with the given address, but over a secure - /// connection. - /// - /// On success, this returns both a client handle (a `WebSocketClient` - /// instance) as well as the WebSocket connection driver. The execution of - /// this driver becomes the responsibility of the client owner, and must be - /// executed in a separate asynchronous context to the client to ensure it - /// doesn't block the client. - pub async fn new(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { - let (host, port) = get_tcp_host_port(address)?; - // Not supplying a connector means async_tungstenite will create the - // connector for us. - let (stream, _response) = - connect_async_with_tls_connector(&format!("wss://{}:{}/websocket", &host, port), None) - .await?; - let (cmd_tx, cmd_rx) = unbounded(); - let driver = WebSocketClientDriver::new(stream, cmd_rx); - Ok(( - Self { - cmd_tx, - _client_type: Default::default(), - }, - driver, - )) +impl TryFrom<&str> for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: &str) -> Result { + value.parse() } } -impl AsyncTungsteniteClient { - fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { - self.cmd_tx.send(cmd).map_err(|e| { - Error::client_internal_error(format!("failed to send command to client driver: {}", e)) - }) +impl TryFrom for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: net::Address) -> Result { + match value { + net::Address::Tcp { + peer_id: _, + host, + port, + } => format!("ws://{}:{}/websocket", host, port).parse(), + net::Address::Unix { .. } => Err(Error::invalid_params( + "only TCP-based node addresses are supported", + )), + } } +} - /// Signals to the driver that it must terminate. - pub fn close(self) -> Result<()> { - self.send_cmd(DriverCommand::Terminate) +mod sealed { + use super::{ + DriverCommand, SimpleRequestCommand, SubscribeCommand, UnsubscribeCommand, + WebSocketClientDriver, + }; + use crate::client::sync::{unbounded, ChannelTx}; + use crate::query::Query; + use crate::request::Wrapper; + use crate::utils::uuid_str; + use crate::{Error, Response, Result, SimpleRequest, Subscription, Url}; + use async_tungstenite::tokio::{connect_async, connect_async_with_tls_connector}; + use tracing::debug; + + /// Marker for the [`AsyncTungsteniteClient`] for clients operating over + /// unsecure connections. + #[derive(Debug, Clone)] + pub struct Unsecure; + + /// Marker for the [`AsyncTungsteniteClient`] for clients operating over + /// secure connections. + #[derive(Debug, Clone)] + pub struct Secure; + + /// An [`async-tungstenite`]-based WebSocket client. + /// + /// Different modes of operation (secure and unsecure) are facilitated by + /// different variants of this type. + /// + /// [`async-tungstenite`]: https://crates.io/crates/async-tungstenite + #[derive(Debug, Clone)] + pub struct AsyncTungsteniteClient { + cmd_tx: ChannelTx, + _client_type: std::marker::PhantomData, } -} -#[async_trait] -impl Client for AsyncTungsteniteClient -where - C: Send + Sync, -{ - async fn perform(&self, request: R) -> Result - where - R: SimpleRequest, - { - let wrapper = Wrapper::new(request); - let id = wrapper.id().clone().to_string(); - let wrapped_request = wrapper.into_json(); - let (response_tx, mut response_rx) = unbounded(); - self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand { - id, - wrapped_request, - response_tx, - }))?; - let response = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - tracing::debug!("Incoming response: {}", response); - R::Response::from_string(response) + impl AsyncTungsteniteClient { + /// Construct a WebSocket client. Immediately attempts to open a WebSocket + /// connection to the node with the given address. + /// + /// On success, this returns both a client handle (a `WebSocketClient` + /// instance) as well as the WebSocket connection driver. The execution of + /// this driver becomes the responsibility of the client owner, and must be + /// executed in a separate asynchronous context to the client to ensure it + /// doesn't block the client. + pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let url = url.to_string(); + debug!("Connecting to unsecure WebSocket endpoint: {}", url); + let (stream, _response) = connect_async(url).await?; + let (cmd_tx, cmd_rx) = unbounded(); + let driver = WebSocketClientDriver::new(stream, cmd_rx); + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) + } } -} -impl Terminate for AsyncTungsteniteClient { - fn terminate(self) -> Result<()> { - self.close() + impl AsyncTungsteniteClient { + /// Construct a WebSocket client. Immediately attempts to open a WebSocket + /// connection to the node with the given address, but over a secure + /// connection. + /// + /// On success, this returns both a client handle (a `WebSocketClient` + /// instance) as well as the WebSocket connection driver. The execution of + /// this driver becomes the responsibility of the client owner, and must be + /// executed in a separate asynchronous context to the client to ensure it + /// doesn't block the client. + pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let url = url.to_string(); + debug!("Connecting to secure WebSocket endpoint: {}", url); + // Not supplying a connector means async_tungstenite will create the + // connector for us. + let (stream, _response) = connect_async_with_tls_connector(url, None).await?; + let (cmd_tx, cmd_rx) = unbounded(); + let driver = WebSocketClientDriver::new(stream, cmd_rx); + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) + } } -} -#[async_trait] -impl SubscriptionClient for AsyncTungsteniteClient -where - C: Send + Sync, -{ - async fn subscribe(&self, query: Query) -> Result { - let (subscription_tx, subscription_rx) = unbounded(); - let (response_tx, mut response_rx) = unbounded(); - // By default we use UUIDs to differentiate subscriptions - let id = uuid_str(); - self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { - id: id.to_string(), - query: query.to_string(), - subscription_tx, - response_tx, - }))?; - // Make sure our subscription request went through successfully. - let _ = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - Ok(Subscription::new(id, query, subscription_rx)) + impl AsyncTungsteniteClient { + fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { + self.cmd_tx.send(cmd).map_err(|e| { + Error::client_internal_error(format!( + "failed to send command to client driver: {}", + e + )) + }) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + let wrapper = Wrapper::new(request); + let id = wrapper.id().clone().to_string(); + let wrapped_request = wrapper.into_json(); + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand { + id, + wrapped_request, + response_tx, + }))?; + let response = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + tracing::debug!("Incoming response: {}", response); + R::Response::from_string(response) + } + + pub async fn subscribe(&self, query: Query) -> Result { + let (subscription_tx, subscription_rx) = unbounded(); + let (response_tx, mut response_rx) = unbounded(); + // By default we use UUIDs to differentiate subscriptions + let id = uuid_str(); + self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { + id: id.to_string(), + query: query.to_string(), + subscription_tx, + response_tx, + }))?; + // Make sure our subscription request went through successfully. + let _ = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + Ok(Subscription::new(id, query, subscription_rx)) + } + + pub async fn unsubscribe(&self, query: Query) -> Result<()> { + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { + query: query.to_string(), + response_tx, + }))?; + let _ = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + Ok(()) + } + + /// Signals to the driver that it must terminate. + pub fn close(self) -> Result<()> { + self.send_cmd(DriverCommand::Terminate) + } } - async fn unsubscribe(&self, query: Query) -> Result<()> { - let (response_tx, mut response_rx) = unbounded(); - self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { - query: query.to_string(), - response_tx, - }))?; - let _ = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - Ok(()) + /// Allows us to erase the type signatures associated with the different + /// WebSocket client variants. + #[derive(Debug, Clone)] + pub enum WebSocketClient { + Unsecure(AsyncTungsteniteClient), + Secure(AsyncTungsteniteClient), + } + + impl WebSocketClient { + pub async fn new_unsecure(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let (client, driver) = AsyncTungsteniteClient::::new(url).await?; + Ok((Self::Unsecure(client), driver)) + } + + pub async fn new_secure(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let (client, driver) = AsyncTungsteniteClient::::new(url).await?; + Ok((Self::Secure(client), driver)) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + match self { + WebSocketClient::Unsecure(c) => c.perform(request).await, + WebSocketClient::Secure(c) => c.perform(request).await, + } + } + + pub async fn subscribe(&self, query: Query) -> Result { + match self { + WebSocketClient::Unsecure(c) => c.subscribe(query).await, + WebSocketClient::Secure(c) => c.subscribe(query).await, + } + } + + pub async fn unsubscribe(&self, query: Query) -> Result<()> { + match self { + WebSocketClient::Unsecure(c) => c.unsubscribe(query).await, + WebSocketClient::Secure(c) => c.unsubscribe(query).await, + } + } + + pub fn close(self) -> Result<()> { + match self { + WebSocketClient::Unsecure(c) => c.close(), + WebSocketClient::Secure(c) => c.close(), + } + } } } @@ -576,6 +728,7 @@ impl WebSocketClientDriver { #[cfg(test)] mod test { use super::*; + use crate::client::sync::unbounded; use crate::query::EventType; use crate::{request, Id, Method}; use async_tungstenite::tokio::{accept_async, TokioAdapter}; @@ -583,6 +736,7 @@ mod test { use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; + use tendermint::net; use tokio::fs; use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; diff --git a/rpc/src/error.rs b/rpc/src/error.rs index cdc280a93..83d23a9db 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -109,6 +109,19 @@ impl Display for Error { } } +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + +#[cfg(any(feature = "http-client", feature = "websocket-client"))] +impl From for Error { + fn from(e: url::ParseError) -> Self { + Error::invalid_params(&e.to_string()) + } +} + #[cfg(feature = "http-client")] impl From for Error { fn from(http_error: http::Error) -> Error { diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index d96867a30..24bb06d0b 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -8,15 +8,14 @@ //! //! Several client-related features are provided at present: //! -//! * `http-client` - Provides [`HttpClient`] and [`HttpsClient`], which are -//! basic RPC clients that interact with remote Tendermint nodes via -//! **JSON-RPC over HTTP or HTTPS**. This client does not provide -//! [`event::Event`] subscription functionality. See the [Tendermint RPC] for -//! more details. -//! * `websocket-client` - Provides [`WebSocketClient`] and -//! [`SecureWebSocketClient`], which provide full client functionality, -//! including general RPC functionality as well as [`event::Event`] -//! subscription functionality. +//! * `http-client` - Provides [`HttpClient`], which is a basic RPC client that +//! interacts with remote Tendermint nodes via **JSON-RPC over HTTP or +//! HTTPS**. This client does not provide [`event::Event`] subscription +//! functionality. See the [Tendermint RPC] for more details. +//! * `websocket-client` - Provides [`WebSocketClient`], which provides full +//! client functionality, including general RPC functionality as well as +//! [`event::Event`] subscription functionality. Can be used over secure +//! (`wss://`) and unsecure (`ws://`) connections. //! //! ### Mock Clients //! @@ -33,15 +32,13 @@ mod client; #[cfg(any(feature = "http-client", feature = "websocket-client"))] pub use client::{ Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, Subscription, - SubscriptionClient, Terminate, + SubscriptionClient, }; -#[cfg(feature = "websocket-client")] -pub use client::{ - AsyncTungsteniteClient, SecureWebSocketClient, WebSocketClient, WebSocketClientDriver, -}; #[cfg(feature = "http-client")] -pub use client::{HttpClient, HttpsClient, HyperClient}; +pub use client::HttpClient; +#[cfg(feature = "websocket-client")] +pub use client::{WebSocketClient, WebSocketClientDriver}; pub mod endpoint; pub mod error; @@ -53,10 +50,11 @@ pub mod query; pub mod request; pub mod response; mod result; +mod url; mod utils; mod version; pub use self::{ error::Error, id::Id, method::Method, order::Order, request::Request, request::SimpleRequest, - response::Response, result::Result, version::Version, + response::Response, result::Result, url::Scheme, url::Url, version::Version, }; diff --git a/rpc/src/url.rs b/rpc/src/url.rs new file mode 100644 index 000000000..bee720c7b --- /dev/null +++ b/rpc/src/url.rs @@ -0,0 +1,132 @@ +//! URL representation for clients. + +use crate::{Error, Result}; +use std::convert::TryFrom; +use std::fmt; +use std::str::FromStr; + +/// The various schemes supported by Tendermint RPC clients. +#[derive(Debug, Clone, Copy)] +pub enum Scheme { + Http, + Https, + WebSocket, + SecureWebSocket, +} + +impl fmt::Display for Scheme { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Scheme::Http => write!(f, "http"), + Scheme::Https => write!(f, "https"), + Scheme::WebSocket => write!(f, "ws"), + Scheme::SecureWebSocket => write!(f, "wss"), + } + } +} + +impl FromStr for Scheme { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(match s { + "http" => Scheme::Http, + "https" => Scheme::Https, + "ws" => Scheme::WebSocket, + "wss" => Scheme::SecureWebSocket, + _ => return Err(Error::invalid_params(&format!("unsupported scheme: {}", s))), + }) + } +} + +/// A uniform resource locator (URL), with support for only those +/// schemes/protocols supported by Tendermint RPC clients. +/// +/// Re-implements relevant parts of [`url::Url`]'s interface with convenience +/// mechanisms for transformation to/from other types. +#[derive(Debug, Clone)] +pub struct Url { + inner: url::Url, + scheme: Scheme, + host: String, + port: u16, +} + +impl FromStr for Url { + type Err = Error; + + fn from_str(s: &str) -> Result { + let inner: url::Url = s.parse()?; + let scheme: Scheme = inner.scheme().parse()?; + let host = inner + .host_str() + .ok_or_else(|| Error::invalid_params(&format!("URL is missing its host: {}", s)))? + .to_owned(); + let port = inner.port_or_known_default().ok_or_else(|| { + Error::invalid_params(&format!("cannot determine appropriate port for URL: {}", s)) + })?; + Ok(Self { + inner, + scheme, + host, + port, + }) + } +} + +impl Url { + /// Returns whether or not this URL represents a connection to a secure + /// endpoint. + pub fn is_secure(&self) -> bool { + match self.scheme { + Scheme::Http => false, + Scheme::Https => true, + Scheme::WebSocket => false, + Scheme::SecureWebSocket => true, + } + } + + /// Get the scheme associated with this URL. + pub fn scheme(&self) -> Scheme { + self.scheme + } + + /// Get the username associated with this URL, if any. + pub fn username(&self) -> &str { + self.inner.username() + } + + /// Get the password associated with this URL, if any. + pub fn password(&self) -> Option<&str> { + self.inner.password() + } + + /// Get the host associated with this URL. + pub fn host(&self) -> &str { + &self.host + } + + /// Get the port associated with this URL. + pub fn port(&self) -> u16 { + self.port + } + + /// Get this URL's path. + pub fn path(&self) -> &str { + self.inner.path() + } +} + +impl fmt::Display for Url { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl TryFrom for Url { + type Error = Error; + + fn try_from(value: url::Url) -> Result { + value.to_string().parse() + } +} From 6dbddb6cbc20b266fedde2591ed4dfe0fc5392af Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 17:45:30 -0500 Subject: [PATCH 16/28] Fix version requirement of tendermint-proto Signed-off-by: Thane Thomson --- abci/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 4318ad2c4..f930db86a 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -25,7 +25,7 @@ binary = [ "structopt", "tracing-subscriber" ] bytes = "1.0" eyre = "0.6" prost = "0.7" -tendermint-proto = { version = "0.18.0", path = "../proto" } +tendermint-proto = { version = "0.18.1", path = "../proto" } thiserror = "1.0" tracing = "0.1" From 9ef5f8dc9973b3262d0eabdbc32319daf6dfc67c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 17:46:32 -0500 Subject: [PATCH 17/28] Update client usage to show simplified parameters Signed-off-by: Thane Thomson --- tools/kvstore-test/tests/tendermint.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/kvstore-test/tests/tendermint.rs b/tools/kvstore-test/tests/tendermint.rs index 074bb18de..5f2161d05 100644 --- a/tools/kvstore-test/tests/tendermint.rs +++ b/tools/kvstore-test/tests/tendermint.rs @@ -45,12 +45,12 @@ mod rpc { pub fn localhost_http_client() -> HttpClient { init_logging(); - HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()).unwrap() + HttpClient::new("http://127.0.0.1:26657").unwrap() } pub async fn localhost_websocket_client() -> (WebSocketClient, WebSocketClientDriver) { init_logging(); - WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) + WebSocketClient::new("ws://127.0.0.1:26657/websocket") .await .unwrap() } From e6c9ff0fba0a288c919935932d4c83b26d43a0fb Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 17:47:00 -0500 Subject: [PATCH 18/28] Use tracing instead of log to show comprehensive debug information during testing Signed-off-by: Thane Thomson --- tools/abci-test/Cargo.toml | 10 +++++----- tools/abci-test/src/main.rs | 17 ++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/tools/abci-test/Cargo.toml b/tools/abci-test/Cargo.toml index f9e5dca5c..1907ad648 100644 --- a/tools/abci-test/Cargo.toml +++ b/tools/abci-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "abci-test" -version = "0.18.0" +version = "0.18.1" authors = ["Thane Thomson "] edition = "2018" description = """ @@ -12,9 +12,9 @@ description = """ [dependencies] futures = "0.3" -log = "0.4" -simple_logger = "1.11" structopt = "0.3" -tendermint = { version = "0.18.0", path = "../../tendermint" } -tendermint-rpc = { version = "0.18.0", path = "../../rpc", features = [ "websocket-client" ] } +tendermint = { version = "0.18.1", path = "../../tendermint" } +tendermint-rpc = { version = "0.18.1", path = "../../rpc", features = [ "websocket-client" ] } +tracing = "0.1" +tracing-subscriber = "0.2" tokio = { version = "1", features = ["full"] } diff --git a/tools/abci-test/src/main.rs b/tools/abci-test/src/main.rs index 35f225583..6dfd072e1 100644 --- a/tools/abci-test/src/main.rs +++ b/tools/abci-test/src/main.rs @@ -1,8 +1,6 @@ //! ABCI key/value store integration test application. use futures::StreamExt; -use log::{debug, error, info, LevelFilter}; -use simple_logger::SimpleLogger; use structopt::StructOpt; use tendermint::abci::Transaction; use tendermint::net::Address; @@ -10,6 +8,8 @@ use tendermint_rpc::event::EventData; use tendermint_rpc::query::EventType; use tendermint_rpc::{Client, SubscriptionClient, WebSocketClient}; use tokio::time::Duration; +use tracing::level_filters::LevelFilter; +use tracing::{debug, error, info}; #[derive(Debug, StructOpt)] /// A harness for testing tendermint-abci through a full Tendermint node @@ -30,14 +30,13 @@ struct Opt { #[tokio::main] async fn main() -> Result<(), Box> { let opt: Opt = Opt::from_args(); - SimpleLogger::new() - .with_level(if opt.verbose { - LevelFilter::Debug + tracing_subscriber::fmt() + .with_max_level(if opt.verbose { + LevelFilter::DEBUG } else { - LevelFilter::Info + LevelFilter::INFO }) - .init() - .unwrap(); + .init(); info!("Connecting to Tendermint node at {}:{}", opt.host, opt.port); let (mut client, driver) = WebSocketClient::new(Address::Tcp { @@ -78,7 +77,7 @@ async fn run_tests(client: &mut WebSocketClient) -> Result<(), Box Date: Mon, 1 Mar 2021 17:52:32 -0500 Subject: [PATCH 19/28] Update docs Signed-off-by: Thane Thomson --- rpc/README.md | 17 ++++++++--------- rpc/src/client/transport/http.rs | 9 ++++++++- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/rpc/README.md b/rpc/README.md index 273e865fd..c209d5c1c 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -23,15 +23,14 @@ select when using it. Several client-related features are provided at present: -* `http-client` - Provides `HttpClient` and `HttpsClient`, which are - basic RPC clients that interact with remote Tendermint nodes via - **JSON-RPC over HTTP or HTTPS**. This client does not provide - `Event` subscription functionality. See the [Tendermint RPC] for - more details. -* `websocket-client` - Provides `WebSocketClient` and - `SecureWebSocketClient`, which provide full client functionality, - including general RPC functionality as well as `Event` - subscription functionality. +* `http-client` - Provides `HttpClient`, which is a basic RPC client that + interacts with remote Tendermint nodes via **JSON-RPC over HTTP or + HTTPS**. This client does not provide `Event` subscription + functionality. See the [Tendermint RPC] for more details. +* `websocket-client` - Provides `WebSocketClient`, which provides full + client functionality, including general RPC functionality as well as + `Event`] subscription functionality. Can be used over secure + (`wss://`) and unsecure (`ws://`) connections. ### CLI diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 5eeaf108e..29e4b9604 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -9,7 +9,9 @@ use tendermint::net; /// A JSON-RPC/HTTP Tendermint RPC client (implements [`crate::Client`]). /// -/// Supports both HTTP and HTTPS connections to Tendermint RPC endpoints. +/// Supports both HTTP and HTTPS connections to Tendermint RPC endpoints, and +/// allows for the use of HTTP proxies (see [`HttpClient::new_with_proxy`] for +/// details). /// /// Does not provide [`crate::event::Event`] subscription facilities (see /// [`crate::WebSocketClient`] for a client that does). @@ -55,6 +57,11 @@ impl HttpClient { /// Construct a new Tendermint RPC HTTP/S client connecting to the given /// URL, but via the specified proxy's URL. + /// + /// If the RPC endpoint is secured (HTTPS), the proxy will automatically + /// attempt to connect using the [HTTP CONNECT] method. + /// + /// [HTTP CONNECT]: https://en.wikipedia.org/wiki/HTTP_tunnel pub fn new_with_proxy(url: U, proxy_url: P) -> Result where U: TryInto, From 4e1a0cf2c51aba452d7630aede1f462ca776bdac Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 17:59:51 -0500 Subject: [PATCH 20/28] Add query parsing examples to crate docs Signed-off-by: Thane Thomson --- rpc/src/query.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 10b743693..61d34b0bc 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -21,6 +21,8 @@ use std::str::FromStr; /// /// ## Examples /// +/// ### Direct construction of queries +/// /// ```rust /// use tendermint_rpc::query::{Query, EventType}; /// @@ -30,10 +32,25 @@ use std::str::FromStr; /// let query = Query::from(EventType::Tx).and_eq("tx.hash", "XYZ"); /// assert_eq!("tm.event = 'Tx' AND tx.hash = 'XYZ'", query.to_string()); /// -/// let query = Query::from(EventType::Tx).and_gte("tx.height", 100_i64); +/// let query = Query::from(EventType::Tx).and_gte("tx.height", 100_u64); /// assert_eq!("tm.event = 'Tx' AND tx.height >= 100", query.to_string()); /// ``` /// +/// ### Query parsing +/// +/// ```rust +/// use tendermint_rpc::query::{Query, EventType}; +/// +/// let query: Query = "tm.event = 'NewBlock'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::NewBlock)); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.hash = 'XYZ'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_eq("tx.hash", "XYZ")); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.height >= 100".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_gte("tx.height", 100_u64)); +/// ``` +/// /// [subscribe endpoint documentation]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe #[derive(Debug, Clone, PartialEq)] pub struct Query { From a585b12ffaf10a79605f7f000eaf5025e1dc29f6 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 18:09:02 -0500 Subject: [PATCH 21/28] Rename ambiguous module url to rpc_url and ensure it is only exported when a client is enabled Signed-off-by: Thane Thomson --- rpc/src/lib.rs | 7 +++++-- rpc/src/{url.rs => rpc_url.rs} | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) rename rpc/src/{url.rs => rpc_url.rs} (98%) diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 24bb06d0b..7ad3c5795 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -30,10 +30,14 @@ #[cfg(any(feature = "http-client", feature = "websocket-client"))] mod client; #[cfg(any(feature = "http-client", feature = "websocket-client"))] +mod rpc_url; +#[cfg(any(feature = "http-client", feature = "websocket-client"))] pub use client::{ Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, Subscription, SubscriptionClient, }; +#[cfg(any(feature = "http-client", feature = "websocket-client"))] +pub use rpc_url::{Scheme, Url}; #[cfg(feature = "http-client")] pub use client::HttpClient; @@ -50,11 +54,10 @@ pub mod query; pub mod request; pub mod response; mod result; -mod url; mod utils; mod version; pub use self::{ error::Error, id::Id, method::Method, order::Order, request::Request, request::SimpleRequest, - response::Response, result::Result, url::Scheme, url::Url, version::Version, + response::Response, result::Result, version::Version, }; diff --git a/rpc/src/url.rs b/rpc/src/rpc_url.rs similarity index 98% rename from rpc/src/url.rs rename to rpc/src/rpc_url.rs index bee720c7b..f727a5fb6 100644 --- a/rpc/src/url.rs +++ b/rpc/src/rpc_url.rs @@ -1,4 +1,4 @@ -//! URL representation for clients. +//! URL representation for RPC clients. use crate::{Error, Result}; use std::convert::TryFrom; From c4753ffb06301de1e581e833aadd43f2f49a32ef Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 18:28:57 -0500 Subject: [PATCH 22/28] Improve environment variable handling for proxies Signed-off-by: Thane Thomson --- rpc/src/client/bin/main.rs | 39 +++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs index 57ddb442f..ed90c5d00 100644 --- a/rpc/src/client/bin/main.rs +++ b/rpc/src/client/bin/main.rs @@ -11,7 +11,7 @@ use tendermint_rpc::{ WebSocketClient, }; use tracing::level_filters::LevelFilter; -use tracing::{error, info}; +use tracing::{error, info, warn}; /// CLI for performing simple interactions against a Tendermint node's RPC. /// @@ -30,7 +30,7 @@ struct Opt { /// An optional HTTP/S proxy through which to submit requests to the /// Tendermint node's RPC endpoint. Only available for HTTP/HTTPS endpoints /// (i.e. WebSocket proxies are not supported). - #[structopt(long, env = "HTTP_PROXY")] + #[structopt(long)] proxy_url: Option, /// Increase output logging verbosity to DEBUG level. @@ -158,8 +158,15 @@ async fn main() { .with_writer(std::io::stderr) .init(); + let proxy_url = match get_http_proxy_url(opt.url.scheme(), opt.proxy_url.clone()) { + Ok(u) => u, + Err(e) => { + error!("Failed to obtain proxy URL: {}", e); + std::process::exit(-1); + } + }; let result = match opt.url.scheme() { - Scheme::Http | Scheme::Https => http_request(opt.url, opt.proxy_url, opt.req).await, + Scheme::Http | Scheme::Https => http_request(opt.url, proxy_url, opt.req).await, Scheme::WebSocket | Scheme::SecureWebSocket => match opt.proxy_url { Some(_) => Err(Error::invalid_params( "proxies are only supported for use with HTTP clients at present", @@ -173,6 +180,32 @@ async fn main() { } } +// Retrieve the proxy URL with precedence: +// 1. If supplied, that's the proxy URL used. +// 2. If not supplied, but environment variable HTTP_PROXY or HTTPS_PROXY are +// supplied, then use the appropriate variable for the URL in question. +fn get_http_proxy_url(url_scheme: Scheme, proxy_url: Option) -> Result> { + match proxy_url { + Some(u) => Ok(Some(u)), + None => match url_scheme { + Scheme::Http => std::env::var("HTTP_PROXY").ok(), + Scheme::Https => std::env::var("HTTPS_PROXY") + .ok() + .or_else(|| std::env::var("HTTP_PROXY").ok()), + _ => { + if std::env::var("HTTP_PROXY").is_ok() || std::env::var("HTTPS_PROXY").is_ok() { + warn!( + "Ignoring HTTP proxy environment variables for non-HTTP client connection" + ); + } + None + } + } + .map(|u| u.parse()) + .transpose(), + } +} + async fn http_request(url: Url, proxy_url: Option, req: Request) -> Result<()> { let client = match proxy_url { Some(proxy_url) => { From a12b2c63e55da03f8441dd12a3e532ec6070848f Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 18:29:21 -0500 Subject: [PATCH 23/28] Add examples for using proxy to CLI usage docs Signed-off-by: Thane Thomson --- rpc/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rpc/README.md b/rpc/README.md index c209d5c1c..ffc9895af 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -68,6 +68,13 @@ tendermint-rpc broadcast-tx-async somekey=somevalue # store ABCI app) tendermint-rpc abci-query somekey +# To use an HTTP/S proxy to access your RPC endpoint +tendermint-rpc --proxy-url http://yourproxy:8080 abci-query somekey + +# To set your HTTP/S proxy for multiple subsequent queries +export HTTP_PROXY=http://yourproxy:8080 +tendermint-rpc abci-query somekey + # Subscribe to receive new blocks (must use the WebSocket endpoint) # Prints out all incoming events tendermint-rpc -u ws://127.0.0.1:26657 subscribe "tm.event='NewBlock'" From 06d7a6505f33bf19f2fcb524e044a85c349197fa Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Mon, 1 Mar 2021 18:35:14 -0500 Subject: [PATCH 24/28] Fix some README typos Signed-off-by: Thane Thomson --- rpc/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rpc/README.md b/rpc/README.md index ffc9895af..7351a6a2b 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -35,7 +35,7 @@ Several client-related features are provided at present: ### CLI A `tendermint-rpc` console application is provided for testing/experimentation -purposes. To build this application, from the `rpc` crate's directory: +purposes. To build this application, from the `tendermint-rpc` crate's directory: ```bash cargo build --bin tendermint-rpc --features cli @@ -77,12 +77,12 @@ tendermint-rpc abci-query somekey # Subscribe to receive new blocks (must use the WebSocket endpoint) # Prints out all incoming events -tendermint-rpc -u ws://127.0.0.1:26657 subscribe "tm.event='NewBlock'" +tendermint-rpc -u ws://127.0.0.1:26657/websocket subscribe "tm.event='NewBlock'" # If you want to execute a number of queries against a specific endpoint and # don't feel like re-typing the URL over and over again, just set the # TENDERMINT_RPC_URL environment variable -export TENDERMINT_RPC_URL=ws://127.0.0.1:26657 +export TENDERMINT_RPC_URL=ws://127.0.0.1:26657/websocket tendermint-rpc subscribe "tm.event='Tx'" ``` From a533de46af2b4dbaa60902e6dc53a50d4795b5b6 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 2 Mar 2021 21:36:18 -0500 Subject: [PATCH 25/28] Export HTTP and WebSocket URL types to surface documentation Signed-off-by: Thane Thomson --- rpc/src/client.rs | 4 ++-- rpc/src/lib.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rpc/src/client.rs b/rpc/src/client.rs index b6708575d..39dab71b5 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -8,9 +8,9 @@ mod transport; pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher}; #[cfg(feature = "http-client")] -pub use transport::http::HttpClient; +pub use transport::http::{HttpClient, HttpClientUrl}; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; +pub use transport::websocket::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl}; use crate::endpoint::*; use crate::query::Query; diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 7ad3c5795..c53ae8df6 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -40,9 +40,9 @@ pub use client::{ pub use rpc_url::{Scheme, Url}; #[cfg(feature = "http-client")] -pub use client::HttpClient; +pub use client::{HttpClient, HttpClientUrl}; #[cfg(feature = "websocket-client")] -pub use client::{WebSocketClient, WebSocketClientDriver}; +pub use client::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl}; pub mod endpoint; pub mod error; From ec58b1ef577b330452e84b3de6f0b165c8235c26 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 2 Mar 2021 21:36:30 -0500 Subject: [PATCH 26/28] Add CHANGELOG entries Signed-off-by: Thane Thomson --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a620ec1bc..e3d58c370 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,29 @@ ## Unreleased +### BREAKING CHANGES + +* `[tendermint-rpc]` The `SubscriptionClient` trait now requires a `close` + method, since it assumes that subscription clients will, in general, use + long-running connections. This should not, however, break any downstream + usage of the clients ([#820]) +* `[tendermint-rpc]` The `HttpClient` and `WebSocketClient` constructors now + take any input that can be converted to a `tendermint_rpc::Url`. This should + hopefully have minimal impact on projects using the code, but it might + require some minor code changes in some cases - see the crate docs for more + details ([#820]) + ### FEATURES * `[tendermint-abci]` Release minimal framework for building ABCI applications in Rust ([#794]) +* `[tendermint-rpc]` Support for secure connections (`https://` and `wss://`) + has been added to the Tendermint RPC clients, as well as support for HTTP + proxies for HTTP clients ([#820]) +* `[tendermint-rpc]` A `tendermint-rpc` CLI has been added to simplify + interaction with RPC endpoints from the command line ([#820]) [#794]: https://github.com/informalsystems/tendermint-rs/pull/794 +[#820]: https://github.com/informalsystems/tendermint-rs/pull/820 ## v0.18.1 From 4155597f32ceadab747bd2e5298732b23f239629 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Wed, 3 Mar 2021 09:26:58 -0500 Subject: [PATCH 27/28] Clarify directory for tendermint-rpc CLI Signed-off-by: Thane Thomson --- rpc/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/README.md b/rpc/README.md index 7351a6a2b..50ba0584b 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -35,9 +35,11 @@ Several client-related features are provided at present: ### CLI A `tendermint-rpc` console application is provided for testing/experimentation -purposes. To build this application, from the `tendermint-rpc` crate's directory: +purposes. To build this application: ```bash +# From the tendermint-rpc crate's directory +cd rpc cargo build --bin tendermint-rpc --features cli # To run directly and show usage information From 69816a676aa1ba7c280e3bd272d2508231530ac0 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Wed, 3 Mar 2021 10:43:04 -0500 Subject: [PATCH 28/28] Add HTTP/2 support for HTTP client Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 6228b5984..2320291e4 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -84,7 +84,7 @@ async-trait = { version = "0.1", optional = true } async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rustls"], optional = true } futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } -hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } +hyper = { version = "0.14", optional = true, features = ["client", "http1", "http2", "tcp"] } hyper-proxy = { version = "0.9", optional = true } hyper-rustls = { version = "0.22.1", optional = true } structopt = { version = "0.3", optional = true }