Skip to content

Commit

Permalink
feat(hubble): add prometheus metrics (#711)
Browse files Browse the repository at this point in the history
* feat(hubble): add prometheus metrics

* feat(hubble): support metrics in nixosmodule

* feat(hubble): use a single buffer for metrics collection
  • Loading branch information
KaiserKarel authored Sep 15, 2023
1 parent b3502b5 commit 676a2f5
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 71 deletions.
213 changes: 188 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ url = { version = "2.4.1", features = ["serde"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
futures = "0.3.28"
prometheus = { version = "0.13.3", features = ["process"] }
lazy_static = "1.4.0"
axum = { version = "0.6.20", features = ["macros"] }
9 changes: 8 additions & 1 deletion hubble/hubble.nix
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
type = types.str;
default = "https://graphql.union.build";
};
metrics-addr = mkOption {
type = types.str;
default = "0.0.0.0:9090";
};
hasura-admin-secret = mkOption {
type = types.str;
default = "";
Expand All @@ -66,7 +70,10 @@
indexersJson = builtins.toJSON cfg.indexers;
in
''
${pkgs.lib.getExe cfg.package} --url ${cfg.url} ${secretArg} --indexers '${indexersJson}'
${pkgs.lib.getExe cfg.package} \
--metrics-addr ${cfg.metrics-addr} \
--url ${cfg.url} ${secretArg} \
--indexers '${indexersJson}'
'';
};
in
Expand Down
6 changes: 5 additions & 1 deletion hubble/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::str::FromStr;
use std::{net::SocketAddr, str::FromStr};

use clap::Parser;
use url::Url;
Expand All @@ -18,6 +18,10 @@ pub struct Args {
/// Indexer configurations to start.
#[arg(short, long, env = "HUBBLE_INDEXERS")]
pub indexers: Indexers,

/// Indexer configurations to start.
#[arg(short, long, env = "HUBBLE_METRICS_PORT")]
pub metrics_addr: Option<SocketAddr>,
}

#[derive(Clone, Debug, serde::Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion hubble/src/hasura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct InsertChain;
#[graphql(
schema_path = "src/graphql/schema.graphql",
query_path = "src/graphql/operations.graphql",
response_derives = "Debug, Default",
response_derives = "Clone, Debug, Default",
normalization = "rust",
skip_serializing_none
)]
Expand Down
13 changes: 13 additions & 0 deletions hubble/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(return_position_impl_trait_in_trait)]
#![feature(result_option_inspect)]

use axum::{routing::get, Router};
use clap::Parser;
use hasura::HasuraDataStore;
use reqwest::Client;
Expand All @@ -9,6 +10,7 @@ use tracing::{error, info, warn};

mod cli;
mod hasura;
mod metrics;
mod tm;

