This repository has been archived by the owner on Nov 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Handle removed logs in filter changes and add geth compatibility field #8796
Merged
Merged
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
7a4fbd7
Add removed geth compatibility field in log
sorpaas e92d69c
Fix mocked tests
sorpaas 0c029f1
Add field block hash in PollFilter
sorpaas b5002bf
Store last block hash info for log filters
sorpaas 92ed0e8
Implement canon route
sorpaas e7ac8e4
Use canon logs for fetching reorg logs
sorpaas d751314
Merge branch 'sorpaas/filter-removed' into sorpaas/geth-removed
sorpaas cacb283
Make sure removed flag is set
sorpaas b31a419
Merge branch 'master' of https://github.com/paritytech/parity into so…
sorpaas 9affc9c
Merge branch 'master' of https://github.com/paritytech/parity into so…
sorpaas 9c6d1c6
Address grumbles
sorpaas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,7 @@ pub trait Filterable { | |
fn best_block_number(&self) -> u64; | ||
|
||
/// Get a block hash by block id. | ||
fn block_hash(&self, id: BlockId) -> Option<RpcH256>; | ||
fn block_hash(&self, id: BlockId) -> Option<H256>; | ||
|
||
/// pending transaction hashes at the given block. | ||
fn pending_transactions_hashes(&self) -> Vec<H256>; | ||
|
@@ -52,6 +52,9 @@ pub trait Filterable { | |
|
||
/// Get a reference to the poll manager. | ||
fn polls(&self) -> &Mutex<PollManager<PollFilter>>; | ||
|
||
/// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed. | ||
fn canon_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we rename this to something like |
||
} | ||
|
||
/// Eth filter rpc implementation for a full node. | ||
|
@@ -80,8 +83,8 @@ impl<C, M> Filterable for EthFilterClient<C, M> where | |
self.client.chain_info().best_block_number | ||
} | ||
|
||
fn block_hash(&self, id: BlockId) -> Option<RpcH256> { | ||
self.client.block_hash(id).map(Into::into) | ||
fn block_hash(&self, id: BlockId) -> Option<H256> { | ||
self.client.block_hash(id) | ||
} | ||
|
||
fn pending_transactions_hashes(&self) -> Vec<H256> { | ||
|
@@ -100,13 +103,47 @@ impl<C, M> Filterable for EthFilterClient<C, M> where | |
} | ||
|
||
fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls } | ||
|
||
fn canon_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64) { | ||
let inner = || -> Option<Vec<H256>> { | ||
let mut route = Vec::new(); | ||
|
||
let mut current_block_hash = block_hash; | ||
let mut current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?; | ||
|
||
while current_block_hash != self.client.block_hash(BlockId::Number(current_block_header.number()))? { | ||
route.push(current_block_hash); | ||
|
||
current_block_hash = current_block_header.parent_hash(); | ||
current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?; | ||
} | ||
|
||
Some(route) | ||
}; | ||
|
||
let route = inner().unwrap_or_default(); | ||
let route_len = route.len() as u64; | ||
(route.into_iter().flat_map(|block_hash| { | ||
let mut filter = filter.clone(); | ||
filter.from_block = BlockId::Hash(block_hash); | ||
filter.to_block = filter.from_block; | ||
|
||
self.client.logs(filter).into_iter().map(|log| { | ||
let mut log: Log = log.into(); | ||
log.log_type = "removed".into(); | ||
log.removed = true; | ||
|
||
log | ||
}) | ||
}).collect(), route_len) | ||
} | ||
} | ||
|
||
impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | ||
fn new_filter(&self, filter: Filter) -> Result<RpcU256> { | ||
let mut polls = self.polls().lock(); | ||
let block_number = self.best_block_number(); | ||
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter)); | ||
let id = polls.create_poll(PollFilter::Logs(block_number, None, Default::default(), filter)); | ||
Ok(id.into()) | ||
} | ||
|
||
|
@@ -134,7 +171,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | |
let current_number = self.best_block_number() + 1; | ||
let hashes = (*block_number..current_number).into_iter() | ||
.map(BlockId::Number) | ||
.filter_map(|id| self.block_hash(id)) | ||
.filter_map(|id| self.block_hash(id).map(Into::into)) | ||
.collect::<Vec<RpcH256>>(); | ||
|
||
*block_number = current_number; | ||
|
@@ -164,7 +201,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | |
// return new hashes | ||
Either::A(future::ok(FilterChanges::Hashes(new_hashes))) | ||
}, | ||
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => { | ||
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => { | ||
// retrive the current block number | ||
let current_number = self.best_block_number(); | ||
|
||
|
@@ -176,6 +213,10 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | |
filter.from_block = BlockId::Number(*block_number); | ||
filter.to_block = BlockId::Latest; | ||
|
||
// retrieve reorg logs | ||
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.canon_logs(h, &filter)); | ||
*block_number -= reorg_len as u64; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't affect the filter that's used below when getting pending logs. And it's overridden afterwards with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My mistake. Filter |
||
|
||
// retrieve pending logs | ||
let pending = if include_pending { | ||
let pending_logs = self.pending_logs(current_number, &filter); | ||
|
@@ -198,9 +239,14 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | |
// we want to get logs | ||
*block_number = current_number + 1; | ||
|
||
// save the current block hash, which we used to get back to the | ||
// canon chain in case of reorg. | ||
*last_block_hash = self.block_hash(BlockId::Number(current_number)); | ||
|
||
// retrieve logs in range from_block..min(BlockId::Latest..to_block) | ||
let limit = filter.limit; | ||
Either::B(self.logs(filter) | ||
.map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front | ||
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs | ||
.map(move |logs| limit_logs(logs, limit)) // limit the logs | ||
.map(FilterChanges::Logs)) | ||
|
@@ -214,7 +260,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T { | |
let mut polls = self.polls().lock(); | ||
|
||
match polls.poll(&index.value()) { | ||
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => filter.clone(), | ||
Some(&PollFilter::Logs(ref _block_number, ref _last_block_hash, ref _previous_log, ref filter)) => filter.clone(), | ||
// just empty array | ||
Some(_) => return Box::new(future::ok(Vec::new())), | ||
None => return Box::new(future::err(errors::filter_not_found())), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add documentation for this new parameter: "last seen block hash"?