diff --git a/Cargo.lock b/Cargo.lock index 0435cd457..532110614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,6 +2085,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tracing", "triggered", ] diff --git a/db_store/Cargo.toml b/db_store/Cargo.toml index fa2cc15bc..8eca6781f 100644 --- a/db_store/Cargo.toml +++ b/db_store/Cargo.toml @@ -15,6 +15,7 @@ serde = {workspace = true} http = {workspace = true} http-serde = {workspace = true} tokio = {workspace = true} +tracing = {workspace = true} triggered = {workspace = true} futures = {workspace = true} diff --git a/db_store/src/lib.rs b/db_store/src/lib.rs index 4d441029d..fb8ec211e 100644 --- a/db_store/src/lib.rs +++ b/db_store/src/lib.rs @@ -1,6 +1,7 @@ use std::str::FromStr; mod error; mod iam_auth_pool; +mod metric_tracker; mod settings; pub use error::{Error, Result}; diff --git a/db_store/src/metric_tracker.rs b/db_store/src/metric_tracker.rs new file mode 100644 index 000000000..f0175de3e --- /dev/null +++ b/db_store/src/metric_tracker.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use crate::{Error, Result}; + +const DURATION: Duration = Duration::from_secs(300); + +pub async fn start( + app_name: &str, + pool: sqlx::Pool, + shutdown: triggered::Listener, +) -> Result> { + let pool_size_name = format!("{app_name}_db_pool_size"); + let pool_idle_name = format!("{app_name}_db_pool_idle"); + let join_handle = + tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await }); + + Ok(Box::pin(async move { + match join_handle.await { + Ok(()) => Ok(()), + Err(err) => Err(Error::from(err)), + } + })) +} + +async fn run( + size_name: String, + idle_name: String, + pool: sqlx::Pool, + shutdown: triggered::Listener, +) { + let mut trigger = tokio::time::interval(DURATION); + + loop { + let shutdown = shutdown.clone(); + + tokio::select! { + _ = shutdown => { + tracing::info!("db_store: MetricTracker shutting down"); + break; + } + _ = trigger.tick() => { + metrics::gauge!(size_name.clone(), pool.size() as f64); + metrics::gauge!(idle_name.clone(), pool.num_idle() as f64); + } + } + } +} diff --git a/db_store/src/settings.rs b/db_store/src/settings.rs index 5e9599f74..602ae17b5 100644 --- a/db_store/src/settings.rs +++ b/db_store/src/settings.rs @@ -1,4 +1,4 @@ -use crate::{iam_auth_pool, Error, Result}; +use crate::{iam_auth_pool, metric_tracker, Error, Result}; use serde::Deserialize; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; @@ -39,14 +39,36 @@ fn default_auth_type() -> AuthType { impl Settings { pub async fn connect( &self, + app_name: &str, shutdown: triggered::Listener, ) -> Result<(Pool, futures::future::BoxFuture<'static, Result>)> { match self.auth_type { AuthType::Postgres => match self.simple_connect().await { - Ok(pool) => Ok((pool, Box::pin(async move { Ok(()) }))), + Ok(pool) => Ok(( + pool.clone(), + metric_tracker::start(app_name, pool, shutdown).await?, + )), Err(err) => Err(err), }, - AuthType::Iam => iam_auth_pool::connect(self, shutdown).await, + AuthType::Iam => { + let (pool, iam_auth_handle) = + iam_auth_pool::connect(self, shutdown.clone()).await?; + let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?; + + let handle = + tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) }); + + Ok(( + pool, + Box::pin(async move { + match handle.await { + Ok(Err(err)) => Err(err), + Err(err) => Err(Error::from(err)), + Ok(_) => Ok(()), + } + }), + )) + } } } diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index a0b2c92c4..dc8d95628 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -69,7 +69,10 @@ impl Daemon { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let listen_addr = settings.listen_addr()?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 3094304da..037b1b07c 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -80,7 +80,10 @@ pub async fn run_daemon(settings: &Settings) -> Result<()> { }); // Set up the postgres pool: - let (pool, db_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // Set up the solana RpcClient: diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index f2ea7e50d..2e246d6f0 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -65,7 +65,10 @@ impl Server { }); // Create database pool and run migrations - let (pool, db_join_handle) = settings.database.connect(shutdown.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let count_all_beacons = Report::count_all_beacons(&pool).await?; diff --git a/mobile_rewards/src/main.rs b/mobile_rewards/src/main.rs index a604248a3..833ef58f7 100644 --- a/mobile_rewards/src/main.rs +++ b/mobile_rewards/src/main.rs @@ -62,7 +62,10 @@ impl Server { }); // Create database pool and migrate - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // Reward server diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index dc896a8a1..769b73ffa 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -33,7 +33,10 @@ impl Cmd { let mut follower = settings.follower.connect_follower(); let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let (pool, _join_handle) = settings.database.connect(shutdown_listener).await?; + let (pool, _join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener) + .await?; let heartbeats = Heartbeats::validated(&pool).await?; let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 5cabdd1ed..023c09f4a 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -20,7 +20,10 @@ impl Cmd { shutdown_trigger.trigger() }); - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); diff --git a/poc_iot_injector/src/cli/server.rs b/poc_iot_injector/src/cli/server.rs index 6279a4ecf..aadf32cc0 100644 --- a/poc_iot_injector/src/cli/server.rs +++ b/poc_iot_injector/src/cli/server.rs @@ -20,7 +20,10 @@ impl Cmd { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // poc_iot_injector server diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 9605e8f9a..3568e13b6 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -66,7 +66,11 @@ impl Server { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME")); + let (pool, db_join_handle) = settings + .database + .connect(&app_name, shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let file_store = FileStore::from_settings(&settings.verifier).await?;