Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve websockets reconnects #170

Merged
merged 43 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
99feda2
Implement Ethereum RPC client for JsonRpsee
Lohann Aug 29, 2023
2f7e07a
Fix error type
Lohann Aug 29, 2023
d65e91b
Support PubSub for Jsonrpsee
Lohann Aug 29, 2023
e75c598
Cleanup code
Lohann Aug 29, 2023
a1a5ebc
Move ethereum jsonrpsee client to another repo
Lohann Aug 29, 2023
d8450bc
split client and subscriptions
Lohann Aug 29, 2023
25a47ff
Implement PubSub adapter
Lohann Aug 30, 2023
cdebf2e
Implement ClientT and SubscriptionT traits for EthAdapter
Lohann Aug 30, 2023
d49ec0c
Rename adapter structs
Lohann Aug 30, 2023
952e45c
update docs
Lohann Aug 30, 2023
23d93a6
Remove duplicated code
Lohann Aug 30, 2023
a551214
Support unsubscribe
Lohann Aug 30, 2023
c993a62
Remove unused crates
Lohann Aug 30, 2023
777c864
Implement auto-reconnect trait
Lohann Aug 30, 2023
39e71ee
Implement ClientT and SubscriptionT extensions
Lohann Aug 31, 2023
9780ea2
Implement cloneable version of JsonRpseeError
Lohann Aug 31, 2023
43a665e
Fix compilation errors
Lohann Aug 31, 2023
1f29e73
Fix reconnect concurrency
Lohann Aug 31, 2023
9924bb4
Implement restart_needed hook
Lohann Aug 31, 2023
ff25312
Implement reconnect for subscriptions
Lohann Aug 31, 2023
8968e41
Rename reconnect client
Lohann Aug 31, 2023
0e421f7
Remove unucessary constraits
Lohann Aug 31, 2023
3e730f8
Refactor reconnect strategy
Lohann Aug 31, 2023
d53ec8b
cargo fmt
Lohann Aug 31, 2023
2825f99
Rename files
Lohann Aug 31, 2023
50c6819
improve documentation
Lohann Aug 31, 2023
11e6faf
Rename ethereum event stream adapter
Lohann Sep 1, 2023
ade1e9f
Rename ethereum RPC params
Lohann Sep 1, 2023
fba0675
Remove unused crates
Lohann Sep 1, 2023
4fdef96
Refactor rosetta-ethereum-rpc-client
Lohann Sep 1, 2023
bc683c2
Impl client and subscription macros
Lohann Sep 1, 2023
99ad276
Remove unused code
Lohann Sep 1, 2023
944e9bb
cargo fmt
Lohann Sep 1, 2023
8ef64d4
Refactor error
Lohann Sep 3, 2023
3a3c8c3
Simplified reconnect logic
Lohann Sep 3, 2023
70c2799
Impl reconnect for all clients
Lohann Sep 3, 2023
1d13d02
Fix stackoverflow on ethereum adapter
Lohann Sep 3, 2023
21d60b8
Refactor ethers adapter
Lohann Sep 3, 2023
c8d6f58
Fix reconnects on ethereum connector
Lohann Sep 3, 2023
f273772
Fix unit tests
Lohann Sep 4, 2023
a452dcf
Make the delay between reconnects consistent
Lohann Sep 4, 2023
f6696af
Implement fibonnaci retry strategy
Lohann Sep 4, 2023
4148755
Update docs
Lohann Sep 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 151 additions & 126 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"chains/bitcoin/config",
"chains/bitcoin/server",
"chains/ethereum/config",
"chains/ethereum/rpc-client",
"chains/ethereum/server",
"chains/ethereum/tx",
"chains/polkadot/config",
Expand Down
19 changes: 19 additions & 0 deletions chains/ethereum/rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "rosetta-ethereum-rpc-client"
version = "0.1.0"
edition = "2021"
license = "MIT"
repository = "https://github.com/analog-labs/chain-connectors"
description = "Adapters for JsonRpsee RPC client and ethers-rs."

[dependencies]
async-trait = "0.1"
dashmap = "5.5"
ethers = { version = "2.0", default-features = true }
futures-util = "0.3"
jsonrpsee = { version = "0.20", default-features = false, features = ["macros"] }
pin-project = "1.1"
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"
tracing = "0.1"
92 changes: 92 additions & 0 deletions chains/ethereum/rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::{error::EthError, extension::impl_client_trait, params::EthRpcParams};
use async_trait::async_trait;
use ethers::providers::JsonRpcClient;
use jsonrpsee::core::client::ClientT;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};

