Skip to content

Commit

Permalink
chain/ethereum: Update network indexer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Dec 12, 2019
1 parent b095209 commit 2f3437e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 70 deletions.
43 changes: 43 additions & 0 deletions chain/ethereum/src/network_indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use graph::prelude::*;
use std::fmt;
use std::ops::Deref;
use web3::types::{Block, H256};

mod block_writer;
Expand All @@ -21,6 +22,48 @@ const NETWORK_INDEXER_VERSION: u32 = 0;
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Ommer(Block<H256>);

impl From<Block<H256>> for Ommer {
fn from(block: Block<H256>) -> Self {
Self(block)
}
}

impl From<LightEthereumBlock> for Ommer {
fn from(block: LightEthereumBlock) -> Self {
Self(Block {
hash: block.hash,
parent_hash: block.parent_hash,
uncles_hash: block.uncles_hash,
author: block.author,
state_root: block.state_root,
transactions_root: block.transactions_root,
receipts_root: block.receipts_root,
number: block.number,
gas_used: block.gas_used,
gas_limit: block.gas_limit,
extra_data: block.extra_data,
logs_bloom: block.logs_bloom,
timestamp: block.timestamp,
difficulty: block.difficulty,
total_difficulty: block.total_difficulty,
seal_fields: block.seal_fields,
uncles: block.uncles,
transactions: block.transactions.into_iter().map(|tx| tx.hash).collect(),
size: block.size,
mix_hash: block.mix_hash,
nonce: block.nonce,
})
}
}

