Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-151 and 149: Support Subxt wss connections #153

Merged
merged 11 commits into from
Aug 7, 2023
321 changes: 304 additions & 17 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion chains/astar/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ default = ["astar-metadata"]
astar-metadata = ["subxt"]

[dependencies]
anyhow = "1.0.69"
anyhow = "1.0"
rosetta-core = { version = "0.4.0", path = "../../../rosetta-core" }
subxt = { version = "0.30", default-features = false, features = ["substrate-compat", "native"], optional = true }
6 changes: 4 additions & 2 deletions chains/astar/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ anyhow = "1.0.69"
async-std = { version = "1.12.0", features = ["tokio1"] }
async-trait = "0.1.66"
ethers = "2.0.7"
futures = { version = "0.3", default-features = false, features = ["std"] }
hex = "0.4.3"
log = "0.4.17"
parity-scale-codec = "3.4.0"
rosetta-config-astar = { version = "0.4.0", path = "../config" }
rosetta-config-ethereum = { version = "0.4.0", path = "../../ethereum/config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
rosetta-server-ethereum = { version = "0.4.0", path = "../../ethereum/server" }
serde_json = "1.0.94"
serde_json = "1.0"
Copy link
Collaborator Author

@Lohann Lohann Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the minor tag from a few crates because it allows cargo to solve and get the latest minor version, so we don't have to update it manually every time.

sp-core = { version = "21.0.0", default-features = false, features = ["blake2", "std"] }
sp-keyring = "24.0.0"
subxt = { version = "0.30", features = ["substrate-compat"] }
subxt = { version = "0.30", default-features = false, features = ["substrate-compat"] }
tokio = { version = "1.26.0", features = ["rt-multi-thread", "macros"] }

[dev-dependencies]
Expand Down
9 changes: 7 additions & 2 deletions chains/astar/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use rosetta_server::types::{
Block, BlockIdentifier, CallRequest, Coin, PartialBlockIdentifier, Transaction,
TransactionIdentifier,
};
use rosetta_server::{BlockchainClient, BlockchainConfig};
use rosetta_server::{ws::default_client, BlockchainClient, BlockchainConfig};
use rosetta_server_ethereum::EthereumClient;
use serde_json::Value;
use sp_core::crypto::Ss58AddressFormat;
use std::sync::Arc;
use subxt::{
dynamic::Value as SubtxValue, rpc::types::BlockNumber, tx::PairSigner, utils::AccountId32,
OnlineClient, PolkadotConfig,
Expand Down Expand Up @@ -86,8 +87,12 @@ impl BlockchainClient for AstarClient {
} else {
(format!("http://{addr}"), format!("ws://{addr}"))
};
let substrate_client = {
let client = default_client(ws_uri.as_str(), None).await?;
log::info!("Connected to {}", ws_uri.as_str());
OnlineClient::<PolkadotConfig>::from_rpc_client(Arc::new(client)).await?
};
let ethereum_client = EthereumClient::new(config, http_uri.as_str()).await?;
let substrate_client = OnlineClient::<PolkadotConfig>::from_url(ws_uri.as_str()).await?;
Ok(Self {
client: ethereum_client,
ws_client: substrate_client,
Expand Down
2 changes: 1 addition & 1 deletion chains/bitcoin/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Bitcoin configuration."

[dependencies]
anyhow = "1.0.69"
anyhow = "1.0"
rosetta-core = { version = "0.4.0", path = "../../../rosetta-core" }
4 changes: 2 additions & 2 deletions chains/bitcoin/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Bitcoin rosetta server."

[dependencies]
anyhow = "1.0.69"
anyhow = "1.0"
async-std = { version = "1.12.0", features = ["tokio1"] }
async-trait = "0.1.66"
bitcoincore-rpc-async = "3.0.1"
hex = "0.4.3"
rosetta-config-bitcoin = { version = "0.4.0", path = "../config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
serde_json = "1.0.94"
serde_json = "1.0"
tokio = { version = "1.26.0", features = ["rt-multi-thread", "macros"] }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion chains/ethereum/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ hex = "0.4.3"
rosetta-config-ethereum = { version = "0.4.0", path = "../config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
serde = "1.0.153"
serde_json = "1.0.94"
serde_json = "1.0"
tokio = { version = "1.26.0", features = ["rt-multi-thread", "macros"] }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion chains/ethereum/tx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ ethabi = "18.0.0"
ethers-core = "2.0.0"
rosetta-config-ethereum = { version = "0.4.0", path = "../config" }
rosetta-core = { version = "0.4.0", path = "../../../rosetta-core" }
serde_json = "1.0.94"
serde_json = "1.0"
sha3 = "0.10.6"
6 changes: 3 additions & 3 deletions chains/polkadot/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Polkadot configuration."

[dependencies]
anyhow = "1.0.69"
anyhow = "1.0"
rosetta-core = { version = "0.4.0", path = "../../../rosetta-core" }
serde = { version = "1.0.153", features = ["derive"] }
subxt = "0.30"
serde = { version = "1.0", features = ["derive"] }
subxt = { version = "0.30", default-features = false, features = ["substrate-compat", "native"] }
6 changes: 3 additions & 3 deletions chains/polkadot/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ async-trait = "0.1.66"
hex = "0.4.3"
parity-scale-codec = "3.4.0"
rosetta-config-polkadot = { version = "0.4.0", path = "../config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server", features = ["ws"] }
scale-info = "2.3.1"
serde = { version = "1.0.153", features = ["derive"] }
serde_json = "1.0.94"
serde_json = "1.0"
sp-keyring = "24.0"
subxt = { version = "0.30", features = ["substrate-compat"] }
subxt = { version = "0.30", default-features = false, features = ["substrate-compat", "native"] }
tokio = { version = "1.26.0", features = ["rt-multi-thread", "macros"] }

[dev-dependencies]
Expand Down
7 changes: 5 additions & 2 deletions chains/polkadot/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rosetta_server::types::{
Block, BlockIdentifier, CallRequest, Coin, PartialBlockIdentifier, Transaction,
TransactionIdentifier,
};
use rosetta_server::{BlockchainClient, BlockchainConfig};
use rosetta_server::{ws::default_client, BlockchainClient, BlockchainConfig};
use serde_json::Value;
use sp_keyring::AccountKeyring;
use std::time::Duration;
Expand Down Expand Up @@ -79,7 +79,10 @@ impl BlockchainClient for PolkadotClient {
}

async fn new(config: BlockchainConfig, addr: &str) -> Result<Self> {
let client = OnlineClient::<PolkadotConfig>::from_url(addr).await?;
let client = {
let ws_client = default_client(addr, None).await?;
OnlineClient::<PolkadotConfig>::from_rpc_client(std::sync::Arc::new(ws_client)).await?
};
let genesis = client.genesis_hash();
let genesis_block = BlockIdentifier {
index: 0,
Expand Down
4 changes: 2 additions & 2 deletions chains/polkadot/tx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Polkadot transaction builder."

[dependencies]
anyhow = "1.0.69"
anyhow = "1.0"
blake2-rfc = "0.2.18"
bs58 = "0.4.0"
parity-scale-codec = { version = "3.4.0", features = ["derive"] }
rosetta-config-polkadot = { version = "0.4.0", path = "../config" }
rosetta-core = { version = "0.4.0", path = "../../../rosetta-core" }
serde_json = "1.0.94"
serde_json = "1.0"
4 changes: 2 additions & 2 deletions rosetta-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ rosetta-config-polkadot = { version = "0.4.0", path = "../chains/polkadot/config
rosetta-core = { version = "0.4.0", path = "../rosetta-core" }
rosetta-tx-ethereum = { version = "0.4.0", path = "../chains/ethereum/tx" }
rosetta-tx-polkadot = { version = "0.4.0", path = "../chains/polkadot/tx" }
serde = "1.0.153"
serde_json = "1.0.94"
serde = "1.0"
serde_json = "1.0"
surf = { version = "2.3.2", default-features = false }

[target.'cfg(target_family = "wasm")'.dependencies]
Expand Down
8 changes: 4 additions & 4 deletions rosetta-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Provides traits and definitions shared by the server and client crates."

[dependencies]
anyhow = "1.0.69"
async-trait = "0.1.66"
anyhow = "1.0"
async-trait = "0.1"
fluent-uri = "0.1.4"
rosetta-crypto = { version = "0.4.0", path = "../rosetta-crypto" }
rosetta-types = { version = "0.4.0", path = "../rosetta-types" }
serde = "1.0.153"
serde_json = "1.0.94"
thiserror = "1.0.30"
serde_json = "1.0"
thiserror = "1.0"
9 changes: 7 additions & 2 deletions rosetta-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repository = "https://github.com/analog-labs/chain-connectors"
description = "Generic rosetta server implementation."

[features]
ws = ["jsonrpsee", "subxt", "futures", "futures/std"]
tests = ["futures", "rosetta-docker", "nanoid"]

[dependencies]
Expand All @@ -16,15 +17,19 @@ clap = { version = "4.1.8", features = ["derive"] }
femme = "2.2.1"
futures = { version = "0.3.26", optional = true }
hex = "0.4.3"
# jsonrpsee = { version = "0.19.0", default-features = false, features = ["ws-client", "client-ws-transport-webpki-tls"], optional = true }
Lohann marked this conversation as resolved.
Show resolved Hide resolved
jsonrpsee = { version = "0.19.0", default-features = false, features = ["full"], optional = true }
log = "0.4.17"
nanoid = { version = "0.4.0", optional = true }
rosetta-core = { version = "0.4.0", path = "../rosetta-core" }
rosetta-docker = { version = "0.4.0", path = "../rosetta-docker", optional = true }
serde = "1.0.153"
serde_json = "1.0.94"
serde = "1.0"
serde_json = "1.0"
subxt = { version = "0.30", default-features = false, features = ["substrate-compat"], optional = true }
tide = { version = "0.16.0", default-features = false, features = ["h1-server", "logger"] }
tokio = { version = "1.26.0", features = ["full"] }
tokio-retry = "0.3"
tokio-tungstenite = { version = "0.20.0", features = ["rustls-tls-webpki-roots"] }

[build-dependencies]
anyhow = "1.0.69"
Expand Down
3 changes: 3 additions & 0 deletions rosetta-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use tokio_retry::{

pub use rosetta_core::*;

#[cfg(feature = "ws")]
pub mod ws;

#[derive(Parser)]
struct Opts {
#[clap(long)]
Expand Down
101 changes: 101 additions & 0 deletions rosetta-server/src/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
mod config;
mod jsonrpsee_client;
mod tungstenite_jsonrpsee;

pub use config::{RpcClientConfig, WsTransportClient};
pub use jsonrpsee::client_transport::ws::WsHandshakeError;
use jsonrpsee::core::client::{Client, ClientBuilder};
pub use jsonrpsee_client::RpcClient;
pub use tungstenite_jsonrpsee::{TungsteniteClient, WsError};

pub async fn default_client(
url: &str,
config: Option<RpcClientConfig>,
) -> Result<RpcClient, WsHandshakeError> {
let config = config.unwrap_or_default();

let client = match config.client {
WsTransportClient::Auto => {
log::info!("Connecting using Socketto...");
match build_socketto_client(url).await {
Ok(client) => client,
Err(error) => {
log::warn!("Socketto failed: {}", error);
log::info!("Connecting using Tungstenite...");
build_tungstenite_client(url, RpcClientConfig::default()).await?
}
}
}
WsTransportClient::Socketto => {
let client = build_socketto_client(url).await?;
log::info!("Connected to {} using Socketto", url);
client
}
Lohann marked this conversation as resolved.
Show resolved Hide resolved
WsTransportClient::Tungstenite => {
let client = build_tungstenite_client(url, RpcClientConfig::default()).await?;
log::info!("Connected to {} using Tungstenite", url);
client
}
};
Ok(RpcClient(client))
}

async fn build_socketto_client(uri: &str) -> Result<Client, WsHandshakeError> {
use jsonrpsee::client_transport::ws::WsTransportClientBuilder;
use tokio_tungstenite::tungstenite::http::uri::{Authority, Uri};

let mut uri = uri
.parse::<Uri>()
.map_err(|e| WsHandshakeError::Url(e.to_string().into()))?;
let default_port = match uri.scheme().map(|s| s.as_str()) {
Some("ws" | "http") => Some(80),
Some("wss" | "https") => Some(443),
_ => None,
};

// Set default port, workaround because jsonrpsee doesn't infer the url port:
// https://github.com/paritytech/jsonrpsee/blob/v0.19.0/client/transport/src/ws/mod.rs#L509-L511
if let (None, Some(port), Some(authority)) =
(uri.port(), default_port, uri.authority().cloned())
{
let new_authority =
Authority::try_from(format!("{}:{}", authority, port)).unwrap_or(authority);
uri = {
let mut parts = uri.clone().into_parts();
parts.authority = Some(new_authority);
Uri::from_parts(parts).unwrap_or(uri)
};
}

let (sender, receiver) = WsTransportClientBuilder::default()
.use_webpki_rustls()
.build(uri)
.await?;
let client = ClientBuilder::default()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(sender, receiver);
Ok(client)
}

async fn build_tungstenite_client(
url: &str,
config: RpcClientConfig,
) -> Result<Client, WsHandshakeError> {
use tide::http::url::Url;

let url = url
.parse::<Url>()
.map_err(|e| WsHandshakeError::Url(e.to_string().into()))?;
let client = TungsteniteClient::new(url, config)
.await
.map_err(|e| match e {
WsError::Url(error) => WsHandshakeError::Url(error.to_string().into()),
WsError::Io(error) => WsHandshakeError::Io(error),
_ => WsHandshakeError::Url(e.to_string().into()),
})?;
let (sender, receiver) = client.split();
let client = ClientBuilder::default()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(sender, receiver);
Ok(client)
}
72 changes: 72 additions & 0 deletions rosetta-server/src/ws/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/// Ten megabytes.
pub const TEN_MB_SIZE_BYTES: usize = 10 * 1024 * 1024;

/// Supported WebSocket transport clients.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WsTransportClient {
/// Auto will try to use Socketto first, if it fails, it will fallback to Tungstenite.
#[default]
Auto,

/// Socketto is the default WebSocket client for Substrate and Subxt.
/// Whoever have an issue when connecting to some RPC nodes using TLS.
/// https://github.com/paritytech/jsonrpsee/issues/1142
Socketto,

/// Tungstenite is the most used stream-based WebSocket Client
/// Use this if you have issues with Socketto.
Tungstenite,
}

/// Common configuration for Socketto and Tungstenite clients.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RpcClientConfig {
/// Supported WebSocket transport clients.
pub client: WsTransportClient,

/// The target minimum size of the write buffer to reach before writing the data
/// to the underlying stream.
/// The default value is 128 KiB.
pub write_buffer_size: usize,

/// The max size of the write buffer in bytes. Setting this can provide backpressure
/// in the case the write buffer is filling up due to write errors.
/// The default value is unlimited.
///
/// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
/// and probably a little more depending on error handling strategy.
pub max_write_buffer_size: usize,

/// The maximum size of a message. `None` means no size limit. The default value is 10 MiB
/// which should be reasonably big for all normal use-cases but small enough to prevent
/// memory eating by a malicious user.
pub max_message_size: Option<usize>,

/// The maximum size of a single message frame. `None` means no size limit. The limit is for
/// frame payload NOT including the frame header. The default value is 16 MiB which should
/// be reasonably big for all normal use-cases but small enough to prevent memory eating
/// by a malicious user.
pub max_frame_size: Option<usize>,

/// Whether to accept unmasked frames from the peer. The default value is `false`.
/// from the client. According to the RFC 6455, the server must close the
/// connection to the client in such cases, however it seems like there are
/// some popular libraries that are sending unmasked frames, ignoring the RFC.
/// By default this option is set to `false`, i.e. according to RFC 6455.
///
/// OBS: not supported for Socketto client.
pub accept_unmasked_frames: bool,
}

impl Default for RpcClientConfig {
fn default() -> Self {
Self {
client: WsTransportClient::Auto,
write_buffer_size: 128 * 1024,
max_write_buffer_size: usize::MAX,
max_message_size: Some(TEN_MB_SIZE_BYTES),
max_frame_size: Some(16 << 20),
accept_unmasked_frames: false,
}
}
}
Loading