/// Adapter for [`jsonrpsee::core::client::ClientT`] to [`ethers::providers::JsonRpcClient`].
#[repr(transparent)]
pub struct EthClientAdapter<C> {
pub(crate) client: C,
}

impl<C> EthClientAdapter<C>
where
C: ClientT + Debug + Send + Sync,
{
pub fn new(client: C) -> Self {
Self { client }
}
}

impl<C> AsRef<C> for EthClientAdapter<C> {
fn as_ref(&self) -> &C {
&self.client
}
}

impl_client_trait!(EthClientAdapter<C> where C: ClientT + Debug + Send + Sync);

impl<C> Debug for EthClientAdapter<C>
where
C: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientAdapter")
.field("client", &self.client)
.finish()
}
}

impl<C> Clone for EthClientAdapter<C>
where
C: Clone,
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
}

impl<C> AsMut<C> for EthClientAdapter<C> {
fn as_mut(&mut self) -> &mut C {
&mut self.client
}
}

impl<C> Deref for EthClientAdapter<C> {
type Target = C;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl<C> DerefMut for EthClientAdapter<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}

#[async_trait]
impl<C> JsonRpcClient for EthClientAdapter<C>
where
C: ClientT + Debug + Send + Sync,
{
type Error = EthError;

async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
let params = EthRpcParams::from_serializable(&params)?;
ClientT::request::<R, EthRpcParams>(&self.client, method, params)
.await
.map_err(EthError::from)
}
}
100 changes: 100 additions & 0 deletions chains/ethereum/rpc-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use ethers::providers::{
JsonRpcError as EthJsonRpcError, ProviderError as EthProviderError,
RpcError as EthRpcErrorTrait,
};
use jsonrpsee::{client_transport::ws::WsHandshakeError, core::Error as JsonRpseeError};

/// Adapter for [`jsonrpsee::core::Error`] to [`ethers::providers::RpcError`].
#[derive(Debug, thiserror::Error)]
pub enum EthError {
/// Thrown if the response could not be parsed
#[error("{original}")]
JsonRpsee {
original: JsonRpseeError,
message: Option<EthJsonRpcError>,
},

/// Failed to parse the data.
#[allow(clippy::enum_variant_names)]
#[error(transparent)]
ParseError(#[from] serde_json::Error),

/// Error that can happen during the WebSocket handshake.
#[error("WS Handshake failed: {0}")]
HandshakeFailed(WsHandshakeError),

/// The background task has been terminated.
#[error("The background task been terminated because: {0}; restart required")]
RestartNeeded(String),

/// The client is reconnecting
#[error("The client is restarting the background task")]
Reconnecting,
}

impl From<JsonRpseeError> for EthError {
fn from(error: JsonRpseeError) -> Self {
match error {
JsonRpseeError::Call(call) => {
let code = call.code() as i64;
let data = call
.data()
.and_then(|raw_value| serde_json::value::to_value(raw_value).ok());
let message = call.message().to_string();
Self::JsonRpsee {
original: JsonRpseeError::Call(call),
message: Some(EthJsonRpcError {
code,
message,
data,
}),
}
}
JsonRpseeError::ParseError(serde_error) => Self::ParseError(serde_error),
JsonRpseeError::RestartNeeded(reason) => Self::RestartNeeded(reason),
error => {
let message = format!("{}", &error);
Self::JsonRpsee {
original: error,
message: Some(EthJsonRpcError {
code: 9999,
message,
data: None,
}),
}
}
}
}
}

impl From<WsHandshakeError> for EthError {
fn from(error: WsHandshakeError) -> Self {
Self::HandshakeFailed(error)
}
}

impl From<EthError> for EthProviderError {
fn from(error: EthError) -> Self {
match error {
EthError::ParseError(error) => Self::SerdeJson(error),
EthError::HandshakeFailed(error) => Self::CustomError(error.to_string()),
error => Self::JsonRpcClientError(Box::new(error)),
}
}
}

impl EthRpcErrorTrait for EthError {
fn as_error_response(&self) -> Option<&EthJsonRpcError> {
match self {
Self::JsonRpsee { message, .. } => message.as_ref(),
_ => None,
}
}

fn as_serde_error(&self) -> Option<&serde_json::Error> {
match self {
Self::ParseError(error) => Some(error),
_ => None,
}
}
}
Loading