Skip to content

Commit

Permalink
fix: Fix panic on first startup when mixing firehose and rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Jun 7, 2023
1 parent d564061 commit 9161255
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 101 deletions.
10 changes: 10 additions & 0 deletions graph/src/blockchain/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,13 @@ pub struct ChainIdentifier {
pub net_version: String,
pub genesis_block_hash: BlockHash,
}

impl fmt::Display for ChainIdentifier {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"net_version: {}, genesis_block_hash: {}",
self.net_version, self.genesis_block_hash
)
}
}
3 changes: 2 additions & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use graph_store_postgres::{
SubscriptionManager, PRIMARY_SHARD,
};
use lazy_static::lazy_static;
use std::collections::BTreeMap;
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};
const VERSION_LABEL_KEY: &str = "version";

Expand Down Expand Up @@ -892,7 +893,7 @@ impl Context {
pools.clone(),
subgraph_store,
HashMap::default(),
vec![],
BTreeMap::new(),
self.registry,
);

Expand Down
62 changes: 45 additions & 17 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use graph::slog::{debug, error, info, o, Logger};
use graph::url::Url;
use graph::util::security::SafeDisplay;
use graph_chain_ethereum::{self as ethereum, EthereumAdapterTrait, Transport};
use std::collections::{BTreeMap, HashMap};
use std::collections::{btree_map, BTreeMap};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -220,7 +220,7 @@ pub fn create_firehose_networks(
pub async fn connect_ethereum_networks(
logger: &Logger,
mut eth_networks: EthereumNetworks,
) -> (EthereumNetworks, Vec<(String, Vec<ChainIdentifier>)>) {
) -> Result<(EthereumNetworks, BTreeMap<String, ChainIdentifier>), anyhow::Error> {
// This has one entry for each provider, and therefore multiple entries
// for each network
let statuses = join_all(
Expand Down Expand Up @@ -268,10 +268,10 @@ pub async fn connect_ethereum_networks(
.await;

// Group identifiers by network name
let idents: HashMap<String, Vec<ChainIdentifier>> =
let idents: BTreeMap<String, ChainIdentifier> =
statuses
.into_iter()
.fold(HashMap::new(), |mut networks, status| {
.try_fold(BTreeMap::new(), |mut networks, status| {
match status {
ProviderNetworkStatus::Broken {
chain_id: network,
Expand All @@ -280,12 +280,25 @@ pub async fn connect_ethereum_networks(
ProviderNetworkStatus::Version {
chain_id: network,
ident,
} => networks.entry(network).or_default().push(ident),
} => match networks.entry(network.clone()) {
btree_map::Entry::Vacant(entry) => {
entry.insert(ident);
}
btree_map::Entry::Occupied(entry) => {
if &ident != entry.get() {
return Err(anyhow!(
"conflicting network identifiers for chain {}: `{}` != `{}`",
network,
ident,
entry.get()
));
}
}
},
}
networks
});
let idents: Vec<_> = idents.into_iter().collect();
(eth_networks, idents)
Ok(networks)
})?;
Ok((eth_networks, idents))
}

/// Try to connect to all the providers in `firehose_networks` and get their net
Expand All @@ -299,7 +312,7 @@ pub async fn connect_ethereum_networks(
pub async fn connect_firehose_networks<M>(
logger: &Logger,
mut firehose_networks: FirehoseNetworks,
) -> (FirehoseNetworks, Vec<(String, Vec<ChainIdentifier>)>)
) -> Result<(FirehoseNetworks, BTreeMap<String, ChainIdentifier>), Error>
where
M: prost::Message + BlockchainBlock + Default + 'static,
{
Expand Down Expand Up @@ -341,6 +354,8 @@ where
"genesis_block" => format_args!("{}", &ptr),
);

// BUG: Firehose doesn't provide the net_version.
// See also: firehose-no-net-version
let ident = ChainIdentifier {
net_version: "0".to_string(),
genesis_block_hash: ptr.hash,
Expand All @@ -354,20 +369,34 @@ where
.await;

// Group identifiers by chain id
let idents: HashMap<String, Vec<ChainIdentifier>> =
let idents: BTreeMap<String, ChainIdentifier> =
statuses
.into_iter()
.fold(HashMap::new(), |mut networks, status| {
.try_fold(BTreeMap::new(), |mut networks, status| {
match status {
ProviderNetworkStatus::Broken { chain_id, provider } => {
firehose_networks.remove(&chain_id, &provider)
}
ProviderNetworkStatus::Version { chain_id, ident } => {
networks.entry(chain_id).or_default().push(ident)
match networks.entry(chain_id.clone()) {
btree_map::Entry::Vacant(entry) => {
entry.insert(ident);
}
btree_map::Entry::Occupied(entry) => {
if &ident != entry.get() {
return Err(anyhow!(
"conflicting network identifiers for chain {}: `{}` != `{}`",
chain_id,
ident,
entry.get()
));
}
}
}
}
}
networks
});
Ok(networks)
})?;

// Clean-up chains with 0 provider
firehose_networks.networks.retain(|chain_id, endpoints| {
Expand All @@ -381,8 +410,7 @@ where
endpoints.len() > 0
});

let idents: Vec<_> = idents.into_iter().collect();
(firehose_networks, idents)
Ok((firehose_networks, idents))
}

/// Parses all Ethereum connection strings and returns their network names and
Expand Down
38 changes: 24 additions & 14 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,22 @@ async fn main() {
// `blockchain_map`.
let mut blockchain_map = BlockchainMap::new();

// Unwraps: `connect_ethereum_networks` and `connect_firehose_networks` only fail if
// mismatching chain identifiers are returned for a same network, which indicates a serious
// inconsistency between providers.
let (arweave_networks, arweave_idents) = connect_firehose_networks::<ArweaveBlock>(
&logger,
firehose_networks_by_kind
.remove(&BlockchainKind::Arweave)
.unwrap_or_else(FirehoseNetworks::new),
)
.await;
.await
.unwrap();

// This only has idents for chains with rpc adapters.
let (eth_networks, ethereum_idents) =
connect_ethereum_networks(&logger, eth_networks).await;
let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks)
.await
.unwrap();

let (eth_firehose_only_networks, eth_firehose_only_idents) =
connect_firehose_networks::<HeaderOnlyBlock>(
Expand All @@ -324,7 +329,8 @@ async fn main() {
.remove(&BlockchainKind::Ethereum)
.unwrap_or_else(FirehoseNetworks::new),
)
.await;
.await
.unwrap();

let (near_networks, near_idents) =
connect_firehose_networks::<NearFirehoseHeaderOnlyBlock>(
Expand All @@ -333,23 +339,27 @@ async fn main() {
.remove(&BlockchainKind::Near)
.unwrap_or_else(FirehoseNetworks::new),
)
.await;
.await
.unwrap();

let (cosmos_networks, cosmos_idents) = connect_firehose_networks::<CosmosFirehoseBlock>(
&logger,
firehose_networks_by_kind
.remove(&BlockchainKind::Cosmos)
.unwrap_or_else(FirehoseNetworks::new),
)
.await;

let network_identifiers = ethereum_idents
.into_iter()
.chain(eth_firehose_only_idents)
.chain(arweave_idents)
.chain(near_idents)
.chain(cosmos_idents)
.collect();
.await
.unwrap();

// Note that both `eth_firehose_only_idents` and `ethereum_idents` contain Ethereum
// networks. If the same network is configured in both RPC and Firehose, the RPC ident takes
// precedence. This is necessary because Firehose endpoints currently have no `net_version`.
// See also: firehose-no-net-version.
let mut network_identifiers = eth_firehose_only_idents;
network_identifiers.extend(ethereum_idents);
network_identifiers.extend(arweave_idents);
network_identifiers.extend(near_idents);
network_identifiers.extend(cosmos_idents);

let network_store = store_builder.network_store(network_identifiers);

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub async fn run(

let eth_adapters2 = eth_adapters.clone();

let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await;
let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await?;
// let (near_networks, near_idents) = connect_firehose_networks::<NearFirehoseHeaderOnlyBlock>(
// &logger,
// firehose_networks_by_kind
Expand Down
5 changes: 3 additions & 2 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::iter::FromIterator;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -166,7 +167,7 @@ impl StoreBuilder {
pools: HashMap<ShardName, ConnectionPool>,
subgraph_store: Arc<SubgraphStore>,
chains: HashMap<String, ShardName>,
networks: Vec<(String, Vec<ChainIdentifier>)>,
networks: BTreeMap<String, ChainIdentifier>,
registry: Arc<MetricsRegistry>,
) -> Arc<DieselStore> {
let networks = networks
Expand Down Expand Up @@ -280,7 +281,7 @@ impl StoreBuilder {

/// Return a store that combines both a `Store` for subgraph data
/// and a `BlockStore` for all chain related data
pub fn network_store(self, networks: Vec<(String, Vec<ChainIdentifier>)>) -> Arc<DieselStore> {
pub fn network_store(self, networks: BTreeMap<String, ChainIdentifier>) -> Arc<DieselStore> {
Self::make_store(
&self.logger,
self.pools,
Expand Down
Loading

0 comments on commit 9161255

Please sign in to comment.