Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hubble): add prometheus metrics #711

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 188 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ url = { version = "2.4.1", features = ["serde"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
futures = "0.3.28"
prometheus = { version = "0.13.3", features = ["process"] }
lazy_static = "1.4.0"
axum = { version = "0.6.20", features = ["macros"] }
9 changes: 8 additions & 1 deletion hubble/hubble.nix
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
type = types.str;
default = "https://graphql.union.build";
};
metrics-addr = mkOption {
type = types.str;
default = "0.0.0.0:9090";
};
hasura-admin-secret = mkOption {
type = types.str;
default = "";
Expand All @@ -66,7 +70,10 @@
indexersJson = builtins.toJSON cfg.indexers;
in
''
${pkgs.lib.getExe cfg.package} --url ${cfg.url} ${secretArg} --indexers '${indexersJson}'
${pkgs.lib.getExe cfg.package} \
--metrics-addr ${cfg.metrics-addr} \
--url ${cfg.url} ${secretArg} \
--indexers '${indexersJson}'
'';
};
in
Expand Down
6 changes: 5 additions & 1 deletion hubble/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::str::FromStr;
use std::{net::SocketAddr, str::FromStr};

use clap::Parser;
use url::Url;
Expand All @@ -18,6 +18,10 @@ pub struct Args {
/// Indexer configurations to start.
#[arg(short, long, env = "HUBBLE_INDEXERS")]
pub indexers: Indexers,

/// Indexer configurations to start.
#[arg(short, long, env = "HUBBLE_METRICS_PORT")]
pub metrics_addr: Option<SocketAddr>,
}

#[derive(Clone, Debug, serde::Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion hubble/src/hasura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct InsertChain;
#[graphql(
schema_path = "src/graphql/schema.graphql",
query_path = "src/graphql/operations.graphql",
response_derives = "Debug, Default",
response_derives = "Clone, Debug, Default",
normalization = "rust",
skip_serializing_none
)]
Expand Down
13 changes: 13 additions & 0 deletions hubble/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(return_position_impl_trait_in_trait)]
#![feature(result_option_inspect)]

use axum::{routing::get, Router};
use clap::Parser;
use hasura::HasuraDataStore;
use reqwest::Client;
Expand All @@ -9,6 +10,7 @@ use tracing::{error, info, warn};

mod cli;
mod hasura;
mod metrics;
mod tm;

