Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KakarotTransactions trait to upstream send_raw_transaction #1352

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 100 additions & 9 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
use crate::{
models::transaction::validate_transaction,
pool::{
mempool::{KakarotPool, TransactionOrdering},
validate::KakarotTransactionValidatorBuilder,
},
providers::eth_provider::{
database::{state::EthDatabase, Database},
error::KakarotError,
provider::EthDataProvider,
chain::ChainProvider,
database::{
ethereum::{EthereumBlockStore, EthereumTransactionStore},
state::EthDatabase,
Database,
},
error::{EthApiError, EthereumDataFormatError, KakarotError, SignatureError, TransactionError},
provider::{EthApiResult, EthDataProvider},
starknet::kakarot_core::to_starknet_transaction,
},
};
use alloy_rlp::Decodable;
use async_trait::async_trait;
use num_traits::ToPrimitive;
use reth_chainspec::ChainSpec;
use reth_transaction_pool::{blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig};
use reth_primitives::{Bytes, TransactionSigned, TransactionSignedEcRecovered, B256};
use reth_rpc_types_compat::transaction::from_recovered;
use reth_transaction_pool::{
blobstore::NoopBlobStore, EthPooledTransaction, PoolConfig, TransactionOrigin, TransactionPool,
};
use starknet::{core::types::Felt, providers::Provider};
use std::sync::Arc;
use tracing::Instrument;

#[async_trait]
pub trait KakarotTransactions {
/// Send a raw transaction to the network and returns the transactions hash.
async fn send_raw_transaction(&self, transaction: Bytes) -> EthApiResult<B256>;
}

/// Provides a wrapper structure around the Ethereum Provider
/// and the Mempool.
Expand All @@ -27,16 +47,15 @@ impl<SP> EthClient<SP>
where
SP: Provider + Clone + Sync + Send,
{
/// Tries to start a [`EthClient`] by fetching the current chain id, initializing a [`EthDataProvider`] and
/// a `Pool`.
/// Tries to start a [`EthClient`] by fetching the current chain id, initializing a [`EthDataProvider`] and a `Pool`.
tcoratger marked this conversation as resolved.
Show resolved Hide resolved
pub async fn try_new(starknet_provider: SP, database: Database) -> eyre::Result<Self> {
let chain = (starknet_provider.chain_id().await.map_err(KakarotError::from)?.to_bigint()
& Felt::from(u32::MAX).to_bigint())
.to_u64()
.unwrap();

// Create a new EthDataProvider instance with the initialized database and Starknet provider.
let mut eth_provider = EthDataProvider::try_new(database, starknet_provider).await?;
let eth_provider = EthDataProvider::try_new(database, starknet_provider).await?;

let validator =
KakarotTransactionValidatorBuilder::new(Arc::new(ChainSpec { chain: chain.into(), ..Default::default() }))
Expand All @@ -49,8 +68,6 @@ where
PoolConfig::default(),
));

eth_provider.set_mempool(pool.clone());

Ok(Self { eth_provider, pool })
}

Expand All @@ -64,3 +81,77 @@ where
self.pool.clone()
}
}

#[async_trait]
impl<SP> KakarotTransactions for EthClient<SP>
where
SP: Provider + Clone + Sync + Send,
{
async fn send_raw_transaction(&self, transaction: Bytes) -> EthApiResult<B256> {
// Decode the transaction data
let transaction_signed = TransactionSigned::decode(&mut transaction.0.as_ref())
.map_err(|_| EthApiError::EthereumDataFormat(EthereumDataFormatError::TransactionConversion))?;

let chain_id: u64 = self
.eth_provider
.chain_id()
.await?
.unwrap_or_default()
.try_into()
.map_err(|_| TransactionError::InvalidChainId)?;

// Validate the transaction
let latest_block_header =
self.eth_provider.database().latest_header().await?.ok_or(EthApiError::UnknownBlockNumber(None))?;
validate_transaction(&transaction_signed, chain_id, &latest_block_header)?;

// Recover the signer from the transaction
let signer = transaction_signed.recover_signer().ok_or(SignatureError::Recovery)?;

// Get the number of retries for the transaction
let retries = self.eth_provider.database().pending_transaction_retries(&transaction_signed.hash).await?;

let transaction_signed_ec_recovered =
TransactionSignedEcRecovered::from_signed_transaction(transaction_signed.clone(), signer);

let encoded_length = transaction_signed_ec_recovered.clone().length_without_header();

// Upsert the transaction as pending in the database
let transaction = from_recovered(transaction_signed_ec_recovered.clone());
self.eth_provider.database().upsert_pending_transaction(transaction, retries).await?;

// Convert the Ethereum transaction to a Starknet transaction
let starknet_transaction = to_starknet_transaction(&transaction_signed, signer, retries)?;

// Deploy EVM transaction signer if Hive feature is enabled
#[cfg(feature = "hive")]
self.eth_provider.deploy_evm_transaction_signer(signer).await?;

// Add the transaction to the Starknet provider
let span = tracing::span!(tracing::Level::INFO, "sn::add_invoke_transaction");
let res = self
.eth_provider
.starknet_provider()
.add_invoke_transaction(starknet_transaction)
.instrument(span)
.await
.map_err(KakarotError::from)?;

let pool_transaction = EthPooledTransaction::new(transaction_signed_ec_recovered, encoded_length);

// Don't handle the result in case we are adding multiple times the same transaction due to the retry.
let _ = self.pool.as_ref().add_transaction(TransactionOrigin::Local, pool_transaction).await;

// Return transaction hash if testing feature is enabled, otherwise log and return Ethereum hash
if cfg!(feature = "testing") {
return Ok(B256::from_slice(&res.transaction_hash.to_bytes_be()[..]));
}
let hash = transaction_signed.hash();
tracing::info!(
ethereum_hash = ?hash,
starknet_hash = ?B256::from_slice(&res.transaction_hash.to_bytes_be()[..]),
);

Ok(hash)
}
}
4 changes: 2 additions & 2 deletions src/eth_rpc/servers/eth_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::blocks_in_conditions)]

