Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

add pending tx type to wait for tx confirmations #11

Merged
merged 6 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- run:
name: tests
# skip these temporarily until we get ganache-cli and solc on CI
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events --skip test_pending_tx
- run:
name: Check style
command: |
Expand Down
1 change: 0 additions & 1 deletion ethers-contract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ serde = { version = "1.0.110", default-features = false }
rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
once_cell = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.21", default-features = false }
futures = "0.3.5"

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions ethers-contract/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use ethers_core::{
abi::{Detokenize, Error as AbiError, Function, InvalidOutputType},
types::{Address, BlockNumber, TransactionRequest, H256, U256},
types::{Address, BlockNumber, TransactionRequest, U256},
};
use ethers_providers::{JsonRpcClient, ProviderError};
use ethers_providers::{JsonRpcClient, PendingTransaction, ProviderError};
use ethers_signers::{Client, ClientError, Signer};

use std::{fmt::Debug, marker::PhantomData};
Expand Down Expand Up @@ -110,7 +110,7 @@ where
}

/// Signs and broadcasts the provided transaction
pub async fn send(self) -> Result<H256, ContractError> {
pub async fn send(self) -> Result<PendingTransaction<'a, P>, ContractError> {
Ok(self.client.send_transaction(self.tx, self.block).await?)
}
}
33 changes: 5 additions & 28 deletions ethers-contract/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,20 @@ use ethers_core::{
use ethers_providers::JsonRpcClient;
use ethers_signers::{Client, Signer};

use std::time::Duration;
use tokio::time;

/// Poll for tx confirmation once every 7 seconds.
// TODO: Can this be improved by replacing polling with an "on new block" subscription?
const POLL_INTERVAL: u64 = 7000;

#[derive(Debug, Clone)]
/// Helper which manages the deployment transaction of a smart contract
pub struct Deployer<'a, P, S> {
abi: Abi,
client: &'a Client<P, S>,
tx: TransactionRequest,
confs: usize,
poll_interval: Duration,
}

impl<'a, P, S> Deployer<'a, P, S>
where
S: Signer,
P: JsonRpcClient,
{
/// Sets the poll frequency for checking the number of confirmations for
/// the contract deployment transaction
pub fn poll_interval<T: Into<Duration>>(mut self, interval: T) -> Self {
self.poll_interval = interval.into();
self
}

/// Sets the number of confirmations to wait for the contract deployment transaction
pub fn confirmations<T: Into<usize>>(mut self, confirmations: T) -> Self {
self.confs = confirmations.into();
Expand All @@ -46,20 +31,13 @@ where
/// be sufficiently confirmed (default: 1), it returns a [`Contract`](./struct.Contract.html)
/// struct at the deployed contract's address.
pub async fn send(self) -> Result<Contract<'a, P, S>, ContractError> {
let tx_hash = self.client.send_transaction(self.tx, None).await?;
let pending_tx = self.client.send_transaction(self.tx, None).await?;

// poll for the receipt
let address;
loop {
if let Ok(receipt) = self.client.get_transaction_receipt(tx_hash).await {
address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;
break;
}
let receipt = pending_tx.confirmations(self.confs).await?;

time::delay_for(Duration::from_millis(POLL_INTERVAL)).await;
}
let address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;

let contract = Contract::new(address, self.abi.clone(), self.client);
Ok(contract)
Expand Down Expand Up @@ -177,7 +155,6 @@ where
abi: self.abi,
tx,
confs: 1,
poll_interval: Duration::from_millis(POLL_INTERVAL),
})
}
}
5 changes: 4 additions & 1 deletion ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ mod provider;
// ENS support
mod ens;

mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod stream;
pub use stream::FilterStream;
// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream`
Expand All @@ -28,6 +31,6 @@ pub trait JsonRpcClient: Debug + Clone {
/// Sends a request with the provided JSON-RPC and parameters serialized as JSON
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Serialize + Send + Sync,
T: Debug + Serialize + Send + Sync,
R: for<'a> Deserialize<'a>;
}
191 changes: 191 additions & 0 deletions ethers-providers/src/pending_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use crate::{JsonRpcClient, Provider, ProviderError};
use ethers_core::types::{TransactionReceipt, TxHash, U64};
use pin_project::pin_project;
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};

