Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
add pending tx type to wait for tx confirmations (#11)
Browse files Browse the repository at this point in the history
* feat: add pending tx type

* feat(pending-txs): implement the full state machine

* tests(ethers): fix transfer eth example

* feat: use the pending transaction struct when deploying a contract

* ci: skip the pending tx test

* chore: fix doctests
  • Loading branch information
gakonst authored Jun 15, 2020
1 parent 20da946 commit 79b21b9
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- run:
name: tests
# skip these temporarily until we get ganache-cli and solc on CI
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events --skip test_pending_tx
- run:
name: Check style
command: |
Expand Down
1 change: 0 additions & 1 deletion ethers-contract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ serde = { version = "1.0.110", default-features = false }
rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
once_cell = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.21", default-features = false }
futures = "0.3.5"

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions ethers-contract/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use ethers_core::{
abi::{Detokenize, Error as AbiError, Function, InvalidOutputType},
types::{Address, BlockNumber, TransactionRequest, H256, U256},
types::{Address, BlockNumber, TransactionRequest, U256},
};
use ethers_providers::{JsonRpcClient, ProviderError};
use ethers_providers::{JsonRpcClient, PendingTransaction, ProviderError};
use ethers_signers::{Client, ClientError, Signer};

use std::{fmt::Debug, marker::PhantomData};
Expand Down Expand Up @@ -110,7 +110,7 @@ where
}

/// Signs and broadcasts the provided transaction
pub async fn send(self) -> Result<H256, ContractError> {
pub async fn send(self) -> Result<PendingTransaction<'a, P>, ContractError> {
Ok(self.client.send_transaction(self.tx, self.block).await?)
}
}
33 changes: 5 additions & 28 deletions ethers-contract/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,20 @@ use ethers_core::{
use ethers_providers::JsonRpcClient;
use ethers_signers::{Client, Signer};

use std::time::Duration;
use tokio::time;

/// Poll for tx confirmation once every 7 seconds.
// TODO: Can this be improved by replacing polling with an "on new block" subscription?
const POLL_INTERVAL: u64 = 7000;

#[derive(Debug, Clone)]
/// Helper which manages the deployment transaction of a smart contract
pub struct Deployer<'a, P, S> {
abi: Abi,
client: &'a Client<P, S>,
tx: TransactionRequest,
confs: usize,
poll_interval: Duration,
}

impl<'a, P, S> Deployer<'a, P, S>
where
S: Signer,
P: JsonRpcClient,
{
/// Sets the poll frequency for checking the number of confirmations for
/// the contract deployment transaction
pub fn poll_interval<T: Into<Duration>>(mut self, interval: T) -> Self {
self.poll_interval = interval.into();
self
}

/// Sets the number of confirmations to wait for the contract deployment transaction
pub fn confirmations<T: Into<usize>>(mut self, confirmations: T) -> Self {
self.confs = confirmations.into();
Expand All @@ -46,20 +31,13 @@ where
/// be sufficiently confirmed (default: 1), it returns a [`Contract`](./struct.Contract.html)
/// struct at the deployed contract's address.
pub async fn send(self) -> Result<Contract<'a, P, S>, ContractError> {
let tx_hash = self.client.send_transaction(self.tx, None).await?;
let pending_tx = self.client.send_transaction(self.tx, None).await?;

// poll for the receipt
let address;
loop {
if let Ok(receipt) = self.client.get_transaction_receipt(tx_hash).await {
address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;
break;
}
let receipt = pending_tx.confirmations(self.confs).await?;

time::delay_for(Duration::from_millis(POLL_INTERVAL)).await;
}
let address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;

let contract = Contract::new(address, self.abi.clone(), self.client);
Ok(contract)
Expand Down Expand Up @@ -177,7 +155,6 @@ where
abi: self.abi,
tx,
confs: 1,
poll_interval: Duration::from_millis(POLL_INTERVAL),
})
}
}
5 changes: 4 additions & 1 deletion ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod provider;
// ENS support
mod ens;

mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod stream;
pub use stream::FilterStream;
// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream`
Expand All @@ -28,6 +31,6 @@ pub trait JsonRpcClient: Debug + Clone {
/// Sends a request with the provided JSON-RPC and parameters serialized as JSON
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Serialize + Send + Sync,
T: Debug + Serialize + Send + Sync,
R: for<'a> Deserialize<'a>;
}
191 changes: 191 additions & 0 deletions ethers-providers/src/pending_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use crate::{JsonRpcClient, Provider, ProviderError};
use ethers_core::types::{TransactionReceipt, TxHash, U64};
use pin_project::pin_project;
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};

