Skip to content

Commit

Permalink
chain/ethereum, node: Move subgraph creation into NetworkIndexer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jannis committed Dec 12, 2019
1 parent d762fc7 commit b095209
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 90 deletions.
44 changes: 2 additions & 42 deletions chain/ethereum/src/network_indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,46 +40,6 @@ impl fmt::Display for BlockWithOmmers {
}
}

pub fn create<S>(
subgraph_name: String,
logger: &Logger,
adapter: Arc<dyn EthereumAdapter>,
store: Arc<S>,
metrics_registry: Arc<dyn MetricsRegistry>,
start_block: Option<EthereumBlockPointer>,
) -> impl Future<Item = NetworkIndexer, Error = ()>
where
S: Store + ChainStore,
{
// Create a subgraph name and ID
let id_str = format!(
"{}_v{}",
subgraph_name.replace("/", "_"),
NETWORK_INDEXER_VERSION
);
let subgraph_id = SubgraphDeploymentId::new(id_str).expect("valid network subgraph ID");
let subgraph_name = SubgraphName::new(subgraph_name).expect("valid network subgraph name");
pub trait NetworkStore: Store + ChainStore {}

let logger = logger.new(o!(
"subgraph_name" => subgraph_name.to_string(),
"subgraph_id" => subgraph_id.to_string(),
));

// Ensure subgraph, the wire up the tracer and indexer
subgraph::ensure_subgraph_exists(
subgraph_name,
subgraph_id.clone(),
logger.clone(),
store.clone(),
start_block,
)
.and_then(move |_| {
future::ok(NetworkIndexer::new(
subgraph_id.clone(),
&logger,
adapter.clone(),
store.clone(),
metrics_registry.clone(),
))
})
}
impl<S: Store + ChainStore> NetworkStore for S {}
97 changes: 83 additions & 14 deletions chain/ethereum/src/network_indexer/network_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use web3::types::H256;

use super::block_writer::BlockWriter;
use super::metrics::NetworkIndexerMetrics;
use super::subgraph;
use super::*;

/// Terminology used in this component:
Expand Down Expand Up @@ -66,6 +67,7 @@ struct ReorgData {
common_ancestor: EthereumBlockPointer,
}

type EnsureSubgraphFuture = Box<dyn Future<Item = (), Error = Error> + Send>;
type LocalHeadFuture = Box<dyn Future<Item = Option<EthereumBlockPointer>, Error = Error> + Send>;
type ChainHeadFuture = Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
type BlockPointerFuture = Box<dyn Future<Item = EthereumBlockPointer, Error = Error> + Send>;
Expand Down Expand Up @@ -97,6 +99,22 @@ macro_rules! track_future {
}};
}

fn ensure_subgraph(
logger: Logger,
store: Arc<dyn NetworkStore>,
subgraph_name: SubgraphName,
subgraph_id: SubgraphDeploymentId,
start_block: Option<EthereumBlockPointer>,
) -> EnsureSubgraphFuture {
Box::new(subgraph::ensure_subgraph_exists(
subgraph_name,
subgraph_id,
logger,
store,
start_block,
))
}

