Skip to content

Commit

Permalink
Create a task to prune EpochSettings and add it to the UpkeepService.
Browse files Browse the repository at this point in the history
  • Loading branch information
sfauvel committed Nov 8, 2024
1 parent 52f97ac commit e4ee45b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 31 deletions.
62 changes: 45 additions & 17 deletions mithril-aggregator/src/database/repository/epoch_settings_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::database::query::{
DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
};
use crate::entities::AggregatorEpochSettings;
use crate::services::EpochPruningTask;
use crate::EpochSettingsStorer;

/// Service to deal with epoch settings (read & write).
Expand Down Expand Up @@ -68,17 +69,6 @@ impl EpochSettingsStorer for EpochSettingsStore {
.map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))?
.unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));

// Prune useless old epoch settings.
if let Some(threshold) = self.retention_limit {
let _ = self
.connection
.fetch(DeleteEpochSettingsQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?
.count();
}

Ok(Some(epoch_settings_record.into()))
}

Expand All @@ -102,6 +92,25 @@ impl EpochSettingsStorer for EpochSettingsStore {
}
}

#[async_trait]
impl EpochPruningTask for EpochSettingsStore {
fn pruned_data(&self) -> &'static str {
"Epoch settings"
}

/// Prune useless old epoch settings.
async fn prune(&self, epoch: Epoch) -> StdResult<()> {
if let Some(threshold) = self.retention_limit {
self.connection
.apply(DeleteEpochSettingsQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use mithril_common::entities::BlockNumber;
Expand Down Expand Up @@ -172,7 +181,7 @@ mod tests {
}

#[tokio::test]
async fn save_epoch_settings_prune_older_epoch_settings() {
async fn prune_epoch_settings_older_than_threshold() {
const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;

let connection = main_db_connection().unwrap();
Expand All @@ -183,12 +192,10 @@ mod tests {
);

store
.save_epoch_settings(
Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD,
AggregatorEpochSettings::dummy(),
)
.prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
.await
.expect("saving epoch settings should not fails");
.unwrap();

let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();

Expand All @@ -202,6 +209,27 @@ mod tests {
);
}

#[tokio::test]
async fn without_threshold_nothing_is_pruned() {
let connection = main_db_connection().unwrap();
insert_epoch_settings(&connection, &[1, 2]).unwrap();
let store = EpochSettingsStore::new(Arc::new(connection), None);

store.prune(Epoch(100)).await.unwrap();

let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();

assert!(
epoch1_params.is_some(),
"Epoch settings at epoch 1 should have been pruned",
);
assert!(
epoch2_params.is_some(),
"Epoch settings at epoch 2 should still exist",
);
}

#[tokio::test]
async fn save_epoch_settings_stores_in_database() {
let connection = main_db_connection().unwrap();
Expand Down
25 changes: 13 additions & 12 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ pub struct DependenciesBuilder {
/// Verification key store.
pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,

/// Epoch settings storer.
pub epoch_settings_storer: Option<Arc<dyn EpochSettingsStorer>>,
/// Epoch settings store.
pub epoch_settings_store: Option<Arc<EpochSettingsStore>>,

/// Cardano CLI Runner for the [ChainObserver]
pub cardano_cli_runner: Option<Box<CardanoCliRunner>>,
Expand Down Expand Up @@ -261,7 +261,7 @@ impl DependenciesBuilder {
certificate_repository: None,
open_message_repository: None,
verification_key_store: None,
epoch_settings_storer: None,
epoch_settings_store: None,
cardano_cli_runner: None,
chain_observer: None,
chain_block_reader: None,
Expand Down Expand Up @@ -587,7 +587,7 @@ impl DependenciesBuilder {
Ok(self.verification_key_store.as_ref().cloned().unwrap())
}

async fn build_epoch_settings_storer(&mut self) -> Result<Arc<dyn EpochSettingsStorer>> {
async fn build_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
let logger = self.root_logger();
let epoch_settings_store = EpochSettingsStore::new(
self.get_sqlite_connection().await?,
Expand Down Expand Up @@ -638,12 +638,12 @@ impl DependenciesBuilder {
}

/// Get a configured [EpochSettingsStorer].
pub async fn get_epoch_settings_storer(&mut self) -> Result<Arc<dyn EpochSettingsStorer>> {
if self.epoch_settings_storer.is_none() {
self.epoch_settings_storer = Some(self.build_epoch_settings_storer().await?);
pub async fn get_epoch_settings_store(&mut self) -> Result<Arc<EpochSettingsStore>> {
if self.epoch_settings_store.is_none() {
self.epoch_settings_store = Some(self.build_epoch_settings_store().await?);
}

Ok(self.epoch_settings_storer.as_ref().cloned().unwrap())
Ok(self.epoch_settings_store.as_ref().cloned().unwrap())
}

async fn build_chain_observer(&mut self) -> Result<Arc<dyn ChainObserver>> {
Expand Down Expand Up @@ -1255,7 +1255,7 @@ impl DependenciesBuilder {

async fn build_epoch_service(&mut self) -> Result<EpochServiceWrapper> {
let verification_key_store = self.get_verification_key_store().await?;
let epoch_settings_storer = self.get_epoch_settings_storer().await?;
let epoch_settings_storer = self.get_epoch_settings_store().await?;
let epoch_settings = self.get_epoch_settings_configuration()?;
let network = self.configuration.get_network()?;
let allowed_discriminants = self.get_allowed_signed_entity_types_discriminants()?;
Expand Down Expand Up @@ -1330,14 +1330,15 @@ impl DependenciesBuilder {

async fn build_upkeep_service(&mut self) -> Result<Arc<dyn UpkeepService>> {
let stake_pool_pruning_task = self.get_stake_store().await?;
let epoch_settings_pruning_task = self.get_epoch_settings_store().await?;

let upkeep_service = Arc::new(AggregatorUpkeepService::new(
self.get_sqlite_connection().await?,
self.get_sqlite_connection_cardano_transaction_pool()
.await?,
self.get_event_store_sqlite_connection().await?,
self.get_signed_entity_lock().await?,
vec![stake_pool_pruning_task],
vec![stake_pool_pruning_task, epoch_settings_pruning_task],
self.root_logger(),
));

Expand Down Expand Up @@ -1428,7 +1429,7 @@ impl DependenciesBuilder {
certificate_repository: self.get_certificate_repository().await?,
open_message_repository: self.get_open_message_repository().await?,
verification_key_store: self.get_verification_key_store().await?,
epoch_settings_storer: self.get_epoch_settings_storer().await?,
epoch_settings_storer: self.get_epoch_settings_store().await?,
chain_observer: self.get_chain_observer().await?,
immutable_file_observer: self.get_immutable_file_observer().await?,
digester: self.get_immutable_digester().await?,
Expand Down Expand Up @@ -1555,7 +1556,7 @@ impl DependenciesBuilder {
certificate_repository: self.get_certificate_repository().await?,
certificate_verifier: self.get_certificate_verifier().await?,
genesis_verifier: self.get_genesis_verifier().await?,
epoch_settings_storer: self.get_epoch_settings_storer().await?,
epoch_settings_storer: self.get_epoch_settings_store().await?,
verification_key_store: self.get_verification_key_store().await?,
};

Expand Down
15 changes: 13 additions & 2 deletions mithril-aggregator/src/store/epoch_settings_storer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use tokio::sync::RwLock;

use mithril_common::entities::{Epoch, ProtocolParameters};

use crate::entities::AggregatorEpochSettings;
use crate::{entities::AggregatorEpochSettings, services::EpochPruningTask};

/// Store and get [aggregator epoch settings][AggregatorEpochSettings] for given epoch.
#[async_trait]
pub trait EpochSettingsStorer: Sync + Send {
pub trait EpochSettingsStorer: EpochPruningTask + Sync + Send {
/// Save the given `AggregatorEpochSettings` for the given [Epoch].
async fn save_epoch_settings(
&self,
Expand Down Expand Up @@ -83,6 +83,17 @@ impl EpochSettingsStorer for FakeEpochSettingsStorer {
}
}

#[async_trait]
impl EpochPruningTask for FakeEpochSettingsStorer {
fn pruned_data(&self) -> &'static str {
"Fake epoch settings"
}

async fn prune(&self, _epoch: Epoch) -> StdResult<()> {
Ok(())
}
}

#[cfg(test)]
mod tests {

Expand Down

0 comments on commit e4ee45b

Please sign in to comment.