Skip to content

Commit

Permalink
db_store will now record metrics on db connection pool size and num_i…
Browse files Browse the repository at this point in the history
…dle (#412)

* db_store will now record metrics on db connection pool size and num_idle

* Remove unnecessary Box and change app_name to &str

* Fix missed paramter change
  • Loading branch information
bbalser authored Mar 15, 2023
1 parent 25c9f45 commit a944241
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions db_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde = {workspace = true}
http = {workspace = true}
http-serde = {workspace = true}
tokio = {workspace = true}
tracing = {workspace = true}
triggered = {workspace = true}
futures = {workspace = true}

Expand Down
1 change: 1 addition & 0 deletions db_store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
mod error;
mod iam_auth_pool;
mod metric_tracker;
mod settings;

pub use error::{Error, Result};
Expand Down
47 changes: 47 additions & 0 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::time::Duration;

use crate::{Error, Result};

const DURATION: Duration = Duration::from_secs(300);

pub async fn start(
app_name: &str,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) -> Result<futures::future::BoxFuture<'static, Result>> {
let pool_size_name = format!("{app_name}_db_pool_size");
let pool_idle_name = format!("{app_name}_db_pool_idle");
let join_handle =
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await });

Ok(Box::pin(async move {
match join_handle.await {
Ok(()) => Ok(()),
Err(err) => Err(Error::from(err)),
}
}))
}

async fn run(
size_name: String,
idle_name: String,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) {
let mut trigger = tokio::time::interval(DURATION);

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => {
tracing::info!("db_store: MetricTracker shutting down");
break;
}
_ = trigger.tick() => {
metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
}
}
}
}
28 changes: 25 additions & 3 deletions db_store/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{iam_auth_pool, Error, Result};
use crate::{iam_auth_pool, metric_tracker, Error, Result};
use serde::Deserialize;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};

Expand Down Expand Up @@ -39,14 +39,36 @@ fn default_auth_type() -> AuthType {
impl Settings {
pub async fn connect(
&self,
app_name: &str,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
match self.auth_type {
AuthType::Postgres => match self.simple_connect().await {
Ok(pool) => Ok((pool, Box::pin(async move { Ok(()) }))),
Ok(pool) => Ok((
pool.clone(),
metric_tracker::start(app_name, pool, shutdown).await?,
)),
Err(err) => Err(err),
},
AuthType::Iam => iam_auth_pool::connect(self, shutdown).await,
AuthType::Iam => {
let (pool, iam_auth_handle) =
iam_auth_pool::connect(self, shutdown.clone()).await?;
let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?;

let handle =
tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) });

Ok((
pool,
Box::pin(async move {
match handle.await {
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Ok(_) => Ok(()),
}
}),
))
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ impl Daemon {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let listen_addr = settings.listen_addr()?;
Expand Down
5 changes: 4 additions & 1 deletion iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ pub async fn run_daemon(settings: &Settings) -> Result<()> {
});

// Set up the postgres pool:
let (pool, db_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana RpcClient:
Expand Down
5 changes: 4 additions & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl Server {
});

// Create database pool and run migrations
let (pool, db_join_handle) = settings.database.connect(shutdown.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let count_all_beacons = Report::count_all_beacons(&pool).await?;
Expand Down
5 changes: 4 additions & 1 deletion mobile_rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ impl Server {
});

// Create database pool and migrate
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Reward server
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ impl Cmd {

let mut follower = settings.follower.connect_follower();
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let (pool, _join_handle) = settings.database.connect(shutdown_listener).await?;
let (pool, _join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener)
.await?;

let heartbeats = Heartbeats::validated(&pool).await?;
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ impl Cmd {
shutdown_trigger.trigger()
});

let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
Expand Down
5 changes: 4 additions & 1 deletion poc_iot_injector/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ impl Cmd {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// poc_iot_injector server
Expand Down
6 changes: 5 additions & 1 deletion reward_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl Server {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME"));
let (pool, db_join_handle) = settings
.database
.connect(&app_name, shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let file_store = FileStore::from_settings(&settings.verifier).await?;
Expand Down

0 comments on commit a944241

Please sign in to comment.