/// A pending transaction is a transaction which has been submitted but is not yet mined.
/// `await`'ing on a pending transaction will resolve to a transaction receipt
/// once the transaction has enough `confirmations`. The default number of confirmations
/// is 1, but may be adjusted with the `confirmations` method. If the transaction does not
/// have enough confirmations or is not mined, the future will stay in the pending state.
#[pin_project]
pub struct PendingTransaction<'a, P> {
tx_hash: TxHash,
confirmations: usize,
provider: &'a Provider<P>,
state: PendingTxState<'a>,
}

impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction_receipt(tx_hash));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingReceipt(fut),
}
}

/// Sets the number of confirmations for the pending transaction to resolve
/// to a receipt
pub fn confirmations(mut self, confs: usize) -> Self {
self.confirmations = confs;
self
}
}

impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();

match this.state {
PendingTxState::GettingReceipt(fut) => {
let receipt = futures_util::ready!(fut.as_mut().poll(ctx))?;
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
}
PendingTxState::CheckingReceipt(receipt) => {
// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
let fut = Box::pin(this.provider.get_block_number());
*this.state =
PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone()))
} else {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
}
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");

let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;

// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block >= inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
// we need to re-instantiate the get_block_number future so that
// we poll again
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone());
return Poll::Pending;
}
}
PendingTxState::Completed => {
panic!("polled pending transaction future after completion")
}
};

Poll::Pending
}
}

impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingTransaction")
.field("tx_hash", &self.tx_hash)
.field("confirmations", &self.confirmations)
.field("state", &self.state)
.finish()
}
}

impl<'a, P> PartialEq for PendingTransaction<'a, P> {
fn eq(&self, other: &Self) -> bool {
self.tx_hash == other.tx_hash
}
}

impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
fn eq(&self, other: &TxHash) -> bool {
&self.tx_hash == other
}
}

impl<'a, P> Eq for PendingTransaction<'a, P> {}

impl<'a, P> Deref for PendingTransaction<'a, P> {
type Target = TxHash;

fn deref(&self) -> &Self::Target {
&self.tx_hash
}
}

// Helper type alias
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;

// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, TransactionReceipt>),

/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Box<TransactionReceipt>),

/// If the pending tx required only 1 conf, it will return early. Otherwise it will
/// proceed to the next state which will poll the block number until there have been
/// enough confirmations
CheckingReceipt(Box<TransactionReceipt>),

/// Future has completed and should panic if polled again
Completed,
}

impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
PendingTxState::Completed => "Completed",
};

f.debug_struct("PendingTxState")
.field("state", &state)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::Http;
use ethers_core::{types::TransactionRequest, utils::Ganache};
use std::convert::TryFrom;

#[tokio::test]
async fn test_pending_tx() {
let _ganache = Ganache::new().spawn();
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
let accounts = provider.get_accounts().await.unwrap();
let tx = TransactionRequest::pay(accounts[0], 1000).from(accounts[0]);

let pending_tx = provider.send_transaction(tx).await.unwrap();

let receipt = provider
.get_transaction_receipt(pending_tx.tx_hash)
.await
.unwrap();

// the pending tx resolves to the same receipt
let tx_receipt = pending_tx.confirmations(1).await.unwrap();
assert_eq!(receipt, tx_receipt);
}
}
19 changes: 12 additions & 7 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient,
JsonRpcClient, PendingTransaction,
};

use ethers_core::{
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<P: JsonRpcClient> Provider<P> {
pub async fn send_transaction(
&self,
mut tx: TransactionRequest,
) -> Result<TxHash, ProviderError> {
) -> Result<PendingTransaction<'_, P>, ProviderError> {
if let Some(ref to) = tx.to {
if let NameOrAddress::Name(ens_name) = to {
// resolve to an address
Expand All @@ -277,22 +277,27 @@ impl<P: JsonRpcClient> Provider<P> {
}
}

Ok(self
let tx_hash = self
.0
.request("eth_sendTransaction", [tx])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}

/// Send the raw RLP encoded transaction to the entire Ethereum network and returns the transaction's hash
/// This will consume gas from the account that signed the transaction.
pub async fn send_raw_transaction(&self, tx: &Transaction) -> Result<TxHash, ProviderError> {
pub async fn send_raw_transaction(
&self,
tx: &Transaction,
) -> Result<PendingTransaction<'_, P>, ProviderError> {
let rlp = utils::serialize(&tx.rlp());
Ok(self
let tx_hash = self
.0
.request("eth_sendRawTransaction", [rlp])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}

/// Signs data using a specific account. This account needs to be unlocked.
Expand Down
Loading