#[tokio::main]
Expand All @@ -17,13 +19,24 @@ async fn main() -> color_eyre::eyre::Result<()> {

let args = crate::cli::Args::parse();
tracing_subscriber::fmt::init();
metrics::register_custom_metrics();

let url = args.url.clone();
let secret = args.secret.clone();
let client = Client::new();
let db = HasuraDataStore::new(client, url, secret);
let mut set = JoinSet::new();

if let Some(addr) = args.metrics_addr {
set.spawn(async move {
let app = Router::new().route("/metrics", get(metrics::handler));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.map_err(Into::into)
});
}

args.indexers.into_iter().for_each(|indexer| {
let db = db.clone();
set.spawn(async move {
Expand Down
47 changes: 47 additions & 0 deletions hubble/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use lazy_static::lazy_static;
use prometheus::{IntCounterVec, Opts, Registry};
use reqwest::StatusCode;

lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
pub static ref EVENT_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("events", "Events"), &["chain_id", "block_hash"])
.expect("register EVENT_COLLECTOR");
pub static ref BLOCK_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("blocks", "Blocks"), &["chain_id"])
.expect("register BLOCK_COLLECTOR");
pub static ref POST_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("posts", "Posts to Hasura"), &["chain_id"])
.expect("register POSTS");
}

pub fn register_custom_metrics() {
REGISTRY
.register(Box::new(EVENT_COLLECTOR.clone()))
.expect("EVENT_COLLECTOR can be registered");
REGISTRY
.register(Box::new(BLOCK_COLLECTOR.clone()))
.expect("BLOCK_COLLECTOR can be registered");
REGISTRY
.register(Box::new(POST_COLLECTOR.clone()))
.expect("BLOCK_COLLECTOR can be registered");
}

#[axum::debug_handler]
pub async fn handler() -> Result<String, StatusCode> {
let encoder = prometheus::TextEncoder::new();
let mut response = encoder
.encode_to_string(&REGISTRY.gather())
.map_err(|err| {
tracing::error!("could not gather metrics: {}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;

encoder
.encode_utf8(&prometheus::gather(), &mut response)
.map_err(|err| {
tracing::error!("could not gather metrics: {}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(response)
}
120 changes: 77 additions & 43 deletions hubble/src/tm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration};
use tracing::{debug, info};
use url::Url;

use crate::hasura::*;
use crate::{hasura::*, metrics};

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -37,14 +37,22 @@ impl Config {
}
};

let (height, chain_db_id) = get_current_data(&db, chain_id).await?;
let (height, chain_db_id) = get_current_data(&db, &chain_id).await?;
let mut height: Height = (height + 1).into();

// Fast sync protocol. We sync up to latest.height - batch-size + 1
while let Some(up_to) = should_fast_sync_up_to(&client, Self::BATCH_SIZE, height).await? {
info!("starting fast sync protocol up to: {}", up_to);
loop {
height = batch_sync(&client, &db, chain_db_id, Self::BATCH_SIZE, height).await?;
height = batch_sync(
&client,
&db,
&chain_id,
chain_db_id,
Self::BATCH_SIZE,
height,
)
.await?;
if height >= up_to {
info!("re-evaluating fast sync protocol");
break; // go back to the should_fast_sync_up_to. If this returns None, we continue to slow sync.
Expand All @@ -57,7 +65,7 @@ impl Config {
debug!("starting regular sync protocol");
// Regular sync protocol. This fetches blocks one-by-one.
retry_count += 1;
match sync_next(&client, &db, chain_db_id, height).await? {
match sync_next(&client, &db, &chain_id, chain_db_id, height).await? {
Some(h) => {
height = h;
retry_count = 0
Expand Down Expand Up @@ -93,13 +101,13 @@ pub fn is_height_exceeded_error(err: &Error) -> bool {
/// Obtains the current height and chain_db_id for the chain_id. If the chain_id is not stored yet, an entry is created.
async fn get_current_data<D: Datastore>(
db: &D,
chain_id: String,
chain_id: &str,
) -> Result<(u32, i64), color_eyre::eyre::Report> {
// We query for the last indexed block to not waste resources re-indexing.
debug!("fetching latest stored block");
let latest_stored = db
.do_post::<GetLatestBlock>(get_latest_block::Variables {
chain_id: chain_id.clone(),
chain_id: chain_id.to_string(),
})
.await?;

Expand All @@ -118,7 +126,9 @@ async fn get_current_data<D: Datastore>(
chains.id
} else {
let created = db
.do_post::<InsertChain>(insert_chain::Variables { chain_id })
.do_post::<InsertChain>(insert_chain::Variables {
chain_id: chain_id.to_string(),
})
.await?;
created.data.unwrap().insert_chains_one.unwrap().id
};
Expand Down Expand Up @@ -150,12 +160,13 @@ async fn should_fast_sync_up_to(
async fn batch_sync<D: Datastore>(
client: &HttpClient,
db: &D,
chain_id: &str,
chain_db_id: i64,
batch_size: u32,
from: Height,
) -> Result<Height, Report> {
if from.value() == 1 {
sync_next(client, db, chain_db_id, from).await?;
sync_next(client, db, chain_id, chain_db_id, from).await?;
}

let min = from.value() as u32;
Expand All @@ -169,49 +180,63 @@ async fn batch_sync<D: Datastore>(
)
.await?;

let objects: Result<Vec<_>, Report> =
join_all(headers.block_metas.iter().rev().map(|header| async {
debug!("fetching block results for height {}", header.header.height);
let block = client.block_results(header.header.height).await?;
let events: Vec<_> = block
.events()
.enumerate()
.map(|event| event.into())
.collect();
debug!(
"found {} events for block {}",
events.len(),
header.header.height
);
Ok(insert_blocks_many::BlocksInsertInput {
chain_id: Some(chain_db_id),
chain: None,
events: Some(insert_blocks_many::EventsArrRelInsertInput {
data: events,
on_conflict: None,
}),
hash: Some(header.header.hash().to_string()),
height: Some(header.header.height.into()),
id: None,
created_at: None,
updated_at: None,
is_finalized: Some(true),
extra_data: Some(serde_json::to_value(header.clone())?),
})
}))
.await
.into_iter()
.collect();

let variables = insert_blocks_many::Variables { objects: objects? };
let objects: Vec<_> = join_all(headers.block_metas.iter().rev().map(|header| async {
debug!("fetching block results for height {}", header.header.height);
let block = client.block_results(header.header.height).await?;
let events: Vec<_> = block
.events()
.enumerate()
.map(|event| event.into())
.collect();
debug!(
"found {} events for block {}",
events.len(),
header.header.height
);
Ok(insert_blocks_many::BlocksInsertInput {
chain_id: Some(chain_db_id),
chain: None,
events: Some(insert_blocks_many::EventsArrRelInsertInput {
data: events,
on_conflict: None,
}),
hash: Some(header.header.hash().to_string()),
height: Some(header.header.height.into()),
id: None,
created_at: None,
updated_at: None,
is_finalized: Some(true),
extra_data: Some(serde_json::to_value(header.clone())?),
})
}))
.await
.into_iter()
.collect::<Result<Vec<_>, Report>>()?;

objects.iter().for_each(|block| {
let num_events = block
.events
.as_ref()
.map(|input| input.data.len())
.unwrap_or_default();
metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id, block.hash.as_ref().unwrap()])
.inc_by(num_events as u64);
});
metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id])
.inc_by(objects.len() as u64);
let variables = insert_blocks_many::Variables { objects };
debug!("inserting batch of blocks");
db.do_post::<InsertBlocksMany>(variables).await?;
metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc();
Ok((from.value() as u32 + headers.block_metas.len() as u32).into())
}

async fn sync_next<D: Datastore>(
client: &HttpClient,
db: &D,
chain_id: &str,
chain_db_id: i64,
height: Height,
) -> Result<Option<Height>, Report> {
Expand All @@ -236,6 +261,14 @@ async fn sync_next<D: Datastore>(
info!("found {} events for block {}", &events.len(), &height);

debug!("storing events for block {}", &height);

metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id, &header.hash().to_string()])
.inc_by(events.len() as u64);

metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id])
.inc();
let v = insert_block::Variables {
object: insert_block::BlocksInsertInput {
chain: None,
Expand All @@ -255,6 +288,7 @@ async fn sync_next<D: Datastore>(
};

db.do_post::<InsertBlock>(v).await?;
metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc();
Ok(Some(height.increment()))
}

Expand Down