diff --git a/Cargo.lock b/Cargo.lock index 0df01673b8..eb3547741e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -119,7 +119,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -433,6 +433,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -447,7 +448,11 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-layer", "tower-service", @@ -470,6 +475,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1460,7 +1477,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1675,7 +1692,7 @@ checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2629,7 +2646,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2670,10 +2687,13 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" name = "hubble" version = "0.1.0" dependencies = [ + "axum", "clap", "color-eyre 0.6.2", "futures", "graphql_client", + "lazy_static", + "prometheus", "reqwest", "serde", "serde_json", @@ -2959,6 +2979,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -2972,8 +3003,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", - "rustix", - "windows-sys", + "rustix 0.38.11", + "windows-sys 0.48.0", ] [[package]] @@ -3109,6 +3140,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "linux-raw-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" + [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -3218,7 +3255,7 @@ checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3493,7 +3530,7 @@ dependencies = [ "libc", "redox_syscall 0.3.5", "smallvec 1.11.0", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -3800,6 +3837,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "hex", + "lazy_static", + "rustix 0.36.15", +] + +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "libc", + "memchr", + "parking_lot", + "procfs", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.11.9" @@ -3832,6 +3899,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protos" version = "0.1.0" @@ -4129,6 +4202,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.36.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c37f1bd5ef1b5422177b7646cba67430579cfe2ace80f284fee876bca52ad941" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + [[package]] name = "rustix" version = "0.38.11" @@ -4138,8 +4225,8 @@ dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys", - "windows-sys", + "linux-raw-sys 0.4.5", + "windows-sys 0.48.0", ] [[package]] @@ -4339,7 +4426,7 @@ version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -4566,6 +4653,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.16" @@ -4740,7 +4837,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -4929,8 +5026,8 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix", - "windows-sys", + "rustix 0.38.11", + "windows-sys 0.48.0", ] [[package]] @@ -5202,7 +5299,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.3", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -6105,13 +6202,37 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -6120,51 +6241,93 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -6187,7 +6350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ "cfg-if", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] diff --git a/hubble/Cargo.toml b/hubble/Cargo.toml index fb012e93a4..a60b1d2312 100644 --- a/hubble/Cargo.toml +++ b/hubble/Cargo.toml @@ -18,3 +18,6 @@ url = { version = "2.4.1", features = ["serde"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" futures = "0.3.28" +prometheus = { version = "0.13.3", features = ["process"] } +lazy_static = "1.4.0" +axum = { version = "0.6.20", features = ["macros"] } diff --git a/hubble/hubble.nix b/hubble/hubble.nix index abae99a357..c9766cdad2 100644 --- a/hubble/hubble.nix +++ b/hubble/hubble.nix @@ -40,6 +40,10 @@ type = types.str; default = "https://graphql.union.build"; }; + metrics-addr = mkOption { + type = types.str; + default = "0.0.0.0:9090"; + }; hasura-admin-secret = mkOption { type = types.str; default = ""; @@ -66,7 +70,10 @@ indexersJson = builtins.toJSON cfg.indexers; in '' - ${pkgs.lib.getExe cfg.package} --url ${cfg.url} ${secretArg} --indexers '${indexersJson}' + ${pkgs.lib.getExe cfg.package} \ + --metrics-addr ${cfg.metrics-addr} \ + --url ${cfg.url} ${secretArg} \ + --indexers '${indexersJson}' ''; }; in diff --git a/hubble/src/cli.rs b/hubble/src/cli.rs index cded221109..143e7d805c 100644 --- a/hubble/src/cli.rs +++ b/hubble/src/cli.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{net::SocketAddr, str::FromStr}; use clap::Parser; use url::Url; @@ -18,6 +18,10 @@ pub struct Args { /// Indexer configurations to start. #[arg(short, long, env = "HUBBLE_INDEXERS")] pub indexers: Indexers, + + /// Indexer configurations to start. + #[arg(short, long, env = "HUBBLE_METRICS_PORT")] + pub metrics_addr: Option, } #[derive(Clone, Debug, serde::Deserialize)] diff --git a/hubble/src/hasura.rs b/hubble/src/hasura.rs index d7f6648c28..72173fed33 100644 --- a/hubble/src/hasura.rs +++ b/hubble/src/hasura.rs @@ -90,7 +90,7 @@ pub struct InsertChain; #[graphql( schema_path = "src/graphql/schema.graphql", query_path = "src/graphql/operations.graphql", - response_derives = "Debug, Default", + response_derives = "Clone, Debug, Default", normalization = "rust", skip_serializing_none )] diff --git a/hubble/src/main.rs b/hubble/src/main.rs index 793b57ee78..7571bfbe37 100644 --- a/hubble/src/main.rs +++ b/hubble/src/main.rs @@ -1,6 +1,7 @@ #![feature(return_position_impl_trait_in_trait)] #![feature(result_option_inspect)] +use axum::{routing::get, Router}; use clap::Parser; use hasura::HasuraDataStore; use reqwest::Client; @@ -9,6 +10,7 @@ use tracing::{error, info, warn}; mod cli; mod hasura; +mod metrics; mod tm; #[tokio::main] @@ -17,6 +19,7 @@ async fn main() -> color_eyre::eyre::Result<()> { let args = crate::cli::Args::parse(); tracing_subscriber::fmt::init(); + metrics::register_custom_metrics(); let url = args.url.clone(); let secret = args.secret.clone(); @@ -24,6 +27,16 @@ async fn main() -> color_eyre::eyre::Result<()> { let db = HasuraDataStore::new(client, url, secret); let mut set = JoinSet::new(); + if let Some(addr) = args.metrics_addr { + set.spawn(async move { + let app = Router::new().route("/metrics", get(metrics::handler)); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .map_err(Into::into) + }); + } + args.indexers.into_iter().for_each(|indexer| { let db = db.clone(); set.spawn(async move { diff --git a/hubble/src/metrics.rs b/hubble/src/metrics.rs new file mode 100644 index 0000000000..25c01ac1e0 --- /dev/null +++ b/hubble/src/metrics.rs @@ -0,0 +1,47 @@ +use lazy_static::lazy_static; +use prometheus::{IntCounterVec, Opts, Registry}; +use reqwest::StatusCode; + +lazy_static! { + pub static ref REGISTRY: Registry = Registry::new(); + pub static ref EVENT_COLLECTOR: IntCounterVec = + IntCounterVec::new(Opts::new("events", "Events"), &["chain_id", "block_hash"]) + .expect("register EVENT_COLLECTOR"); + pub static ref BLOCK_COLLECTOR: IntCounterVec = + IntCounterVec::new(Opts::new("blocks", "Blocks"), &["chain_id"]) + .expect("register BLOCK_COLLECTOR"); + pub static ref POST_COLLECTOR: IntCounterVec = + IntCounterVec::new(Opts::new("posts", "Posts to Hasura"), &["chain_id"]) + .expect("register POSTS"); +} + +pub fn register_custom_metrics() { + REGISTRY + .register(Box::new(EVENT_COLLECTOR.clone())) + .expect("EVENT_COLLECTOR can be registered"); + REGISTRY + .register(Box::new(BLOCK_COLLECTOR.clone())) + .expect("BLOCK_COLLECTOR can be registered"); + REGISTRY + .register(Box::new(POST_COLLECTOR.clone())) + .expect("BLOCK_COLLECTOR can be registered"); +} + +#[axum::debug_handler] +pub async fn handler() -> Result { + let encoder = prometheus::TextEncoder::new(); + let mut response = encoder + .encode_to_string(®ISTRY.gather()) + .map_err(|err| { + tracing::error!("could not gather metrics: {}", err); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + encoder + .encode_utf8(&prometheus::gather(), &mut response) + .map_err(|err| { + tracing::error!("could not gather metrics: {}", err); + StatusCode::INTERNAL_SERVER_ERROR + })?; + Ok(response) +} diff --git a/hubble/src/tm.rs b/hubble/src/tm.rs index d2f53e8b9a..7229d8d91a 100644 --- a/hubble/src/tm.rs +++ b/hubble/src/tm.rs @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration}; use tracing::{debug, info}; use url::Url; -use crate::hasura::*; +use crate::{hasura::*, metrics}; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -37,14 +37,22 @@ impl Config { } }; - let (height, chain_db_id) = get_current_data(&db, chain_id).await?; + let (height, chain_db_id) = get_current_data(&db, &chain_id).await?; let mut height: Height = (height + 1).into(); // Fast sync protocol. We sync up to latest.height - batch-size + 1 while let Some(up_to) = should_fast_sync_up_to(&client, Self::BATCH_SIZE, height).await? { info!("starting fast sync protocol up to: {}", up_to); loop { - height = batch_sync(&client, &db, chain_db_id, Self::BATCH_SIZE, height).await?; + height = batch_sync( + &client, + &db, + &chain_id, + chain_db_id, + Self::BATCH_SIZE, + height, + ) + .await?; if height >= up_to { info!("re-evaluating fast sync protocol"); break; // go back to the should_fast_sync_up_to. If this returns None, we continue to slow sync. @@ -57,7 +65,7 @@ impl Config { debug!("starting regular sync protocol"); // Regular sync protocol. This fetches blocks one-by-one. retry_count += 1; - match sync_next(&client, &db, chain_db_id, height).await? { + match sync_next(&client, &db, &chain_id, chain_db_id, height).await? { Some(h) => { height = h; retry_count = 0 @@ -93,13 +101,13 @@ pub fn is_height_exceeded_error(err: &Error) -> bool { /// Obtains the current height and chain_db_id for the chain_id. If the chain_id is not stored yet, an entry is created. async fn get_current_data( db: &D, - chain_id: String, + chain_id: &str, ) -> Result<(u32, i64), color_eyre::eyre::Report> { // We query for the last indexed block to not waste resources re-indexing. debug!("fetching latest stored block"); let latest_stored = db .do_post::(get_latest_block::Variables { - chain_id: chain_id.clone(), + chain_id: chain_id.to_string(), }) .await?; @@ -118,7 +126,9 @@ async fn get_current_data( chains.id } else { let created = db - .do_post::(insert_chain::Variables { chain_id }) + .do_post::(insert_chain::Variables { + chain_id: chain_id.to_string(), + }) .await?; created.data.unwrap().insert_chains_one.unwrap().id }; @@ -150,12 +160,13 @@ async fn should_fast_sync_up_to( async fn batch_sync( client: &HttpClient, db: &D, + chain_id: &str, chain_db_id: i64, batch_size: u32, from: Height, ) -> Result { if from.value() == 1 { - sync_next(client, db, chain_db_id, from).await?; + sync_next(client, db, chain_id, chain_db_id, from).await?; } let min = from.value() as u32; @@ -169,49 +180,63 @@ async fn batch_sync( ) .await?; - let objects: Result, Report> = - join_all(headers.block_metas.iter().rev().map(|header| async { - debug!("fetching block results for height {}", header.header.height); - let block = client.block_results(header.header.height).await?; - let events: Vec<_> = block - .events() - .enumerate() - .map(|event| event.into()) - .collect(); - debug!( - "found {} events for block {}", - events.len(), - header.header.height - ); - Ok(insert_blocks_many::BlocksInsertInput { - chain_id: Some(chain_db_id), - chain: None, - events: Some(insert_blocks_many::EventsArrRelInsertInput { - data: events, - on_conflict: None, - }), - hash: Some(header.header.hash().to_string()), - height: Some(header.header.height.into()), - id: None, - created_at: None, - updated_at: None, - is_finalized: Some(true), - extra_data: Some(serde_json::to_value(header.clone())?), - }) - })) - .await - .into_iter() - .collect(); - - let variables = insert_blocks_many::Variables { objects: objects? }; + let objects: Vec<_> = join_all(headers.block_metas.iter().rev().map(|header| async { + debug!("fetching block results for height {}", header.header.height); + let block = client.block_results(header.header.height).await?; + let events: Vec<_> = block + .events() + .enumerate() + .map(|event| event.into()) + .collect(); + debug!( + "found {} events for block {}", + events.len(), + header.header.height + ); + Ok(insert_blocks_many::BlocksInsertInput { + chain_id: Some(chain_db_id), + chain: None, + events: Some(insert_blocks_many::EventsArrRelInsertInput { + data: events, + on_conflict: None, + }), + hash: Some(header.header.hash().to_string()), + height: Some(header.header.height.into()), + id: None, + created_at: None, + updated_at: None, + is_finalized: Some(true), + extra_data: Some(serde_json::to_value(header.clone())?), + }) + })) + .await + .into_iter() + .collect::, Report>>()?; + + objects.iter().for_each(|block| { + let num_events = block + .events + .as_ref() + .map(|input| input.data.len()) + .unwrap_or_default(); + metrics::EVENT_COLLECTOR + .with_label_values(&[chain_id, block.hash.as_ref().unwrap()]) + .inc_by(num_events as u64); + }); + metrics::BLOCK_COLLECTOR + .with_label_values(&[chain_id]) + .inc_by(objects.len() as u64); + let variables = insert_blocks_many::Variables { objects }; debug!("inserting batch of blocks"); db.do_post::(variables).await?; + metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc(); Ok((from.value() as u32 + headers.block_metas.len() as u32).into()) } async fn sync_next( client: &HttpClient, db: &D, + chain_id: &str, chain_db_id: i64, height: Height, ) -> Result, Report> { @@ -236,6 +261,14 @@ async fn sync_next( info!("found {} events for block {}", &events.len(), &height); debug!("storing events for block {}", &height); + + metrics::EVENT_COLLECTOR + .with_label_values(&[chain_id, &header.hash().to_string()]) + .inc_by(events.len() as u64); + + metrics::BLOCK_COLLECTOR + .with_label_values(&[chain_id]) + .inc(); let v = insert_block::Variables { object: insert_block::BlocksInsertInput { chain: None, @@ -255,6 +288,7 @@ async fn sync_next( }; db.do_post::(v).await?; + metrics::POST_COLLECTOR.with_label_values(&[chain_id]).inc(); Ok(Some(height.increment())) }