Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change how subgraph_deployment is organized #2267

Closed
wants to merge 39 commits into from
Closed
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
926ca8a
server: Fix flaky explorer cache test
lutter Mar 12, 2021
47779c0
docker: Fix error in Dockerfile
lutter Mar 19, 2021
918e98f
build(deps): bump derive_more from 0.99.11 to 0.99.13
dependabot-preview[bot] Mar 22, 2021
7118d5d
build(deps): bump config from 0.10.1 to 0.11.0
dependabot-preview[bot] Mar 22, 2021
28bfee8
build(deps): bump anyhow from 1.0.38 to 1.0.39
dependabot-preview[bot] Mar 22, 2021
748f38a
build(deps): bump async-trait from 0.1.41 to 0.1.48
dependabot-preview[bot] Mar 22, 2021
e510c50
build(deps): bump tokio-stream from 0.1.3 to 0.1.5
dependabot-preview[bot] Mar 22, 2021
84d1aa7
store: run multiple EntityModification operations with less queries
tilacog Mar 13, 2021
b089e6e
store: update tests
tilacog Mar 15, 2021
bb37ce4
store: implement LoadQuery for InsertQuery
tilacog Mar 17, 2021
71fcc21
store: rename helper struct RevertEntityData to ReturnedEntityData
tilacog Mar 17, 2021
11ea359
store[tests]: implement LoadQuery for ClampRangeQuery
tilacog Mar 17, 2021
60f6723
store[tests]: add `insert_many_and_delete_many` test
tilacog Mar 17, 2021
9848e1a
store: remove unecessary &
tilacog Mar 17, 2021
b50ddbc
store: handle entity counts
tilacog Mar 17, 2021
642f8f9
store: get rid of unused parameter
tilacog Mar 17, 2021
c6b6746
store[tests]: add update_many test
tilacog Mar 18, 2021
432acba
store: build unique column name list outside of walk_ast function
tilacog Mar 18, 2021
f367d55
store: Use a HashMap for building unique columns
tilacog Mar 18, 2021
9ba1c40
store: add stopwatch metrics
tilacog Mar 18, 2021
9b49868
store: relational_bytes test
tilacog Mar 18, 2021
2ae7f4d
store: avoid extra vector creation
tilacog Mar 19, 2021
463ea3e
store: revert from returning (unused) values from queries.
tilacog Mar 20, 2021
c4a6a8b
runtime: Update comments
leoyvens Mar 23, 2021
1d46c6d
runtime: Deprecate calling `ethereum.call` in globals
leoyvens Mar 23, 2021
286dc85
log: Increase log buffer
leoyvens Mar 23, 2021
1c66b5d
node: Add 'graphman listen'
lutter Mar 24, 2021
63ae38e
store: Batch notifications to at most 64 events per batch
lutter Mar 24, 2021
fce3d99
runtime-wasm: new stopwatch sections
tilacog Mar 24, 2021
b3bdea1
build(deps): bump anyhow from 1.0.39 to 1.0.40
dependabot-preview[bot] Mar 29, 2021
bf6c789
node, graph, chain: Make sure we log network details when getting net…
lutter Mar 26, 2021
64a5122
node, graph, chain: Label block ingestor logs with provider name
lutter Mar 26, 2021
92c5899
node, graph, chain: Asyncify service startup
lutter Mar 26, 2021
10081f2
node, store: Do not block on eth adapters during startup
lutter Mar 26, 2021
ac3bf0e
graph, node, store: Only create one ChainStore per chain
lutter Mar 26, 2021
de23cec
store: Make sure we check net identifiers correctly
lutter Mar 26, 2021
141a909
store: Store the Site for a Layout in the layout
lutter Mar 2, 2021
7406a84
store: Add deployment_schemas.id to Site
lutter Mar 6, 2021
af76792
store: Change how the subgraph_deployment table is organized
lutter Mar 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion chain/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ graph = { path = "../../graph" }
lazy_static = "1.2.0"
state_machine_future = "0.2"
serde = "1.0"
config = { version = "0.10", features = ["toml"], default-features = false }
config = { version = "0.11", features = ["toml"], default-features = false }
dirs = "3.0"
anyhow = "1.0"
fail = "0.4"
2 changes: 1 addition & 1 deletion chain/ethereum/src/block_ingestor.rs
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ where
}),
);

let logger = logger.new(o!("network_name" => network_name.clone()));
let logger = logger.new(o!("provider" => eth_adapter.provider().to_string()));

