From d88e81d5bbc6189b3e3c3a6d8f845c933f344d0c Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 15 Mar 2023 09:54:02 -0400 Subject: [PATCH 1/3] db_store will now record metrics on db connection pool size and num_idle --- Cargo.lock | 1 + db_store/Cargo.toml | 1 + db_store/src/lib.rs | 1 + db_store/src/metric_tracker.rs | 41 +++++++++++++++++++++++ db_store/src/settings.rs | 29 ++++++++++++++-- iot_config/src/main.rs | 5 ++- iot_packet_verifier/src/daemon.rs | 5 ++- iot_verifier/src/main.rs | 5 ++- mobile_rewards/src/main.rs | 5 ++- mobile_verifier/src/cli/reward_from_db.rs | 5 ++- mobile_verifier/src/cli/server.rs | 5 ++- poc_iot_injector/src/cli/server.rs | 5 ++- reward_index/src/main.rs | 6 +++- 13 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 db_store/src/metric_tracker.rs diff --git a/Cargo.lock b/Cargo.lock index 0435cd457..532110614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,6 +2085,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tracing", "triggered", ] diff --git a/db_store/Cargo.toml b/db_store/Cargo.toml index fa2cc15bc..8eca6781f 100644 --- a/db_store/Cargo.toml +++ b/db_store/Cargo.toml @@ -15,6 +15,7 @@ serde = {workspace = true} http = {workspace = true} http-serde = {workspace = true} tokio = {workspace = true} +tracing = {workspace = true} triggered = {workspace = true} futures = {workspace = true} diff --git a/db_store/src/lib.rs b/db_store/src/lib.rs index 4d441029d..fb8ec211e 100644 --- a/db_store/src/lib.rs +++ b/db_store/src/lib.rs @@ -1,6 +1,7 @@ use std::str::FromStr; mod error; mod iam_auth_pool; +mod metric_tracker; mod settings; pub use error::{Error, Result}; diff --git a/db_store/src/metric_tracker.rs b/db_store/src/metric_tracker.rs new file mode 100644 index 000000000..2eab1de43 --- /dev/null +++ b/db_store/src/metric_tracker.rs @@ -0,0 +1,41 @@ +use std::time::Duration; + +use crate::{Error, Result}; + +const DURATION: Duration = Duration::from_secs(300); + +pub async fn start( + app_name: String, + pool: sqlx::Pool, + shutdown: triggered::Listener, +) -> Result> { + let join_handle = tokio::spawn(async move { run(app_name, pool, shutdown).await }); + + Ok(Box::pin(async move { + match join_handle.await { + Ok(()) => Ok(()), + Err(err) => Err(Error::from(err)), + } + })) +} + +async fn run(app_name: String, pool: sqlx::Pool, shutdown: triggered::Listener) { + let mut trigger = tokio::time::interval(DURATION); + let pool_size_name = format!("{app_name}_db_pool_size"); + let pool_idle_name = format!("{app_name}_db_pool_idle"); + + loop { + let shutdown = shutdown.clone(); + + tokio::select! { + _ = shutdown => { + tracing::info!("db_store: MetricTracker shutting down"); + break; + } + _ = trigger.tick() => { + metrics::gauge!(pool_size_name.clone(), pool.size() as f64); + metrics::gauge!(pool_idle_name.clone(), pool.num_idle() as f64); + } + } + } +} diff --git a/db_store/src/settings.rs b/db_store/src/settings.rs index 5e9599f74..4f82ba4e7 100644 --- a/db_store/src/settings.rs +++ b/db_store/src/settings.rs @@ -1,4 +1,4 @@ -use crate::{iam_auth_pool, Error, Result}; +use crate::{iam_auth_pool, metric_tracker, Error, Result}; use serde::Deserialize; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; @@ -39,14 +39,37 @@ fn default_auth_type() -> AuthType { impl Settings { pub async fn connect( &self, + app_name: impl Into, shutdown: triggered::Listener, ) -> Result<(Pool, futures::future::BoxFuture<'static, Result>)> { match self.auth_type { AuthType::Postgres => match self.simple_connect().await { - Ok(pool) => Ok((pool, Box::pin(async move { Ok(()) }))), + Ok(pool) => Ok(( + pool.clone(), + Box::pin(metric_tracker::start(app_name.into(), pool, shutdown).await?), + )), Err(err) => Err(err), }, - AuthType::Iam => iam_auth_pool::connect(self, shutdown).await, + AuthType::Iam => { + let (pool, iam_auth_handle) = + iam_auth_pool::connect(self, shutdown.clone()).await?; + let metric_handle = + metric_tracker::start(app_name.into(), pool.clone(), shutdown).await?; + + let handle = + tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) }); + + Ok(( + pool, + Box::pin(async move { + match handle.await { + Ok(Err(err)) => Err(err), + Err(err) => Err(Error::from(err)), + Ok(_) => Ok(()), + } + }), + )) + } } } diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index a0b2c92c4..dc8d95628 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -69,7 +69,10 @@ impl Daemon { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let listen_addr = settings.listen_addr()?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 3094304da..037b1b07c 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -80,7 +80,10 @@ pub async fn run_daemon(settings: &Settings) -> Result<()> { }); // Set up the postgres pool: - let (pool, db_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // Set up the solana RpcClient: diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index f2ea7e50d..2e246d6f0 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -65,7 +65,10 @@ impl Server { }); // Create database pool and run migrations - let (pool, db_join_handle) = settings.database.connect(shutdown.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let count_all_beacons = Report::count_all_beacons(&pool).await?; diff --git a/mobile_rewards/src/main.rs b/mobile_rewards/src/main.rs index a604248a3..833ef58f7 100644 --- a/mobile_rewards/src/main.rs +++ b/mobile_rewards/src/main.rs @@ -62,7 +62,10 @@ impl Server { }); // Create database pool and migrate - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // Reward server diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index dc896a8a1..769b73ffa 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -33,7 +33,10 @@ impl Cmd { let mut follower = settings.follower.connect_follower(); let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let (pool, _join_handle) = settings.database.connect(shutdown_listener).await?; + let (pool, _join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener) + .await?; let heartbeats = Heartbeats::validated(&pool).await?; let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 5cabdd1ed..023c09f4a 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -20,7 +20,10 @@ impl Cmd { shutdown_trigger.trigger() }); - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); diff --git a/poc_iot_injector/src/cli/server.rs b/poc_iot_injector/src/cli/server.rs index 6279a4ecf..aadf32cc0 100644 --- a/poc_iot_injector/src/cli/server.rs +++ b/poc_iot_injector/src/cli/server.rs @@ -20,7 +20,10 @@ impl Cmd { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let (pool, db_join_handle) = settings + .database + .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; // poc_iot_injector server diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 9605e8f9a..e18c8ef35 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -66,7 +66,11 @@ impl Server { }); // Create database pool - let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?; + let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME")); + let (pool, db_join_handle) = settings + .database + .connect(app_name, shutdown_listener.clone()) + .await?; sqlx::migrate!().run(&pool).await?; let file_store = FileStore::from_settings(&settings.verifier).await?; From 574c46aa308fe1c8f508be2785d88a5e477fab31 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 15 Mar 2023 10:08:46 -0400 Subject: [PATCH 2/3] Remove unnecessary Box and change app_name to &str --- db_store/src/metric_tracker.rs | 20 +++++++++++++------- db_store/src/settings.rs | 7 +++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/db_store/src/metric_tracker.rs b/db_store/src/metric_tracker.rs index 2eab1de43..f0175de3e 100644 --- a/db_store/src/metric_tracker.rs +++ b/db_store/src/metric_tracker.rs @@ -5,11 +5,14 @@ use crate::{Error, Result}; const DURATION: Duration = Duration::from_secs(300); pub async fn start( - app_name: String, + app_name: &str, pool: sqlx::Pool, shutdown: triggered::Listener, ) -> Result> { - let join_handle = tokio::spawn(async move { run(app_name, pool, shutdown).await }); + let pool_size_name = format!("{app_name}_db_pool_size"); + let pool_idle_name = format!("{app_name}_db_pool_idle"); + let join_handle = + tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await }); Ok(Box::pin(async move { match join_handle.await { @@ -19,10 +22,13 @@ pub async fn start( })) } -async fn run(app_name: String, pool: sqlx::Pool, shutdown: triggered::Listener) { +async fn run( + size_name: String, + idle_name: String, + pool: sqlx::Pool, + shutdown: triggered::Listener, +) { let mut trigger = tokio::time::interval(DURATION); - let pool_size_name = format!("{app_name}_db_pool_size"); - let pool_idle_name = format!("{app_name}_db_pool_idle"); loop { let shutdown = shutdown.clone(); @@ -33,8 +39,8 @@ async fn run(app_name: String, pool: sqlx::Pool, shutdown: trigg break; } _ = trigger.tick() => { - metrics::gauge!(pool_size_name.clone(), pool.size() as f64); - metrics::gauge!(pool_idle_name.clone(), pool.num_idle() as f64); + metrics::gauge!(size_name.clone(), pool.size() as f64); + metrics::gauge!(idle_name.clone(), pool.num_idle() as f64); } } } diff --git a/db_store/src/settings.rs b/db_store/src/settings.rs index 4f82ba4e7..602ae17b5 100644 --- a/db_store/src/settings.rs +++ b/db_store/src/settings.rs @@ -39,22 +39,21 @@ fn default_auth_type() -> AuthType { impl Settings { pub async fn connect( &self, - app_name: impl Into, + app_name: &str, shutdown: triggered::Listener, ) -> Result<(Pool, futures::future::BoxFuture<'static, Result>)> { match self.auth_type { AuthType::Postgres => match self.simple_connect().await { Ok(pool) => Ok(( pool.clone(), - Box::pin(metric_tracker::start(app_name.into(), pool, shutdown).await?), + metric_tracker::start(app_name, pool, shutdown).await?, )), Err(err) => Err(err), }, AuthType::Iam => { let (pool, iam_auth_handle) = iam_auth_pool::connect(self, shutdown.clone()).await?; - let metric_handle = - metric_tracker::start(app_name.into(), pool.clone(), shutdown).await?; + let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?; let handle = tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) }); From 28465d8d7a46d34c4db0273345deca14ce2a999b Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 15 Mar 2023 10:11:45 -0400 Subject: [PATCH 3/3] Fix missed paramter change --- reward_index/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index e18c8ef35..3568e13b6 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -69,7 +69,7 @@ impl Server { let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME")); let (pool, db_join_handle) = settings .database - .connect(app_name, shutdown_listener.clone()) + .connect(&app_name, shutdown_listener.clone()) .await?; sqlx::migrate!().run(&pool).await?;