Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fixpending #1074

Merged
merged 17 commits into from
May 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ pub use transaction_queue::{TransactionQueue, AccountDetails, TransactionImportR
pub use miner::{Miner};
pub use external::{ExternalMiner, ExternalMinerService};

use std::collections::BTreeMap;
use util::{H256, U256, Address, Bytes};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::block::{ClosedBlock};
use ethcore::receipt::{Receipt};
use ethcore::error::{Error, ExecutionError};
use ethcore::transaction::SignedTransaction;

Expand Down Expand Up @@ -134,9 +136,15 @@ pub trait MinerService : Send + Sync {
/// Query pending transactions for hash.
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;

/// Get a list of all transactions.
fn all_transactions(&self) -> Vec<SignedTransaction>;

/// Get a list of all pending transactions.
fn pending_transactions(&self) -> Vec<SignedTransaction>;

/// Get a list of all pending receipts.
fn pending_receipts(&self) -> BTreeMap<H256, Receipt>;

/// Returns highest transaction nonce for given address.
fn last_nonce(&self, address: &Address) -> Option<U256>;

Expand Down
47 changes: 42 additions & 5 deletions miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::error::*;
use ethcore::client::{Executive, Executed, EnvInfo, TransactOptions};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::{Receipt};
use ethcore::spec::Spec;
use ethcore::engine::Engine;
use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
Expand Down Expand Up @@ -407,20 +408,56 @@ impl MinerService for Miner {
}

fn pending_transactions_hashes(&self) -> Vec<H256> {
let transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.pending_hashes()
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used 4 times here. Perhaps could be extracted?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea here? Return only transactions from the pending block if sealing is enabled, return all transactions from the queue otherwise? Where is this required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are maintaining a pending block, then we return the transactions within that block. if we are not maintaining a pending block, then we replicate the old behaviour of just using all submitted (in queue) transactions.

really, there should be another tag aside from pending, something like submitted, which does the original behaviour, and pending should mean only those transactions which have been executed locally (and have according receipts).

(true, Some(pending)) => pending.transactions().iter().map(|t| t.hash()).collect(),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.pending_hashes()
}
}
}

fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
let queue = self.transaction_queue.lock().unwrap();
queue.find(hash)
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => pending.transactions().iter().find(|t| &t.hash() == hash).map(|t| t.clone()),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.find(hash)
}
}
}

fn pending_transactions(&self) -> Vec<SignedTransaction> {
fn all_transactions(&self) -> Vec<SignedTransaction> {
let queue = self.transaction_queue.lock().unwrap();
queue.top_transactions()
}

fn pending_transactions(&self) -> Vec<SignedTransaction> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending transactions is used also when propagating transactions - does it mean that we don't want to propagate everything from queue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All valid transactions should be propogated

// TODO: should only use the sealing_work when it's current (it could be an old block)
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => pending.transactions().clone(),
_ => {
let queue = self.transaction_queue.lock().unwrap();
queue.top_transactions()
}
}
}

fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
match (self.sealing_enabled.load(atomic::Ordering::Relaxed), self.sealing_work.lock().unwrap().peek_last_ref()) {
(true, Some(pending)) => {
let hashes = pending.transactions()
.iter()
.map(|t| t.hash());

let receipts = pending.receipts().clone().into_iter();

hashes.zip(receipts).collect()
},
_ => BTreeMap::new()
}
}

fn last_nonce(&self, address: &Address) -> Option<U256> {
self.transaction_queue.lock().unwrap().last_nonce(address)
}
Expand Down
11 changes: 8 additions & 3 deletions rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
//! Helper type with all filter possibilities.
//! Helper type with all filter state data.

use std::collections::HashSet;
use util::hash::H256;
use ethcore::filter::Filter;
use v1::types::{Filter, Log};

pub type BlockNumber = u64;

/// Filter state.
#[derive(Clone)]
pub enum PollFilter {
/// Number of last block which client was notified about.
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
Logs(BlockNumber, Filter)
/// Number of From block number, pending logs and log filter iself.
Logs(BlockNumber, HashSet<Log>, Filter)
}
80 changes: 71 additions & 9 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use ethcore::block::IsBlock;
use ethcore::views::*;
use ethcore::ethereum::Ethash;
use ethcore::transaction::{Transaction as EthTransaction, SignedTransaction, Action};
use ethcore::log_entry::LogEntry;
use ethcore::filter::Filter as EthcoreFilter;
use self::ethash::SeedHashCompute;
use v1::traits::{Eth, EthFilter};
use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, TransactionRequest, CallRequest, OptionalValue, Index, Filter, Log, Receipt};
Expand All @@ -47,7 +49,7 @@ pub struct EthClient<C, S, A, M, EM> where
A: AccountProvider,
M: MinerService,
EM: ExternalMinerService {

client: Weak<C>,
sync: Weak<S>,
accounts: Weak<A>,
Expand Down Expand Up @@ -215,6 +217,25 @@ fn from_params_default_third<F1, F2>(params: Params) -> Result<(F1, F2, BlockNum
}
}

fn pending_logs<M>(miner: &M, filter: &EthcoreFilter) -> Vec<Log> where M: MinerService {
let receipts = miner.pending_receipts();

let pending_logs = receipts.into_iter()
.flat_map(|(hash, r)| r.logs.into_iter().map(|l| (hash.clone(), l)).collect::<Vec<(H256, LogEntry)>>())
.collect::<Vec<(H256, LogEntry)>>();

let result = pending_logs.into_iter()
.filter(|pair| filter.matches(&pair.1))
.map(|pair| {
let mut log = Log::from(pair.1);
log.transaction_hash = Some(pair.0);
log
})
.collect();

result
}

impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
C: BlockChainClient + 'static,
S: SyncProvider + 'static,
Expand Down Expand Up @@ -426,10 +447,18 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
fn logs(&self, params: Params) -> Result<Value, Error> {
from_params::<(Filter,)>(params)
.and_then(|(filter,)| {
let logs = take_weak!(self.client).logs(filter.into())
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();

if include_pending {
let pending = pending_logs(take_weak!(self.miner).deref(), &filter);
logs.extend(pending);
}

to_value(&logs)
})
}
Expand Down Expand Up @@ -580,7 +609,7 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
.and_then(|(filter,)| {
let mut polls = self.polls.lock().unwrap();
let block_number = take_weak!(self.client).chain_info().best_block_number;
let id = polls.create_poll(PollFilter::Logs(block_number, filter.into()));
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter));
to_value(&U256::from(id))
})
}
Expand Down Expand Up @@ -643,18 +672,44 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where

to_value(&diff)
},
PollFilter::Logs(ref mut block_number, ref filter) => {
let mut filter = filter.clone();
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = client.chain_info().best_block_number;

// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);

// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();
filter.from_block = BlockID::Number(*block_number);
filter.to_block = BlockID::Latest;
let logs = client.logs(filter)

// retrieve logs in range from_block..min(BlockID::Latest..to_block)
let mut logs = client.logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();

let current_number = client.chain_info().best_block_number;
// additionally retrieve pending logs
if include_pending {
let pending_logs = pending_logs(take_weak!(self.miner).deref(), &filter);

// remove logs about which client was already notified about
let new_pending_logs: Vec<_> = pending_logs.iter()
.filter(|p| !previous_logs.contains(p))
.cloned()
.collect();

// save all logs retrieved by client
*previous_logs = pending_logs.into_iter().collect();

// append logs array with new pending logs
logs.extend(new_pending_logs);
}

// save current block number as next from block number
*block_number = current_number;

to_value(&logs)
}
}
Expand All @@ -667,11 +722,18 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
.and_then(|(index,)| {
let mut polls = self.polls.lock().unwrap();
match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref filter)) => {
let logs = take_weak!(self.client).logs(filter.clone())
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.clone().into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();

if include_pending {
logs.extend(pending_logs(take_weak!(self.miner).deref(), &filter));
}

to_value(&logs)
},
// just empty array
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/tests/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ fn rpc_eth_transaction_receipt() {
"params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"],
"id": 1
}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","data":"0x","logIndex":"0x01","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","type":"mined"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00"},"id":1}"#;

assert_eq!(tester.io.handle_request(request), Some(response.to_owned()));
}
Expand Down
12 changes: 12 additions & 0 deletions rpc/src/v1/tests/helpers/miner_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use ethcore::error::{Error, ExecutionError};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethminer::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};

/// Test miner service.
Expand All @@ -32,6 +33,8 @@ pub struct TestMinerService {
pub latest_closed_block: Mutex<Option<ClosedBlock>>,
/// Pre-existed pending transactions
pub pending_transactions: Mutex<HashMap<H256, SignedTransaction>>,
/// Pre-existed pending receipts
pub pending_receipts: Mutex<BTreeMap<H256, Receipt>>,
/// Last nonces.
pub last_nonces: RwLock<HashMap<Address, U256>>,

Expand All @@ -48,6 +51,7 @@ impl Default for TestMinerService {
imported_transactions: Mutex::new(Vec::new()),
latest_closed_block: Mutex::new(None),
pending_transactions: Mutex::new(HashMap::new()),
pending_receipts: Mutex::new(BTreeMap::new()),
last_nonces: RwLock::new(HashMap::new()),
min_gas_price: RwLock::new(U256::from(20_000_000)),
gas_floor_target: RwLock::new(U256::from(12345)),
Expand Down Expand Up @@ -161,10 +165,18 @@ impl MinerService for TestMinerService {
self.pending_transactions.lock().unwrap().get(hash).cloned()
}

fn all_transactions(&self) -> Vec<SignedTransaction> {
self.pending_transactions.lock().unwrap().values().cloned().collect()
}

fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.pending_transactions.lock().unwrap().values().cloned().collect()
}

fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
self.pending_receipts.lock().unwrap().clone()
}

fn last_nonce(&self, address: &Address) -> Option<U256> {
self.last_nonces.read().unwrap().get(address).cloned()
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/types/block_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use serde::de::Visitor;
use ethcore::client::BlockID;

/// Represents rpc api block number param.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum BlockNumber {
Num(u64),
Latest,
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/types/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::de::Visitor;
use util::common::FromHex;

/// Wrapper structure around vector of bytes.
#[derive(Debug, PartialEq, Default)]
#[derive(Debug, PartialEq, Eq, Default, Hash, Clone)]
pub struct Bytes(pub Vec<u8>);

impl Bytes {
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/v1/types/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use v1::types::BlockNumber;
use ethcore::filter::Filter as EthFilter;
use ethcore::client::BlockID;

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum VariadicValue<T> where T: Deserialize {
Single(T),
Multiple(Vec<T>),
Expand All @@ -47,7 +47,7 @@ impl<T> Deserialize for VariadicValue<T> where T: Deserialize {
pub type FilterAddress = VariadicValue<Address>;
pub type Topic = VariadicValue<H256>;

#[derive(Debug, PartialEq, Deserialize)]
#[derive(Debug, PartialEq, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Filter {
#[serde(rename="fromBlock")]
Expand Down
Loading