Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor aggregator pruning with upkeep service #2103

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/mithril-persistence/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-persistence"
version = "0.2.31"
version = "0.2.32"
description = "Common types, interfaces, and utilities to persist data for Mithril nodes."
authors = { workspace = true }
edition = { workspace = true }
Expand Down
60 changes: 60 additions & 0 deletions internal/mithril-persistence/src/sqlite/connection_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub trait ConnectionExtensions {
fn fetch_collect<Q: Query, B: FromIterator<Q::Entity>>(&self, query: Q) -> StdResult<B> {
Ok(self.fetch(query)?.collect::<B>())
}

/// Apply a query that do not return data from the database(ie: insert, delete, ...).
fn apply<Q: Query>(&self, query: Q) -> StdResult<()> {
self.fetch(query)?.count();
Ok(())
}
}

impl ConnectionExtensions for SqliteConnection {
Expand Down Expand Up @@ -79,6 +85,8 @@ fn prepare_statement<'conn>(
mod tests {
use sqlite::Connection;

use crate::sqlite::{HydrationError, SqLiteEntity, WhereCondition};

use super::*;

#[test]
Expand Down Expand Up @@ -115,4 +123,56 @@ mod tests {

assert_eq!(value, 45);
}

#[test]
fn test_apply_execute_the_query() {
sfauvel marked this conversation as resolved.
Show resolved Hide resolved
struct DummySqLiteEntity {}
impl SqLiteEntity for DummySqLiteEntity {
fn hydrate(_row: sqlite::Row) -> Result<Self, HydrationError>
where
Self: Sized,
{
unimplemented!()
}

fn get_projection() -> crate::sqlite::Projection {
unimplemented!()
}
}

struct FakeQuery {
sql: String,
}
impl Query for FakeQuery {
type Entity = DummySqLiteEntity;

fn filters(&self) -> WhereCondition {
WhereCondition::default()
}

fn get_definition(&self, _condition: &str) -> String {
self.sql.clone()
}
}

let connection = Connection::open_thread_safe(":memory:").unwrap();
connection
.execute("create table query_test(text_data);")
.unwrap();

let value: i64 = connection
.query_single_cell("select count(*) from query_test", &[])
.unwrap();
assert_eq!(value, 0);

let query = FakeQuery {
sql: "insert into query_test(text_data) values ('row 1')".to_string(),
};
connection.apply(query).unwrap();

let value: i64 = connection
.query_single_cell("select count(*) from query_test", &[])
.unwrap();
assert_eq!(value, 1);
}
}
2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.5.107"
version = "0.5.108"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
62 changes: 45 additions & 17 deletions mithril-aggregator/src/database/repository/epoch_settings_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::database::query::{
DeleteEpochSettingsQuery, GetEpochSettingsQuery, UpdateEpochSettingsQuery,
};
use crate::entities::AggregatorEpochSettings;
use crate::services::EpochPruningTask;
use crate::EpochSettingsStorer;

/// Service to deal with epoch settings (read & write).
Expand Down Expand Up @@ -68,17 +69,6 @@ impl EpochSettingsStorer for EpochSettingsStore {
.map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))?
.unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}"));

// Prune useless old epoch settings.
if let Some(threshold) = self.retention_limit {
let _ = self
.connection
.fetch(DeleteEpochSettingsQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?
.count();
}

Ok(Some(epoch_settings_record.into()))
}

Expand All @@ -102,6 +92,25 @@ impl EpochSettingsStorer for EpochSettingsStore {
}
}

#[async_trait]
impl EpochPruningTask for EpochSettingsStore {
fn pruned_data(&self) -> &'static str {
"Epoch settings"
}