Ok(BlockIngestor {
chain_store,
52 changes: 31 additions & 21 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use std::time::Instant;

use ethabi::ParamType;
use graph::prelude::{
anyhow, debug, error, ethabi,
anyhow, async_trait, debug, error, ethabi,
futures03::{self, compat::Future01CompatExt, FutureExt, StreamExt, TryStreamExt},
hex, retry, stream, tiny_keccak, trace, warn,
web3::{
@@ -32,7 +32,9 @@ use web3::types::Filter;

#[derive(Clone)]
pub struct EthereumAdapter<T: web3::Transport> {
logger: Logger,
url_hostname: Arc<String>,
provider: String,
web3: Arc<Web3<T>>,
metrics: Arc<ProviderEthRpcMetrics>,
is_ganache: bool,
@@ -85,6 +87,8 @@ lazy_static! {
impl<T: web3::Transport> CheapClone for EthereumAdapter<T> {
fn cheap_clone(&self) -> Self {
Self {
logger: self.logger.clone(),
provider: self.provider.clone(),
url_hostname: self.url_hostname.cheap_clone(),
web3: self.web3.cheap_clone(),
metrics: self.metrics.cheap_clone(),
@@ -100,6 +104,8 @@ where
T::Out: Send,
{
pub async fn new(
logger: Logger,
provider: String,
url: &str,
transport: T,
provider_metrics: Arc<ProviderEthRpcMetrics>,
@@ -124,6 +130,8 @@ where
.unwrap_or(false);

EthereumAdapter {
logger,
provider,
url_hostname: Arc::new(hostname),
web3,
metrics: provider_metrics,
@@ -615,6 +623,7 @@ where
}
}

#[async_trait]
impl<T> EthereumAdapterTrait for EthereumAdapter<T>
where
T: web3::BatchTransport + Send + Sync + 'static,
@@ -625,11 +634,12 @@ where
&self.url_hostname
}

fn net_identifiers(
&self,
logger: &Logger,
) -> Box<dyn Future<Item = EthereumNetworkIdentifier, Error = Error> + Send> {
let logger = logger.clone();
fn provider(&self) -> &str {
&self.provider
}

async fn net_identifiers(&self) -> Result<EthereumNetworkIdentifier, Error> {
let logger = self.logger.clone();

let web3 = self.web3.clone();
let net_version_future = retry("net_version RPC call", &logger)
@@ -656,21 +666,21 @@ where
})
});

Box::new(
net_version_future
.join(gen_block_hash_future)
.map(
|(net_version, genesis_block_hash)| EthereumNetworkIdentifier {
net_version,
genesis_block_hash,
},
)
.map_err(|e| {
e.into_inner().unwrap_or_else(|| {
anyhow!("Ethereum node took too long to read network identifiers")
})
}),
)
net_version_future
.join(gen_block_hash_future)
.compat()
.await
.map(
|(net_version, genesis_block_hash)| EthereumNetworkIdentifier {
net_version,
genesis_block_hash,
},
)
.map_err(|e| {
e.into_inner().unwrap_or_else(|| {
anyhow!("Ethereum node took too long to read network identifiers")
})
})
}

fn latest_block_header(
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ version = "0.22.0"
edition = "2018"

[dependencies]
async-trait = "0.1.41"
async-trait = "0.1.48"
atomic_refcell = "0.1.6"
bytes = "0.5"
futures01 = { package="futures", version="0.1.29" }
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ RUN apt-get update \
&& apt-get install -y libpq-dev ca-certificates netcat

ADD docker/wait_for docker/start /usr/local/bin/
COPY --from=graph-node-build /usr/local/cargo/bin/graph-node /usr/local/cargo/bin/graphman /usr/local/bin
COPY --from=graph-node-build /usr/local/cargo/bin/graph-node /usr/local/cargo/bin/graphman /usr/local/bin/
COPY --from=graph-node-build /etc/image-info /etc/image-info
COPY docker/Dockerfile /Dockerfile
CMD start
2 changes: 1 addition & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ edition = "2018"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.41"
async-trait = "0.1.48"
atomic_refcell = "0.1.6"
bigdecimal = { version = "0.1.0", features = ["serde"] }
bytes = "0.5"
11 changes: 6 additions & 5 deletions graph/src/components/ethereum/adapter.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ use crate::prelude::*;

pub type EventSignature = H256;

#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
/// A collection of attributes that (kind of) uniquely identify an Ethereum blockchain.
pub struct EthereumNetworkIdentifier {
pub net_version: String,
@@ -593,15 +593,16 @@ impl BlockStreamMetrics {
/// Implementations may be implemented against an in-process Ethereum node
/// or a remote node over RPC.
#[automock]
#[async_trait]
pub trait EthereumAdapter: Send + Sync + 'static {
fn url_hostname(&self) -> &str;

/// The `provider.label` from the adapter's configuration
fn provider(&self) -> &str;

/// Ask the Ethereum node for some identifying information about the Ethereum network it is
/// connected to.
fn net_identifiers(
&self,
logger: &Logger,
) -> Box<dyn Future<Item = EthereumNetworkIdentifier, Error = Error> + Send>;
async fn net_identifiers(&self) -> Result<EthereumNetworkIdentifier, Error>;

/// Get the latest block, including full transactions.
fn latest_block(
4 changes: 2 additions & 2 deletions graph/src/log/elastic.rs
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ impl ElasticDrain {
return;
}

trace!(
debug!(
flush_logger,
"Flushing {} logs to Elasticsearch",
logs_to_send.len()
@@ -371,7 +371,7 @@ impl Drain for ElasticDrain {
pub fn elastic_logger(config: ElasticDrainConfig, error_logger: Logger) -> Logger {
let elastic_drain = ElasticDrain::new(config, error_logger).fuse();
let async_drain = slog_async::Async::new(elastic_drain)
.chan_size(10000)
.chan_size(20000)
.build()
.fuse();
Logger::root(async_drain, o!())
2 changes: 1 addition & 1 deletion graph/src/log/mod.rs
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ pub fn logger(show_debug: bool) -> Logger {
)
.build();
let drain = slog_async::Async::new(drain)
.chan_size(10000)
.chan_size(20000)
.build()
.fuse();
Logger::root(drain, o!())
2 changes: 1 addition & 1 deletion graph/src/log/split.rs
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ where
{
let split_drain = SplitDrain::new(drain1.fuse(), drain2.fuse()).fuse();
let async_drain = slog_async::Async::new(split_drain)
.chan_size(10000)
.chan_size(20000)
.build()
.fuse();
Logger::root(async_drain, o!())
39 changes: 38 additions & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,9 @@ use graph::{
};
use graph_node::config;
use graph_node::store_builder::StoreBuilder;
use graph_store_postgres::{connection_pool::ConnectionPool, SubgraphStore, PRIMARY_SHARD};
use graph_store_postgres::{
connection_pool::ConnectionPool, SubgraphStore, SubscriptionManager, PRIMARY_SHARD,
};

use crate::config::Config as Cfg;
use graph_node::manager::commands;
@@ -106,6 +108,8 @@ pub enum Command {
/// Print information about a configuration file without
/// actually connecting to databases or network clients
Config(ConfigCommand),
/// Listen for store events and print them
Listen(ListenCommand),
}

#[derive(Clone, Debug, StructOpt)]
@@ -157,6 +161,15 @@ pub enum ConfigCommand {
},
}

#[derive(Clone, Debug, StructOpt)]
pub enum ListenCommand {
Assignments,
Entities {
deployment: String,
entity_types: Vec<String>,
},
}

impl From<Opt> for config::Opt {
fn from(opt: Opt) -> Self {
let mut config_opt = config::Opt::default();
@@ -189,6 +202,14 @@ fn make_store(logger: &Logger, node_id: &NodeId, config: &Cfg) -> Arc<SubgraphSt
StoreBuilder::make_sharded_store(logger, node_id, config, make_registry(logger))
}

fn make_subscription_manager(logger: &Logger, config: &Cfg) -> Arc<SubscriptionManager> {
let primary = config.primary_store();
Arc::new(SubscriptionManager::new(
logger.clone(),
primary.connection.to_owned(),
))
}

#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@@ -274,6 +295,22 @@ async fn main() {
let store = make_store();
commands::assign::reassign(store, id, node)
}
Listen(cmd) => {
use ListenCommand::*;
match cmd {
Assignments => {
let subscription_manager = make_subscription_manager(&logger, &config);
commands::listen::assignments(subscription_manager).await
}
Entities {
deployment,
entity_types,
} => {
let subscription_manager = make_subscription_manager(&logger, &config);
commands::listen::entities(subscription_manager, deployment, entity_types).await
}
}
}
};
if let Err(e) = result {
die!("error: {}", e)
Loading