#[tokio::main]
Expand All @@ -17,13 +19,24 @@ async fn main() -> color_eyre::eyre::Result<()> {

let args = crate::cli::Args::parse();
tracing_subscriber::fmt::init();
metrics::register_custom_metrics();

let url = args.url.clone();
let secret = args.secret.clone();
let client = Client::new();
let db = HasuraDataStore::new(client, url, secret);
let mut set = JoinSet::new();

if let Some(addr) = args.metrics_addr {
set.spawn(async move {
let app = Router::new().route("/metrics", get(metrics::handler));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.map_err(Into::into)
});
}

args.indexers.into_iter().for_each(|indexer| {
let db = db.clone();
set.spawn(async move {
Expand Down
47 changes: 47 additions & 0 deletions hubble/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use lazy_static::lazy_static;
use prometheus::{IntCounterVec, Opts, Registry};
use reqwest::StatusCode;

lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
pub static ref EVENT_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("events", "Events"), &["chain_id", "block_hash"])
.expect("register EVENT_COLLECTOR");
pub static ref BLOCK_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("blocks", "Blocks"), &["chain_id"])
.expect("register BLOCK_COLLECTOR");
pub static ref POST_COLLECTOR: IntCounterVec =
IntCounterVec::new(Opts::new("posts", "Posts to Hasura"), &["chain_id"])
.expect("register POSTS");
}

pub fn register_custom_metrics() {
REGISTRY
.register(Box::new(EVENT_COLLECTOR.clone()))
.expect("EVENT_COLLECTOR can be registered");
REGISTRY
.register(Box::new(BLOCK_COLLECTOR.clone()))
.expect("BLOCK_COLLECTOR can be registered");
REGISTRY
.register(Box::new(POST_COLLECTOR.clone()))
.expect("BLOCK_COLLECTOR can be registered");
}

#[axum::debug_handler]
pub async fn handler() -> Result<String, StatusCode> {
let encoder = prometheus::TextEncoder::new();
let mut response = encoder
.encode_to_string(&REGISTRY.gather())
.map_err(|err| {
tracing::error!("could not gather metrics: {}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;

encoder
.encode_utf8(&prometheus::gather(), &mut response)
.map_err(|err| {
tracing::error!("could not gather metrics: {}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(response)
}
120 changes: 77 additions & 43 deletions hubble/src/tm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration};
use tracing::{debug, info};
use url::Url;

use crate::hasura::*;
use crate::{hasura::*, metrics};

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -37,14 +37,22 @@ impl Config {
}
};

let (height, chain_db_id) = get_current_data(&db, chain_id).await?;
let (height, chain_db_id) = get_current_data(&db, &chain_id).await?;
let mut height: Height = (height + 1).into();

// Fast sync protocol. We sync up to latest.height - batch-size + 1
while let Some(up_to) = should_fast_sync_up_to(&client, Self::BATCH_SIZE, height).await? {
info!("starting fast sync protocol up to: {}", up_to);
loop {
height = batch_sync(&client, &db, chain_db_id, Self::BATCH_SIZE, height).await?;
height = batch_sync(
&client,
&db,
&chain_id,
chain_db_id,
Self::BATCH_SIZE,
height,
)
.await?;
if height >= up_to {
info!("re-evaluating fast sync protocol");
break; // go back to the should_fast_sync_up_to. If this returns None, we continue to slow sync.
Expand All @@ -57,7 +65,7 @@ impl Config {
debug!("starting regular sync protocol");
// Regular sync protocol. This fetches blocks one-by-one.
retry_count += 1;
match sync_next(&client, &db, chain_db_id, height).await? {
match sync_next(&client, &db, &chain_id, chain_db_id, height).await? {
Some(h) => {
height = h;
retry_count = 0
Expand Down Expand Up @@ -93,13 +101,13 @@ pub fn is_height_exceeded_error(err: &Error) -> bool {
/// Obtains the current height and chain_db_id for the chain_id. If the chain_id is not stored yet, an entry is created.
async fn get_current_data<D: Datastore>(
db: &D,
chain_id: String,
chain_id: &str,
) -> Result<(u32, i64), color_eyre::eyre::Report> {
// We query for the last indexed block to not waste resources re-indexing.
debug!("fetching latest stored block");
let latest_stored = db
.do_post::<GetLatestBlock>(get_latest_block::Variables {
chain_id: chain_id.clone(),
chain_id: chain_id.to_string(),
})
.await?;

Expand All @@ -118,7 +126,9 @@ async fn get_current_data<D: Datastore>(
chains.id
} else {
let created = db
.do_post::<InsertChain>(insert_chain::Variables { chain_id })
.do_post::<InsertChain>(insert_chain::Variables {
chain_id: chain_id.to_string(),
})
.await?;
created.data.unwrap().insert_chains_one.unwrap().id
};
Expand Down Expand Up @@ -150,12 +160,13 @@ async fn should_fast_sync_up_to(
async fn batch_sync<D: Datastore>(
client: &HttpClient,
db: &D,
chain_id: &str,
chain_db_id: i64,
batch_size: u32,
from: Height,
) -> Result<Height, Report> {
if from.value() == 1 {
sync_next(client, db, chain_db_id, from).await?;
sync_next(client, db, chain_id, chain_db_id, from).await?;
}

let min = from.value() as u32;
Expand All @@ -169,49 +180,63 @@ async fn batch_sync<D: Datastore>(
)
.await?;

let objects: Result<Vec<_>, Report> =
join_all(headers.block_metas.iter().rev().map(|header| async {
debug!("fetching block results for height {}", header.header.height);
let block = client.block_results(header.header.height).await?;
let events: Vec<_> = block
.events()
.enumerate()
.map(|event| event.into())
.collect();
debug!(
"found {} events for block {}",
events.len(),
header.header.height
);
Ok(insert_blocks_many::BlocksInsertInput {
chain_id: Some(chain_db_id),
chain: None,
events: Some(insert_blocks_many::EventsArrRelInsertInput {
data: events,
on_conflict: None,
}),
hash: Some(header.header.hash().to_string()),
height: Some(header.header.height.into()),
id: None,
created_at: None,
updated_at: None,
is_finalized: Some(true),
extra_data: Some(serde_json::to_value(header.clone())?),
})
}))
.await
.into_iter()
.collect();

let variables = insert_blocks_many::Variables { objects: objects? };
let objects: Vec<_> = join_all(headers.block_metas.iter().rev().map(|header| async {
debug!("fetching block results for height {}", header.header.height);
let block = client.block_results(header.header.height).await?;
let events: Vec<_> = block
.events()
.enumerate()
.map(|event| event.into())
.collect();
debug!(
"found {} events for block {}",
events.len(),
header.header.height
);
Ok(insert_blocks_many::BlocksInsertInput {
chain_id: Some(chain_db_id),
chain: None,
events: Some(insert_blocks_many::EventsArrRelInsertInput {
data: events,
on_conflict: None,
}),
hash: Some(header.header.hash().to_string()),
height: Some(header.header.height.into()),
id: None,
created_at: None,
updated_at: None,
is_finalized: Some(true),
extra_data: Some(serde_json::to_value(header.clone())?),
})
}))
.await
.into_iter()
.collect::<Result<Vec<_>, Report>>()?;

objects.iter().for_each(|block| {
let num_events = block
.events
.as_ref()
.map(|input| input.data.len())
.unwrap_or_default();
metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id, block.hash.as_ref().unwrap()])
.inc_by(num_events as u64);
});
metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id])
.inc_by(objects.len() as u64);
let variables = insert_blocks_many::Variables { objects };
debug!("inserting batch of blocks");
db.do_post::<InsertBlocksMany>(variables).await?;
metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc();
Ok((from.value() as u32 + headers.block_metas.len() as u32).into())
}

async fn sync_next<D: Datastore>(
client: &HttpClient,
db: &D,
chain_id: &str,
chain_db_id: i64,
height: Height,
) -> Result<Option<Height>, Report> {
Expand All @@ -236,6 +261,14 @@ async fn sync_next<D: Datastore>(
info!("found {} events for block {}", &events.len(), &height);

debug!("storing events for block {}", &height);

metrics::EVENT_COLLECTOR
.with_label_values(&[chain_id, &header.hash().to_string()])
.inc_by(events.len() as u64);

metrics::BLOCK_COLLECTOR
.with_label_values(&[chain_id])
.inc();
let v = insert_block::Variables {
object: insert_block::BlocksInsertInput {
chain: None,
Expand All @@ -255,6 +288,7 @@ async fn sync_next<D: Datastore>(
};

db.do_post::<InsertBlock>(v).await?;
metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc();
Ok(Some(height.increment()))
}

Expand Down

0 comments on commit 676a2f5

Please sign in to comment.