From 3d6eacb95cf54cc4b6754328cbeb86e4df0e5255 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Thu, 12 Dec 2019 14:57:38 +0100 Subject: [PATCH] chain/ethereum, node: Move subgraph creation into NetworkIndexer --- chain/ethereum/src/network_indexer/mod.rs | 44 +-------- .../src/network_indexer/network_indexer.rs | 97 ++++++++++++++++--- .../ethereum/src/network_indexer/subgraph.rs | 36 +++---- node/src/main.rs | 14 ++- 4 files changed, 102 insertions(+), 89 deletions(-) diff --git a/chain/ethereum/src/network_indexer/mod.rs b/chain/ethereum/src/network_indexer/mod.rs index a4591debf30..5fae7c2cd56 100644 --- a/chain/ethereum/src/network_indexer/mod.rs +++ b/chain/ethereum/src/network_indexer/mod.rs @@ -40,46 +40,6 @@ impl fmt::Display for BlockWithOmmers { } } -pub fn create( - subgraph_name: String, - logger: &Logger, - adapter: Arc, - store: Arc, - metrics_registry: Arc, - start_block: Option, -) -> impl Future -where - S: Store + ChainStore, -{ - // Create a subgraph name and ID - let id_str = format!( - "{}_v{}", - subgraph_name.replace("/", "_"), - NETWORK_INDEXER_VERSION - ); - let subgraph_id = SubgraphDeploymentId::new(id_str).expect("valid network subgraph ID"); - let subgraph_name = SubgraphName::new(subgraph_name).expect("valid network subgraph name"); +pub trait NetworkStore: Store + ChainStore {} - let logger = logger.new(o!( - "subgraph_name" => subgraph_name.to_string(), - "subgraph_id" => subgraph_id.to_string(), - )); - - // Ensure subgraph, the wire up the tracer and indexer - subgraph::ensure_subgraph_exists( - subgraph_name, - subgraph_id.clone(), - logger.clone(), - store.clone(), - start_block, - ) - .and_then(move |_| { - future::ok(NetworkIndexer::new( - subgraph_id.clone(), - &logger, - adapter.clone(), - store.clone(), - metrics_registry.clone(), - )) - }) -} +impl NetworkStore for S {} diff --git a/chain/ethereum/src/network_indexer/network_indexer.rs b/chain/ethereum/src/network_indexer/network_indexer.rs index 0a46ab57737..b509feda8ea 100644 --- a/chain/ethereum/src/network_indexer/network_indexer.rs +++ b/chain/ethereum/src/network_indexer/network_indexer.rs @@ -14,6 +14,7 @@ use web3::types::H256; use super::block_writer::BlockWriter; use super::metrics::NetworkIndexerMetrics; +use super::subgraph; use super::*; /// Terminology used in this component: @@ -66,6 +67,7 @@ struct ReorgData { common_ancestor: EthereumBlockPointer, } +type EnsureSubgraphFuture = Box + Send>; type LocalHeadFuture = Box, Error = Error> + Send>; type ChainHeadFuture = Box + Send>; type BlockPointerFuture = Box + Send>; @@ -97,6 +99,22 @@ macro_rules! track_future { }}; } +fn ensure_subgraph( + logger: Logger, + store: Arc, + subgraph_name: SubgraphName, + subgraph_id: SubgraphDeploymentId, + start_block: Option, +) -> EnsureSubgraphFuture { + Box::new(subgraph::ensure_subgraph_exists( + subgraph_name, + subgraph_id, + logger, + store, + start_block, + )) +} + fn load_local_head(context: &Context) -> LocalHeadFuture { Box::new(track_future!( context.metrics, @@ -109,7 +127,7 @@ fn load_local_head(context: &Context) -> LocalHeadFuture { fn load_parent_block_from_store( subgraph_id: SubgraphDeploymentId, logger: Logger, - store: Arc, + store: Arc, metrics: Arc, block_ptr: EthereumBlockPointer, ) -> BlockPointerFuture { @@ -567,7 +585,7 @@ fn write_block(block_writer: Arc, block: BlockWithOmmers) -> AddBlo fn revert_blocks( subgraph_id: SubgraphDeploymentId, logger: Logger, - store: Arc, + store: Arc, metrics: Arc, event_sink: Sender, blocks: Vec, @@ -710,13 +728,15 @@ fn update_chain_and_local_head_metrics( /// Context for the network tracer. pub struct Context { - subgraph_id: SubgraphDeploymentId, logger: Logger, adapter: Arc, - store: Arc, - event_sink: Sender, - block_writer: Arc, + store: Arc, metrics: Arc, + block_writer: Arc, + event_sink: Sender, + subgraph_name: SubgraphName, + subgraph_id: SubgraphDeploymentId, + start_block: Option, } /// Events emitted by the network tracer. @@ -744,11 +764,18 @@ impl fmt::Display for NetworkIndexerEvent { #[derive(StateMachineFuture)] #[state_machine_future(context = "Context")] enum StateMachine { - /// The indexer start in an empty state and immediately moves on - /// to loading the local head block from the store. - #[state_machine_future(start, transitions(LoadLocalHead))] + /// The indexer start in an empty state and immediately moves on to + /// ensuring that the network subgraph exists. + #[state_machine_future(start, transitions(EnsureSubgraph))] Start, + /// This state ensures that the network subgraph that stores the + /// indexed data exists, and creates it if necessary. + #[state_machine_future(transitions(LoadLocalHead))] + EnsureSubgraph { + ensure_subgraph: EnsureSubgraphFuture, + }, + /// This state waits until the local head block has been loaded from the /// store. It then moves on to polling the chain head block. #[state_machine_future(transitions(PollChainHead, Failed))] @@ -921,6 +948,31 @@ impl PollStateMachine for StateMachine { // down. try_ready!(context.event_sink.poll_ready()); + info!(context.logger, "Ensure that the network subgraph exists"); + + transition!(EnsureSubgraph { + ensure_subgraph: ensure_subgraph( + context.logger.clone(), + context.store.clone(), + context.subgraph_name.clone(), + context.subgraph_id.clone(), + context.start_block.clone(), + ) + }) + } + + fn poll_ensure_subgraph<'a, 'c>( + state: &'a mut RentToOwn<'a, EnsureSubgraph>, + context: &'c mut RentToOwn<'c, Context>, + ) -> Poll { + // Abort if the output stream has been closed. Depending on how the + // network indexer is wired up, this could mean that the system shutting + // down. + try_ready!(context.event_sink.poll_ready()); + + // Ensure the subgraph exists; if creating it fails, fail the indexer + try_ready!(state.ensure_subgraph.poll()); + info!(context.logger, "Start indexing network data"); // Start by loading the local head from the store. This is the most @@ -1505,16 +1557,31 @@ pub struct NetworkIndexer { impl NetworkIndexer { pub fn new( - subgraph_id: SubgraphDeploymentId, logger: &Logger, adapter: Arc, store: Arc, metrics_registry: Arc, + subgraph_name: String, + start_block: Option, ) -> Self where S: Store + ChainStore, { - let logger = logger.new(o!("component" => "NetworkIndexer")); + // Create a subgraph name and ID + let id_str = format!( + "{}_v{}", + subgraph_name.replace("/", "_"), + NETWORK_INDEXER_VERSION + ); + let subgraph_id = SubgraphDeploymentId::new(id_str).expect("valid network subgraph ID"); + let subgraph_name = SubgraphName::new(subgraph_name).expect("valid network subgraph name"); + + let logger = logger.new(o!( + "component" => "NetworkIndexer", + "subgraph_name" => subgraph_name.to_string(), + "subgraph_id" => subgraph_id.to_string(), + )); + let logger_for_err = logger.clone(); let stopwatch = StopwatchMetrics::new( @@ -1542,13 +1609,15 @@ impl NetworkIndexer { // Create state machine that emits block and revert events for the network let state_machine = StateMachine::start(Context { - subgraph_id, logger, adapter, store, - event_sink, - block_writer, metrics, + block_writer, + event_sink, + subgraph_name, + subgraph_id, + start_block, }); // Launch state machine diff --git a/chain/ethereum/src/network_indexer/subgraph.rs b/chain/ethereum/src/network_indexer/subgraph.rs index c595900d78e..947d81be939 100644 --- a/chain/ethereum/src/network_indexer/subgraph.rs +++ b/chain/ethereum/src/network_indexer/subgraph.rs @@ -1,16 +1,13 @@ use futures::future::FutureResult; use std::time::{SystemTime, UNIX_EPOCH}; +use super::*; use graph::data::subgraph::schema::*; -use graph::prelude::*; -fn check_subgraph_exists( - store: Arc, +fn check_subgraph_exists( + store: Arc, subgraph_id: SubgraphDeploymentId, -) -> impl Future -where - S: Store, -{ +) -> impl Future { future::result( store .get(SubgraphDeploymentEntity::key(subgraph_id)) @@ -19,15 +16,12 @@ where ) } -fn create_subgraph( - store: Arc, +fn create_subgraph( + store: Arc, subgraph_name: SubgraphName, subgraph_id: SubgraphDeploymentId, start_block: Option, -) -> FutureResult<(), Error> -where - S: Store + ChainStore, -{ +) -> FutureResult<(), Error> { let mut ops = vec![]; // Ensure the subgraph itself doesn't already exist @@ -125,16 +119,13 @@ where ) } -pub fn ensure_subgraph_exists( +pub fn ensure_subgraph_exists( subgraph_name: SubgraphName, subgraph_id: SubgraphDeploymentId, logger: Logger, - store: Arc, + store: Arc, start_block: Option, -) -> impl Future -where - S: Store + ChainStore, -{ +) -> impl Future { debug!(logger, "Ensure that the network subgraph exists"); let logger_for_created = logger.clone(); @@ -161,10 +152,5 @@ where ) } }) - .map_err(move |e| { - error!( - logger_for_err, - "Failed to ensure Ethereum network subgraph exists: {}", e - ); - }) + .map_err(move |e| format_err!("Failed to ensure Ethereum network subgraph exists: {}", e)) } diff --git a/node/src/main.rs b/node/src/main.rs index 30fa5c03c51..9d7cd98dcfe 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -577,8 +577,7 @@ fn async_main() -> impl Future + Send + 'static { .filter(|network_subgraph| network_subgraph.starts_with("ethereum/")) .for_each(|network_subgraph| { let network_name = network_subgraph.replace("ethereum/", ""); - let network_indexer = network_indexer::create( - network_subgraph.into(), + let mut indexer = network_indexer::NetworkIndexer::new( &logger, eth_adapters .get(&network_name) @@ -589,14 +588,13 @@ fn async_main() -> impl Future + Send + 'static { .expect("store for network") .clone(), metrics_registry.clone(), + network_subgraph.into(), None, ); - tokio::spawn(network_indexer.and_then(|mut indexer| { - indexer.take_event_stream().unwrap().for_each(|_| { - // For now we simply ignore these events; we may later use them - // to drive subgraph indexing - Ok(()) - }) + tokio::spawn(indexer.take_event_stream().unwrap().for_each(|_| { + // For now we simply ignore these events; we may later use them + // to drive subgraph indexing + Ok(()) })); }) };