/// Prune useless old epoch settings.
async fn prune(&self, epoch: Epoch) -> StdResult<()> {
if let Some(threshold) = self.retention_limit {
self.connection
.apply(DeleteEpochSettingsQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use mithril_common::entities::BlockNumber;
Expand Down Expand Up @@ -172,7 +181,7 @@ mod tests {
}

#[tokio::test]
async fn save_epoch_settings_prune_older_epoch_settings() {
async fn prune_epoch_settings_older_than_threshold() {
const EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD: u64 = 5;

let connection = main_db_connection().unwrap();
Expand All @@ -183,12 +192,10 @@ mod tests {
);

store
.save_epoch_settings(
Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD,
AggregatorEpochSettings::dummy(),
)
.prune(Epoch(2) + EPOCH_SETTINGS_PRUNE_EPOCH_THRESHOLD)
.await
.expect("saving epoch settings should not fails");
.unwrap();

let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();

Expand All @@ -202,6 +209,27 @@ mod tests {
);
}

#[tokio::test]
async fn without_threshold_nothing_is_pruned() {
let connection = main_db_connection().unwrap();
insert_epoch_settings(&connection, &[1, 2]).unwrap();
let store = EpochSettingsStore::new(Arc::new(connection), None);

store.prune(Epoch(100)).await.unwrap();

let epoch1_params = store.get_epoch_settings(Epoch(1)).await.unwrap();
let epoch2_params = store.get_epoch_settings(Epoch(2)).await.unwrap();

assert!(
epoch1_params.is_some(),
"Epoch settings at epoch 1 should have been pruned",
);
assert!(
epoch2_params.is_some(),
"Epoch settings at epoch 2 should still exist",
);
}

#[tokio::test]
async fn save_epoch_settings_stores_in_database() {
let connection = main_db_connection().unwrap();
Expand Down
63 changes: 44 additions & 19 deletions mithril-aggregator/src/database/repository/stake_pool_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::database::query::{
DeleteStakePoolQuery, GetStakePoolQuery, InsertOrReplaceStakePoolQuery,
};
use crate::database::record::StakePool;
use crate::services::EpochPruningTask;

/// Service to deal with stake pools (read & write).
pub struct StakePoolStore {
Expand Down Expand Up @@ -53,17 +54,6 @@ impl StakeStorer for StakePoolStore {
.with_context(|| format!("persist stakes failure, epoch: {epoch}"))
.map_err(AdapterError::GeneralError)?;

// Prune useless old stake distributions.
if let Some(threshold) = self.retention_limit {
let _ = self
.connection
.fetch(DeleteStakePoolQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?
.count();
}

Ok(Some(StakeDistribution::from_iter(
pools.into_iter().map(|p| (p.stake_pool_id, p.stake)),
)))
Expand Down Expand Up @@ -91,9 +81,25 @@ impl StakeStorer for StakePoolStore {
#[async_trait]
impl StakeDistributionRetriever for StakePoolStore {
async fn retrieve(&self, epoch: Epoch) -> StdResult<Option<StakeDistribution>> {
let stake_distribution = self.get_stakes(epoch).await?;
self.get_stakes(epoch).await
}
}

Ok(stake_distribution)
#[async_trait]
impl EpochPruningTask for StakePoolStore {
fn pruned_data(&self) -> &'static str {
"Stake pool"
}

async fn prune(&self, epoch: Epoch) -> StdResult<()> {
if let Some(threshold) = self.retention_limit {
self.connection
.apply(DeleteStakePoolQuery::below_epoch_threshold(
epoch - threshold,
))
.map_err(AdapterError::QueryError)?;
}
Ok(())
}
}

Expand All @@ -104,20 +110,18 @@ mod tests {
use super::*;

#[tokio::test]
async fn save_protocol_parameters_prune_older_epoch_settings() {
async fn prune_epoch_settings_older_than_threshold() {
let connection = main_db_connection().unwrap();
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 10;
insert_stake_pool(&connection, &[1, 2]).unwrap();
let store =
StakePoolStore::new(Arc::new(connection), Some(STAKE_POOL_PRUNE_EPOCH_THRESHOLD));

store
.save_stakes(
Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD,
StakeDistribution::from_iter([("pool1".to_string(), 100)]),
)
.prune(Epoch(2) + STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
.await
.expect("saving stakes should not fails");
.unwrap();

let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();

Expand All @@ -131,6 +135,27 @@ mod tests {
);
}

#[tokio::test]
async fn without_threshold_nothing_is_pruned() {
let connection = main_db_connection().unwrap();
insert_stake_pool(&connection, &[1, 2]).unwrap();
let store = StakePoolStore::new(Arc::new(connection), None);

store.prune(Epoch(100)).await.unwrap();

let epoch1_stakes = store.get_stakes(Epoch(1)).await.unwrap();
let epoch2_stakes = store.get_stakes(Epoch(2)).await.unwrap();

assert!(
epoch1_stakes.is_some(),
"Stakes at epoch 1 should have been pruned",
);
assert!(
epoch2_stakes.is_some(),
"Stakes at epoch 2 should still exist",
);
}

#[tokio::test]
async fn retrieve_with_no_stakes_returns_none() {
let connection = main_db_connection().unwrap();
Expand Down
Loading