From e4ee45be0689c051b3014f02ab10763b80eca446 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Fri, 8 Nov 2024 16:52:16 +0100 Subject: [PATCH] Create a task to prune EpochSettings and add it to the `UpkeepService`. --- .../repository/epoch_settings_store.rs | 62 ++++++++++++++----- .../src/dependency_injection/builder.rs | 25 ++++---- .../src/store/epoch_settings_storer.rs | 15 ++++- 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/mithril-aggregator/src/database/repository/epoch_settings_store.rs b/mithril-aggregator/src/database/repository/epoch_settings_store.rs index 80491a7e6c7..5101927c088 100644 --- a/mithril-aggregator/src/database/repository/epoch_settings_store.rs +++ b/mithril-aggregator/src/database/repository/epoch_settings_store.rs @@ -12,6 +12,7 @@ use crate::database::query::{ DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery, }; use crate::entities::AggregatorEpochSettings; +use crate::services::EpochPruningTask; use crate::EpochSettingsStorer; /// Service to deal with epoch settings (read & write). @@ -68,17 +69,6 @@ impl EpochSettingsStorer for EpochSettingsStore { .map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))? .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}")); - // Prune useless old epoch settings. - if let Some(threshold) = self.retention_limit { - let _ = self - .connection - .fetch(DeleteEpochSettingsQuery::below_epoch_threshold( - epoch - threshold, - )) - .map_err(AdapterError::QueryError)? - .count(); - } - Ok(Some(epoch_settings_record.into())) } @@ -102,6 +92,25 @@ impl EpochSettingsStorer for EpochSettingsStore { } } +#[async_trait] +impl EpochPruningTask for EpochSettingsStore { + fn pruned_data(&self) -> &'static str { + "Epoch settings" + } + + /// Prune useless old epoch settings. + async fn prune(&self, epoch: Epoch) -> StdResult<()> { + if let Some(threshold) = self.retention_limit { + self.connection + .apply(DeleteEpochSettingsQuery::below_epoch_threshold( + epoch - threshold, + )) + .map_err(AdapterError::QueryError)?; + } + Ok(()) + } +} + #[cfg(test)] mod tests { use mithril_common::entities::BlockNumber; @@ -172,7 +181,7 @@ mod tests { } #[tokio::test] - async fn save_epoch_settings_prune_older_epoch_settings() { + async fn prune_epoch_settings_older_than_threshold() { const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5; let connection = main_db_connection().unwrap(); @@ -183,12 +192,10 @@ mod tests { ); store - .save_epoch_settings( - Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD, - AggregatorEpochSettings::dummy(), - ) + .prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD) .await - .expect("saving epoch settings should not fails"); + .unwrap(); + let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap(); let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap(); @@ -202,6 +209,27 @@ mod tests { ); } + #[tokio::test] + async fn without_threshold_nothing_is_pruned() { + let connection = main_db_connection().unwrap(); + insert_epoch_settings(&connection, &[1, 2]).unwrap(); + let store = EpochSettingsStore::new(Arc::new(connection), None); + + store.prune(Epoch(100)).await.unwrap(); + + let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap(); + let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap(); + + assert!( + epoch1_params.is_some(), + "Epoch settings at epoch 1 should have been pruned", + ); + assert!( + epoch2_params.is_some(), + "Epoch settings at epoch 2 should still exist", + ); + } + #[tokio::test] async fn save_epoch_settings_stores_in_database() { let connection = main_db_connection().unwrap(); diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 8570cb526a8..2e1d80e4f5d 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -133,8 +133,8 @@ pub struct DependenciesBuilder { /// Verification key store. pub verification_key_store: Option>, - /// Epoch settings storer. - pub epoch_settings_storer: Option>, + /// Epoch settings store. + pub epoch_settings_store: Option>, /// Cardano CLI Runner for the [ChainObserver] pub cardano_cli_runner: Option>, @@ -261,7 +261,7 @@ impl DependenciesBuilder { certificate_repository: None, open_message_repository: None, verification_key_store: None, - epoch_settings_storer: None, + epoch_settings_store: None, cardano_cli_runner: None, chain_observer: None, chain_block_reader: None, @@ -587,7 +587,7 @@ impl DependenciesBuilder { Ok(self.verification_key_store.as_ref().cloned().unwrap()) } - async fn build_epoch_settings_storer(&mut self) -> Result> { + async fn build_epoch_settings_store(&mut self) -> Result> { let logger = self.root_logger(); let epoch_settings_store = EpochSettingsStore::new( self.get_sqlite_connection().await?, @@ -638,12 +638,12 @@ impl DependenciesBuilder { } /// Get a configured [EpochSettingsStorer]. - pub async fn get_epoch_settings_storer(&mut self) -> Result> { - if self.epoch_settings_storer.is_none() { - self.epoch_settings_storer = Some(self.build_epoch_settings_storer().await?); + pub async fn get_epoch_settings_store(&mut self) -> Result> { + if self.epoch_settings_store.is_none() { + self.epoch_settings_store = Some(self.build_epoch_settings_store().await?); } - Ok(self.epoch_settings_storer.as_ref().cloned().unwrap()) + Ok(self.epoch_settings_store.as_ref().cloned().unwrap()) } async fn build_chain_observer(&mut self) -> Result> { @@ -1255,7 +1255,7 @@ impl DependenciesBuilder { async fn build_epoch_service(&mut self) -> Result { let verification_key_store = self.get_verification_key_store().await?; - let epoch_settings_storer = self.get_epoch_settings_storer().await?; + let epoch_settings_storer = self.get_epoch_settings_store().await?; let epoch_settings = self.get_epoch_settings_configuration()?; let network = self.configuration.get_network()?; let allowed_discriminants = self.get_allowed_signed_entity_types_discriminants()?; @@ -1330,6 +1330,7 @@ impl DependenciesBuilder { async fn build_upkeep_service(&mut self) -> Result> { let stake_pool_pruning_task = self.get_stake_store().await?; + let epoch_settings_pruning_task = self.get_epoch_settings_store().await?; let upkeep_service = Arc::new(AggregatorUpkeepService::new( self.get_sqlite_connection().await?, @@ -1337,7 +1338,7 @@ impl DependenciesBuilder { .await?, self.get_event_store_sqlite_connection().await?, self.get_signed_entity_lock().await?, - vec![stake_pool_pruning_task], + vec![stake_pool_pruning_task, epoch_settings_pruning_task], self.root_logger(), )); @@ -1428,7 +1429,7 @@ impl DependenciesBuilder { certificate_repository: self.get_certificate_repository().await?, open_message_repository: self.get_open_message_repository().await?, verification_key_store: self.get_verification_key_store().await?, - epoch_settings_storer: self.get_epoch_settings_storer().await?, + epoch_settings_storer: self.get_epoch_settings_store().await?, chain_observer: self.get_chain_observer().await?, immutable_file_observer: self.get_immutable_file_observer().await?, digester: self.get_immutable_digester().await?, @@ -1555,7 +1556,7 @@ impl DependenciesBuilder { certificate_repository: self.get_certificate_repository().await?, certificate_verifier: self.get_certificate_verifier().await?, genesis_verifier: self.get_genesis_verifier().await?, - epoch_settings_storer: self.get_epoch_settings_storer().await?, + epoch_settings_storer: self.get_epoch_settings_store().await?, verification_key_store: self.get_verification_key_store().await?, }; diff --git a/mithril-aggregator/src/store/epoch_settings_storer.rs b/mithril-aggregator/src/store/epoch_settings_storer.rs index 504c055ddb6..edcab635c86 100644 --- a/mithril-aggregator/src/store/epoch_settings_storer.rs +++ b/mithril-aggregator/src/store/epoch_settings_storer.rs @@ -6,11 +6,11 @@ use tokio::sync::RwLock; use mithril_common::entities::{Epoch, ProtocolParameters}; -use crate::entities::AggregatorEpochSettings; +use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask}; /// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch. #[async_trait] -pub trait EpochSettingsStorer: Sync + Send { +pub trait EpochSettingsStorer: EpochPruningTask + Sync + Send { /// Save the given `AggregatorEpochSettings` for the given [Epoch]. async fn save_epoch_settings( &self, @@ -83,6 +83,17 @@ impl EpochSettingsStorer for FakeEpochSettingsStorer { } } +#[async_trait] +impl EpochPruningTask for FakeEpochSettingsStorer { + fn pruned_data(&self) -> &'static str { + "Fake epoch settings" + } + + async fn prune(&self, _epoch: Epoch) -> StdResult<()> { + Ok(()) + } +} + #[cfg(test)] mod tests {