From d3a5d0d92af37e7f2d13a8bced79aabcc153edfd Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 28 Aug 2023 15:51:26 +0100 Subject: [PATCH] Implement WS reconnect (#169) --- Cargo.lock | 1 + chains/ethereum/server/Cargo.toml | 5 +- chains/ethereum/server/src/client.rs | 8 +- chains/ethereum/server/src/event_stream.rs | 4 +- chains/ethereum/server/src/lib.rs | 11 +- chains/ethereum/server/src/ws_provider.rs | 140 +++++++++++++++++++++ rosetta-core/Cargo.toml | 2 +- 7 files changed, 161 insertions(+), 10 deletions(-) create mode 100644 chains/ethereum/server/src/ws_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 3db02586..27ca04a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5053,6 +5053,7 @@ dependencies = [ "ethabi", "ethers", "ethers-solc", + "futures-timer", "hex 0.4.3", "log", "pin-project", diff --git a/chains/ethereum/server/Cargo.toml b/chains/ethereum/server/Cargo.toml index 438335e7..a4992dcd 100644 --- a/chains/ethereum/server/Cargo.toml +++ b/chains/ethereum/server/Cargo.toml @@ -13,11 +13,12 @@ async-std = { version = "1.12", features = ["tokio1"] } async-trait = "0.1" ethabi = "18.0.0" ethers = { version = "2.0", default-features = true, features = ["ws", "abigen", "rustls"] } +futures-timer = "3.0" hex = "0.4.3" -rosetta-config-ethereum = { version = "0.4.0", path = "../config" } -rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" } log = "0.4" pin-project = "1.1" +rosetta-config-ethereum = { version = "0.4.0", path = "../config" } +rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" } serde = "1.0" serde_json = "1.0" tokio = { version = "1.32", features = ["rt-multi-thread", "macros"] } diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index 91af206f..cbc4614e 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -184,7 +184,13 @@ where let block = self .client .get_block_with_txs(block_id) - .await? + .await + .map_err(|error| { + anyhow::anyhow!( + "Failed to get block with transactions: {}", + error.to_string() + ) + })? .context("block not found")?; let block_number = block.number.context("Unable to fetch block number")?; let block_hash = block.hash.context("Unable to fetch block hash")?; diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index adf349aa..bfe44b7e 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -48,13 +48,13 @@ where let Some(number) = block.number else { log::error!("block number is missing"); self.failures += 1; - continue + continue; }; let Some(hash) = block.hash else { log::error!("block hash is missing"); self.failures += 1; - continue + continue; }; self.failures = 0; diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 28c01e0b..b2d348e0 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -1,6 +1,6 @@ use anyhow::Result; use client::EthereumClient; -use ethers::providers::{Http, Ws}; +use ethers::providers::Http; use rosetta_config_ethereum::{EthereumMetadata, EthereumMetadataParams}; use rosetta_server::crypto::address::Address; use rosetta_server::crypto::PublicKey; @@ -17,19 +17,22 @@ mod eth_types; mod event_stream; mod proof; mod utils; +mod ws_provider; + +use ws_provider::ExtendedWs; pub use event_stream::EthereumEventStream; pub enum MaybeWsEthereumClient { Http(EthereumClient), - Ws(EthereumClient), + Ws(EthereumClient), } impl MaybeWsEthereumClient { pub async fn new>(config: BlockchainConfig, addr: S) -> Result { let addr = addr.as_ref(); if addr.starts_with("ws://") || addr.starts_with("wss://") { - let ws_connection = Ws::connect(addr).await?; + let ws_connection = ExtendedWs::connect(addr).await?; let client = EthereumClient::new(config, ws_connection).await?; Ok(Self::Ws(client)) } else { @@ -44,7 +47,7 @@ impl MaybeWsEthereumClient { impl BlockchainClient for MaybeWsEthereumClient { type MetadataParams = EthereumMetadataParams; type Metadata = EthereumMetadata; - type EventStream<'a> = EthereumEventStream<'a, Ws>; + type EventStream<'a> = EthereumEventStream<'a, ExtendedWs>; fn create_config(network: &str) -> Result { rosetta_config_ethereum::config(network) diff --git a/chains/ethereum/server/src/ws_provider.rs b/chains/ethereum/server/src/ws_provider.rs new file mode 100644 index 00000000..7a9c297a --- /dev/null +++ b/chains/ethereum/server/src/ws_provider.rs @@ -0,0 +1,140 @@ +use arc_swap::ArcSwap; +use async_trait::async_trait; +use ethers::prelude::*; +use ethers::providers::{ConnectionDetails, JsonRpcClient, PubsubClient, WsClientError}; +use futures_timer::Delay; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +/// Websocket Provider that supports reconnecting +#[derive(Debug)] +pub struct ExtendedWs { + conn_details: ConnectionDetails, + client: ArcSwap, + is_reconnecting: Arc, +} + +impl ExtendedWs { + pub async fn connect(conn: impl Into) -> Result { + let conn_details = conn.into(); + let client = Ws::connect(conn_details.clone()).await?; + Ok(Self { + conn_details, + client: ArcSwap::new(Arc::from(client)), + is_reconnecting: Arc::new(AtomicBool::new(false)), + }) + } + + // TODO: reconnect in a different thread, otherwise the request will block + pub async fn reconnect(&self) -> Result<(), WsClientError> { + // Guarantee that only one thread is attempting to reconnect + if self.is_reconnecting.swap(true, Ordering::SeqCst) { + return Err(WsClientError::UnexpectedClose); + } + + // Get the current client + let client = self.client.load(); + + // Check if is already connected + if is_connected(&client).await { + return Ok(()); + } + + // TODO: close the client after a given number of attempts + let mut attempts = 0; + loop { + attempts += 1; + + log::info!("Retrying to connect... attempt {attempts}"); + let client = Ws::connect(self.conn_details.clone()).await; + + match client { + Ok(client) => { + log::info!("Client reconnected successfully: {}", self.conn_details.url); + self.client.store(Arc::from(client)); + break; + } + Err(e) => { + log::error!("Failed to reconnect: {:?}", e); + } + }; + + Delay::new(Duration::from_secs(5)).await; + } + self.is_reconnecting.store(false, Ordering::SeqCst); + Ok(()) + } +} + +#[async_trait] +impl JsonRpcClient for ExtendedWs { + type Error = WsClientError; + + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, + { + if self.is_reconnecting.load(Ordering::Relaxed) { + log::error!("Cannot process request, client is reconnecting: {method}"); + return Err(WsClientError::UnexpectedClose); + } + + let provider = self.client.load().clone(); + let result = JsonRpcClient::request(&provider, method, params).await; + + // Attempt to reconnect Connection unexpectedly closed + // TODO: execute this in a different thread/task, this will block the request + match result { + Err(WsClientError::UnexpectedClose) => { + log::error!("Websocket closed unexpectedly, reconnecting..."); + self.reconnect().await?; + Err(WsClientError::UnexpectedClose) + } + Err(err) => { + // Log the error + log::error!("{err:?}"); + Err(err) + } + _ => result, + } + } +} + +impl PubsubClient for ExtendedWs { + type NotificationStream = ::NotificationStream; + + /// Add a subscription to this transport + fn subscribe>(&self, id: T) -> Result { + if self.is_reconnecting.load(Ordering::Relaxed) { + log::error!("subscription {} failed, client is reconnecting", id.into()); + return Err(WsClientError::UnexpectedClose); + } + + let client = self.client.load().clone(); + PubsubClient::subscribe(client.as_ref(), id) + } + + /// Remove a subscription from this transport + fn unsubscribe>(&self, id: T) -> Result<(), Self::Error> { + if self.is_reconnecting.load(Ordering::Relaxed) { + log::error!("unsubscribe {} failed, client is reconnecting", id.into()); + return Err(WsClientError::UnexpectedClose); + } + + let client = self.client.load().clone(); + PubsubClient::unsubscribe(client.as_ref(), id) + } +} + +pub async fn is_connected(provider: &Ws) -> bool { + let result = JsonRpcClient::request::<_, U64>(provider, "eth_blockNumber", ()).await; + !matches!( + result, + Err(WsClientError::UnexpectedClose) | Err(WsClientError::InternalError(_)) + ) +} diff --git a/rosetta-core/Cargo.toml b/rosetta-core/Cargo.toml index 1b1e07cb..866a66a5 100644 --- a/rosetta-core/Cargo.toml +++ b/rosetta-core/Cargo.toml @@ -10,9 +10,9 @@ description = "Provides traits and definitions shared by the server and client c anyhow = "1.0" async-trait = "0.1" fluent-uri = "0.1.4" +futures-util = "0.3" rosetta-crypto = { version = "0.4.0", path = "../rosetta-crypto" } rosetta-types = { version = "0.4.0", path = "../rosetta-types" } serde = "1.0" serde_json = "1.0" thiserror = "1.0" -futures-util = "0.3"