diff --git a/chain/ethereum/src/network_indexer/mod.rs b/chain/ethereum/src/network_indexer/mod.rs index 5fae7c2cd56..f3eca8712aa 100644 --- a/chain/ethereum/src/network_indexer/mod.rs +++ b/chain/ethereum/src/network_indexer/mod.rs @@ -1,5 +1,6 @@ use graph::prelude::*; use std::fmt; +use std::ops::Deref; use web3::types::{Block, H256}; mod block_writer; @@ -21,6 +22,48 @@ const NETWORK_INDEXER_VERSION: u32 = 0; #[derive(Clone, Debug, Default, PartialEq)] pub struct Ommer(Block); +impl From> for Ommer { + fn from(block: Block) -> Self { + Self(block) + } +} + +impl From for Ommer { + fn from(block: LightEthereumBlock) -> Self { + Self(Block { + hash: block.hash, + parent_hash: block.parent_hash, + uncles_hash: block.uncles_hash, + author: block.author, + state_root: block.state_root, + transactions_root: block.transactions_root, + receipts_root: block.receipts_root, + number: block.number, + gas_used: block.gas_used, + gas_limit: block.gas_limit, + extra_data: block.extra_data, + logs_bloom: block.logs_bloom, + timestamp: block.timestamp, + difficulty: block.difficulty, + total_difficulty: block.total_difficulty, + seal_fields: block.seal_fields, + uncles: block.uncles, + transactions: block.transactions.into_iter().map(|tx| tx.hash).collect(), + size: block.size, + mix_hash: block.mix_hash, + nonce: block.nonce, + }) + } +} + +impl Deref for Ommer { + type Target = Block; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Helper type to bundle blocks and their ommers together. #[derive(Clone, Debug, Default, PartialEq)] pub struct BlockWithOmmers { diff --git a/chain/ethereum/tests/network_indexer.rs b/chain/ethereum/tests/network_indexer.rs index b1d3edb2182..f42d8d4702f 100644 --- a/chain/ethereum/tests/network_indexer.rs +++ b/chain/ethereum/tests/network_indexer.rs @@ -10,11 +10,11 @@ use std::time::Duration; use graph::mock::*; use graph::prelude::*; use graph_chain_ethereum::network_indexer::{ - self as network_indexer, BlockWithUncles, NetworkIndexerEvent, + self as network_indexer, BlockWithOmmers, NetworkIndexerEvent, }; use graph_core::MetricsRegistry; use graph_store_postgres::Store as DieselStore; -use web3::types::{Block, H256, H64}; +use web3::types::{H256, H64}; use test_store::*; @@ -45,9 +45,9 @@ fn remove_test_data(store: Arc) { pub fn run_network_indexer( store: Arc, start_block: Option, - chains: Vec>, + chains: Vec>, timeout: Duration, -) -> impl Future, Error = Error> { +) -> impl Future, Error = ()> { // Simulate an Ethereum network using a mock adapter let adapter = create_mock_ethereum_adapter(chains); @@ -57,31 +57,29 @@ pub fn run_network_indexer( let metrics_registry = Arc::new(MetricsRegistry::new(logger.clone(), prometheus_registry)); // Create the network indexer - network_indexer::create( - subgraph_name.to_string(), + let mut indexer = network_indexer::NetworkIndexer::new( &logger, adapter, store.clone(), metrics_registry, + subgraph_name.to_string(), start_block, - ) - .and_then(move |mut indexer| { - let (event_sink, event_stream) = futures::sync::mpsc::channel(100); - - // Run network indexer and forward its events to the channel - tokio::spawn( - indexer - .take_event_stream() - .expect("failed to take stream from indexer") - .timeout(timeout) - .map_err(|_| ()) - .forward(event_sink.sink_map_err(|_| ())) - .map(|_| ()), - ); - - event_stream.collect() - }) - .map_err(|_| format_err!("failed to start network indexer")) + ); + + let (event_sink, event_stream) = futures::sync::mpsc::channel(100); + + // Run network indexer and forward its events to the channel + tokio::spawn( + indexer + .take_event_stream() + .expect("failed to take stream from indexer") + .timeout(timeout) + .map_err(|_| ()) + .forward(event_sink.sink_map_err(|_| ())) + .map(|_| ()), + ); + + event_stream.collect() } // Helper to run tests against a clean store. @@ -112,11 +110,11 @@ where } // Helper to create a sequence of linked blocks. -fn create_chain(n: u64, parent: Option<&BlockWithUncles>) -> Vec { +fn create_chain(n: u64, parent: Option<&BlockWithOmmers>) -> Vec { let start = parent.map_or(0, |block| block.inner().number.unwrap().as_u64() + 1); (start..start + n).fold(vec![], |mut blocks, number| { - let mut block = BlockWithUncles::default(); + let mut block = BlockWithOmmers::default(); // Set required fields block.block.block.nonce = Some(H64::random()); @@ -147,10 +145,10 @@ fn create_chain(n: u64, parent: Option<&BlockWithUncles>) -> Vec, + original_blocks: Vec, base: u64, total: u64, -) -> Vec { +) -> Vec { let mut blocks = original_blocks[0..(base as usize) + 1].to_vec(); let new_blocks = create_chain((total - base - 1).try_into().unwrap(), blocks.last()); blocks.extend(new_blocks); @@ -159,11 +157,11 @@ fn create_fork( struct Chains { chain_index: Option, - chains: Vec>, + chains: Vec>, } impl Chains { - pub fn new(chains: Vec>) -> Self { + pub fn new(chains: Vec>) -> Self { Self { chain_index: None, chains, @@ -174,18 +172,18 @@ impl Chains { self.chain_index.clone() } - pub fn next_chain(&mut self) -> Option<&Vec> { + pub fn next_chain(&mut self) -> Option<&Vec> { let next_index = self.chain_index.map_or(0, |index| index + 1); self.chain_index.replace(next_index); self.chains.get(next_index) } - pub fn current_chain(&self) -> Option<&Vec> { + pub fn current_chain(&self) -> Option<&Vec> { self.chain_index.and_then(|index| self.chains.get(index)) } } -fn create_mock_ethereum_adapter(chains: Vec>) -> Arc { +fn create_mock_ethereum_adapter(chains: Vec>) -> Arc { let chains = Arc::new(Mutex::new(Chains::new(chains))); // Create the mock Ethereum adapter. @@ -272,12 +270,12 @@ fn create_mock_ethereum_adapter(chains: Vec>) -> Arc>) -> Arc>() }), )) }); @@ -610,36 +611,6 @@ fn indexing_handles_reorg_back_and_forth() { }); } -struct Ommer(LightEthereumBlock); - -impl Into> for Ommer { - fn into(self) -> Block { - Block { - hash: self.0.hash, - parent_hash: self.0.parent_hash, - uncles_hash: self.0.uncles_hash, - author: self.0.author, - state_root: self.0.state_root, - transactions_root: self.0.transactions_root, - receipts_root: self.0.receipts_root, - number: self.0.number, - gas_used: self.0.gas_used, - gas_limit: self.0.gas_limit, - extra_data: self.0.extra_data, - logs_bloom: self.0.logs_bloom, - timestamp: self.0.timestamp, - difficulty: self.0.difficulty, - total_difficulty: self.0.total_difficulty, - seal_fields: self.0.seal_fields, - uncles: self.0.uncles, - transactions: self.0.transactions.into_iter().map(|tx| tx.hash).collect(), - size: self.0.size, - mix_hash: self.0.mix_hash, - nonce: self.0.nonce, - } - } -} - // Test that ommer blocks are not confused with reguar blocks when finding // common ancestors for reorgs. There was a bug initially where that would // happen, because any block that was in the store was considered to be on the @@ -672,13 +643,13 @@ fn indexing_identifies_common_ancestor_correctly_despite_ommers() { // Make it so that #5' has #4 as an uncle fork1[5].block.block.uncles = vec![initial_chain[4].inner().hash.clone().unwrap()]; - fork1[5].uncles = vec![Some(Ommer(initial_chain[4].block.block.clone()).into())]; + fork1[5].ommers = vec![initial_chain[4].block.block.clone().into()]; // Create fork 2 (blocks #0 - #4, #5'', #6''); this fork includes the // original #4 again, which at this point should no longer be part of // the indexed chain in the store and therefor not be considered as the // common ancestor of the fork (that should be #3). It is still in the - // store as an uncle (of #5', from fork1) but that uncle should not be + // store as an ommer (of #5', from fork1) but that ommer should not be // picked as the common ancestor either. let fork2 = create_fork(initial_chain.clone(), 4, 7);