Skip to content

Commit

Permalink
Implement WS reconnect (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohann authored Aug 28, 2023
1 parent 4acb0bb commit d3a5d0d
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions chains/ethereum/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ async-std = { version = "1.12", features = ["tokio1"] }
async-trait = "0.1"
ethabi = "18.0.0"
ethers = { version = "2.0", default-features = true, features = ["ws", "abigen", "rustls"] }
futures-timer = "3.0"
hex = "0.4.3"
rosetta-config-ethereum = { version = "0.4.0", path = "../config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
log = "0.4"
pin-project = "1.1"
rosetta-config-ethereum = { version = "0.4.0", path = "../config" }
rosetta-server = { version = "0.4.0", path = "../../../rosetta-server" }
serde = "1.0"
serde_json = "1.0"
tokio = { version = "1.32", features = ["rt-multi-thread", "macros"] }
Expand Down
8 changes: 7 additions & 1 deletion chains/ethereum/server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ where
let block = self
.client
.get_block_with_txs(block_id)
.await?
.await
.map_err(|error| {
anyhow::anyhow!(
"Failed to get block with transactions: {}",
error.to_string()
)
})?
.context("block not found")?;
let block_number = block.number.context("Unable to fetch block number")?;
let block_hash = block.hash.context("Unable to fetch block hash")?;
Expand Down
4 changes: 2 additions & 2 deletions chains/ethereum/server/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ where
let Some(number) = block.number else {
log::error!("block number is missing");
self.failures += 1;
continue
continue;
};

let Some(hash) = block.hash else {
log::error!("block hash is missing");
self.failures += 1;
continue
continue;
};

self.failures = 0;
Expand Down
11 changes: 7 additions & 4 deletions chains/ethereum/server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use client::EthereumClient;
use ethers::providers::{Http, Ws};
use ethers::providers::Http;
use rosetta_config_ethereum::{EthereumMetadata, EthereumMetadataParams};
use rosetta_server::crypto::address::Address;
use rosetta_server::crypto::PublicKey;
Expand All @@ -17,19 +17,22 @@ mod eth_types;
mod event_stream;
mod proof;
mod utils;
mod ws_provider;

use ws_provider::ExtendedWs;

pub use event_stream::EthereumEventStream;

pub enum MaybeWsEthereumClient {
Http(EthereumClient<Http>),
Ws(EthereumClient<Ws>),
Ws(EthereumClient<ExtendedWs>),
}

impl MaybeWsEthereumClient {
pub async fn new<S: AsRef<str>>(config: BlockchainConfig, addr: S) -> Result<Self> {
let addr = addr.as_ref();
if addr.starts_with("ws://") || addr.starts_with("wss://") {
let ws_connection = Ws::connect(addr).await?;
let ws_connection = ExtendedWs::connect(addr).await?;
let client = EthereumClient::new(config, ws_connection).await?;
Ok(Self::Ws(client))
} else {
Expand All @@ -44,7 +47,7 @@ impl MaybeWsEthereumClient {
impl BlockchainClient for MaybeWsEthereumClient {
type MetadataParams = EthereumMetadataParams;
type Metadata = EthereumMetadata;
type EventStream<'a> = EthereumEventStream<'a, Ws>;
type EventStream<'a> = EthereumEventStream<'a, ExtendedWs>;

fn create_config(network: &str) -> Result<BlockchainConfig> {
rosetta_config_ethereum::config(network)
Expand Down
140 changes: 140 additions & 0 deletions chains/ethereum/server/src/ws_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use arc_swap::ArcSwap;
use async_trait::async_trait;
use ethers::prelude::*;
use ethers::providers::{ConnectionDetails, JsonRpcClient, PubsubClient, WsClientError};
use futures_timer::Delay;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

/// Websocket Provider that supports reconnecting
#[derive(Debug)]
pub struct ExtendedWs {
conn_details: ConnectionDetails,
client: ArcSwap<Ws>,
is_reconnecting: Arc<AtomicBool>,
}

impl ExtendedWs {
pub async fn connect(conn: impl Into<ConnectionDetails>) -> Result<Self, WsClientError> {
let conn_details = conn.into();
let client = Ws::connect(conn_details.clone()).await?;
Ok(Self {
conn_details,
client: ArcSwap::new(Arc::from(client)),
is_reconnecting: Arc::new(AtomicBool::new(false)),
})
}

// TODO: reconnect in a different thread, otherwise the request will block
pub async fn reconnect(&self) -> Result<(), WsClientError> {
// Guarantee that only one thread is attempting to reconnect
if self.is_reconnecting.swap(true, Ordering::SeqCst) {
return Err(WsClientError::UnexpectedClose);
}

// Get the current client
let client = self.client.load();

// Check if is already connected
if is_connected(&client).await {
return Ok(());
}

// TODO: close the client after a given number of attempts
let mut attempts = 0;
loop {
attempts += 1;

log::info!("Retrying to connect... attempt {attempts}");
let client = Ws::connect(self.conn_details.clone()).await;

match client {
Ok(client) => {
log::info!("Client reconnected successfully: {}", self.conn_details.url);
self.client.store(Arc::from(client));
break;
}
Err(e) => {
log::error!("Failed to reconnect: {:?}", e);
}
};

Delay::new(Duration::from_secs(5)).await;
}
self.is_reconnecting.store(false, Ordering::SeqCst);
Ok(())
}
}

#[async_trait]
impl JsonRpcClient for ExtendedWs {
type Error = WsClientError;

async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
if self.is_reconnecting.load(Ordering::Relaxed) {
log::error!("Cannot process request, client is reconnecting: {method}");
return Err(WsClientError::UnexpectedClose);
}

let provider = self.client.load().clone();
let result = JsonRpcClient::request(&provider, method, params).await;

// Attempt to reconnect Connection unexpectedly closed
// TODO: execute this in a different thread/task, this will block the request
match result {
Err(WsClientError::UnexpectedClose) => {
log::error!("Websocket closed unexpectedly, reconnecting...");
self.reconnect().await?;
Err(WsClientError::UnexpectedClose)
}
Err(err) => {
// Log the error
log::error!("{err:?}");
Err(err)
}
_ => result,
}
}
}

impl PubsubClient for ExtendedWs {
type NotificationStream = <Ws as PubsubClient>::NotificationStream;

/// Add a subscription to this transport
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, WsClientError> {
if self.is_reconnecting.load(Ordering::Relaxed) {
log::error!("subscription {} failed, client is reconnecting", id.into());
return Err(WsClientError::UnexpectedClose);
}

let client = self.client.load().clone();
PubsubClient::subscribe(client.as_ref(), id)
}

/// Remove a subscription from this transport
fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), Self::Error> {
if self.is_reconnecting.load(Ordering::Relaxed) {
log::error!("unsubscribe {} failed, client is reconnecting", id.into());
return Err(WsClientError::UnexpectedClose);
}

let client = self.client.load().clone();
PubsubClient::unsubscribe(client.as_ref(), id)
}
}

pub async fn is_connected(provider: &Ws) -> bool {
let result = JsonRpcClient::request::<_, U64>(provider, "eth_blockNumber", ()).await;
!matches!(
result,
Err(WsClientError::UnexpectedClose) | Err(WsClientError::InternalError(_))
)
}
2 changes: 1 addition & 1 deletion rosetta-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ description = "Provides traits and definitions shared by the server and client c
anyhow = "1.0"
async-trait = "0.1"
fluent-uri = "0.1.4"
futures-util = "0.3"
rosetta-crypto = { version = "0.4.0", path = "../rosetta-crypto" }
rosetta-types = { version = "0.4.0", path = "../rosetta-types" }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"
futures-util = "0.3"

0 comments on commit d3a5d0d

Please sign in to comment.