use crate::{
client::EthClient,
client::{EthClient, KakarotTransactions},
eth_rpc::api::eth_api::EthApiServer,
providers::eth_provider::{
constant::MAX_PRIORITY_FEE_PER_GAS, error::EthApiError, BlockProvider, ChainProvider, GasProvider, LogProvider,
Expand Down Expand Up @@ -238,7 +238,7 @@ where
#[tracing::instrument(skip_all, ret, err)]
async fn send_raw_transaction(&self, bytes: Bytes) -> Result<B256> {
tracing::info!("Serving eth_sendRawTransaction");
Ok(self.eth_client.eth_provider().send_raw_transaction(bytes).await?)
Ok(self.eth_client.send_raw_transaction(bytes).await?)
}

async fn sign(&self, _address: Address, _message: Bytes) -> Result<Bytes> {
Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ async fn main() -> Result<()> {
let starknet_provider = Arc::new(starknet_provider);

let eth_client = EthClient::try_new(starknet_provider, db.clone()).await.expect("failed to start ethereum client");
let eth_provider = eth_client.eth_provider().clone();

// Setup the retry handler
let retry_handler = RetryHandler::new(eth_provider, db);
let retry_handler = RetryHandler::new(eth_client.clone(), db);
retry_handler.start(&tokio::runtime::Handle::current());

// Setup the RPC module
Expand Down
17 changes: 8 additions & 9 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::validate::KakarotTransactionValidator;
use crate::providers::eth_provider::provider::EthDataProvider;
use crate::pool::EthClient;
use reth_transaction_pool::{
blobstore::NoopBlobStore, CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionPool,
};
Expand Down Expand Up @@ -58,35 +58,34 @@ impl AccountManager {
Self { accounts }
}

pub fn start<SP>(&self, rt_handle: &Handle, eth_provider: &'static EthDataProvider<SP>)
pub fn start<SP>(&self, rt_handle: &Handle, eth_client: &'static EthClient<SP>)
where
SP: starknet::providers::Provider + Send + Sync + 'static,
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
greged93 marked this conversation as resolved.
Show resolved Hide resolved
{
let accounts = self.accounts.clone();

rt_handle.spawn(async move {
loop {
for address in &accounts {
Self::process_transaction(address, eth_provider);
Self::process_transaction(address, eth_client);
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

fn process_transaction<SP>(address: &Felt, eth_provider: &EthDataProvider<SP>)
fn process_transaction<SP>(address: &Felt, eth_client: &EthClient<SP>)
where
SP: starknet::providers::Provider + Send + Sync + 'static,
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
greged93 marked this conversation as resolved.
Show resolved Hide resolved
{
let balance = Self::check_balance(address);

if balance > Felt::ONE {
let best_hashes =
eth_provider.mempool.as_ref().unwrap().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();
let best_hashes = eth_client.mempool().as_ref().best_transactions().map(|x| *x.hash()).collect::<Vec<_>>();

if let Some(best_hash) = best_hashes.first() {
eth_provider.mempool.as_ref().unwrap().remove_transactions(vec![*best_hash]);
eth_client.mempool().as_ref().remove_transactions(vec![*best_hash]);
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#![allow(clippy::used_underscore_binding)]
use crate::providers::eth_provider::{
database::{
use crate::{
client::{EthClient, KakarotTransactions},
providers::eth_provider::database::{
ethereum::EthereumTransactionStore,
filter::{self, EthDatabaseFilterBuilder},
types::transaction::StoredPendingTransaction,
Database,
},
provider::EthereumProvider,
};
use eyre::Result;
use opentelemetry::metrics::{Gauge, Unit};
use reth_primitives::{TransactionSignedEcRecovered, B256};
use starknet::providers::Provider;
use std::{
fmt,
str::FromStr,
Expand All @@ -37,9 +38,9 @@ pub fn get_transaction_max_retries() -> u8 {
}

/// The [`RetryHandler`] is responsible for retrying transactions that have failed.
pub struct RetryHandler<P: EthereumProvider> {
/// The Ethereum provider.
eth_provider: P,
pub struct RetryHandler<SP: Provider + Clone + Send + Sync> {
/// The Ethereum client.
eth_client: EthClient<SP>,
/// The database.
database: Database,
/// The time to retry transactions recorded in an Observable.
Expand All @@ -49,29 +50,29 @@ pub struct RetryHandler<P: EthereumProvider> {
retried: Arc<Mutex<Vec<B256>>>,
}

impl<P: EthereumProvider> fmt::Debug for RetryHandler<P> {
impl<SP: Provider + Clone + Send + Sync> fmt::Debug for RetryHandler<SP> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RetryHandler")
.field("eth_provider", &"...")
.field("eth_client", &"...")
.field("database", &self.database)
.finish_non_exhaustive()
}
}

impl<P> RetryHandler<P>
impl<SP> RetryHandler<SP>
where
P: EthereumProvider + Send + Sync + 'static,
SP: Provider + Clone + Send + Sync + 'static,
{
/// Creates a new [`RetryHandler`] with the given Ethereum provider, database.
#[allow(clippy::missing_const_for_fn)]
pub fn new(eth_provider: P, database: Database) -> Self {
pub fn new(eth_client: EthClient<SP>, database: Database) -> Self {
let retry_time = opentelemetry::global::meter("retry_service")
.u64_gauge("retry_time")
.with_description("The time taken to process pending transactions")
.with_unit(Unit::new("microseconds"))
.init();
Self {
eth_provider,
eth_client,
database,
retry_time,
#[cfg(test)]
Expand Down Expand Up @@ -126,7 +127,7 @@ where
}
};

let _hash = self.eth_provider.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?;
let _hash = self.eth_client.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?;
#[cfg(test)]
self.retried.lock().await.push(_hash);
Ok(())
Expand Down Expand Up @@ -170,8 +171,9 @@ mod tests {
async fn test_retry_handler(#[future] katana: Katana, _setup: ()) {
// Given
let eth_provider = katana.eth_provider();
let eth_client = katana.eth_client();
let db = eth_provider.database().clone();
let retry_handler = RetryHandler::new(eth_provider.clone(), db.clone());
let retry_handler = RetryHandler::new(eth_client, db.clone());

// Insert the first transaction into the pending transactions collection with 0 pool
let transaction1 = katana.eoa().mock_transaction_with_nonce(0).await.expect("Failed to get mock transaction");
Expand Down
15 changes: 1 addition & 14 deletions src/providers/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use itertools::Itertools;
use mongodb::bson::doc;
use num_traits::cast::ToPrimitive;
use reth_primitives::{BlockId, BlockNumberOrTag, TxKind, U256};
use std::sync::Arc;

use crate::pool::mempool::KakarotPool;
use reth_rpc_types::{BlockHashOrNumber, TransactionRequest};
use starknet::core::types::Felt;
use tracing::{instrument, Instrument};
Expand Down Expand Up @@ -66,7 +63,6 @@ pub struct EthDataProvider<SP: starknet::providers::Provider + Send + Sync> {
database: Database,
starknet_provider: SP,
pub(crate) chain_id: u64,
pub mempool: Option<Arc<KakarotPool<Self>>>,
}

impl<SP> EthDataProvider<SP>
Expand All @@ -82,11 +78,6 @@ where
pub const fn starknet_provider(&self) -> &SP {
&self.starknet_provider
}

/// Returns a reference to the pool.
pub fn mempool(&self) -> Option<Arc<KakarotPool<Self>>> {
self.mempool.clone()
}
}

impl<SP> EthDataProvider<SP>
Expand All @@ -100,11 +91,7 @@ where
let chain_id =
(Felt::from(u32::MAX).to_biguint() & starknet_provider.chain_id().await?.to_biguint()).try_into().unwrap(); // safe unwrap

Ok(Self { database, starknet_provider, chain_id, mempool: None })
}

pub fn set_mempool(&mut self, mempool: Arc<KakarotPool<Self>>) {
self.mempool = Some(mempool);
Ok(Self { database, starknet_provider, chain_id })
}

/// Prepare the call input for an estimate gas or call from a transaction request.
Expand Down
Loading
Loading