impl Deref for Ommer {
type Target = Block<H256>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Helper type to bundle blocks and their ommers together.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BlockWithOmmers {
Expand Down
111 changes: 41 additions & 70 deletions chain/ethereum/tests/network_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use std::time::Duration;
use graph::mock::*;
use graph::prelude::*;
use graph_chain_ethereum::network_indexer::{
self as network_indexer, BlockWithUncles, NetworkIndexerEvent,
self as network_indexer, BlockWithOmmers, NetworkIndexerEvent,
};
use graph_core::MetricsRegistry;
use graph_store_postgres::Store as DieselStore;
use web3::types::{Block, H256, H64};
use web3::types::{H256, H64};

use test_store::*;

Expand Down Expand Up @@ -45,9 +45,9 @@ fn remove_test_data(store: Arc<DieselStore>) {
pub fn run_network_indexer(
store: Arc<DieselStore>,
start_block: Option<EthereumBlockPointer>,
chains: Vec<Vec<BlockWithUncles>>,
chains: Vec<Vec<BlockWithOmmers>>,
timeout: Duration,
) -> impl Future<Item = Vec<NetworkIndexerEvent>, Error = Error> {
) -> impl Future<Item = Vec<NetworkIndexerEvent>, Error = ()> {
// Simulate an Ethereum network using a mock adapter
let adapter = create_mock_ethereum_adapter(chains);

Expand All @@ -57,31 +57,29 @@ pub fn run_network_indexer(
let metrics_registry = Arc::new(MetricsRegistry::new(logger.clone(), prometheus_registry));

// Create the network indexer
network_indexer::create(
subgraph_name.to_string(),
let mut indexer = network_indexer::NetworkIndexer::new(
&logger,
adapter,
store.clone(),
metrics_registry,
subgraph_name.to_string(),
start_block,
)
.and_then(move |mut indexer| {
let (event_sink, event_stream) = futures::sync::mpsc::channel(100);

// Run network indexer and forward its events to the channel
tokio::spawn(
indexer
.take_event_stream()
.expect("failed to take stream from indexer")
.timeout(timeout)
.map_err(|_| ())
.forward(event_sink.sink_map_err(|_| ()))
.map(|_| ()),
);

event_stream.collect()
})
.map_err(|_| format_err!("failed to start network indexer"))
);

let (event_sink, event_stream) = futures::sync::mpsc::channel(100);

// Run network indexer and forward its events to the channel
tokio::spawn(
indexer
.take_event_stream()
.expect("failed to take stream from indexer")
.timeout(timeout)
.map_err(|_| ())
.forward(event_sink.sink_map_err(|_| ()))
.map(|_| ()),
);

event_stream.collect()
}

// Helper to run tests against a clean store.
Expand Down Expand Up @@ -112,11 +110,11 @@ where
}

// Helper to create a sequence of linked blocks.
fn create_chain(n: u64, parent: Option<&BlockWithUncles>) -> Vec<BlockWithUncles> {
fn create_chain(n: u64, parent: Option<&BlockWithOmmers>) -> Vec<BlockWithOmmers> {
let start = parent.map_or(0, |block| block.inner().number.unwrap().as_u64() + 1);

(start..start + n).fold(vec![], |mut blocks, number| {
let mut block = BlockWithUncles::default();
let mut block = BlockWithOmmers::default();

// Set required fields
block.block.block.nonce = Some(H64::random());
Expand Down Expand Up @@ -147,10 +145,10 @@ fn create_chain(n: u64, parent: Option<&BlockWithUncles>) -> Vec<BlockWithUncles
}

fn create_fork(
original_blocks: Vec<BlockWithUncles>,
original_blocks: Vec<BlockWithOmmers>,
base: u64,
total: u64,
) -> Vec<BlockWithUncles> {
) -> Vec<BlockWithOmmers> {
let mut blocks = original_blocks[0..(base as usize) + 1].to_vec();
let new_blocks = create_chain((total - base - 1).try_into().unwrap(), blocks.last());
blocks.extend(new_blocks);
Expand All @@ -159,11 +157,11 @@ fn create_fork(

struct Chains {
chain_index: Option<usize>,
chains: Vec<Vec<BlockWithUncles>>,
chains: Vec<Vec<BlockWithOmmers>>,
}

impl Chains {
pub fn new(chains: Vec<Vec<BlockWithUncles>>) -> Self {
pub fn new(chains: Vec<Vec<BlockWithOmmers>>) -> Self {
Self {
chain_index: None,
chains,
Expand All @@ -174,18 +172,18 @@ impl Chains {
self.chain_index.clone()
}

pub fn next_chain(&mut self) -> Option<&Vec<BlockWithUncles>> {
pub fn next_chain(&mut self) -> Option<&Vec<BlockWithOmmers>> {
let next_index = self.chain_index.map_or(0, |index| index + 1);
self.chain_index.replace(next_index);
self.chains.get(next_index)
}

pub fn current_chain(&self) -> Option<&Vec<BlockWithUncles>> {
pub fn current_chain(&self) -> Option<&Vec<BlockWithOmmers>> {
self.chain_index.and_then(|index| self.chains.get(index))
}
}

fn create_mock_ethereum_adapter(chains: Vec<Vec<BlockWithUncles>>) -> Arc<MockEthereumAdapter> {
fn create_mock_ethereum_adapter(chains: Vec<Vec<BlockWithOmmers>>) -> Arc<MockEthereumAdapter> {
let chains = Arc::new(Mutex::new(Chains::new(chains)));

// Create the mock Ethereum adapter.
Expand Down Expand Up @@ -272,12 +270,12 @@ fn create_mock_ethereum_adapter(chains: Vec<Vec<BlockWithUncles>>) -> Arc<MockEt
))
});

// For now return no uncles
let chains_for_uncles = chains.clone();
// For now return no ommers
let chains_for_ommers = chains.clone();
adapter
.expect_uncles()
.returning(move |_, block: &LightEthereumBlock| {
let chains = chains_for_uncles.lock().unwrap();
let chains = chains_for_ommers.lock().unwrap();
Box::new(future::result(
chains
.current_chain()
Expand All @@ -289,14 +287,17 @@ fn create_mock_ethereum_adapter(chains: Vec<Vec<BlockWithUncles>>) -> Arc<MockEt
.find(|b| b.inner().hash.unwrap() == block.hash.unwrap())
.expect(
format!(
"block {} ({:x}) not found",
"block #{} ({:x}) not found",
block.number.unwrap(),
block.hash.unwrap()
)
.as_str(),
)
.clone()
.uncles
.ommers
.into_iter()
.map(|ommer| Some((*ommer).clone()))
.collect::<Vec<_>>()
}),
))
});
Expand Down Expand Up @@ -610,36 +611,6 @@ fn indexing_handles_reorg_back_and_forth() {
});
}

struct Ommer(LightEthereumBlock);

impl Into<Block<H256>> for Ommer {
fn into(self) -> Block<H256> {
Block {
hash: self.0.hash,
parent_hash: self.0.parent_hash,
uncles_hash: self.0.uncles_hash,
author: self.0.author,
state_root: self.0.state_root,
transactions_root: self.0.transactions_root,
receipts_root: self.0.receipts_root,
number: self.0.number,
gas_used: self.0.gas_used,
gas_limit: self.0.gas_limit,
extra_data: self.0.extra_data,
logs_bloom: self.0.logs_bloom,
timestamp: self.0.timestamp,
difficulty: self.0.difficulty,
total_difficulty: self.0.total_difficulty,
seal_fields: self.0.seal_fields,
uncles: self.0.uncles,
transactions: self.0.transactions.into_iter().map(|tx| tx.hash).collect(),
size: self.0.size,
mix_hash: self.0.mix_hash,
nonce: self.0.nonce,
}
}
}

// Test that ommer blocks are not confused with reguar blocks when finding
// common ancestors for reorgs. There was a bug initially where that would
// happen, because any block that was in the store was considered to be on the
Expand Down Expand Up @@ -672,13 +643,13 @@ fn indexing_identifies_common_ancestor_correctly_despite_ommers() {

// Make it so that #5' has #4 as an uncle
fork1[5].block.block.uncles = vec![initial_chain[4].inner().hash.clone().unwrap()];
fork1[5].uncles = vec![Some(Ommer(initial_chain[4].block.block.clone()).into())];
fork1[5].ommers = vec![initial_chain[4].block.block.clone().into()];

// Create fork 2 (blocks #0 - #4, #5'', #6''); this fork includes the
// original #4 again, which at this point should no longer be part of
// the indexed chain in the store and therefor not be considered as the
// common ancestor of the fork (that should be #3). It is still in the
// store as an uncle (of #5', from fork1) but that uncle should not be
// store as an ommer (of #5', from fork1) but that ommer should not be
// picked as the common ancestor either.
let fork2 = create_fork(initial_chain.clone(), 4, 7);

Expand Down

0 comments on commit 2f3437e

Please sign in to comment.