Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(logger): don't block when deleting old logs #1690

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use prost_types::Timestamp;
use shuttle_proto::logger::{LogItem, LogLine};
use sqlx::{
migrate::Migrator,
postgres::PgConnectOptions,
types::chrono::{DateTime, Utc},
Executor, FromRow, PgPool, QueryBuilder,
};
Expand Down Expand Up @@ -60,14 +59,7 @@ impl Postgres {
pub async fn new(connection_uri: &Uri) -> Self {
let pool = PgPool::connect(connection_uri.to_string().as_str())
.await
.expect("to be able to connect to the postgres db using the connection url");
Self::from_pool(pool).await
}

pub async fn with_options(options: PgConnectOptions) -> Self {
let pool = PgPool::connect_with(options)
.await
.expect("to be able to connect to the postgres db using the pg connect options");
.expect("to connect to the db");
Self::from_pool(pool).await
}

Expand All @@ -77,13 +69,17 @@ impl Postgres {
.await
.expect("to run migrations successfully");

// Perform cleaning of old logs on startup
pool.execute("DELETE FROM logs WHERE tx_timestamp < (NOW() - INTERVAL '1 month')")
.await
.expect("to clean old logs successfully");
let pool_clone = pool.clone();
tokio::spawn(async move {
info!("cleaning old logs");
pool_clone
.execute("DELETE FROM logs WHERE tx_timestamp < (NOW() - INTERVAL '1 month')")
.await
.expect("to clean old logs successfully");
info!("done cleaning old logs");
});

let (tx, mut rx) = broadcast::channel::<(Vec<Log>, Span)>(1000);
let pool_spawn = pool.clone();

let interval_tx = tx.clone();
tokio::spawn(async move {
Expand All @@ -95,6 +91,7 @@ impl Postgres {
}
});

let pool_clone = pool.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Expand Down Expand Up @@ -124,7 +121,7 @@ impl Postgres {
});
let query = builder.build();

if let Err(error) = query.execute(&pool_spawn).instrument(parent_span).await
if let Err(error) = query.execute(&pool_clone).instrument(parent_span).await
{
error!(
error = &error as &dyn std::error::Error,
Expand Down