Skip to content

Commit

Permalink
Improve websockets reconnects (#170)
Browse files Browse the repository at this point in the history
* Implement Ethereum RPC client for JsonRpsee

* Fix error type

* Support PubSub for Jsonrpsee

* Cleanup code

* Move ethereum jsonrpsee client to another repo

* split client and subscriptions

* Implement PubSub adapter

* Implement ClientT and SubscriptionT traits for EthAdapter

* Rename adapter structs

* update docs

* Remove duplicated code

* Support unsubscribe

* Remove unused crates

* Implement auto-reconnect trait

* Implement ClientT and SubscriptionT extensions

* Implement cloneable version of JsonRpseeError

* Fix compilation errors

* Fix reconnect concurrency

* Implement restart_needed hook

* Implement reconnect for subscriptions

* Rename reconnect client

* Remove unucessary constraits

* Refactor reconnect strategy

* cargo fmt

* Rename files

* improve documentation

* Rename ethereum event stream adapter

* Rename ethereum RPC params

* Remove unused crates

* Refactor rosetta-ethereum-rpc-client

* Impl client and subscription macros

* Remove unused code

* cargo fmt

* Refactor error

* Simplified reconnect logic

* Impl reconnect for all clients

* Fix stackoverflow on ethereum adapter

* Refactor ethers adapter

* Fix reconnects on ethereum connector

* Fix unit tests

* Make the delay between reconnects consistent

* Implement fibonnaci retry strategy

* Update docs
  • Loading branch information
Lohann authored Sep 4, 2023
1 parent d3a5d0d commit 8dc29a5
Show file tree
Hide file tree
Showing 23 changed files with 2,119 additions and 329 deletions.
277 changes: 151 additions & 126 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"chains/bitcoin/config",
"chains/bitcoin/server",
"chains/ethereum/config",
"chains/ethereum/rpc-client",
"chains/ethereum/server",
"chains/ethereum/tx",
"chains/polkadot/config",
Expand Down
19 changes: 19 additions & 0 deletions chains/ethereum/rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "rosetta-ethereum-rpc-client"
version = "0.1.0"
edition = "2021"
license = "MIT"
repository = "https://github.com/analog-labs/chain-connectors"
description = "Adapters for JsonRpsee RPC client and ethers-rs."

[dependencies]
async-trait = "0.1"
dashmap = "5.5"
ethers = { version = "2.0", default-features = true }
futures-util = "0.3"
jsonrpsee = { version = "0.20", default-features = false, features = ["macros"] }
pin-project = "1.1"
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"
tracing = "0.1"
92 changes: 92 additions & 0 deletions chains/ethereum/rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::{error::EthError, extension::impl_client_trait, params::EthRpcParams};
use async_trait::async_trait;
use ethers::providers::JsonRpcClient;
use jsonrpsee::core::client::ClientT;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};

/// Adapter for [`jsonrpsee::core::client::ClientT`] to [`ethers::providers::JsonRpcClient`].
#[repr(transparent)]
pub struct EthClientAdapter<C> {
pub(crate) client: C,
}

impl<C> EthClientAdapter<C>
where
C: ClientT + Debug + Send + Sync,
{
pub fn new(client: C) -> Self {
Self { client }
}
}

impl<C> AsRef<C> for EthClientAdapter<C> {
fn as_ref(&self) -> &C {
&self.client
}
}

impl_client_trait!(EthClientAdapter<C> where C: ClientT + Debug + Send + Sync);

impl<C> Debug for EthClientAdapter<C>
where
C: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientAdapter")
.field("client", &self.client)
.finish()
}
}

impl<C> Clone for EthClientAdapter<C>
where
C: Clone,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
}

impl<C> AsMut<C> for EthClientAdapter<C> {
fn as_mut(&mut self) -> &mut C {
&mut self.client
}
}

impl<C> Deref for EthClientAdapter<C> {
type Target = C;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl<C> DerefMut for EthClientAdapter<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}

#[async_trait]
impl<C> JsonRpcClient for EthClientAdapter<C>
where
C: ClientT + Debug + Send + Sync,
{
type Error = EthError;

async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
let params = EthRpcParams::from_serializable(&params)?;
ClientT::request::<R, EthRpcParams>(&self.client, method, params)
.await
.map_err(EthError::from)
}
}
100 changes: 100 additions & 0 deletions chains/ethereum/rpc-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use ethers::providers::{
JsonRpcError as EthJsonRpcError, ProviderError as EthProviderError,
RpcError as EthRpcErrorTrait,
};
use jsonrpsee::{client_transport::ws::WsHandshakeError, core::Error as JsonRpseeError};

/// Adapter for [`jsonrpsee::core::Error`] to [`ethers::providers::RpcError`].
#[derive(Debug, thiserror::Error)]
pub enum EthError {
/// Thrown if the response could not be parsed
#[error("{original}")]
JsonRpsee {
original: JsonRpseeError,
message: Option<EthJsonRpcError>,
},

/// Failed to parse the data.
#[allow(clippy::enum_variant_names)]
#[error(transparent)]
ParseError(#[from] serde_json::Error),

/// Error that can happen during the WebSocket handshake.
#[error("WS Handshake failed: {0}")]
HandshakeFailed(WsHandshakeError),

/// The background task has been terminated.
#[error("The background task been terminated because: {0}; restart required")]
RestartNeeded(String),

/// The client is reconnecting
#[error("The client is restarting the background task")]
Reconnecting,
}

impl From<JsonRpseeError> for EthError {
fn from(error: JsonRpseeError) -> Self {
match error {
JsonRpseeError::Call(call) => {
let code = call.code() as i64;
let data = call
.data()
.and_then(|raw_value| serde_json::value::to_value(raw_value).ok());
let message = call.message().to_string();
Self::JsonRpsee {
original: JsonRpseeError::Call(call),
message: Some(EthJsonRpcError {
code,
message,
data,
}),
}
}
JsonRpseeError::ParseError(serde_error) => Self::ParseError(serde_error),
JsonRpseeError::RestartNeeded(reason) => Self::RestartNeeded(reason),
error => {
let message = format!("{}", &error);
Self::JsonRpsee {
original: error,
message: Some(EthJsonRpcError {
code: 9999,
message,
data: None,
}),
}
}
}
}
}

impl From<WsHandshakeError> for EthError {
fn from(error: WsHandshakeError) -> Self {
Self::HandshakeFailed(error)
}
}

impl From<EthError> for EthProviderError {
fn from(error: EthError) -> Self {
match error {
EthError::ParseError(error) => Self::SerdeJson(error),
EthError::HandshakeFailed(error) => Self::CustomError(error.to_string()),
error => Self::JsonRpcClientError(Box::new(error)),
}
}
}

impl EthRpcErrorTrait for EthError {
fn as_error_response(&self) -> Option<&EthJsonRpcError> {
match self {
Self::JsonRpsee { message, .. } => message.as_ref(),
_ => None,
}
}

fn as_serde_error(&self) -> Option<&serde_json::Error> {
match self {
Self::ParseError(error) => Some(error),
_ => None,
}
}
}
Loading

0 comments on commit 8dc29a5

Please sign in to comment.