Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db_store will now record metrics on db connection pool size and num_idle #412

Merged
merged 3 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions db_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde = {workspace = true}
http = {workspace = true}
http-serde = {workspace = true}
tokio = {workspace = true}
tracing = {workspace = true}
triggered = {workspace = true}
futures = {workspace = true}

Expand Down
1 change: 1 addition & 0 deletions db_store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
mod error;
mod iam_auth_pool;
mod metric_tracker;
mod settings;

pub use error::{Error, Result};
Expand Down
47 changes: 47 additions & 0 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::time::Duration;

use crate::{Error, Result};

const DURATION: Duration = Duration::from_secs(300);

pub async fn start(
app_name: &str,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) -> Result<futures::future::BoxFuture<'static, Result>> {
let pool_size_name = format!("{app_name}_db_pool_size");
let pool_idle_name = format!("{app_name}_db_pool_idle");
let join_handle =
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await });

Ok(Box::pin(async move {
match join_handle.await {
Ok(()) => Ok(()),
Err(err) => Err(Error::from(err)),
}
}))
}

async fn run(
size_name: String,
idle_name: String,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) {
let mut trigger = tokio::time::interval(DURATION);

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => {
tracing::info!("db_store: MetricTracker shutting down");
break;
}
_ = trigger.tick() => {
metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
}
}
}
}
28 changes: 25 additions & 3 deletions db_store/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{iam_auth_pool, Error, Result};
use crate::{iam_auth_pool, metric_tracker, Error, Result};
use serde::Deserialize;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};

Expand Down Expand Up @@ -39,14 +39,36 @@ fn default_auth_type() -> AuthType {
impl Settings {
pub async fn connect(
&self,
app_name: &str,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
match self.auth_type {
AuthType::Postgres => match self.simple_connect().await {
Ok(pool) => Ok((pool, Box::pin(async move { Ok(()) }))),
Ok(pool) => Ok((
pool.clone(),
metric_tracker::start(app_name, pool, shutdown).await?,
)),
Err(err) => Err(err),
},
AuthType::Iam => iam_auth_pool::connect(self, shutdown).await,
AuthType::Iam => {
let (pool, iam_auth_handle) =
iam_auth_pool::connect(self, shutdown.clone()).await?;
let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?;

let handle =
tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) });

Ok((
pool,
Box::pin(async move {
match handle.await {
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Ok(_) => Ok(()),
}
}),
))
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ impl Daemon {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let listen_addr = settings.listen_addr()?;
Expand Down
5 changes: 4 additions & 1 deletion iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ pub async fn run_daemon(settings: &Settings) -> Result<()> {
});

// Set up the postgres pool:
let (pool, db_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana RpcClient:
Expand Down
5 changes: 4 additions & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl Server {
});

// Create database pool and run migrations
let (pool, db_join_handle) = settings.database.connect(shutdown.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let count_all_beacons = Report::count_all_beacons(&pool).await?;
Expand Down
5 changes: 4 additions & 1 deletion mobile_rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ impl Server {
});

// Create database pool and migrate
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// Reward server
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ impl Cmd {

let mut follower = settings.follower.connect_follower();
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let (pool, _join_handle) = settings.database.connect(shutdown_listener).await?;
let (pool, _join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener)
.await?;

let heartbeats = Heartbeats::validated(&pool).await?;
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ impl Cmd {
shutdown_trigger.trigger()
});

let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
Expand Down
5 changes: 4 additions & 1 deletion poc_iot_injector/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ impl Cmd {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

// poc_iot_injector server
Expand Down
6 changes: 5 additions & 1 deletion reward_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl Server {
});

// Create database pool
let (pool, db_join_handle) = settings.database.connect(shutdown_listener.clone()).await?;
let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME"));
let (pool, db_join_handle) = settings
.database
.connect(&app_name, shutdown_listener.clone())
.await?;
sqlx::migrate!().run(&pool).await?;

let file_store = FileStore::from_settings(&settings.verifier).await?;
Expand Down