Skip to content

Commit

Permalink
Merge pull request #739 from helium/andymck/boost-man-purger
Browse files Browse the repository at this point in the history
purge successfully processed activations from db after a period
  • Loading branch information
andymck authored Feb 23, 2024
2 parents 45e2110 + b7f7869 commit 8547769
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 2 deletions.
17 changes: 16 additions & 1 deletion boost_manager/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::OnChainStatus;
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use file_store::hex_boost::BoostedHexActivation;
use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction};

Expand Down Expand Up @@ -220,3 +220,18 @@ pub async fn update_verified_txns_not_onchain(
.await
.map(|_| ())?)
}

pub async fn purge_stale_records(
db: &Pool<Postgres>,
retention_period: Duration,
) -> anyhow::Result<u64> {
let stale_period = Utc::now() - retention_period;
Ok(
sqlx::query(" DELETE FROM activated_hexes WHERE status = $1 AND updated_at < $2 ")
.bind(OnChainStatus::Success)
.bind(stale_period)
.execute(db)
.await
.map(|result| result.rows_affected())?,
)
}
1 change: 1 addition & 0 deletions boost_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};

pub mod activator;
pub mod db;
pub mod purger;
pub mod settings;
pub mod telemetry;
pub use settings::Settings;
Expand Down
6 changes: 5 additions & 1 deletion boost_manager/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{bail, Result};
use boost_manager::{
activator::Activator, settings::Settings, telemetry, updater::Updater, watcher::Watcher,
activator::Activator, purger::Purger, settings::Settings, telemetry, updater::Updater,
watcher::Watcher,
};
use chrono::Duration;
use clap::Parser;
Expand Down Expand Up @@ -128,13 +129,16 @@ impl Server {
solana,
)?;

let purger = Purger::new(pool.clone(), settings.retention_period());

TaskManager::builder()
.add_task(file_upload_server)
.add_task(manifest_server)
.add_task(updated_hexes_sink_server)
.add_task(activator)
.add_task(watcher)
.add_task(updater)
.add_task(purger)
.start()
.await
}
Expand Down
57 changes: 57 additions & 0 deletions boost_manager/src/purger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::db;
use chrono::Duration as ChronoDuration;
use futures::{future::LocalBoxFuture, TryFutureExt};
use sqlx::{Pool, Postgres};
use std::time::Duration;
use task_manager::ManagedTask;

const PURGE_INTERVAL: Duration = Duration::from_secs(30);

pub struct Purger {
pool: Pool<Postgres>,
retention_period: ChronoDuration,
}

impl ManagedTask for Purger {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl Purger {
pub fn new(pool: Pool<Postgres>, retention_period: ChronoDuration) -> Self {
Self {
pool,
retention_period,
}
}

async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting Purger");
loop {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = tokio::time::sleep(PURGE_INTERVAL) => {
purge(&self.pool, self.retention_period).await?;
}
}
}
tracing::info!("stopping Purger");
Ok(())
}
}

pub async fn purge(pool: &Pool<Postgres>, retention_period: ChronoDuration) -> anyhow::Result<()> {
let num_records_purged = db::purge_stale_records(pool, retention_period).await?;
tracing::info!("purged {} stale records", num_records_purged);
Ok(())
}
11 changes: 11 additions & 0 deletions boost_manager/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ pub struct Settings {
// the number of records to fit per solana txn
#[serde(default = "default_txn_batch_size")]
pub txn_batch_size: u32,
// default retention period in seconds
#[serde(default = "default_retention_period")]
pub retention_period: i64,
}

fn default_retention_period() -> i64 {
86400 * 7 // 7 days
}

fn default_txn_batch_size() -> u32 {
Expand Down Expand Up @@ -84,6 +91,10 @@ impl Settings {
Duration::from_secs(self.activation_check_interval as u64)
}

pub fn retention_period(&self) -> ChronoDuration {
ChronoDuration::seconds(self.retention_period)
}

pub fn txn_batch_size(&self) -> usize {
self.txn_batch_size as usize
}
Expand Down
120 changes: 120 additions & 0 deletions boost_manager/tests/purger_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
mod common;
use boost_manager::purger;
use boost_manager::OnChainStatus;
use chrono::{DateTime, Duration, Utc};
use sqlx::{PgPool, Postgres, Transaction};

const BOOST_HEX_PUBKEY: &str = "J9JiLTpjaShxL8eMvUs8txVw6TZ36E38SiJ89NxnMbLU";
const BOOST_CONFIG_PUBKEY: &str = "BZM1QTud72B2cpTW7PhEnFmRX7ZWzvY7DpPpNJJuDrWG";

#[sqlx::test]
async fn test_purge(pool: PgPool) -> anyhow::Result<()> {
// seed test data to db
seed_data(&pool).await?;

// assert the db contains the expected number of records pre purge
let count: i64 = sqlx::query_scalar("select count(*) from activated_hexes")
.fetch_one(&pool)
.await?;
assert_eq!(7, count);

// do da purge
purger::purge(&pool, Duration::days(7)).await?;

// assert the db contains the expected number of records post purge
let count: i64 = sqlx::query_scalar("select count(*) from activated_hexes")
.fetch_one(&pool)
.await?;
assert_eq!(4, count);

Ok(())
}

async fn seed_data(db: &PgPool) -> anyhow::Result<()> {
let now = Utc::now();
let mut txn = db.begin().await?;

insert_data(
&mut txn,
0x8c2681a306601ff_u64,
OnChainStatus::Queued,
now - Duration::days(1),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306602ff_u64,
OnChainStatus::Queued,
now - Duration::days(8),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306603ff_u64,
OnChainStatus::Success,
now - Duration::days(2),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306604ff_u64,
OnChainStatus::Success,
now - Duration::days(6),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306605ff_u64,
OnChainStatus::Success,
now - Duration::days(8),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306606ff_u64,
OnChainStatus::Success,
now - Duration::days(9),
)
.await?;
insert_data(
&mut txn,
0x8c2681a306607ff_u64,
OnChainStatus::Success,
now - Duration::days(10),
)
.await?;

txn.commit().await?;

Ok(())
}

pub async fn insert_data(
txn: &mut Transaction<'_, Postgres>,
location: u64,
status: OnChainStatus,
last_updated_at: DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
insert into activated_hexes (
location,
activation_ts,
boosted_hex_pubkey,
boost_config_pubkey,
status,
updated_at
) values ($1, $2, $3, $4, $5, $6)
on conflict do nothing
"#,
)
.bind(location as i64)
.bind(last_updated_at)
.bind(BOOST_HEX_PUBKEY)
.bind(BOOST_CONFIG_PUBKEY)
.bind(status)
.bind(last_updated_at)
.execute(txn)
.await?;
Ok(())
}

0 comments on commit 8547769

Please sign in to comment.