diff --git a/Cargo.lock b/Cargo.lock index c51e4bed20..074cd0bd09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,6 +560,16 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if 1.0.0", + "num_cpus", +] + [[package]] name = "der" version = "0.3.3" @@ -1244,10 +1254,15 @@ dependencies = [ "hdpath", "hex", "humantime-serde", + "hyper", "ibc", "ibc-proto", "itertools 0.10.0", "k256", + "lazy_static", + "opentelemetry", + "opentelemetry-prometheus", + "prometheus", "prost", "prost-types", "retry", @@ -1684,6 +1699,36 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "492848ff47f11b7f9de0443b404e2c5775f695e1af6b7076ca25f999581d547a" +dependencies = [ + "async-trait", + "crossbeam-channel 0.5.1", + "dashmap", + "fnv", + "futures", + "js-sys", + "lazy_static", + "percent-encoding", + "pin-project", + "rand 0.8.3", + "thiserror", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f41760047df46012aaf2bb87fec0efed4d97f7a6af6825858c3b4d9438dadb94" +dependencies = [ + "opentelemetry", + "prometheus", + "protobuf", +] + [[package]] name = "os_str_bytes" version = "2.4.0" @@ -1838,6 +1883,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prometheus" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.7.0" @@ -1871,6 +1931,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45604fc7a88158e7d514d8e22e14ac746081e7a70d7690074dd0029ee37458d6" + [[package]] name = "quote" version = "1.0.9" diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 20c26ce4d6..f0303401a2 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -72,3 +72,19 @@ ibc = { version = "0.3.0", path = "../modules", features = ["mocks"] } # Needed for generating (synthetic) light blocks. tendermint-testgen = { version = "=0.19.0" } + +# Dependencies needed for telemetry support +[dependencies.opentelemetry] +version = "0.14.0" + +[dependencies.opentelemetry-prometheus] +version = "0.7.0" + +[dependencies.hyper] +version = "0.14.7" + +[dependencies.lazy_static] +version = "1.4.0" + +[dependencies.prometheus] +version = "0.12.0" diff --git a/relayer/src/config.rs b/relayer/src/config.rs index 6f26e62be3..b6329690cd 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -87,6 +87,10 @@ pub struct GlobalConfig { /// All valid log levels, as defined in tracing: /// https://docs.rs/tracing-core/0.1.17/tracing_core/struct.Level.html pub log_level: String, + + pub telemetry_enabled: bool, + + pub telemetry_port: u16, } impl Default for GlobalConfig { @@ -94,6 +98,8 @@ impl Default for GlobalConfig { Self { strategy: Strategy::default(), log_level: "info".to_string(), + telemetry_enabled: true, + telemetry_port: 3000, } } } diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index 687695c5f5..e1a22eabf0 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -13,6 +13,9 @@ //! //! [Hermes]: https://docs.rs/ibc-relayer-cli/0.2.0/ +#[macro_use] +extern crate lazy_static; + pub mod chain; pub mod channel; pub mod config; @@ -28,6 +31,7 @@ pub mod object; pub mod registry; pub mod relay; pub mod supervisor; +pub mod telemetry; pub mod transfer; pub mod upgrade_chain; pub mod util; diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 5692e48abd..85b304994d 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -29,6 +29,7 @@ use crate::{ }; mod error; +use crate::telemetry::service::TelemetryService; pub use error::Error; /// The supervisor listens for events on multiple pairs of chains, @@ -38,6 +39,7 @@ pub struct Supervisor { config: Config, registry: Registry, workers: HashMap, + telemetry: Option, } impl Supervisor { @@ -45,10 +47,25 @@ impl Supervisor { pub fn spawn(config: Config) -> Result { let registry = Registry::new(config.clone()); + // Start the telemetry service + let telemetry = match config.global.telemetry_enabled { + true => { + println!( + "TELEMETRY ENABLED ON PORT: {:?}", + config.global.telemetry_port + ); + Some(TelemetryService { + listen_port: config.global.telemetry_port, + }) + } + false => None, + }; + Ok(Self { config, registry, workers: HashMap::new(), + telemetry, }) } diff --git a/relayer/src/telemetry.rs b/relayer/src/telemetry.rs new file mode 100644 index 0000000000..29f058ecde --- /dev/null +++ b/relayer/src/telemetry.rs @@ -0,0 +1,2 @@ +pub mod relayer_state; +pub mod service; diff --git a/relayer/src/telemetry/relayer_state.rs b/relayer/src/telemetry/relayer_state.rs new file mode 100644 index 0000000000..709df64e3c --- /dev/null +++ b/relayer/src/telemetry/relayer_state.rs @@ -0,0 +1,7 @@ +use opentelemetry::metrics::BoundCounter; +use opentelemetry_prometheus::PrometheusExporter; + +pub struct RelayerState { + pub exporter: PrometheusExporter, + pub tx_counter: BoundCounter<'static, u64>, +} diff --git a/relayer/src/telemetry/service.rs b/relayer/src/telemetry/service.rs new file mode 100644 index 0000000000..a54c29ef81 --- /dev/null +++ b/relayer/src/telemetry/service.rs @@ -0,0 +1,75 @@ +use hyper::{ + header::CONTENT_TYPE, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, +}; + +use crate::telemetry::relayer_state::RelayerState; +use opentelemetry::{global, KeyValue}; +use prometheus::{Encoder, TextEncoder}; +use std::convert::Infallible; +use std::sync::Arc; + +lazy_static! { + static ref HANDLER_ALL: [KeyValue; 1] = [KeyValue::new("handler", "all")]; +} + +pub struct TelemetryService { + pub(crate) listen_port: u16, +} + +async fn serve_req( + _req: Request, + state: Arc, +) -> Result, hyper::Error> { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = state.exporter.registry().gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + state.tx_counter.add(1); + + let response = Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap(); + + Ok(response) +} + +impl TelemetryService { + pub async fn run(self) -> Result> { + let exporter = opentelemetry_prometheus::exporter().init(); + + let meter = global::meter("hermes/relayer"); + let state = Arc::new(RelayerState { + exporter, + tx_counter: meter + .u64_counter("hermes.tx_count") + .with_description("Total number of transactions processed via the relayer.") + .init() + .bind(HANDLER_ALL.as_ref()), + }); + + // For every connection, we must make a `Service` to handle all + // incoming HTTP requests on said connection. + let make_svc = make_service_fn(move |_conn| { + let state = state.clone(); + // This is the `Service` that will handle the connection. + // `service_fn` is a helper to convert a function that + // returns a Response into a `Service`. + async move { Ok::<_, Infallible>(service_fn(move |req| serve_req(req, state.clone()))) } + }); + + let addr = ([127, 0, 0, 1], self.listen_port).into(); + + let server = Server::bind(&addr).serve(make_svc); + + println!("Telemetry service listening on http://{}", addr); + + server.await?; + + Ok(self) + } +}