Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphman fix block command #3270

Merged
merged 11 commits into from
May 3, 2022
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ diesel = "1.4.8"
fail = "0.5"
http = "0.2.5" # must be compatible with the version rust-web3 uses
prometheus = { version ="0.13.0", features = ["push"] }
json-structural-diff = {version = "0.1", features = ["colorize"] }
tilacog marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
assert_cli = "0.6"
142 changes: 110 additions & 32 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,38 @@
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};

use config::PoolSize;
use git_testament::{git_testament, render_testament};
use graph::{data::graphql::effort::LoadManager, prelude::chrono, prometheus::Registry};
use graph_core::MetricsRegistry;
use graph_graphql::prelude::GraphQlRunner;
use lazy_static::lazy_static;
use structopt::StructOpt;

use graph::{
log::logger,
prelude::{info, o, slog, tokio, Logger, NodeId, ENV_VARS},
prelude::{
anyhow::{self, Context as AnyhowContextTrait},
info, o, slog, tokio, Logger, NodeId, ENV_VARS,
},
url::Url,
};
use graph_chain_ethereum::{EthereumAdapter, EthereumNetworks};
use graph_core::MetricsRegistry;
use graph_graphql::prelude::GraphQlRunner;
use graph_node::config::{self, Config as Cfg};
use graph_node::manager::commands;
use graph_node::{
chain::create_ethereum_networks,
manager::{deployment::DeploymentSearch, PanicSubscriptionManager},
store_builder::StoreBuilder,
MetricsContext,
};
use graph_store_postgres::ChainStore;
use graph_store_postgres::{
connection_pool::ConnectionPool, BlockStore, Shard, Store, SubgraphStore, SubscriptionManager,
PRIMARY_SHARD,
};

use graph_node::config::{self, Config as Cfg};
use graph_node::manager::commands;
use lazy_static::lazy_static;
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};
use structopt::StructOpt;

const VERSION_LABEL_KEY: &str = "version";

git_testament!(TESTAMENT);

macro_rules! die {
($fmt:expr, $($arg:tt)*) => {{
use std::io::Write;
writeln!(&mut ::std::io::stderr(), $fmt, $($arg)*).unwrap();
::std::process::exit(1)
}}
}

tilacog marked this conversation as resolved.
Show resolved Hide resolved
lazy_static! {
static ref RENDERED_TESTAMENT: String = render_testament!(TESTAMENT);
}
Expand Down Expand Up @@ -346,6 +341,25 @@ pub enum ChainCommand {
/// There must be no deployments using that chain. If there are, the
/// subgraphs and/or deployments using the chain must first be removed
Remove { name: String },

/// Compares cached blocks with fresh ones and clears the block cache when they differ.
CheckBlocks {
#[structopt(subcommand)] // Note that we mark a field as a subcommand
method: CheckBlockMethod,

/// Chain name (must be an existing chain, see 'chain list')
#[structopt(empty_values = false)]
chain_name: String,
},
/// Truncates the whole block cache for the given chain.
Truncate {
/// Chain name (must be an existing chain, see 'chain list')
#[structopt(empty_values = false)]
chain_name: String,
/// Skips confirmation prompt
#[structopt(long, short)]
force: bool,
},
}

#[derive(Clone, Debug, StructOpt)]
Expand Down Expand Up @@ -445,6 +459,23 @@ pub enum IndexCommand {
},
}

#[derive(Clone, Debug, StructOpt)]
pub enum CheckBlockMethod {
/// The number of the target block
ByHash { hash: String },

/// The hash of the target block
ByNumber { number: i32 },

/// A block number range, inclusive on both ends.
ByRange {
#[structopt(long, short)]
from: Option<i32>,
#[structopt(long, short)]
to: Option<i32>,
},
}

impl From<Opt> for config::Opt {
fn from(opt: Opt) -> Self {
let mut config_opt = config::Opt::default();
Expand Down Expand Up @@ -521,7 +552,7 @@ impl Context {
&self.node_id,
PRIMARY_SHARD.as_str(),
primary,
self.registry,
self.metrics_registry(),
Arc::new(vec![]),
);
pool.skip_setup();
Expand Down Expand Up @@ -624,10 +655,42 @@ impl Context {
registry,
))
}

async fn ethereum_networks(&self) -> anyhow::Result<EthereumNetworks> {
let logger = self.logger.clone();
let registry = self.metrics_registry();
create_ethereum_networks(logger, registry, &self.config).await
}

fn chain_store(self, chain_name: &str) -> anyhow::Result<Arc<ChainStore>> {
use graph::components::store::BlockStore;
self.store()
.block_store()
.chain_store(&chain_name)
.ok_or_else(|| anyhow::anyhow!("Could not find a network named '{}'", chain_name))
}

async fn chain_store_and_adapter(
self,
chain_name: &str,
) -> anyhow::Result<(Arc<ChainStore>, Arc<EthereumAdapter>)> {
let ethereum_networks = self.ethereum_networks().await?;
let chain_store = self.chain_store(chain_name)?;
let ethereum_adapter = ethereum_networks
.networks
.get(chain_name)
.map(|adapters| adapters.cheapest())
.flatten()
.ok_or(anyhow::anyhow!(
"Failed to obtain an Ethereum adapter for chain '{}'",
chain_name
))?;
Ok((chain_store, ethereum_adapter))
}
}

#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
tilacog marked this conversation as resolved.
Show resolved Hide resolved
let opt = Opt::from_args();

let version_label = opt.version_label.clone();
Expand All @@ -644,13 +707,8 @@ async fn main() {
render_testament!(TESTAMENT)
);

let mut config = match Cfg::load(&logger, &opt.clone().into()) {
Err(e) => {
eprintln!("configuration error: {}", e);
std::process::exit(1);
}
Ok(config) => config,
};
let mut config = Cfg::load(&logger, &opt.clone().into()).context("Configuration error")?;

if opt.pool_size > 0 && !opt.cmd.use_configured_pool_size() {
// Override pool size from configuration
for shard in config.stores.values_mut() {
Expand Down Expand Up @@ -701,7 +759,7 @@ async fn main() {
);

use Command::*;
let result = match opt.cmd {
match opt.cmd {
TxnSpeed { delay } => commands::txn_speed::run(ctx.primary_pool(), delay),
Info {
deployment,
Expand Down Expand Up @@ -862,6 +920,29 @@ async fn main() {
let (block_store, primary) = ctx.block_store_and_primary_pool();
commands::chain::remove(primary, block_store, name)
}
CheckBlocks { method, chain_name } => {
use commands::check_blocks::{by_hash, by_number, by_range};
use CheckBlockMethod::*;
let logger = ctx.logger.clone();
let (chain_store, ethereum_adapter) =
ctx.chain_store_and_adapter(&chain_name).await?;
match method {
ByHash { hash } => {
by_hash(&hash, chain_store, &ethereum_adapter, &logger).await
}
ByNumber { number } => {
by_number(number, chain_store, &ethereum_adapter, &logger).await
}
ByRange { from, to } => {
by_range(chain_store, &ethereum_adapter, from, to, &logger).await
}
}
}
Truncate { chain_name, force } => {
use commands::check_blocks::truncate;
let chain_store = ctx.chain_store(&chain_name)?;
truncate(chain_store, force)
}
}
}
Stats(cmd) => {
Expand Down Expand Up @@ -916,9 +997,6 @@ async fn main() {
}
}
}
};
if let Err(e) = result {
die!("error: {}", e)
}
}

Expand Down
Loading