diff --git a/Cargo.lock b/Cargo.lock index 9332ad878e4..7052145e3e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,7 +472,7 @@ dependencies = [ "either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hash 0.1.2 (git+https://github.com/paritytech/parity-common)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "primal 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2138,6 +2138,7 @@ dependencies = [ "parity-version 2.0.3", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie 0.2.1 (git+https://github.com/paritytech/parity-common)", + "plain_hasher 0.1.0 (git+https://github.com/paritytech/parity-common)", "pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.2.1 (git+https://github.com/paritytech/parity-common)", diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 24c95cfdeb7..3e80d4cd517 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -43,7 +43,7 @@ pub mod provider; mod types; pub use self::cache::Cache; -pub use self::provider::Provider; +pub use self::provider::{Provider, MAX_HEADERS_PER_REQUEST}; pub use self::transaction_queue::TransactionQueue; pub use types::request as request; diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c7cc5ef5e7d..8b8f1034ac9 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -204,6 +204,8 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities { caps.serve_headers = true, CheckedRequest::HeaderByHash(_, _) => caps.serve_headers = true, + CheckedRequest::HeaderWithAncestors(_, _) => + caps.serve_headers = true, CheckedRequest::TransactionIndex(_, _) => {} // hashes yield no info. CheckedRequest::Signal(_, _) => caps.serve_headers = true, diff --git a/ethcore/light/src/on_demand/request.rs b/ethcore/light/src/on_demand/request.rs index 9feb0a670ff..8e0f76dec32 100644 --- a/ethcore/light/src/on_demand/request.rs +++ b/ethcore/light/src/on_demand/request.rs @@ -16,6 +16,7 @@ //! Request types, verification, and verification errors. +use std::cmp; use std::sync::Arc; use bytes::Bytes; @@ -47,6 +48,8 @@ pub enum Request { HeaderProof(HeaderProof), /// A request for a header by hash. HeaderByHash(HeaderByHash), + /// A request for a header by hash with a range of its ancestors. + HeaderWithAncestors(HeaderWithAncestors), /// A request for the index of a transaction. TransactionIndex(TransactionIndex), /// A request for block receipts. @@ -136,6 +139,7 @@ macro_rules! impl_single { // implement traits for each kind of request. impl_single!(HeaderProof, HeaderProof, (H256, U256)); impl_single!(HeaderByHash, HeaderByHash, encoded::Header); +impl_single!(HeaderWithAncestors, HeaderWithAncestors, Vec); impl_single!(TransactionIndex, TransactionIndex, net_request::TransactionIndexResponse); impl_single!(Receipts, BlockReceipts, Vec); impl_single!(Body, Body, encoded::Block); @@ -246,6 +250,7 @@ impl From for HeaderRef { pub enum CheckedRequest { HeaderProof(HeaderProof, net_request::IncompleteHeaderProofRequest), HeaderByHash(HeaderByHash, net_request::IncompleteHeadersRequest), + HeaderWithAncestors(HeaderWithAncestors, net_request::IncompleteHeadersRequest), TransactionIndex(TransactionIndex, net_request::IncompleteTransactionIndexRequest), Receipts(BlockReceipts, net_request::IncompleteReceiptsRequest), Body(Body, net_request::IncompleteBodyRequest), @@ -267,6 +272,16 @@ impl From for CheckedRequest { }; CheckedRequest::HeaderByHash(req, net_req) } + Request::HeaderWithAncestors(req) => { + let net_req = net_request::IncompleteHeadersRequest { + start: req.block_hash.map(Into::into), + skip: 0, + max: req.ancestor_count + 1, + reverse: true, + }; + trace!(target: "on_demand", "HeaderWithAncestors Request, {:?}", net_req); + CheckedRequest::HeaderWithAncestors(req, net_req) + } Request::HeaderProof(req) => { let net_req = net_request::IncompleteHeaderProofRequest { num: req.num().into(), @@ -335,6 +350,7 @@ impl CheckedRequest { match self { CheckedRequest::HeaderProof(_, req) => NetRequest::HeaderProof(req), CheckedRequest::HeaderByHash(_, req) => NetRequest::Headers(req), + CheckedRequest::HeaderWithAncestors(_, req) => NetRequest::Headers(req), CheckedRequest::TransactionIndex(_, req) => NetRequest::TransactionIndex(req), CheckedRequest::Receipts(_, req) => NetRequest::Receipts(req), CheckedRequest::Body(_, req) => NetRequest::Body(req), @@ -390,6 +406,27 @@ impl CheckedRequest { None } + CheckedRequest::HeaderWithAncestors(_, ref req) => { + if req.skip != 1 || !req.reverse { + return None; + } + + if let Some(&net_request::HashOrNumber::Hash(start)) = req.start.as_ref() { + let mut result = Vec::with_capacity(req.max as usize); + let mut hash = start; + let mut cache = cache.lock(); + for _ in 0..req.max { + match cache.block_header(&hash) { + Some(header) => { + hash = header.parent_hash(); + result.push(header); + } + None => return None, + } + } + Some(Response::HeaderWithAncestors(result)) + } else { None } + } CheckedRequest::Receipts(ref check, ref req) => { // empty transactions -> no receipts if check.0.as_ref().ok().map_or(false, |hdr| hdr.receipts_root() == KECCAK_NULL_RLP) { @@ -458,6 +495,7 @@ macro_rules! match_me { match $me { CheckedRequest::HeaderProof($check, $req) => $e, CheckedRequest::HeaderByHash($check, $req) => $e, + CheckedRequest::HeaderWithAncestors($check, $req) => $e, CheckedRequest::TransactionIndex($check, $req) => $e, CheckedRequest::Receipts($check, $req) => $e, CheckedRequest::Body($check, $req) => $e, @@ -487,6 +525,15 @@ impl IncompleteRequest for CheckedRequest { _ => Ok(()), } } + CheckedRequest::HeaderWithAncestors(ref check, ref req) => { + req.check_outputs(&mut f)?; + + // make sure the output given is definitively a hash. + match check.block_hash { + Field::BackReference(r, idx) => f(r, idx, OutputKind::Hash), + _ => Ok(()), + } + } CheckedRequest::TransactionIndex(_, ref req) => req.check_outputs(f), CheckedRequest::Receipts(_, ref req) => req.check_outputs(f), CheckedRequest::Body(_, ref req) => req.check_outputs(f), @@ -507,15 +554,46 @@ impl IncompleteRequest for CheckedRequest { fn complete(self) -> Result { match self { - CheckedRequest::HeaderProof(_, req) => req.complete().map(CompleteRequest::HeaderProof), - CheckedRequest::HeaderByHash(_, req) => req.complete().map(CompleteRequest::Headers), - CheckedRequest::TransactionIndex(_, req) => req.complete().map(CompleteRequest::TransactionIndex), - CheckedRequest::Receipts(_, req) => req.complete().map(CompleteRequest::Receipts), - CheckedRequest::Body(_, req) => req.complete().map(CompleteRequest::Body), - CheckedRequest::Account(_, req) => req.complete().map(CompleteRequest::Account), - CheckedRequest::Code(_, req) => req.complete().map(CompleteRequest::Code), - CheckedRequest::Execution(_, req) => req.complete().map(CompleteRequest::Execution), - CheckedRequest::Signal(_, req) => req.complete().map(CompleteRequest::Signal), + CheckedRequest::HeaderProof(_, req) => { + trace!(target: "on_demand", "HeaderProof request completed {:?}", req); + req.complete().map(CompleteRequest::HeaderProof) + } + CheckedRequest::HeaderByHash(_, req) => { + trace!(target: "on_demand", "HeaderByHash request completed {:?}", req); + req.complete().map(CompleteRequest::Headers) + } + CheckedRequest::HeaderWithAncestors(_, req) => { + trace!(target: "on_demand", "HeaderWithAncestors request completed {:?}", req); + req.complete().map(CompleteRequest::Headers) + } + CheckedRequest::TransactionIndex(_, req) => { + trace!(target: "on_demand", "TransactionIndex request completed {:?}", req); + req.complete().map(CompleteRequest::TransactionIndex) + } + CheckedRequest::Receipts(_, req) => { + trace!(target: "on_demand", "Receipt request completed {:?}", req); + req.complete().map(CompleteRequest::Receipts) + } + CheckedRequest::Body(_, req) => { + trace!(target: "on_demand", "Block request completed {:?}", req); + req.complete().map(CompleteRequest::Body) + } + CheckedRequest::Account(_, req) => { + trace!(target: "on_demand", "Account request completed {:?}", req); + req.complete().map(CompleteRequest::Account) + } + CheckedRequest::Code(_, req) => { + trace!(target: "on_demand", "Code request completed {:?}", req); + req.complete().map(CompleteRequest::Code) + } + CheckedRequest::Execution(_, req) => { + trace!(target: "on_demand", "Execution request completed {:?}", req); + req.complete().map(CompleteRequest::Execution) + } + CheckedRequest::Signal(_, req) => { + trace!(target: "on_demand", "Signal request completed {:?}", req); + req.complete().map(CompleteRequest::Signal) + } } } @@ -551,6 +629,9 @@ impl net_request::CheckedRequest for CheckedRequest { CheckedRequest::HeaderByHash(ref prover, _) => expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderByHash)), + CheckedRequest::HeaderWithAncestors(ref prover, _) => + expect!((&NetResponse::Headers(ref res), &CompleteRequest::Headers(ref req)) => + prover.check_response(cache, &req.start, &res.headers).map(Response::HeaderWithAncestors)), CheckedRequest::TransactionIndex(ref prover, _) => expect!((&NetResponse::TransactionIndex(ref res), _) => prover.check_response(cache, res).map(Response::TransactionIndex)), @@ -584,6 +665,8 @@ pub enum Response { HeaderProof((H256, U256)), /// Response to a header-by-hash request. HeaderByHash(encoded::Header), + /// Response to a header-by-hash with ancestors request. + HeaderWithAncestors(Vec), /// Response to a transaction-index request. TransactionIndex(net_request::TransactionIndexResponse), /// Response to a receipts request. @@ -625,6 +708,10 @@ pub enum Error { Decoder(::rlp::DecoderError), /// Empty response. Empty, + /// Response data length exceeds request max. + TooManyResults(u64, u64), + /// Response data is incomplete. + TooFewResults(u64, u64), /// Trie lookup error (result of bad proof) Trie(TrieError), /// Bad inclusion proof @@ -641,6 +728,8 @@ pub enum Error { WrongTrieRoot(H256, H256), /// Wrong response kind. WrongKind, + /// Wrong sequence of headers. + WrongHeaderSequence, } impl From<::rlp::DecoderError> for Error { @@ -701,6 +790,65 @@ impl HeaderProof { } } +/// Request for a header by hash with a range of ancestors. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HeaderWithAncestors { + /// Hash of the last block in the range to fetch. + pub block_hash: Field, + /// Number of headers before the last block to fetch in addition. + pub ancestor_count: u64, +} + +impl HeaderWithAncestors { + /// Check a response for the headers. + pub fn check_response( + &self, + cache: &Mutex<::cache::Cache>, + start: &net_request::HashOrNumber, + headers: &[encoded::Header] + ) -> Result, Error> { + let expected_hash = match (self.block_hash, start) { + (Field::Scalar(ref h), &net_request::HashOrNumber::Hash(ref h2)) => { + if h != h2 { return Err(Error::WrongHash(*h, *h2)) } + *h + } + (_, &net_request::HashOrNumber::Hash(h2)) => h2, + _ => return Err(Error::HeaderByNumber), + }; + + let start_header = headers.first().ok_or(Error::Empty)?; + let start_hash = start_header.hash(); + if start_hash != expected_hash { + return Err(Error::WrongHash(expected_hash, start_hash)); + } + + let expected_len = 1 + cmp::min(self.ancestor_count, start_header.number()); + let actual_len = headers.len() as u64; + match actual_len.cmp(&expected_len) { + cmp::Ordering::Less => + return Err(Error::TooFewResults(expected_len, actual_len)), + cmp::Ordering::Greater => + return Err(Error::TooManyResults(expected_len, actual_len)), + cmp::Ordering::Equal => (), + }; + + for (header, prev_header) in headers.iter().zip(headers[1..].iter()) { + if header.number() != prev_header.number() + 1 || + header.parent_hash() != prev_header.hash() + { + return Err(Error::WrongHeaderSequence) + } + } + + let mut cache = cache.lock(); + for header in headers { + cache.insert_block_header(header.hash(), header.clone()); + } + + Ok(headers.to_vec()) + } +} + /// Request for a header by hash. #[derive(Debug, Clone, PartialEq, Eq)] pub struct HeaderByHash(pub Field); @@ -993,6 +1141,83 @@ mod tests { assert!(HeaderByHash(hash.into()).check_response(&cache, &hash.into(), &[raw_header]).is_ok()) } + #[test] + fn check_header_with_ancestors() { + let mut last_header_hash = H256::default(); + let mut headers = (0..11).map(|num| { + let mut header = Header::new(); + header.set_number(num); + header.set_parent_hash(last_header_hash); + + last_header_hash = header.hash(); + header + }).collect::>(); + + headers.reverse(); // because responses are in reverse order + + let raw_headers = headers.iter() + .map(|hdr| encoded::Header::new(::rlp::encode(hdr).into_vec())) + .collect::>(); + + let mut invalid_successor = Header::new(); + invalid_successor.set_number(11); + invalid_successor.set_parent_hash(headers[1].hash()); + + let raw_invalid_successor = encoded::Header::new(::rlp::encode(&invalid_successor).into_vec()); + + let cache = Mutex::new(make_cache()); + + let header_with_ancestors = |hash, count| { + HeaderWithAncestors { + block_hash: hash, + ancestor_count: count + } + }; + + // Correct responses + assert!(header_with_ancestors(headers[0].hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]).is_ok()); + assert!(header_with_ancestors(headers[0].hash().into(), 2) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..3]).is_ok()); + assert!(header_with_ancestors(headers[0].hash().into(), 10) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]).is_ok()); + assert!(header_with_ancestors(headers[2].hash().into(), 2) + .check_response(&cache, &headers[2].hash().into(), &raw_headers[2..5]).is_ok()); + assert!(header_with_ancestors(headers[2].hash().into(), 10) + .check_response(&cache, &headers[2].hash().into(), &raw_headers[2..11]).is_ok()); + assert!(header_with_ancestors(invalid_successor.hash().into(), 0) + .check_response(&cache, &invalid_successor.hash().into(), &[raw_invalid_successor.clone()]).is_ok()); + + // Incorrect responses + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..1]), + Err(Error::WrongHash(invalid_successor.hash(), headers[0].hash()))); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 0) + .check_response(&cache, &headers[0].hash().into(), &[]), + Err(Error::Empty)); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 10) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..10]), + Err(Error::TooFewResults(11, 10))); + assert_eq!(header_with_ancestors(headers[0].hash().into(), 9) + .check_response(&cache, &headers[0].hash().into(), &raw_headers[0..11]), + Err(Error::TooManyResults(10, 11))); + + let response = &[raw_headers[0].clone(), raw_headers[2].clone()]; + assert_eq!(header_with_ancestors(headers[0].hash().into(), 1) + .check_response(&cache, &headers[0].hash().into(), response), + Err(Error::WrongHeaderSequence)); + + let response = &[raw_invalid_successor.clone(), raw_headers[0].clone()]; + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1) + .check_response(&cache, &invalid_successor.hash().into(), response), + Err(Error::WrongHeaderSequence)); + + let response = &[raw_invalid_successor.clone(), raw_headers[1].clone()]; + assert_eq!(header_with_ancestors(invalid_successor.hash().into(), 1) + .check_response(&cache, &invalid_successor.hash().into(), response), + Err(Error::WrongHeaderSequence)); + } + #[test] fn check_body() { use rlp::RlpStream; diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index a066cefb529..90cbe95b63c 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -33,6 +33,9 @@ use transaction_queue::TransactionQueue; use request; +/// Maximum allowed size of a headers request. +pub const MAX_HEADERS_PER_REQUEST: u64 = 512; + /// Defines the operations that a provider for the light subprotocol must fulfill. pub trait Provider: Send + Sync { /// Provide current blockchain info. @@ -54,7 +57,6 @@ pub trait Provider: Send + Sync { /// results within must adhere to the `skip` and `reverse` parameters. fn block_headers(&self, req: request::CompleteHeadersRequest) -> Option { use request::HashOrNumber; - const MAX_HEADERS_TO_SEND: u64 = 512; if req.max == 0 { return None } @@ -83,7 +85,7 @@ pub trait Provider: Send + Sync { } }; - let max = ::std::cmp::min(MAX_HEADERS_TO_SEND, req.max); + let max = ::std::cmp::min(MAX_HEADERS_PER_REQUEST, req.max); let headers: Vec<_> = (0u64..max) .map(|x: u64| x.saturating_mul(req.skip.saturating_add(1))) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 7e25871a350..5880822432d 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -58,6 +58,7 @@ parity-reactor = { path = "../util/reactor" } parity-updater = { path = "../updater" } parity-version = { path = "../util/version" } patricia-trie = { git = "https://github.com/paritytech/parity-common" } +plain_hasher = { git = "https://github.com/paritytech/parity-common" } rlp = { git = "https://github.com/paritytech/parity-common" } stats = { path = "../util/stats" } vm = { path = "../ethcore/vm" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 96926700c06..e6fdc37110c 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -64,6 +64,7 @@ extern crate parity_reactor; extern crate parity_updater as updater; extern crate parity_version as version; extern crate patricia_trie as trie; +extern crate plain_hasher; extern crate rlp; extern crate stats; extern crate vm; diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 710f7d7497a..366eacf0573 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -19,6 +19,7 @@ use std::fmt; use ethcore::account_provider::{SignError as AccountError}; +use ethcore::client::BlockId; use ethcore::error::{Error as EthcoreError, ErrorKind, CallError}; use jsonrpc_core::{futures, Error, ErrorCode, Value}; use rlp::DecoderError; @@ -100,6 +101,14 @@ pub fn request_rejected_limit() -> Error { } } +pub fn request_rejected_param_limit(limit: u64, items_desc: &str) -> Error { + Error { + code: ErrorCode::ServerError(codes::REQUEST_REJECTED_LIMIT), + message: format!("Requested data size exceeds limit of {} {}.", limit, items_desc), + data: None, + } +} + pub fn account(error: &str, details: T) -> Error { Error { code: ErrorCode::ServerError(codes::ACCOUNT_ERROR), @@ -422,6 +431,19 @@ pub fn filter_not_found() -> Error { } } +pub fn filter_block_not_found(id: BlockId) -> Error { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), // Specified in EIP-234. + message: "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found".into(), + data: Some(Value::String(match id { + BlockId::Hash(hash) => format!("0x{:x}", hash), + BlockId::Number(number) => format!("0x{:x}", number), + BlockId::Earliest => "earliest".to_string(), + BlockId::Latest => "latest".to_string(), + })), + } +} + // on-demand sender cancelled. pub fn on_demand_cancel(_cancel: futures::sync::oneshot::Canceled) -> Error { internal("on-demand sender cancelled", "") diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 2536024eb72..edceda693a8 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -16,6 +16,7 @@ //! Helpers for fetching blockchain data either from the light client or the network. +use std::cmp; use std::sync::Arc; use ethcore::basic_account::BasicAccount; @@ -31,7 +32,7 @@ use jsonrpc_macros::Trailing; use light::cache::Cache; use light::client::LightChainClient; -use light::cht; +use light::{cht, MAX_HEADERS_PER_REQUEST}; use light::on_demand::{ request, OnDemand, HeaderRef, Request as OnDemandRequest, Response as OnDemandResponse, ExecutionResult, @@ -42,6 +43,7 @@ use sync::LightSync; use ethereum_types::{U256, Address}; use hash::H256; use parking_lot::Mutex; +use plain_hasher::H256FastMap; use transaction::{Action, Transaction as EthTransaction, SignedTransaction, LocalizedTransaction}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; @@ -299,55 +301,67 @@ impl LightFetch { use std::collections::BTreeMap; use jsonrpc_core::futures::stream::{self, Stream}; - // early exit for "to" block before "from" block. - let best_number = self.client.chain_info().best_block_number; - let block_number = |id| match id { - BlockId::Earliest => Some(0), - BlockId::Latest => Some(best_number), - BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()), - BlockId::Number(x) => Some(x), - }; - - match (block_number(filter.to_block), block_number(filter.from_block)) { - (Some(to), Some(from)) if to < from => return Either::A(future::ok(Vec::new())), - (Some(_), Some(_)) => {}, - _ => return Either::A(future::err(errors::unknown_block())), - } - - let maybe_future = self.sync.with_context(move |ctx| { - // find all headers which match the filter, and fetch the receipts for each one. - // match them with their numbers for easy sorting later. - let bit_combos = filter.bloom_possibilities(); - let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) - .take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block) - .take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block) - .filter(|ref hdr| { - let hdr_bloom = hdr.log_bloom(); - bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some() - }) - .map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into()))) - .map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x))) - .collect(); + const MAX_BLOCK_RANGE: u64 = 1000; - // as the receipts come in, find logs within them which match the filter. - // insert them into a BTreeMap to maintain order by number and block index. - stream::futures_unordered(receipts_futures) - .fold(BTreeMap::new(), move |mut matches, (num, receipts)| { - for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() { - if filter.matches(&log) { - matches.insert((num, block_index), log.into()); - } - } - future::ok(matches) - }) // and then collect them into a vector. - .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) - .map_err(errors::on_demand_cancel) - }); + let fetcher = self.clone(); + self.headers_range_by_block_id(filter.from_block, filter.to_block, MAX_BLOCK_RANGE) + .and_then(move |mut headers| { + if headers.is_empty() { + return Either::A(future::ok(Vec::new())); + } - match maybe_future { - Some(fut) => Either::B(Either::A(fut)), - None => Either::B(Either::B(future::err(errors::network_disabled()))), - } + let on_demand = &fetcher.on_demand; + + let maybe_future = fetcher.sync.with_context(move |ctx| { + // find all headers which match the filter, and fetch the receipts for each one. + // match them with their numbers for easy sorting later. + let bit_combos = filter.bloom_possibilities(); + let receipts_futures: Vec<_> = headers.drain(..) + .filter(|ref hdr| { + let hdr_bloom = hdr.log_bloom(); + bit_combos.iter().any(|bloom| hdr_bloom.contains_bloom(bloom)) + }) + .map(|hdr| (hdr.number(), hdr.hash(), request::BlockReceipts(hdr.into()))) + .map(|(num, hash, req)| on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, hash, x))) + .collect(); + + // as the receipts come in, find logs within them which match the filter. + // insert them into a BTreeMap to maintain order by number and block index. + stream::futures_unordered(receipts_futures) + .fold(BTreeMap::new(), move |mut matches, (num, hash, receipts)| { + let mut block_index = 0; + for (transaction_index, receipt) in receipts.into_iter().enumerate() { + for (transaction_log_index, log) in receipt.logs.into_iter().enumerate() { + if filter.matches(&log) { + matches.insert((num, block_index), Log { + address: log.address.into(), + topics: log.topics.into_iter().map(Into::into).collect(), + data: log.data.into(), + block_hash: Some(hash.into()), + block_number: Some(num.into()), + // No way to easily retrieve transaction hash, so let's just skip it. + transaction_hash: None, + transaction_index: Some(transaction_index.into()), + log_index: Some(block_index.into()), + transaction_log_index: Some(transaction_log_index.into()), + log_type: "mined".into(), + removed: false, + }); + } + block_index += 1; + } + } + future::ok(matches) + }) // and then collect them into a vector. + .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) + .map_err(errors::on_demand_cancel) + }); + + match maybe_future { + Some(fut) => Either::B(Either::A(fut)), + None => Either::B(Either::B(future::err(errors::network_disabled()))), + } + }) } // Get a transaction by hash. also returns the index in the block. @@ -425,6 +439,150 @@ impl LightFetch { None => Box::new(future::err(errors::network_disabled())) as Box + Send> } } + + fn headers_range_by_block_id( + &self, + from_block: BlockId, + to_block: BlockId, + max: u64 + ) -> impl Future, Error = Error> { + let fetch_hashes = [from_block, to_block].iter() + .filter_map(|block_id| match block_id { + BlockId::Hash(hash) => Some(hash.clone()), + _ => None, + }) + .collect::>(); + + let best_number = self.client.chain_info().best_block_number; + + let fetcher = self.clone(); + self.headers_by_hash(&fetch_hashes[..]).and_then(move |mut header_map| { + let (from_block_num, to_block_num) = { + let block_number = |id| match id { + &BlockId::Earliest => 0, + &BlockId::Latest => best_number, + &BlockId::Hash(ref h) => + header_map.get(h).map(|hdr| hdr.number()) + .expect("from_block and to_block headers are fetched by hash; this closure is only called on from_block and to_block; qed"), + &BlockId::Number(x) => x, + }; + (block_number(&from_block), block_number(&to_block)) + }; + + if to_block_num < from_block_num { + // early exit for "to" block before "from" block. + return Either::A(future::err(errors::filter_block_not_found(to_block))); + } else if to_block_num - from_block_num >= max { + return Either::A(future::err(errors::request_rejected_param_limit(max, "blocks"))); + } + + let to_header_hint = match to_block { + BlockId::Hash(ref h) => header_map.remove(h), + _ => None, + }; + let headers_fut = fetcher.headers_range(from_block_num, to_block_num, to_header_hint); + Either::B(headers_fut.map(move |headers| { + // Validate from_block if it's a hash + let last_hash = headers.last().map(|hdr| hdr.hash()); + match (last_hash, from_block) { + (Some(h1), BlockId::Hash(h2)) if h1 != h2 => Vec::new(), + _ => headers, + } + })) + }) + } + + fn headers_by_hash(&self, hashes: &[H256]) -> impl Future, Error = Error> { + let mut refs = H256FastMap::with_capacity_and_hasher(hashes.len(), Default::default()); + let mut reqs = Vec::with_capacity(hashes.len()); + + for hash in hashes { + refs.entry(*hash).or_insert_with(|| { + self.make_header_requests(BlockId::Hash(*hash), &mut reqs) + .expect("make_header_requests never fails for BlockId::Hash; qed") + }); + } + + self.send_requests(reqs, move |res| { + let headers = refs.drain() + .map(|(hash, header_ref)| { + let hdr = extract_header(&res, header_ref) + .expect("these responses correspond to requests that header_ref belongs to; \ + qed"); + (hash, hdr) + }) + .collect(); + headers + }) + } + + fn headers_range( + &self, + from_number: u64, + to_number: u64, + to_header_hint: Option + ) -> impl Future, Error = Error> { + let range_length = (to_number - from_number + 1) as usize; + let mut headers: Vec = Vec::with_capacity(range_length); + + let iter_start = match to_header_hint { + Some(hdr) => { + let block_id = BlockId::Hash(hdr.parent_hash()); + headers.push(hdr); + block_id + } + None => BlockId::Number(to_number), + }; + headers.extend(self.client.ancestry_iter(iter_start) + .take_while(|hdr| hdr.number() >= from_number)); + + let fetcher = self.clone(); + future::loop_fn(headers, move |mut headers| { + let remaining = range_length - headers.len(); + if remaining == 0 { + return Either::A(future::ok(future::Loop::Break(headers))); + } + + let mut reqs: Vec = Vec::with_capacity(2); + + let start_hash = if let Some(hdr) = headers.last() { + hdr.parent_hash().into() + } else { + let cht_root = cht::block_to_cht_number(to_number) + .and_then(|cht_num| fetcher.client.cht_root(cht_num as usize)); + + let cht_root = match cht_root { + Some(cht_root) => cht_root, + None => return Either::A(future::err(errors::unknown_block())), + }; + + let header_proof = request::HeaderProof::new(to_number, cht_root) + .expect("HeaderProof::new is Some(_) if cht::block_to_cht_number() is Some(_); \ + this would return above if block_to_cht_number returned None; qed"); + + let idx = reqs.len(); + let hash_ref = Field::back_ref(idx, 0); + reqs.push(header_proof.into()); + + hash_ref + }; + + let max = cmp::min(remaining as u64, MAX_HEADERS_PER_REQUEST); + reqs.push(request::HeaderWithAncestors { + block_hash: start_hash, + ancestor_count: max - 1, + }.into()); + + Either::B(fetcher.send_requests(reqs, |mut res| { + match res.last_mut() { + Some(&mut OnDemandResponse::HeaderWithAncestors(ref mut res_headers)) => + headers.extend(res_headers.drain(..)), + _ => panic!("reqs has at least one entry; each request maps to a response; qed"), + }; + future::Loop::Continue(headers) + })) + }) + } } #[derive(Clone)]