fn load_local_head(context: &Context) -> LocalHeadFuture {
Box::new(track_future!(
context.metrics,
Expand All @@ -109,7 +127,7 @@ fn load_local_head(context: &Context) -> LocalHeadFuture {
fn load_parent_block_from_store(
subgraph_id: SubgraphDeploymentId,
logger: Logger,
store: Arc<dyn Store>,
store: Arc<dyn NetworkStore>,
metrics: Arc<NetworkIndexerMetrics>,
block_ptr: EthereumBlockPointer,
) -> BlockPointerFuture {
Expand Down Expand Up @@ -567,7 +585,7 @@ fn write_block(block_writer: Arc<BlockWriter>, block: BlockWithOmmers) -> AddBlo
fn revert_blocks(
subgraph_id: SubgraphDeploymentId,
logger: Logger,
store: Arc<dyn Store>,
store: Arc<dyn NetworkStore>,
metrics: Arc<NetworkIndexerMetrics>,
event_sink: Sender<NetworkIndexerEvent>,
blocks: Vec<EthereumBlockPointer>,
Expand Down Expand Up @@ -710,13 +728,15 @@ fn update_chain_and_local_head_metrics(

/// Context for the network tracer.
pub struct Context {
subgraph_id: SubgraphDeploymentId,
logger: Logger,
adapter: Arc<dyn EthereumAdapter>,
store: Arc<dyn Store>,
event_sink: Sender<NetworkIndexerEvent>,
block_writer: Arc<BlockWriter>,
store: Arc<dyn NetworkStore>,
metrics: Arc<NetworkIndexerMetrics>,
block_writer: Arc<BlockWriter>,
event_sink: Sender<NetworkIndexerEvent>,
subgraph_name: SubgraphName,
subgraph_id: SubgraphDeploymentId,
start_block: Option<EthereumBlockPointer>,
}

/// Events emitted by the network tracer.
Expand Down Expand Up @@ -744,11 +764,18 @@ impl fmt::Display for NetworkIndexerEvent {
#[derive(StateMachineFuture)]
#[state_machine_future(context = "Context")]
enum StateMachine {
/// The indexer start in an empty state and immediately moves on
/// to loading the local head block from the store.
#[state_machine_future(start, transitions(LoadLocalHead))]
/// The indexer start in an empty state and immediately moves on to
/// ensuring that the network subgraph exists.
#[state_machine_future(start, transitions(EnsureSubgraph))]
Start,

/// This state ensures that the network subgraph that stores the
/// indexed data exists, and creates it if necessary.
#[state_machine_future(transitions(LoadLocalHead))]
EnsureSubgraph {
ensure_subgraph: EnsureSubgraphFuture,
},

/// This state waits until the local head block has been loaded from the
/// store. It then moves on to polling the chain head block.
#[state_machine_future(transitions(PollChainHead, Failed))]
Expand Down Expand Up @@ -921,6 +948,31 @@ impl PollStateMachine for StateMachine {
// down.
try_ready!(context.event_sink.poll_ready());

info!(context.logger, "Ensure that the network subgraph exists");

transition!(EnsureSubgraph {
ensure_subgraph: ensure_subgraph(
context.logger.clone(),
context.store.clone(),
context.subgraph_name.clone(),
context.subgraph_id.clone(),
context.start_block.clone(),
)
})
}

fn poll_ensure_subgraph<'a, 'c>(
state: &'a mut RentToOwn<'a, EnsureSubgraph>,
context: &'c mut RentToOwn<'c, Context>,
) -> Poll<AfterEnsureSubgraph, Error> {
// Abort if the output stream has been closed. Depending on how the
// network indexer is wired up, this could mean that the system shutting
// down.
try_ready!(context.event_sink.poll_ready());

// Ensure the subgraph exists; if creating it fails, fail the indexer
try_ready!(state.ensure_subgraph.poll());

info!(context.logger, "Start indexing network data");

// Start by loading the local head from the store. This is the most
Expand Down Expand Up @@ -1505,16 +1557,31 @@ pub struct NetworkIndexer {

impl NetworkIndexer {
pub fn new<S>(
subgraph_id: SubgraphDeploymentId,
logger: &Logger,
adapter: Arc<dyn EthereumAdapter>,
store: Arc<S>,
metrics_registry: Arc<dyn MetricsRegistry>,
subgraph_name: String,
start_block: Option<EthereumBlockPointer>,
) -> Self
where
S: Store + ChainStore,
{
let logger = logger.new(o!("component" => "NetworkIndexer"));
// Create a subgraph name and ID
let id_str = format!(
"{}_v{}",
subgraph_name.replace("/", "_"),
NETWORK_INDEXER_VERSION
);
let subgraph_id = SubgraphDeploymentId::new(id_str).expect("valid network subgraph ID");
let subgraph_name = SubgraphName::new(subgraph_name).expect("valid network subgraph name");

let logger = logger.new(o!(
"component" => "NetworkIndexer",
"subgraph_name" => subgraph_name.to_string(),
"subgraph_id" => subgraph_id.to_string(),
));

let logger_for_err = logger.clone();

let stopwatch = StopwatchMetrics::new(
Expand Down Expand Up @@ -1542,13 +1609,15 @@ impl NetworkIndexer {

// Create state machine that emits block and revert events for the network
let state_machine = StateMachine::start(Context {
subgraph_id,
logger,
adapter,
store,
event_sink,
block_writer,
metrics,
block_writer,
event_sink,
subgraph_name,
subgraph_id,
start_block,
});

// Launch state machine
Expand Down
37 changes: 11 additions & 26 deletions chain/ethereum/src/network_indexer/subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use futures::future::FutureResult;
use std::time::{SystemTime, UNIX_EPOCH};

use super::*;
use graph::data::subgraph::schema::*;
use graph::prelude::*;

fn check_subgraph_exists<S>(
store: Arc<S>,
fn check_subgraph_exists(
store: Arc<dyn NetworkStore>,
subgraph_id: SubgraphDeploymentId,
) -> impl Future<Item = bool, Error = Error>
where
S: Store,
{
) -> impl Future<Item = bool, Error = Error> {
future::result(
store
.get(SubgraphDeploymentEntity::key(subgraph_id))
Expand All @@ -19,15 +16,12 @@ where
)
}

fn create_subgraph<S>(
store: Arc<S>,
fn create_subgraph(
store: Arc<dyn NetworkStore>,
subgraph_name: SubgraphName,
subgraph_id: SubgraphDeploymentId,
start_block: Option<EthereumBlockPointer>,
) -> FutureResult<(), Error>
where
S: Store + ChainStore,
{
) -> FutureResult<(), Error> {
let mut ops = vec![];

// Ensure the subgraph itself doesn't already exist
Expand Down Expand Up @@ -125,20 +119,16 @@ where
)
}

pub fn ensure_subgraph_exists<S>(
pub fn ensure_subgraph_exists(
subgraph_name: SubgraphName,
subgraph_id: SubgraphDeploymentId,
logger: Logger,
store: Arc<S>,
store: Arc<dyn NetworkStore>,
start_block: Option<EthereumBlockPointer>,
) -> impl Future<Item = (), Error = ()>
where
S: Store + ChainStore,
{
) -> impl Future<Item = (), Error = Error> {
debug!(logger, "Ensure that the network subgraph exists");

let logger_for_created = logger.clone();
let logger_for_err = logger.clone();

check_subgraph_exists(store.clone(), subgraph_id.clone())
.from_err()
Expand All @@ -161,10 +151,5 @@ where
)
}
})
.map_err(move |e| {
error!(
logger_for_err,
"Failed to ensure Ethereum network subgraph exists: {}", e
);
})
.map_err(move |e| format_err!("Failed to ensure Ethereum network subgraph exists: {}", e))
}
14 changes: 6 additions & 8 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,7 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
.filter(|network_subgraph| network_subgraph.starts_with("ethereum/"))
.for_each(|network_subgraph| {
let network_name = network_subgraph.replace("ethereum/", "");
let network_indexer = network_indexer::create(
network_subgraph.into(),
let mut indexer = network_indexer::NetworkIndexer::new(
&logger,
eth_adapters
.get(&network_name)
Expand All @@ -589,14 +588,13 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
.expect("store for network")
.clone(),
metrics_registry.clone(),
network_subgraph.into(),
None,
);
tokio::spawn(network_indexer.and_then(|mut indexer| {
indexer.take_event_stream().unwrap().for_each(|_| {
// For now we simply ignore these events; we may later use them
// to drive subgraph indexing
Ok(())
})
tokio::spawn(indexer.take_event_stream().unwrap().for_each(|_| {
// For now we simply ignore these events; we may later use them
// to drive subgraph indexing
Ok(())
}));
})
};
Expand Down

0 comments on commit b095209

Please sign in to comment.