diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index de7bbce0ed7..9e51155a5f2 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -280,3 +280,13 @@ pub struct ChainIdentifier { pub net_version: String, pub genesis_block_hash: BlockHash, } + +impl fmt::Display for ChainIdentifier { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "net_version: {}, genesis_block_hash: {}", + self.net_version, self.genesis_block_hash + ) + } +} diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index f1071a5fdf9..0a9bb651ddc 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -31,6 +31,7 @@ use graph_store_postgres::{ SubscriptionManager, PRIMARY_SHARD, }; use lazy_static::lazy_static; +use std::collections::BTreeMap; use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration}; const VERSION_LABEL_KEY: &str = "version"; @@ -892,7 +893,7 @@ impl Context { pools.clone(), subgraph_store, HashMap::default(), - vec![], + BTreeMap::new(), self.registry, ); diff --git a/node/src/chain.rs b/node/src/chain.rs index 961a929bfb3..563f78a0fae 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -14,7 +14,7 @@ use graph::slog::{debug, error, info, o, Logger}; use graph::url::Url; use graph::util::security::SafeDisplay; use graph_chain_ethereum::{self as ethereum, EthereumAdapterTrait, Transport}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{btree_map, BTreeMap}; use std::sync::Arc; use std::time::Duration; @@ -220,7 +220,7 @@ pub fn create_firehose_networks( pub async fn connect_ethereum_networks( logger: &Logger, mut eth_networks: EthereumNetworks, -) -> (EthereumNetworks, Vec<(String, Vec)>) { +) -> Result<(EthereumNetworks, BTreeMap), anyhow::Error> { // This has one entry for each provider, and therefore multiple entries // for each network let statuses = join_all( @@ -268,10 +268,10 @@ pub async fn connect_ethereum_networks( .await; // Group identifiers by network name - let idents: HashMap> = + let idents: BTreeMap = statuses .into_iter() - .fold(HashMap::new(), |mut networks, status| { + .try_fold(BTreeMap::new(), |mut networks, status| { match status { ProviderNetworkStatus::Broken { chain_id: network, @@ -280,12 +280,25 @@ pub async fn connect_ethereum_networks( ProviderNetworkStatus::Version { chain_id: network, ident, - } => networks.entry(network).or_default().push(ident), + } => match networks.entry(network.clone()) { + btree_map::Entry::Vacant(entry) => { + entry.insert(ident); + } + btree_map::Entry::Occupied(entry) => { + if &ident != entry.get() { + return Err(anyhow!( + "conflicting network identifiers for chain {}: `{}` != `{}`", + network, + ident, + entry.get() + )); + } + } + }, } - networks - }); - let idents: Vec<_> = idents.into_iter().collect(); - (eth_networks, idents) + Ok(networks) + })?; + Ok((eth_networks, idents)) } /// Try to connect to all the providers in `firehose_networks` and get their net @@ -299,7 +312,7 @@ pub async fn connect_ethereum_networks( pub async fn connect_firehose_networks( logger: &Logger, mut firehose_networks: FirehoseNetworks, -) -> (FirehoseNetworks, Vec<(String, Vec)>) +) -> Result<(FirehoseNetworks, BTreeMap), Error> where M: prost::Message + BlockchainBlock + Default + 'static, { @@ -341,6 +354,8 @@ where "genesis_block" => format_args!("{}", &ptr), ); + // BUG: Firehose doesn't provide the net_version. + // See also: firehose-no-net-version let ident = ChainIdentifier { net_version: "0".to_string(), genesis_block_hash: ptr.hash, @@ -354,20 +369,34 @@ where .await; // Group identifiers by chain id - let idents: HashMap> = + let idents: BTreeMap = statuses .into_iter() - .fold(HashMap::new(), |mut networks, status| { + .try_fold(BTreeMap::new(), |mut networks, status| { match status { ProviderNetworkStatus::Broken { chain_id, provider } => { firehose_networks.remove(&chain_id, &provider) } ProviderNetworkStatus::Version { chain_id, ident } => { - networks.entry(chain_id).or_default().push(ident) + match networks.entry(chain_id.clone()) { + btree_map::Entry::Vacant(entry) => { + entry.insert(ident); + } + btree_map::Entry::Occupied(entry) => { + if &ident != entry.get() { + return Err(anyhow!( + "conflicting network identifiers for chain {}: `{}` != `{}`", + chain_id, + ident, + entry.get() + )); + } + } + } } } - networks - }); + Ok(networks) + })?; // Clean-up chains with 0 provider firehose_networks.networks.retain(|chain_id, endpoints| { @@ -381,8 +410,7 @@ where endpoints.len() > 0 }); - let idents: Vec<_> = idents.into_iter().collect(); - (firehose_networks, idents) + Ok((firehose_networks, idents)) } /// Parses all Ethereum connection strings and returns their network names and diff --git a/node/src/main.rs b/node/src/main.rs index 58b3699754e..206c6c676a1 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -305,17 +305,22 @@ async fn main() { // `blockchain_map`. let mut blockchain_map = BlockchainMap::new(); + // Unwraps: `connect_ethereum_networks` and `connect_firehose_networks` only fail if + // mismatching chain identifiers are returned for a same network, which indicates a serious + // inconsistency between providers. let (arweave_networks, arweave_idents) = connect_firehose_networks::( &logger, firehose_networks_by_kind .remove(&BlockchainKind::Arweave) .unwrap_or_else(FirehoseNetworks::new), ) - .await; + .await + .unwrap(); // This only has idents for chains with rpc adapters. - let (eth_networks, ethereum_idents) = - connect_ethereum_networks(&logger, eth_networks).await; + let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks) + .await + .unwrap(); let (eth_firehose_only_networks, eth_firehose_only_idents) = connect_firehose_networks::( @@ -324,7 +329,8 @@ async fn main() { .remove(&BlockchainKind::Ethereum) .unwrap_or_else(FirehoseNetworks::new), ) - .await; + .await + .unwrap(); let (near_networks, near_idents) = connect_firehose_networks::( @@ -333,7 +339,8 @@ async fn main() { .remove(&BlockchainKind::Near) .unwrap_or_else(FirehoseNetworks::new), ) - .await; + .await + .unwrap(); let (cosmos_networks, cosmos_idents) = connect_firehose_networks::( &logger, @@ -341,15 +348,18 @@ async fn main() { .remove(&BlockchainKind::Cosmos) .unwrap_or_else(FirehoseNetworks::new), ) - .await; - - let network_identifiers = ethereum_idents - .into_iter() - .chain(eth_firehose_only_idents) - .chain(arweave_idents) - .chain(near_idents) - .chain(cosmos_idents) - .collect(); + .await + .unwrap(); + + // Note that both `eth_firehose_only_idents` and `ethereum_idents` contain Ethereum + // networks. If the same network is configured in both RPC and Firehose, the RPC ident takes + // precedence. This is necessary because Firehose endpoints currently have no `net_version`. + // See also: firehose-no-net-version. + let mut network_identifiers = eth_firehose_only_idents; + network_identifiers.extend(ethereum_idents); + network_identifiers.extend(arweave_idents); + network_identifiers.extend(near_idents); + network_identifiers.extend(cosmos_idents); let network_store = store_builder.network_store(network_identifiers); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index f94b09119c4..cba34adc4fe 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -111,7 +111,7 @@ pub async fn run( let eth_adapters2 = eth_adapters.clone(); - let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; + let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await?; // let (near_networks, near_idents) = connect_firehose_networks::( // &logger, // firehose_networks_by_kind diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 3675155eefe..8ececd9d851 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; @@ -166,7 +167,7 @@ impl StoreBuilder { pools: HashMap, subgraph_store: Arc, chains: HashMap, - networks: Vec<(String, Vec)>, + networks: BTreeMap, registry: Arc, ) -> Arc { let networks = networks @@ -280,7 +281,7 @@ impl StoreBuilder { /// Return a store that combines both a `Store` for subgraph data /// and a `BlockStore` for all chain related data - pub fn network_store(self, networks: Vec<(String, Vec)>) -> Arc { + pub fn network_store(self, networks: BTreeMap) -> Arc { Self::make_store( &self.logger, self.pools, diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 17962a91095..7345428ae12 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,6 +1,5 @@ use std::{ - collections::{HashMap, HashSet}, - iter::FromIterator, + collections::HashMap, sync::{Arc, RwLock}, time::Duration, }; @@ -8,12 +7,9 @@ use std::{ use graph::{ blockchain::ChainIdentifier, components::store::BlockStore as BlockStoreTrait, - prelude::{error, warn, BlockNumber, BlockPtr, Logger, ENV_VARS}, -}; -use graph::{ - constraint_violation, - prelude::{anyhow, CheapClone}, + prelude::{error, BlockNumber, BlockPtr, Logger, ENV_VARS}, }; +use graph::{constraint_violation, prelude::CheapClone}; use graph::{ prelude::{tokio, StoreError}, util::timed_cache::TimedCache, @@ -202,7 +198,7 @@ impl BlockStore { pub fn new( logger: Logger, // (network, ident, shard) - chains: Vec<(String, Vec, Shard)>, + chains: Vec<(String, ChainIdentifier, Shard)>, // shard -> pool pools: HashMap, sender: Arc, @@ -226,30 +222,13 @@ impl BlockStore { chain_store_metrics, }; - fn reduce_idents( - chain_name: &str, - idents: Vec, - ) -> Result, StoreError> { - let mut idents: HashSet = HashSet::from_iter(idents.into_iter()); - match idents.len() { - 0 => Ok(None), - 1 => Ok(idents.drain().next()), - _ => Err(anyhow!( - "conflicting network identifiers for chain {}: {:?}", - chain_name, - idents - ) - .into()), - } - } - /// Check that the configuration for `chain` hasn't changed so that /// it is ok to ingest from it fn chain_ingestible( logger: &Logger, chain: &primary::Chain, shard: &Shard, - ident: &Option, + ident: &ChainIdentifier, ) -> bool { if &chain.shard != shard { error!( @@ -261,45 +240,34 @@ impl BlockStore { ); return false; } - match ident { - Some(ident) => { - if chain.net_version != ident.net_version { - error!(logger, + if chain.net_version != ident.net_version { + error!(logger, "the net version for chain {} has changed from {} to {} since the last time we ran", chain.name, chain.net_version, ident.net_version ); - return false; - } - if chain.genesis_block != ident.genesis_block_hash.hash_hex() { - error!(logger, + return false; + } + if chain.genesis_block != ident.genesis_block_hash.hash_hex() { + error!(logger, "the genesis block hash for chain {} has changed from {} to {} since the last time we ran", chain.name, chain.genesis_block, ident.genesis_block_hash ); - return false; - } - true - } - None => { - warn!(logger, "Failed to get net version and genesis hash from provider. Assuming it has not changed"); - true - } + return false; } + true } // For each configured chain, add a chain store - for (chain_name, idents, shard) in chains { - let ident = reduce_idents(&chain_name, idents)?; - match ( - existing_chains - .iter() - .find(|chain| chain.name == chain_name), - ident, - ) { - (Some(chain), ident) => { + for (chain_name, ident, shard) in chains { + match existing_chains + .iter() + .find(|chain| chain.name == chain_name) + { + Some(chain) => { let status = if chain_ingestible(&block_store.logger, chain, &shard, &ident) { ChainStatus::Ingestible } else { @@ -307,7 +275,7 @@ impl BlockStore { }; block_store.add_chain_store(chain, status, false)?; } - (None, Some(ident)) => { + None => { let chain = primary::add_chain( block_store.mirror.primary(), &chain_name, @@ -316,13 +284,6 @@ impl BlockStore { )?; block_store.add_chain_store(&chain, ChainStatus::Ingestible, true)?; } - (None, None) => { - error!( - &block_store.logger, - " the chain {} is new but we could not get a network identifier for it", - chain_name - ); - } }; } diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index ffe716fd8be..76220fd363f 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -562,10 +562,14 @@ fn build_store() -> (Arc, ConnectionPool, Config, Arc Stores { let chain_head_listener = store_builder.chain_head_update_listener(); let network_identifiers = vec![( network_name.clone(), - (vec![ChainIdentifier { + ChainIdentifier { net_version: "".into(), genesis_block_hash: test_ptr(0).hash, - }]), - )]; + }, + )] + .into_iter() + .collect(); let network_store = store_builder.network_store(network_identifiers); let chain_store = network_store .block_store()