diff --git a/mithril-aggregator/src/command_args.rs b/mithril-aggregator/src/command_args.rs index 520a9eff653..3cdf6d36629 100644 --- a/mithril-aggregator/src/command_args.rs +++ b/mithril-aggregator/src/command_args.rs @@ -34,7 +34,7 @@ use mithril_common::{ }; use crate::{ - database::provider::StakePoolRepository, + database::provider::StakePoolStore, event_store::{self, TransmitterService}, http_server::routes::router, tools::{EraTools, GenesisTools, GenesisToolsDependency}, @@ -90,9 +90,9 @@ fn setup_genesis_dependencies( )?), config.store_retention_limit, )); - let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new( - Connection::open(sqlite_db_path.clone().unwrap())?, - )))); + let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open( + sqlite_db_path.clone().unwrap(), + )?)))); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(SQLiteAdapter::new("single_signature", sqlite_db_path)?), config.store_retention_limit, @@ -359,9 +359,9 @@ impl ServeCommand { )?), config.store_retention_limit, )); - let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new( - Connection::open(sqlite_db_path.clone().unwrap())?, - )))); + let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open( + sqlite_db_path.clone().unwrap(), + )?)))); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(SQLiteAdapter::new( "single_signature", diff --git a/mithril-aggregator/src/database/provider/stake_pool.rs b/mithril-aggregator/src/database/provider/stake_pool.rs index 14c098dec01..546272694b2 100644 --- a/mithril-aggregator/src/database/provider/stake_pool.rs +++ b/mithril-aggregator/src/database/provider/stake_pool.rs @@ -18,6 +18,9 @@ use mithril_common::{ use mithril_common::StdError; +/// Delete stake pools for Epoch older than this. +const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 3; + /// Stake pool as read from Chain. /// TODO remove this compile directive ↓ #[allow(dead_code)] @@ -186,12 +189,54 @@ impl<'conn> Provider<'conn> for UpdateStakePoolProvider<'conn> { } } +/// Provider to remove old data from the stake_pool table +pub struct DeleteStakePoolProvider<'conn> { + connection: &'conn Connection, +} + +impl<'conn> Provider<'conn> for DeleteStakePoolProvider<'conn> { + type Entity = StakePool; + + fn get_connection(&'conn self) -> &'conn Connection { + self.connection + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")])); + + format!("delete from stake_pool where {condition} returning {projection}") + } +} + +impl<'conn> DeleteStakePoolProvider<'conn> { + /// Create a new instance + pub fn new(connection: &'conn Connection) -> Self { + Self { connection } + } + + /// Create the SQL condition to prune data older than the given Epoch. + fn get_prune_condition(&self, epoch_threshold: Epoch) -> WhereCondition { + let epoch_value = Value::Integer(i64::try_from(epoch_threshold.0).unwrap()); + + WhereCondition::new("epoch < ?*", vec![epoch_value]) + } + + /// Prune the stake pools data older than the given epoch. + pub fn prune(&self, epoch_threshold: Epoch) -> Result, StdError> { + let filters = self.get_prune_condition(epoch_threshold); + + self.find(filters) + } +} /// Service to deal with stake pools (read & write). -pub struct StakePoolRepository { +pub struct StakePoolStore { connection: Arc>, } -impl StakePoolRepository { +impl StakePoolStore { /// Create a new StakePool service pub fn new(connection: Arc>) -> Self { Self { connection } @@ -199,7 +244,7 @@ impl StakePoolRepository { } #[async_trait] -impl StakeStorer for StakePoolRepository { +impl StakeStorer for StakePoolStore { async fn save_stakes( &self, epoch: Epoch, @@ -221,6 +266,9 @@ impl StakeStorer for StakePoolRepository { .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; new_stakes.insert(pool_id.to_string(), stake_pool.stake); } + let _ = DeleteStakePoolProvider::new(connection) + .prune(Epoch(epoch.0 - STAKE_POOL_PRUNE_EPOCH_THRESHOLD)) + .map_err(AdapterError::InitializationError)?; connection .execute("commit transaction") .map_err(|e| AdapterError::QueryError(e.into()))?; @@ -293,4 +341,15 @@ mod tests { params ); } + + #[test] + fn prune() { + let connection = Connection::open(":memory:").unwrap(); + let provider = DeleteStakePoolProvider::new(&connection); + let condition = provider.get_prune_condition(Epoch(5)); + let (condition, params) = condition.expand(); + + assert_eq!("epoch < ?1".to_string(), condition); + assert_eq!(vec![Value::Integer(5)], params); + } } diff --git a/mithril-common/src/sqlite/source_alias.rs b/mithril-common/src/sqlite/source_alias.rs index c3df58feb6c..a0bde2a6a35 100644 --- a/mithril-common/src/sqlite/source_alias.rs +++ b/mithril-common/src/sqlite/source_alias.rs @@ -31,12 +31,11 @@ mod tests { #[test] fn simple_source_alias() { let source_alias = SourceAlias::new(&[("first", "one"), ("second", "two")]); - let target = source_alias - .get_iterator() - .map(|(name, alias)| format!("{name} => {alias}")) - .collect::>() - .join(", "); + let mut fields = "first.one, second.two".to_string(); - assert_eq!("first => one, second => two".to_string(), target); + for (alias, source) in source_alias.get_iterator() { + fields = fields.replace(alias, source); + } + assert_eq!("one.one, two.two".to_string(), fields); } }