/// A pending transaction is a transaction which has been submitted but is not yet mined.
/// `await`'ing on a pending transaction will resolve to a transaction receipt
/// once the transaction has enough `confirmations`. The default number of confirmations
/// is 1, but may be adjusted with the `confirmations` method. If the transaction does not
/// have enough confirmations or is not mined, the future will stay in the pending state.
#[pin_project]
pub struct PendingTransaction<'a, P> {
tx_hash: TxHash,
confirmations: usize,
provider: &'a Provider<P>,
state: PendingTxState<'a>,
}

impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction_receipt(tx_hash));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingReceipt(fut),
}
}

/// Sets the number of confirmations for the pending transaction to resolve
/// to a receipt
pub fn confirmations(mut self, confs: usize) -> Self {
self.confirmations = confs;
self
}
}

impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();

match this.state {
PendingTxState::GettingReceipt(fut) => {
let receipt = futures_util::ready!(fut.as_mut().poll(ctx))?;
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
}
PendingTxState::CheckingReceipt(receipt) => {
// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
let fut = Box::pin(this.provider.get_block_number());
*this.state =
PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone()))
} else {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
}
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");

let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;

// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block >= inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
// we need to re-instantiate the get_block_number future so that
// we poll again
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone());
return Poll::Pending;
}
}
PendingTxState::Completed => {
panic!("polled pending transaction future after completion")
}
};

Poll::Pending
}
}

impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingTransaction")
.field("tx_hash", &self.tx_hash)
.field("confirmations", &self.confirmations)
.field("state", &self.state)
.finish()
}
}

impl<'a, P> PartialEq for PendingTransaction<'a, P> {
fn eq(&self, other: &Self) -> bool {
self.tx_hash == other.tx_hash
}
}

impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
fn eq(&self, other: &TxHash) -> bool {
&self.tx_hash == other
}
}

impl<'a, P> Eq for PendingTransaction<'a, P> {}

impl<'a, P> Deref for PendingTransaction<'a, P> {
type Target = TxHash;

fn deref(&self) -> &Self::Target {
&self.tx_hash
}
}

// Helper type alias
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;

// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, TransactionReceipt>),

/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Box<TransactionReceipt>),

/// If the pending tx required only 1 conf, it will return early. Otherwise it will
/// proceed to the next state which will poll the block number until there have been
/// enough confirmations
CheckingReceipt(Box<TransactionReceipt>),

/// Future has completed and should panic if polled again
Completed,
}

impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
PendingTxState::Completed => "Completed",
};

f.debug_struct("PendingTxState")
.field("state", &state)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::Http;
use ethers_core::{types::TransactionRequest, utils::Ganache};
use std::convert::TryFrom;

#[tokio::test]
async fn test_pending_tx() {
let _ganache = Ganache::new().spawn();
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
let accounts = provider.get_accounts().await.unwrap();
let tx = TransactionRequest::pay(accounts[0], 1000).from(accounts[0]);

let pending_tx = provider.send_transaction(tx).await.unwrap();

let receipt = provider
.get_transaction_receipt(pending_tx.tx_hash)
.await
.unwrap();

// the pending tx resolves to the same receipt
let tx_receipt = pending_tx.confirmations(1).await.unwrap();
assert_eq!(receipt, tx_receipt);
}
}
19 changes: 12 additions & 7 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient,
JsonRpcClient, PendingTransaction,
};

use ethers_core::{
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<P: JsonRpcClient> Provider<P> {
pub async fn send_transaction(
&self,
mut tx: TransactionRequest,
) -> Result<TxHash, ProviderError> {
) -> Result<PendingTransaction<'_, P>, ProviderError> {
if let Some(ref to) = tx.to {
if let NameOrAddress::Name(ens_name) = to {
// resolve to an address
Expand All @@ -277,22 +277,27 @@ impl<P: JsonRpcClient> Provider<P> {
}
}

Ok(self
let tx_hash = self
.0
.request("eth_sendTransaction", [tx])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}

/// Send the raw RLP encoded transaction to the entire Ethereum network and returns the transaction's hash
/// This will consume gas from the account that signed the transaction.
pub async fn send_raw_transaction(&self, tx: &Transaction) -> Result<TxHash, ProviderError> {
pub async fn send_raw_transaction(
&self,
tx: &Transaction,
) -> Result<PendingTransaction<'_, P>, ProviderError> {
let rlp = utils::serialize(&tx.rlp());
Ok(self
let tx_hash = self
.0
.request("eth_sendRawTransaction", [rlp])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}

/// Signs data using a specific account. This account needs to be unlocked.
Expand Down
Loading

0 comments on commit 79b21b9

Please sign in to comment.