Skip to content

Commit

Permalink
add stake pool autoprune
Browse files Browse the repository at this point in the history
  • Loading branch information
ghubertpalo committed Mar 14, 2023
1 parent 635e271 commit 61b2717
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 16 deletions.
14 changes: 7 additions & 7 deletions mithril-aggregator/src/command_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use mithril_common::{
};

use crate::{
database::provider::StakePoolRepository,
database::provider::StakePoolStore,
event_store::{self, TransmitterService},
http_server::routes::router,
tools::{EraTools, GenesisTools, GenesisToolsDependency},
Expand Down Expand Up @@ -90,9 +90,9 @@ fn setup_genesis_dependencies(
)?),
config.store_retention_limit,
));
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
Connection::open(sqlite_db_path.clone().unwrap())?,
))));
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
sqlite_db_path.clone().unwrap(),
)?))));
let single_signature_store = Arc::new(SingleSignatureStore::new(
Box::new(SQLiteAdapter::new("single_signature", sqlite_db_path)?),
config.store_retention_limit,
Expand Down Expand Up @@ -359,9 +359,9 @@ impl ServeCommand {
)?),
config.store_retention_limit,
));
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
Connection::open(sqlite_db_path.clone().unwrap())?,
))));
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
sqlite_db_path.clone().unwrap(),
)?))));
let single_signature_store = Arc::new(SingleSignatureStore::new(
Box::new(SQLiteAdapter::new(
"single_signature",
Expand Down
65 changes: 62 additions & 3 deletions mithril-aggregator/src/database/provider/stake_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use mithril_common::{

use mithril_common::StdError;

/// Delete stake pools for Epoch older than this.
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 3;

/// Stake pool as read from Chain.
/// TODO remove this compile directive ↓
#[allow(dead_code)]
Expand Down Expand Up @@ -186,20 +189,62 @@ impl<'conn> Provider<'conn> for UpdateStakePoolProvider<'conn> {
}
}

/// Provider to remove old data from the stake_pool table
pub struct DeleteStakePoolProvider<'conn> {
connection: &'conn Connection,
}

impl<'conn> Provider<'conn> for DeleteStakePoolProvider<'conn> {
type Entity = StakePool;

fn get_connection(&'conn self) -> &'conn Connection {
self.connection
}

fn get_definition(&self, condition: &str) -> String {
// it is important to alias the fields with the same name as the table
// since the table cannot be aliased in a RETURNING statement in SQLite.
let projection = Self::Entity::get_projection()
.expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")]));

format!("delete from stake_pool where {condition} returning {projection}")
}
}

impl<'conn> DeleteStakePoolProvider<'conn> {
/// Create a new instance
pub fn new(connection: &'conn Connection) -> Self {
Self { connection }
}

/// Create the SQL condition to prune data older than the given Epoch.
fn get_prune_condition(&self, epoch_threshold: Epoch) -> WhereCondition {
let epoch_value = Value::Integer(i64::try_from(epoch_threshold.0).unwrap());

WhereCondition::new("epoch < ?*", vec![epoch_value])
}

/// Prune the stake pools data older than the given epoch.
pub fn prune(&self, epoch_threshold: Epoch) -> Result<EntityCursor<StakePool>, StdError> {
let filters = self.get_prune_condition(epoch_threshold);

self.find(filters)
}
}
/// Service to deal with stake pools (read & write).
pub struct StakePoolRepository {
pub struct StakePoolStore {
connection: Arc<Mutex<Connection>>,
}

impl StakePoolRepository {
impl StakePoolStore {
/// Create a new StakePool service
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
}
}

#[async_trait]
impl StakeStorer for StakePoolRepository {
impl StakeStorer for StakePoolStore {
async fn save_stakes(
&self,
epoch: Epoch,
Expand All @@ -221,6 +266,9 @@ impl StakeStorer for StakePoolRepository {
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
new_stakes.insert(pool_id.to_string(), stake_pool.stake);
}
let _ = DeleteStakePoolProvider::new(connection)
.prune(Epoch(epoch.0 - STAKE_POOL_PRUNE_EPOCH_THRESHOLD))
.map_err(AdapterError::InitializationError)?;
connection
.execute("commit transaction")
.map_err(|e| AdapterError::QueryError(e.into()))?;
Expand Down Expand Up @@ -293,4 +341,15 @@ mod tests {
params
);
}

#[test]
fn prune() {
let connection = Connection::open(":memory:").unwrap();
let provider = DeleteStakePoolProvider::new(&connection);
let condition = provider.get_prune_condition(Epoch(5));
let (condition, params) = condition.expand();

assert_eq!("epoch < ?1".to_string(), condition);
assert_eq!(vec![Value::Integer(5)], params);
}
}
11 changes: 5 additions & 6 deletions mithril-common/src/sqlite/source_alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ mod tests {
#[test]
fn simple_source_alias() {
let source_alias = SourceAlias::new(&[("first", "one"), ("second", "two")]);
let target = source_alias
.get_iterator()
.map(|(name, alias)| format!("{name} => {alias}"))
.collect::<Vec<String>>()
.join(", ");
let mut fields = "first.one, second.two".to_string();

assert_eq!("first => one, second => two".to_string(), target);
for (alias, source) in source_alias.get_iterator() {
fields = fields.replace(alias, source);
}
assert_eq!("one.one, two.two".to_string(), fields);
}
}

0 comments on commit 61b2717

Please sign in to comment.