diff --git a/CHANGELOG.md b/CHANGELOG.md index 5646ece2a73..d963f159a6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ As a minor extension, we have adopted a slightly different versioning convention - Extended CI build and test steps for MacOS `arm64` runners and include pre-built binaries for MacOS `arm64` in the releases. -- Vacuum aggregator & signer SQLite databases at startup to reduce fragmentation and disk space usage. +- Add a regularly run upkeep task to the `mithril-aggregator` and `mithril-signer` to clean up stale data and optimize their databases. - **UNSTABLE** Cardano transactions certification: - Optimize the performances of the computation of the proof with a Merkle map. diff --git a/Cargo.lock b/Cargo.lock index 2b5eddc25db..c6acb427097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,7 +3547,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.32" +version = "0.5.33" dependencies = [ "anyhow", "async-trait", @@ -3703,7 +3703,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.24" +version = "0.4.25" dependencies = [ "anyhow", "async-trait", @@ -3801,7 +3801,7 @@ dependencies = [ [[package]] name = "mithril-persistence" -version = "0.2.13" +version = "0.2.14" dependencies = [ "anyhow", "async-trait", @@ -3848,7 +3848,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.155" +version = "0.2.156" dependencies = [ "anyhow", "async-trait", diff --git a/internal/mithril-persistence/Cargo.toml b/internal/mithril-persistence/Cargo.toml index 108b3029295..837a175bbae 100644 --- a/internal/mithril-persistence/Cargo.toml +++ b/internal/mithril-persistence/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-persistence" -version = "0.2.13" +version = "0.2.14" description = "Common types, interfaces, and utilities to persist data for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/internal/mithril-persistence/src/sqlite/cleaner.rs b/internal/mithril-persistence/src/sqlite/cleaner.rs new file mode 100644 index 00000000000..841023539d8 --- /dev/null +++ b/internal/mithril-persistence/src/sqlite/cleaner.rs @@ -0,0 +1,250 @@ +use slog::{debug, Logger}; + +use mithril_common::StdResult; + +use crate::sqlite::SqliteConnection; + +/// Tasks that can be performed by the SqliteCleaner +#[derive(Eq, PartialEq, Copy, Clone)] +pub enum SqliteCleaningTask { + /// Reconstruct the database file, repacking it into a minimal amount of disk space. + /// + /// see: + /// + /// ⚠ This operation can be very slow on large databases ⚠ + Vacuum, + /// Run a checkpoint to transfer the data from the WAL file to the main db file and truncate + /// it afterward. + /// + /// see: + WalCheckpointTruncate, +} + +impl SqliteCleaningTask { + /// Get the log message for the task. + pub fn log_message(self: SqliteCleaningTask) -> &'static str { + match self { + SqliteCleaningTask::Vacuum => "SqliteCleaner::Running `vacuum` on the database", + SqliteCleaningTask::WalCheckpointTruncate => { + "SqliteCleaner::Running `wal_checkpoint(TRUNCATE)` on the database" + } + } + } +} + +/// The SqliteCleaner is responsible for cleaning up databases by performing tasks defined +/// in [SqliteCleaningTask]. +pub struct SqliteCleaner<'a> { + connection: &'a SqliteConnection, + logger: Logger, + tasks: Vec, +} + +impl<'a> SqliteCleaner<'a> { + /// Create a new instance of the `SqliteCleaner`. + pub fn new(connection: &'a SqliteConnection) -> Self { + Self { + connection, + logger: Logger::root(slog::Discard, slog::o!()), + tasks: vec![], + } + } + + /// Set the logger to be used by the cleaner. + pub fn with_logger(mut self, logger: Logger) -> Self { + self.logger = logger; + self + } + + /// Set the [SqliteCleaningTask] to be performed by the cleaner. + pub fn with_tasks(mut self, tasks: &[SqliteCleaningTask]) -> Self { + for option in tasks { + self.tasks.push(*option); + } + self + } + + /// Cleanup the database by performing the defined tasks. + pub fn run(self) -> StdResult<()> { + if self.tasks.contains(&SqliteCleaningTask::Vacuum) { + debug!(self.logger, "{}", SqliteCleaningTask::Vacuum.log_message()); + self.connection.execute("vacuum")?; + } + + // Important: If WAL is enabled Vacuuming the database will not shrink until a + // checkpoint is run, so it must be done after vacuuming. + // Note: running a checkpoint when the WAL is disabled is harmless. + if self + .tasks + .contains(&SqliteCleaningTask::WalCheckpointTruncate) + { + debug!( + self.logger, + "{}", + SqliteCleaningTask::WalCheckpointTruncate.log_message() + ); + self.connection + .execute("PRAGMA wal_checkpoint(TRUNCATE);")?; + } else { + self.connection.execute("PRAGMA wal_checkpoint(PASSIVE);")?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + use std::path::Path; + + use mithril_common::test_utils::TempDir; + + use crate::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection}; + + use super::*; + + fn add_test_table(connection: &SqliteConnection) { + connection + .execute("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, text TEXT);") + .unwrap(); + } + + fn fill_test_table(connection: &SqliteConnection, ids: Range) { + connection + .execute(format!( + "INSERT INTO test (id, text) VALUES {}", + ids.map(|i| format!("({}, 'some text to fill the db')", i)) + .collect::>() + .join(", ") + )) + .unwrap(); + } + + fn delete_test_rows(connection: &SqliteConnection, ids: Range) { + connection + .execute(format!( + "DELETE FROM test WHERE id >= {} and id < {}", + ids.start, ids.end + )) + .unwrap(); + } + + /// Apply migrations, disable auto_vacuum and mangle the database to create some free pages + /// for the vacuum to reclaim + fn prepare_db_for_vacuum(connection: &SqliteConnection) { + // Disable Auto vacuum to allow the test to check if the vacuum was run + connection + .execute("pragma auto_vacuum = none; vacuum;") + .unwrap(); + add_test_table(connection); + fill_test_table(connection, 0..10_000); + // Checkpoint before deletion so entries are transferred from the WAL file to the main db + connection + .execute("PRAGMA wal_checkpoint(PASSIVE)") + .unwrap(); + delete_test_rows(connection, 0..5_000); + // Checkpoint after deletion to create free pages in the main db + connection + .execute("PRAGMA wal_checkpoint(PASSIVE)") + .unwrap(); + } + + fn file_size(path: &Path) -> u64 { + path.metadata() + .unwrap_or_else(|_| panic!("Failed to read len of '{}'", path.display())) + .len() + } + + #[test] + fn cleanup_empty_in_memory_db_should_not_crash() { + let connection = ConnectionBuilder::open_memory().build().unwrap(); + + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::Vacuum]) + .run() + .expect("Vacuum should not fail"); + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate]) + .run() + .expect("WalCheckpointTruncate should not fail"); + } + + #[test] + fn cleanup_empty_file_without_wal_db_should_not_crash() { + let db_path = TempDir::create( + "sqlite_cleaner", + "cleanup_empty_file_without_wal_db_should_not_crash", + ) + .join("test.db"); + let connection = ConnectionBuilder::open_file(&db_path).build().unwrap(); + + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::Vacuum]) + .run() + .expect("Vacuum should not fail"); + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate]) + .run() + .expect("WalCheckpointTruncate should not fail"); + } + + #[test] + fn test_vacuum() { + let db_dir = TempDir::create("sqlite_cleaner", "test_vacuum"); + let (db_path, db_wal_path) = (db_dir.join("test.db"), db_dir.join("test.db-wal")); + let connection = ConnectionBuilder::open_file(&db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]) + .build() + .unwrap(); + prepare_db_for_vacuum(&connection); + + let db_initial_size = file_size(&db_path); + assert!(db_initial_size > 0); + + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::Vacuum]) + .run() + .unwrap(); + + let db_after_vacuum_size = file_size(&db_path); + + assert!( + db_initial_size > db_after_vacuum_size, + "db size should have decreased (vacuum enabled)" + ); + assert!( + file_size(&db_wal_path) > 0, + "db wal file should not have been truncated (truncate disabled)" + ); + } + + #[test] + fn test_truncate_wal() { + let db_dir = TempDir::create("sqlite_cleaner", "test_truncate_wal"); + let (db_path, db_wal_path) = (db_dir.join("test.db"), db_dir.join("test.db-wal")); + let connection = ConnectionBuilder::open_file(&db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]) + .build() + .unwrap(); + + // Make "neutral" changes to the db, this will fill the WAL files with some data + // but won't change the db size after cleaning up. + add_test_table(&connection); + fill_test_table(&connection, 0..10_000); + delete_test_rows(&connection, 0..10_000); + + assert!(file_size(&db_wal_path) > 0); + + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate]) + .run() + .unwrap(); + + assert_eq!( + file_size(&db_wal_path), + 0, + "db wal file should have been truncated" + ); + } +} diff --git a/internal/mithril-persistence/src/sqlite/connection_builder.rs b/internal/mithril-persistence/src/sqlite/connection_builder.rs index 6dcf10f32aa..63039700513 100644 --- a/internal/mithril-persistence/src/sqlite/connection_builder.rs +++ b/internal/mithril-persistence/src/sqlite/connection_builder.rs @@ -2,13 +2,12 @@ use std::ops::Not; use std::path::{Path, PathBuf}; use anyhow::Context; -use slog::{info, Logger}; +use slog::Logger; use sqlite::{Connection, ConnectionThreadSafe}; use mithril_common::StdResult; use crate::database::{ApplicationNodeType, DatabaseVersionChecker, SqlMigration}; -use crate::sqlite::vacuum_database; /// Builder of SQLite connection pub struct ConnectionBuilder { @@ -32,11 +31,6 @@ pub enum ConnectionOptions { /// /// This option take priority over [ConnectionOptions::EnableForeignKeys] if both are enabled. ForceDisableForeignKeys, - - /// Run a VACUUM operation on the database after the connection is opened - /// - /// ⚠ This operation can be very slow on large databases ⚠ - Vacuum, } impl ConnectionBuilder { @@ -92,23 +86,6 @@ impl ConnectionBuilder { ) })?; - if self.options.contains(&ConnectionOptions::Vacuum) { - info!( - self.logger, - "Vacuuming SQLite database, this may take a while..."; - "database" => self.connection_path.display() - ); - - vacuum_database(&connection) - .with_context(|| "SQLite initialization: database VACUUM error")?; - - info!( - self.logger, - "SQLite database vacuumed successfully"; - "database" => self.connection_path.display() - ); - } - if self .options .contains(&ConnectionOptions::EnableWriteAheadLog) diff --git a/internal/mithril-persistence/src/sqlite/mod.rs b/internal/mithril-persistence/src/sqlite/mod.rs index de28b8d92b7..8845753328a 100644 --- a/internal/mithril-persistence/src/sqlite/mod.rs +++ b/internal/mithril-persistence/src/sqlite/mod.rs @@ -2,6 +2,7 @@ //! This module provides a minimal yet useful Entity framework on top of SQLite //! with ability to perform any SQL query possible and hydrate results in Rust //! structs. +mod cleaner; mod condition; mod connection_builder; mod connection_extensions; @@ -13,6 +14,7 @@ mod query; mod source_alias; mod transaction; +pub use cleaner::{SqliteCleaner, SqliteCleaningTask}; pub use condition::{GetAllCondition, WhereCondition}; pub use connection_builder::{ConnectionBuilder, ConnectionOptions}; pub use connection_extensions::ConnectionExtensions; @@ -24,32 +26,24 @@ pub use query::Query; pub use source_alias::SourceAlias; pub use transaction::Transaction; -use mithril_common::StdResult; -use sqlite::ConnectionThreadSafe; - /// Type of the connection used in Mithril -pub type SqliteConnection = ConnectionThreadSafe; +pub type SqliteConnection = sqlite::ConnectionThreadSafe; -/// Do a [vacuum](https://www.sqlite.org/lang_vacuum.html) on the given connection, this will -/// reconstruct the database file, repacking it into a minimal amount of disk space. -pub fn vacuum_database(connection: &SqliteConnection) -> StdResult<()> { - connection.execute("vacuum")?; +/// Helpers to handle SQLite errors +pub mod error { + /// Sqlite error type used in Mithril + pub type SqliteError = sqlite::Error; - Ok(()) + /// SQLITE_BUSY error code + /// + /// see: + pub const SQLITE_BUSY: isize = 5; } #[cfg(test)] mod test { - use crate::sqlite::vacuum_database; use sqlite::Connection; - #[tokio::test] - async fn calling_vacuum_on_an_empty_in_memory_db_should_not_fail() { - let connection = Connection::open_thread_safe(":memory:").unwrap(); - - vacuum_database(&connection).expect("Vacuum should not fail"); - } - #[test] fn sqlite_version_should_be_3_42_or_more() { let connection = Connection::open_thread_safe(":memory:").unwrap(); diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 2a9cc3e0146..77b1ae7a8cb 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.32" +version = "0.5.33" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/commands/tools_command.rs b/mithril-aggregator/src/commands/tools_command.rs index ee387a54764..4ba49759a48 100644 --- a/mithril-aggregator/src/commands/tools_command.rs +++ b/mithril-aggregator/src/commands/tools_command.rs @@ -2,7 +2,7 @@ use anyhow::Context; use clap::{Parser, Subcommand}; use config::{builder::DefaultState, ConfigBuilder}; use mithril_common::StdResult; -use mithril_persistence::sqlite::vacuum_database; +use mithril_persistence::sqlite::{SqliteCleaner, SqliteCleaningTask}; use slog_scope::debug; use std::sync::Arc; @@ -74,7 +74,9 @@ impl RecomputeCertificatesHashCommand { .await .with_context(|| "recompute-certificates-hash: database migration error")?; - vacuum_database(&connection) + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::Vacuum]) + .run() .with_context(|| "recompute-certificates-hash: database vacuum error")?; Ok(()) diff --git a/mithril-aggregator/src/database/test_helper.rs b/mithril-aggregator/src/database/test_helper.rs index 6e3c6295f4b..ba23fb96633 100644 --- a/mithril-aggregator/src/database/test_helper.rs +++ b/mithril-aggregator/src/database/test_helper.rs @@ -1,5 +1,6 @@ use chrono::Utc; use sqlite::{ConnectionThreadSafe, Value}; +use std::path::Path; use uuid::Uuid; use mithril_common::entities::{ProtocolParameters, SignerWithStake}; @@ -19,8 +20,20 @@ use crate::database::record::{ }; /// In-memory sqlite database without foreign key support with migrations applied -pub fn main_db_connection() -> StdResult { - let connection = ConnectionBuilder::open_memory() +pub fn main_db_connection() -> StdResult { + let builder = ConnectionBuilder::open_memory(); + build_main_db_connection(builder) +} + +/// File sqlite database without foreign key support with migrations applied and WAL activated +pub fn main_db_file_connection(db_path: &Path) -> StdResult { + let builder = ConnectionBuilder::open_file(db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]); + build_main_db_connection(builder) +} + +fn build_main_db_connection(connection_builder: ConnectionBuilder) -> StdResult { + let connection = connection_builder .with_options(&[ConnectionOptions::ForceDisableForeignKeys]) .with_migrations(crate::database::migration::get_migrations()) .build()?; @@ -28,8 +41,22 @@ pub fn main_db_connection() -> StdResult { } /// In-memory sqlite database without foreign key support with cardano db migrations applied -pub fn cardano_tx_db_connection() -> StdResult { - let connection = ConnectionBuilder::open_memory() +pub fn cardano_tx_db_connection() -> StdResult { + let builder = ConnectionBuilder::open_memory(); + build_cardano_tx_db_connection(builder) +} + +/// File sqlite database without foreign key support with cardano db migrations applied and WAL activated +pub fn cardano_tx_db_file_connection(db_path: &Path) -> StdResult { + let builder = ConnectionBuilder::open_file(db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]); + build_cardano_tx_db_connection(builder) +} + +fn build_cardano_tx_db_connection( + connection_builder: ConnectionBuilder, +) -> StdResult { + let connection = connection_builder .with_options(&[ConnectionOptions::ForceDisableForeignKeys]) .with_migrations( mithril_persistence::database::cardano_transaction_migration::get_migrations(), diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index d3c8aaea66c..13a202d6171 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -59,10 +59,10 @@ use crate::{ event_store::{EventMessage, EventStore, TransmitterService}, http_server::routes::router, services::{ - CardanoTransactionsImporter, CertifierService, MessageService, MithrilCertifierService, - MithrilEpochService, MithrilMessageService, MithrilProverService, + AggregatorUpkeepService, CardanoTransactionsImporter, CertifierService, MessageService, + MithrilCertifierService, MithrilEpochService, MithrilMessageService, MithrilProverService, MithrilSignedEntityService, MithrilStakeDistributionService, ProverService, - SignedEntityService, StakeDistributionService, + SignedEntityService, StakeDistributionService, UpkeepService, }, tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter}, AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, @@ -221,6 +221,9 @@ pub struct DependenciesBuilder { /// Transactions Importer pub transactions_importer: Option>, + + /// Upkeep service + pub upkeep_service: Option>, } impl DependenciesBuilder { @@ -270,6 +273,7 @@ impl DependenciesBuilder { prover_service: None, signed_entity_type_lock: None, transactions_importer: None, + upkeep_service: None, } } @@ -286,39 +290,25 @@ impl DependenciesBuilder { &self, sqlite_file_name: &str, migrations: Vec, - do_vacuum_database: bool, ) -> Result { let logger = self.get_logger()?; let connection_builder = match self.configuration.environment { - ExecutionEnvironment::Production => ConnectionBuilder::open_file( - &self.configuration.get_sqlite_dir().join(sqlite_file_name), - ), - _ if self.configuration.data_stores_directory.to_string_lossy() == ":memory:" => { + ExecutionEnvironment::Test + if self.configuration.data_stores_directory.to_string_lossy() == ":memory:" => + { ConnectionBuilder::open_memory() } _ => ConnectionBuilder::open_file( - &self - .configuration - .data_stores_directory - .join(sqlite_file_name), + &self.configuration.get_sqlite_dir().join(sqlite_file_name), ), }; - let connection_options = { - let mut options = vec![ - ConnectionOptions::EnableForeignKeys, - ConnectionOptions::EnableWriteAheadLog, - ]; - if do_vacuum_database { - options.push(ConnectionOptions::Vacuum); - } - - options - }; - let connection = connection_builder .with_node_type(ApplicationNodeType::Aggregator) - .with_options(&connection_options) + .with_options(&[ + ConnectionOptions::EnableForeignKeys, + ConnectionOptions::EnableWriteAheadLog, + ]) .with_logger(logger.clone()) .with_migrations(migrations) .build() @@ -348,7 +338,6 @@ impl DependenciesBuilder { self.sqlite_connection = Some(Arc::new(self.build_sqlite_connection( SQLITE_FILE, crate::database::migration::get_migrations(), - true, )?)); } @@ -367,11 +356,10 @@ impl DependenciesBuilder { SQLITE_FILE_CARDANO_TRANSACTION, mithril_persistence::database::cardano_transaction_migration::get_migrations(), // Don't vacuum the Cardano transactions database as it can be very large - false, )?; let connection_pool = Arc::new(SqliteConnectionPool::build(connection_pool_size, || { - self.build_sqlite_connection(SQLITE_FILE_CARDANO_TRANSACTION, vec![], false) + self.build_sqlite_connection(SQLITE_FILE_CARDANO_TRANSACTION, vec![]) .with_context(|| { "Dependencies Builder can not build SQLite connection for Cardano transactions" }) @@ -1238,6 +1226,26 @@ impl DependenciesBuilder { Ok(self.transactions_importer.as_ref().cloned().unwrap()) } + async fn build_upkeep_service(&mut self) -> Result> { + let upkeep_service = Arc::new(AggregatorUpkeepService::new( + self.get_sqlite_connection().await?, + self.get_sqlite_connection_cardano_transaction_pool() + .await?, + self.get_signed_entity_lock().await?, + self.get_logger()?, + )); + + Ok(upkeep_service) + } + + async fn get_upkeep_service(&mut self) -> Result> { + if self.upkeep_service.is_none() { + self.upkeep_service = Some(self.build_upkeep_service().await?); + } + + Ok(self.upkeep_service.as_ref().cloned().unwrap()) + } + /// Return an unconfigured [DependencyContainer] pub async fn build_dependency_container(&mut self) -> Result { let dependency_manager = DependencyContainer { @@ -1281,6 +1289,7 @@ impl DependenciesBuilder { transaction_store: self.get_transaction_repository().await?, prover_service: self.get_prover_service().await?, signed_entity_type_lock: self.get_signed_entity_lock().await?, + upkeep_service: self.get_upkeep_service().await?, }; Ok(dependency_manager) diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index 3b00224019f..e89798084c6 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -28,7 +28,7 @@ use crate::{ multi_signer::MultiSigner, services::{ CertifierService, EpochService, MessageService, ProverService, SignedEntityService, - StakeDistributionService, TransactionStore, + StakeDistributionService, TransactionStore, UpkeepService, }, signer_registerer::SignerRecorder, snapshot_uploaders::SnapshotUploader, @@ -161,6 +161,9 @@ pub struct DependencyContainer { /// Signed Entity Type Lock pub signed_entity_type_lock: Arc, + + /// Upkeep service + pub upkeep_service: Arc, } #[doc(hidden)] diff --git a/mithril-aggregator/src/http_server/routes/artifact_routes/cardano_transaction.rs b/mithril-aggregator/src/http_server/routes/artifact_routes/cardano_transaction.rs index 9ff27177f22..4b79f766f43 100644 --- a/mithril-aggregator/src/http_server/routes/artifact_routes/cardano_transaction.rs +++ b/mithril-aggregator/src/http_server/routes/artifact_routes/cardano_transaction.rs @@ -55,7 +55,7 @@ pub mod handlers { Err(err) => { warn!("list_artifacts_cardano_transactions"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -78,7 +78,7 @@ pub mod handlers { } Err(err) => { warn!("get_cardano_transaction_details::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/artifact_routes/mithril_stake_distribution.rs b/mithril-aggregator/src/http_server/routes/artifact_routes/mithril_stake_distribution.rs index 88c174ca94f..ff7e28ba824 100644 --- a/mithril-aggregator/src/http_server/routes/artifact_routes/mithril_stake_distribution.rs +++ b/mithril-aggregator/src/http_server/routes/artifact_routes/mithril_stake_distribution.rs @@ -55,8 +55,7 @@ pub mod handlers { Ok(message) => Ok(reply::json(&message, StatusCode::OK)), Err(err) => { warn!("list_artifacts_mithril_stake_distribution"; "error" => ?err); - - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -79,7 +78,7 @@ pub mod handlers { } Err(err) => { warn!("get_mithril_stake_distribution_details::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/artifact_routes/snapshot.rs b/mithril-aggregator/src/http_server/routes/artifact_routes/snapshot.rs index 4aee3ed4bcd..7a1e40426b4 100644 --- a/mithril-aggregator/src/http_server/routes/artifact_routes/snapshot.rs +++ b/mithril-aggregator/src/http_server/routes/artifact_routes/snapshot.rs @@ -114,7 +114,7 @@ mod handlers { Ok(message) => Ok(reply::json(&message, StatusCode::OK)), Err(err) => { warn!("list_artifacts_snapshot"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -136,7 +136,7 @@ mod handlers { } Err(err) => { warn!("snapshot_details::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -212,7 +212,7 @@ mod handlers { } Err(err) => { warn!("snapshot_download::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/certificate_routes.rs b/mithril-aggregator/src/http_server/routes/certificate_routes.rs index f5b7e597579..b415631da2b 100644 --- a/mithril-aggregator/src/http_server/routes/certificate_routes.rs +++ b/mithril-aggregator/src/http_server/routes/certificate_routes.rs @@ -86,7 +86,7 @@ mod handlers { Ok(None) => Ok(reply::empty(StatusCode::NO_CONTENT)), Err(err) => { warn!("certificate_pending::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -104,7 +104,7 @@ mod handlers { Ok(certificates) => Ok(reply::json(&certificates, StatusCode::OK)), Err(err) => { warn!("certificate_certificates::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -127,7 +127,7 @@ mod handlers { Ok(None) => Ok(reply::empty(StatusCode::NOT_FOUND)), Err(err) => { warn!("certificate_certificate_hash::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/epoch_routes.rs b/mithril-aggregator/src/http_server/routes/epoch_routes.rs index a3176cf8271..940414dbd46 100644 --- a/mithril-aggregator/src/http_server/routes/epoch_routes.rs +++ b/mithril-aggregator/src/http_server/routes/epoch_routes.rs @@ -52,7 +52,7 @@ mod handlers { } (Err(err), _, _) | (_, Err(err), _) | (_, _, Err(err)) => { warn!("epoch_settings::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/mod.rs b/mithril-aggregator/src/http_server/routes/mod.rs index 782c1b7147b..d806afe4326 100644 --- a/mithril-aggregator/src/http_server/routes/mod.rs +++ b/mithril-aggregator/src/http_server/routes/mod.rs @@ -19,7 +19,7 @@ macro_rules! unwrap_to_internal_server_error { Ok(res) => res, Err(err) => { warn!($($warn_comment)*; "error" => ?err); - return Ok($crate::http_server::routes::reply::internal_server_error( + return Ok($crate::http_server::routes::reply::server_error( err, )); } diff --git a/mithril-aggregator/src/http_server/routes/reply.rs b/mithril-aggregator/src/http_server/routes/reply.rs index 12152fd4f78..7cc91d88551 100644 --- a/mithril-aggregator/src/http_server/routes/reply.rs +++ b/mithril-aggregator/src/http_server/routes/reply.rs @@ -1,7 +1,13 @@ -use mithril_common::entities::{ClientError, InternalServerError}; use serde::Serialize; use warp::http::StatusCode; +use mithril_common::entities::{ClientError, ServerError}; +use mithril_common::StdError; +use mithril_persistence::sqlite::error::{SqliteError, SQLITE_BUSY}; + +use crate::tools::downcast_check; +use crate::SignerRegistrationError; + pub fn json(value: &T, status_code: StatusCode) -> Box where T: Serialize, @@ -20,10 +26,83 @@ pub fn bad_request(label: String, message: String) -> Box { json(&ClientError::new(label, message), StatusCode::BAD_REQUEST) } -pub fn internal_server_error>(message: T) -> Box { +pub fn server_error>(error: E) -> Box { + let std_error: StdError = error.into(); + let status_code = { + let mut code = StatusCode::INTERNAL_SERVER_ERROR; + + if downcast_check::(&std_error, |e| { + e.code.is_some_and(|code| code == SQLITE_BUSY) + }) { + code = StatusCode::SERVICE_UNAVAILABLE; + } + + if downcast_check::(&std_error, |e| { + matches!(e, SignerRegistrationError::RegistrationRoundNotYetOpened) + }) { + code = StatusCode::SERVICE_UNAVAILABLE; + } + + code + }; + + json(&ServerError::new(format!("{std_error:?}")), status_code) +} + +pub fn internal_server_error>(message: T) -> Box { json(&message.into(), StatusCode::INTERNAL_SERVER_ERROR) } -pub fn service_unavailable>(message: T) -> Box { +pub fn service_unavailable>(message: T) -> Box { json(&message.into(), StatusCode::SERVICE_UNAVAILABLE) } + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use warp::Reply; + + use super::*; + + #[test] + fn test_server_error_convert_std_error_to_500_by_default() { + let error = anyhow!("Some error"); + let response = server_error(error).into_response(); + + assert_eq!(StatusCode::INTERNAL_SERVER_ERROR, response.status()); + } + + #[test] + fn test_server_error_convert_wrapped_sqlite_busy_error_to_503() { + let res = sqlite::Error { + code: Some(SQLITE_BUSY), + message: None, + }; + let response = server_error(res).into_response(); + + assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.status()); + + // Wrapping the error in a StdError should also work + let res = anyhow!(sqlite::Error { + code: Some(SQLITE_BUSY), + message: None, + }); + let response = server_error(res).into_response(); + + assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.status()); + } + + #[test] + fn test_server_error_convert_signer_registration_round_not_yet_opened_to_503() { + let err = SignerRegistrationError::RegistrationRoundNotYetOpened; + let response = server_error(err).into_response(); + + assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.status()); + + // Wrapping the error in a StdError should also work + let err = anyhow!(SignerRegistrationError::RegistrationRoundNotYetOpened); + let response = server_error(err).into_response(); + + assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.status()); + } +} diff --git a/mithril-aggregator/src/http_server/routes/signatures_routes.rs b/mithril-aggregator/src/http_server/routes/signatures_routes.rs index 1cc3797b468..03ae29bf485 100644 --- a/mithril-aggregator/src/http_server/routes/signatures_routes.rs +++ b/mithril-aggregator/src/http_server/routes/signatures_routes.rs @@ -89,7 +89,7 @@ mod handlers { } Some(_) | None => { warn!("register_signatures::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } }, Ok(()) => Ok(reply::empty(StatusCode::CREATED)), @@ -97,7 +97,7 @@ mod handlers { } Err(err) => { warn!("register_signatures::cant_retrieve_signed_entity_type"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/http_server/routes/signer_routes.rs b/mithril-aggregator/src/http_server/routes/signer_routes.rs index 9b9edf30293..6ff1c627165 100644 --- a/mithril-aggregator/src/http_server/routes/signer_routes.rs +++ b/mithril-aggregator/src/http_server/routes/signer_routes.rs @@ -169,7 +169,7 @@ mod handlers { } Err(err) => { warn!("register_signer::error"; "error" => ?err); - Ok(reply::internal_server_error(err.to_string())) + Ok(reply::server_error(err)) } } } @@ -208,7 +208,7 @@ mod handlers { } Err(err) => { warn!("registered_signers::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } @@ -237,7 +237,7 @@ mod handlers { } Err(err) => { warn!("registered_signers::error"; "error" => ?err); - Ok(reply::internal_server_error(err)) + Ok(reply::server_error(err)) } } } diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index 05457a70636..587cd9796ad 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -74,13 +74,30 @@ static GLOBAL: Jemalloc = Jemalloc; #[cfg(test)] pub(crate) mod test_tools { - use slog::Drain; + use std::fs::File; + use std::io; use std::sync::Arc; - pub fn logger_for_tests() -> slog::Logger { - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - slog::Logger::root(Arc::new(drain), slog::o!()) + use slog::{Drain, Logger}; + use slog_async::Async; + use slog_term::{CompactFormat, PlainDecorator}; + + pub struct TestLogger; + + impl TestLogger { + fn from_writer(writer: W) -> Logger { + let decorator = PlainDecorator::new(writer); + let drain = CompactFormat::new(decorator).build().fuse(); + let drain = Async::new(drain).build().fuse(); + Logger::root(Arc::new(drain), slog::o!()) + } + + pub fn stdout() -> Logger { + Self::from_writer(slog_term::TestStdoutWriter) + } + + pub fn file(filepath: &std::path::Path) -> Logger { + Self::from_writer(File::create(filepath).unwrap()) + } } } diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 28aebacf6ac..af439f45b04 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -118,6 +118,9 @@ pub trait AggregatorRunnerTrait: Sync + Send { /// Ask services to update themselves for the new epoch async fn inform_new_epoch(&self, epoch: Epoch) -> StdResult<()>; + /// Perform the upkeep tasks. + async fn upkeep(&self) -> StdResult<()>; + /// Precompute what doesn't change for the actual epoch async fn precompute_epoch_data(&self) -> StdResult<()>; @@ -475,6 +478,11 @@ impl AggregatorRunnerTrait for AggregatorRunner { Ok(()) } + async fn upkeep(&self) -> StdResult<()> { + debug!("RUNNER: upkeep"); + self.dependencies.upkeep_service.run().await + } + async fn create_open_message( &self, signed_entity_type: &SignedEntityType, @@ -489,7 +497,7 @@ impl AggregatorRunnerTrait for AggregatorRunner { #[cfg(test)] pub mod tests { - use crate::services::FakeEpochService; + use crate::services::{FakeEpochService, MockUpkeepService}; use crate::{ entities::OpenMessage, initialize_dependencies, @@ -887,7 +895,6 @@ pub mod tests { .expect_inform_epoch() .returning(|_| Ok(())) .times(1); - let mut deps = initialize_dependencies().await; let current_epoch = deps .chain_observer @@ -907,6 +914,19 @@ pub mod tests { runner.inform_new_epoch(current_epoch).await.unwrap(); } + #[tokio::test] + async fn test_upkeep() { + let mut upkeep_service = MockUpkeepService::new(); + upkeep_service.expect_run().returning(|| Ok(())).times(1); + + let mut deps = initialize_dependencies().await; + deps.upkeep_service = Arc::new(upkeep_service); + + let runner = AggregatorRunner::new(Arc::new(deps)); + + runner.upkeep().await.unwrap(); + } + #[tokio::test] async fn test_update_protocol_parameters() { let mut mock_certifier_service = MockCertifierService::new(); diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 0e60452ebc3..3bbf46ac857 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -290,6 +290,7 @@ impl AggregatorRuntime { self.runner .update_stake_distribution(&new_time_point) .await?; + self.runner.upkeep().await?; self.runner .open_signer_registration_round(&new_time_point) .await?; @@ -469,6 +470,7 @@ mod tests { .expect_precompute_epoch_data() .once() .returning(|| Ok(())); + runner.expect_upkeep().once().returning(|| Ok(())); let mut runtime = init_runtime( Some(AggregatorState::Idle(IdleState { @@ -525,6 +527,7 @@ mod tests { .expect_precompute_epoch_data() .once() .returning(|| Ok(())); + runner.expect_upkeep().once().returning(|| Ok(())); let mut runtime = init_runtime( Some(AggregatorState::Idle(IdleState { diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index 3e0edca7e17..ba73188ca40 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -197,6 +197,7 @@ mod tests { use mithril_persistence::database::repository::CardanoTransactionRepository; use crate::database::test_helper::cardano_tx_db_connection; + use crate::test_tools::TestLogger; use super::*; @@ -223,7 +224,7 @@ mod tests { scanner, transaction_store, Path::new(""), - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ) } } @@ -650,9 +651,8 @@ mod tests { let (importer, repository) = { let connection = cardano_tx_db_connection().unwrap(); - let repository = Arc::new(CardanoTransactionRepository::new(Arc::new( - SqliteConnectionPool::build_from_connection(connection), - ))); + let connection_pool = Arc::new(SqliteConnectionPool::build_from_connection(connection)); + let repository = Arc::new(CardanoTransactionRepository::new(connection_pool)); let importer = CardanoTransactionsImporter::new_for_test( Arc::new(DumbBlockScanner::new().forwards(vec![blocks.clone()])), repository.clone(), diff --git a/mithril-aggregator/src/services/mod.rs b/mithril-aggregator/src/services/mod.rs index 3b9516466c5..73767b5efe3 100644 --- a/mithril-aggregator/src/services/mod.rs +++ b/mithril-aggregator/src/services/mod.rs @@ -16,6 +16,7 @@ mod message; mod prover; mod signed_entity; mod stake_distribution; +mod upkeep; pub use cardano_transactions_importer::*; pub use certifier::*; @@ -24,3 +25,4 @@ pub use message::*; pub use prover::*; pub use signed_entity::*; pub use stake_distribution::*; +pub use upkeep::*; diff --git a/mithril-aggregator/src/services/upkeep.rs b/mithril-aggregator/src/services/upkeep.rs new file mode 100644 index 00000000000..60b89d351db --- /dev/null +++ b/mithril-aggregator/src/services/upkeep.rs @@ -0,0 +1,206 @@ +//! ## Upkeep Service +//! +//! This service is responsible for the upkeep of the application. +//! +//! It is in charge of the following tasks: +//! * free up space by executing vacuum and WAL checkpoint on the database + +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; +use slog::{info, Logger}; + +use mithril_common::signed_entity_type_lock::SignedEntityTypeLock; +use mithril_common::StdResult; +use mithril_persistence::sqlite::{ + SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool, +}; + +/// Define the service responsible for the upkeep of the application. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait UpkeepService: Send + Sync { + /// Run the upkeep service. + async fn run(&self) -> StdResult<()>; +} + +/// Implementation of the upkeep service for the aggregator. +/// +/// To ensure that connections are cleaned up properly, it creates new connections itself +/// instead of relying on a connection pool or a shared connection. +pub struct AggregatorUpkeepService { + main_db_connection: Arc, + cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, + logger: Logger, +} + +impl AggregatorUpkeepService { + /// Create a new instance of the aggregator upkeep service. + pub fn new( + main_db_connection: Arc, + cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, + logger: Logger, + ) -> Self { + Self { + main_db_connection, + cardano_tx_connection_pool, + signed_entity_type_lock, + logger, + } + } + + async fn upkeep_all_databases(&self) -> StdResult<()> { + if self.signed_entity_type_lock.has_locked_entities().await { + info!( + self.logger, + "UpkeepService::Some entities are locked - Skipping database upkeep" + ); + return Ok(()); + } + + let main_db_connection = self.main_db_connection.clone(); + let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone(); + let db_upkeep_logger = self.logger.clone(); + + // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime + let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> { + info!(db_upkeep_logger, "UpkeepService::Cleaning main database"); + SqliteCleaner::new(&main_db_connection) + .with_logger(db_upkeep_logger.clone()) + .with_tasks(&[ + SqliteCleaningTask::Vacuum, + SqliteCleaningTask::WalCheckpointTruncate, + ]) + .run()?; + + info!( + db_upkeep_logger, + "UpkeepService::Cleaning cardano transactions database" + ); + let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?; + SqliteCleaner::new(&cardano_tx_db_connection) + .with_logger(db_upkeep_logger.clone()) + .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate]) + .run()?; + + Ok(()) + }); + + db_upkeep_thread + .await + .with_context(|| "Database Upkeep thread crashed")? + } +} + +#[async_trait] +impl UpkeepService for AggregatorUpkeepService { + async fn run(&self) -> StdResult<()> { + info!(self.logger, "UpkeepService::start"); + + self.upkeep_all_databases() + .await + .with_context(|| "Database upkeep failed")?; + + info!(self.logger, "UpkeepService::end"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants; + use mithril_common::test_utils::TempDir; + + use crate::database::test_helper::{ + cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection, + main_db_file_connection, + }; + use crate::test_tools::TestLogger; + + use super::*; + + #[tokio::test] + async fn test_cleanup_database() { + let (main_db_path, ctx_db_path, log_path) = { + let db_dir = TempDir::create("aggregator_upkeep", "test_cleanup_database"); + ( + db_dir.join("main.db"), + db_dir.join("cardano_tx.db"), + db_dir.join("upkeep.log"), + ) + }; + + let main_db_connection = main_db_file_connection(&main_db_path).unwrap(); + let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap(); + + let service = AggregatorUpkeepService::new( + Arc::new(main_db_connection), + Arc::new(SqliteConnectionPool::build_from_connection( + cardano_tx_connection, + )), + Arc::new(SignedEntityTypeLock::default()), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 1, + "Should have run only once since only the main database has a `Vacuum` cleanup" + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 2, + "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup" + ); + } + + #[tokio::test] + async fn test_doesnt_cleanup_db_if_any_entity_is_locked() { + let log_path = TempDir::create( + "aggregator_upkeep", + "test_doesnt_cleanup_db_if_any_entity_is_locked", + ) + .join("upkeep.log"); + + let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default()); + let service = AggregatorUpkeepService::new( + Arc::new(main_db_connection().unwrap()), + Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()), + signed_entity_type_lock.clone(), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::CardanoTransactions) + .await; + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 0, + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 0, + ); + } +} diff --git a/mithril-aggregator/src/tools/mod.rs b/mithril-aggregator/src/tools/mod.rs index be7c631202a..2f6be81ddc0 100644 --- a/mithril-aggregator/src/tools/mod.rs +++ b/mithril-aggregator/src/tools/mod.rs @@ -18,3 +18,17 @@ pub use signer_importer::{ #[cfg(test)] pub use remote_file_uploader::MockRemoteFileUploader; + +/// Downcast the error to the specified error type and check if the error satisfies the condition. +pub(crate) fn downcast_check( + error: &mithril_common::StdError, + check: impl Fn(&E) -> bool, +) -> bool +where + E: std::fmt::Display + std::fmt::Debug + Send + Sync + 'static, +{ + if let Some(inner_error) = error.downcast_ref::() { + return check(inner_error); + } + false +} diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index d8c02b0a673..6844708f376 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.24" +version = "0.4.25" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/entities/http_server_error.rs b/mithril-common/src/entities/http_server_error.rs index 7f62db87ce4..2e60bbd8d4c 100644 --- a/mithril-common/src/entities/http_server_error.rs +++ b/mithril-common/src/entities/http_server_error.rs @@ -1,35 +1,35 @@ use crate::StdError; use serde::{Deserialize, Serialize}; -/// Representation of a Internal Server Error raised by an http server +/// Representation of a Server Error raised by a http server #[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] -pub struct InternalServerError { +pub struct ServerError { /// error message pub message: String, } -impl InternalServerError { +impl ServerError { /// InternalServerError factory - pub fn new(message: String) -> InternalServerError { - InternalServerError { message } + pub fn new(message: String) -> ServerError { + ServerError { message } } } -impl From for InternalServerError { +impl From for ServerError { fn from(message: String) -> Self { - InternalServerError::new(message) + ServerError::new(message) } } -impl From<&str> for InternalServerError { +impl From<&str> for ServerError { fn from(message: &str) -> Self { - InternalServerError::new(message.to_string()) + ServerError::new(message.to_string()) } } -impl From for InternalServerError { +impl From for ServerError { fn from(error: StdError) -> Self { - InternalServerError::new(format!("{error:?}")) + ServerError::new(format!("{error:?}")) } } diff --git a/mithril-common/src/entities/mod.rs b/mithril-common/src/entities/mod.rs index e47e50b27e2..6d794ad5691 100644 --- a/mithril-common/src/entities/mod.rs +++ b/mithril-common/src/entities/mod.rs @@ -37,7 +37,7 @@ pub use certificate_metadata::{CertificateMetadata, StakeDistributionParty}; pub use certificate_pending::CertificatePending; pub use epoch::{Epoch, EpochError}; pub use epoch_settings::EpochSettings; -pub use http_server_error::{ClientError, InternalServerError}; +pub use http_server_error::{ClientError, ServerError}; pub use mithril_stake_distribution::MithrilStakeDistribution; pub use protocol_message::{ProtocolMessage, ProtocolMessagePartKey, ProtocolMessagePartValue}; pub use protocol_parameters::ProtocolParameters; diff --git a/mithril-common/src/signed_entity_type_lock.rs b/mithril-common/src/signed_entity_type_lock.rs index 640aef844e0..d22500f1c3b 100644 --- a/mithril-common/src/signed_entity_type_lock.rs +++ b/mithril-common/src/signed_entity_type_lock.rs @@ -45,6 +45,12 @@ impl SignedEntityTypeLock { locked_entities.remove(&entity_type.into()); } + /// Check if there are any locked signed entities. + pub async fn has_locked_entities(&self) -> bool { + let locked_entities = self.locked_entities.read().await; + !locked_entities.is_empty() + } + /// List only the unlocked signed entities in the given list. pub async fn filter_unlocked_entries + Clone>( &self, @@ -189,4 +195,16 @@ mod tests { vec![SignedEntityTypeDiscriminants::CardanoTransactions] ); } + + #[tokio::test] + async fn has_locked_entities() { + let signed_entity_type_lock = SignedEntityTypeLock::new(); + + assert!(!signed_entity_type_lock.has_locked_entities().await); + + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::MithrilStakeDistribution) + .await; + assert!(signed_entity_type_lock.has_locked_entities().await); + } } diff --git a/mithril-common/src/test_utils/apispec.rs b/mithril-common/src/test_utils/apispec.rs index 3e28b4ddd08..67f86265ec4 100644 --- a/mithril-common/src/test_utils/apispec.rs +++ b/mithril-common/src/test_utils/apispec.rs @@ -460,7 +460,7 @@ components: // for this route, so it's the default response spec that is used. let response = build_json_response( StatusCode::INTERNAL_SERVER_ERROR.into(), - entities::InternalServerError::new("an error occurred".to_string()), + entities::ServerError::new("an error occurred".to_string()), ); APISpec::from_file(&APISpec::get_default_spec_file()) @@ -474,7 +474,7 @@ components: fn test_should_fail_when_the_status_code_is_not_the_expected_one() { let response = build_json_response( StatusCode::INTERNAL_SERVER_ERROR.into(), - entities::InternalServerError::new("an error occurred".to_string()), + entities::ServerError::new("an error occurred".to_string()), ); let mut api_spec = APISpec::from_file(&APISpec::get_default_spec_file()); @@ -500,7 +500,7 @@ components: fn test_should_be_ok_when_the_status_code_is_the_right_one() { let response = build_json_response( StatusCode::INTERNAL_SERVER_ERROR.into(), - entities::InternalServerError::new("an error occurred".to_string()), + entities::ServerError::new("an error occurred".to_string()), ); APISpec::from_file(&APISpec::get_default_spec_file()) diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index c7075523ce3..b890b1d168f 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.155" +version = "0.2.156" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/cardano_transactions_importer.rs b/mithril-signer/src/cardano_transactions_importer.rs index 448a0e32362..ba73188ca40 100644 --- a/mithril-signer/src/cardano_transactions_importer.rs +++ b/mithril-signer/src/cardano_transactions_importer.rs @@ -196,7 +196,8 @@ mod tests { use mithril_common::entities::{BlockNumber, BlockRangesSequence}; use mithril_persistence::database::repository::CardanoTransactionRepository; - use crate::database::test_utils::cardano_tx_db_connection; + use crate::database::test_helper::cardano_tx_db_connection; + use crate::test_tools::TestLogger; use super::*; @@ -223,7 +224,7 @@ mod tests { scanner, transaction_store, Path::new(""), - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ) } } diff --git a/mithril-signer/src/database/mod.rs b/mithril-signer/src/database/mod.rs index 079dea1ecdd..c435285cc41 100644 --- a/mithril-signer/src/database/mod.rs +++ b/mithril-signer/src/database/mod.rs @@ -3,20 +3,5 @@ //! representation with their associated providers. pub mod migration; pub mod repository; - #[cfg(test)] -pub mod test_utils { - use sqlite::ConnectionThreadSafe; - - use mithril_common::StdResult; - use mithril_persistence::database::cardano_transaction_migration; - use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions}; - - pub fn cardano_tx_db_connection() -> StdResult { - let connection = ConnectionBuilder::open_memory() - .with_options(&[ConnectionOptions::ForceDisableForeignKeys]) - .with_migrations(cardano_transaction_migration::get_migrations()) - .build()?; - Ok(connection) - } -} +pub(crate) mod test_helper; diff --git a/mithril-signer/src/database/test_helper.rs b/mithril-signer/src/database/test_helper.rs new file mode 100644 index 00000000000..89b20636221 --- /dev/null +++ b/mithril-signer/src/database/test_helper.rs @@ -0,0 +1,50 @@ +use std::path::Path; + +use mithril_common::StdResult; +use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection}; + +/// In-memory sqlite database without foreign key support with migrations applied +pub fn main_db_connection() -> StdResult { + let builder = ConnectionBuilder::open_memory(); + build_main_db_connection(builder) +} + +/// File sqlite database without foreign key support with migrations applied and WAL activated +pub fn main_db_file_connection(db_path: &Path) -> StdResult { + let builder = ConnectionBuilder::open_file(db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]); + build_main_db_connection(builder) +} + +fn build_main_db_connection(connection_builder: ConnectionBuilder) -> StdResult { + let connection = connection_builder + .with_options(&[ConnectionOptions::ForceDisableForeignKeys]) + .with_migrations(crate::database::migration::get_migrations()) + .build()?; + Ok(connection) +} + +/// In-memory sqlite database without foreign key support with cardano db migrations applied +pub fn cardano_tx_db_connection() -> StdResult { + let builder = ConnectionBuilder::open_memory(); + build_cardano_tx_db_connection(builder) +} + +/// File sqlite database without foreign key support with cardano db migrations applied and WAL activated +pub fn cardano_tx_db_file_connection(db_path: &Path) -> StdResult { + let builder = ConnectionBuilder::open_file(db_path) + .with_options(&[ConnectionOptions::EnableWriteAheadLog]); + build_cardano_tx_db_connection(builder) +} + +fn build_cardano_tx_db_connection( + connection_builder: ConnectionBuilder, +) -> StdResult { + let connection = connection_builder + .with_options(&[ConnectionOptions::ForceDisableForeignKeys]) + .with_migrations( + mithril_persistence::database::cardano_transaction_migration::get_migrations(), + ) + .build()?; + Ok(connection) +} diff --git a/mithril-signer/src/lib.rs b/mithril-signer/src/lib.rs index ab4eb7168b9..2716aa7a54b 100644 --- a/mithril-signer/src/lib.rs +++ b/mithril-signer/src/lib.rs @@ -18,6 +18,7 @@ mod single_signer; mod transactions_importer_by_chunk; mod transactions_importer_with_pruner; mod transactions_importer_with_vacuum; +mod upkeep_service; #[cfg(test)] pub use aggregator_client::dumb::DumbAggregatorClient; @@ -34,6 +35,7 @@ pub use single_signer::*; pub use transactions_importer_by_chunk::*; pub use transactions_importer_with_pruner::*; pub use transactions_importer_with_vacuum::*; +pub use upkeep_service::*; /// HTTP request timeout duration in milliseconds const HTTP_REQUEST_TIMEOUT_DURATION: u64 = 30000; @@ -52,13 +54,30 @@ static GLOBAL: Jemalloc = Jemalloc; #[cfg(test)] pub mod test_tools { - use slog::Drain; + use std::fs::File; + use std::io; use std::sync::Arc; - pub fn logger_for_tests() -> slog::Logger { - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - slog::Logger::root(Arc::new(drain), slog::o!()) + use slog::{Drain, Logger}; + use slog_async::Async; + use slog_term::{CompactFormat, PlainDecorator}; + + pub struct TestLogger; + + impl TestLogger { + fn from_writer(writer: W) -> Logger { + let decorator = PlainDecorator::new(writer); + let drain = CompactFormat::new(decorator).build().fuse(); + let drain = Async::new(drain).build().fuse(); + Logger::root(Arc::new(drain), slog::o!()) + } + + pub fn stdout() -> Logger { + Self::from_writer(slog_term::TestStdoutWriter) + } + + pub fn file(filepath: &std::path::Path) -> Logger { + Self::from_writer(File::create(filepath).unwrap()) + } } } diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index e9e30f79889..879c183bbbe 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -75,6 +75,9 @@ pub trait Runner: Send + Sync { /// Read the current era and update the EraChecker. async fn update_era_checker(&self, epoch: Epoch) -> StdResult<()>; + + /// Perform the upkeep tasks. + async fn upkeep(&self) -> StdResult<()>; } /// This type represents the errors thrown from the Runner. @@ -452,6 +455,12 @@ impl Runner for SignerRunner { Ok(()) } + + async fn upkeep(&self) -> StdResult<()> { + debug!("RUNNER: upkeep"); + self.services.upkeep_service.run().await?; + Ok(()) + } } #[cfg(test)] @@ -482,7 +491,7 @@ mod tests { use crate::{ metrics::MetricsService, AggregatorClient, CardanoTransactionsImporter, DumbAggregatorClient, MithrilSingleSigner, MockAggregatorClient, MockTransactionStore, - ProtocolInitializerStore, SingleSigner, + MockUpkeepService, ProtocolInitializerStore, SingleSigner, }; use super::*; @@ -575,6 +584,7 @@ mod tests { chain_observer.clone(), slog_scope::logger(), )); + let upkeep_service = Arc::new(MockUpkeepService::new()); SignerServices { stake_store: Arc::new(StakeStore::new(Box::new(DumbStoreAdapter::new()), None)), @@ -594,6 +604,7 @@ mod tests { metrics_service, signed_entity_type_lock, cardano_transactions_preloader, + upkeep_service, } } @@ -929,4 +940,15 @@ mod tests { assert_eq!(time_point.epoch, era_checker.current_epoch()); } + + #[tokio::test] + async fn test_upkeep() { + let mut services = init_services().await; + let mut upkeep_service_mock = MockUpkeepService::new(); + upkeep_service_mock.expect_run().returning(|| Ok(())).once(); + services.upkeep_service = Arc::new(upkeep_service_mock); + + let runner = init_runner(Some(services), None).await; + runner.upkeep().await.expect("upkeep should not fail"); + } } diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index 0ba80cd9af6..7f36e2854db 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -26,16 +26,16 @@ use mithril_common::{ }; use mithril_persistence::{ database::{repository::CardanoTransactionRepository, ApplicationNodeType, SqlMigration}, - sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionPool}, + sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool}, store::{adapter::SQLiteAdapter, StakeStore}, }; use crate::{ aggregator_client::AggregatorClient, metrics::MetricsService, single_signer::SingleSigner, AggregatorHTTPClient, CardanoTransactionsImporter, Configuration, MithrilSingleSigner, - ProtocolInitializerStore, ProtocolInitializerStorer, TransactionsImporterByChunk, - TransactionsImporterWithPruner, TransactionsImporterWithVacuum, HTTP_REQUEST_TIMEOUT_DURATION, - SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, + ProtocolInitializerStore, ProtocolInitializerStorer, SignerUpkeepService, + TransactionsImporterByChunk, TransactionsImporterWithPruner, TransactionsImporterWithVacuum, + UpkeepService, HTTP_REQUEST_TIMEOUT_DURATION, SQLITE_FILE, SQLITE_FILE_CARDANO_TRANSACTION, }; type StakeStoreService = Arc; @@ -174,7 +174,6 @@ impl<'a> ProductionServiceBuilder<'a> { let logger = slog_scope::logger(); let connection = ConnectionBuilder::open_file(&sqlite_db_path) .with_node_type(ApplicationNodeType::Signer) - .with_options(&[ConnectionOptions::Vacuum]) .with_migrations(migrations) .with_logger(logger.clone()) .build() @@ -226,7 +225,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { slog_scope::logger(), )); let stake_store = Arc::new(StakeStore::new( - Box::new(SQLiteAdapter::new("stake", sqlite_connection)?), + Box::new(SQLiteAdapter::new("stake", sqlite_connection.clone())?), self.config.store_retention_limit, )); let chain_observer = { @@ -308,7 +307,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { let preloader_transactions_importer = Arc::new(TransactionsImporterByChunk::new( transaction_store.clone(), Arc::new(TransactionsImporterWithVacuum::new( - sqlite_connection_cardano_transaction_pool, + sqlite_connection_cardano_transaction_pool.clone(), transactions_importer.clone(), slog_scope::logger(), )), @@ -334,6 +333,12 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { chain_observer.clone(), slog_scope::logger(), )); + let upkeep_service = Arc::new(SignerUpkeepService::new( + sqlite_connection.clone(), + sqlite_connection_cardano_transaction_pool, + signed_entity_type_lock.clone(), + slog_scope::logger(), + )); let services = SignerServices { ticker_service, @@ -350,6 +355,7 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { metrics_service, signed_entity_type_lock, cardano_transactions_preloader, + upkeep_service, }; Ok(services) @@ -399,6 +405,9 @@ pub struct SignerServices { /// Cardano transactions preloader pub cardano_transactions_preloader: Arc, + + /// Upkeep service + pub upkeep_service: Arc, } #[cfg(test)] diff --git a/mithril-signer/src/runtime/state_machine.rs b/mithril-signer/src/runtime/state_machine.rs index 396ddc78257..5b94d68079a 100644 --- a/mithril-signer/src/runtime/state_machine.rs +++ b/mithril-signer/src/runtime/state_machine.rs @@ -340,16 +340,17 @@ impl StateMachine { .await .map_err(|e| RuntimeError::KeepState { message: format!("Could not update stake distribution in 'unregistered → registered' phase for epoch {:?}.", epoch), - nested_error: Some(e) })?; + nested_error: Some(e), + })?; - self.runner. register_signer_to_aggregator( + self.runner.register_signer_to_aggregator( epoch_settings.epoch, &epoch_settings.next_protocol_parameters, ) - .await.map_err(|e| { - if e.downcast_ref::().is_some(){ + .await.map_err(|e| { + if e.downcast_ref::().is_some() { RuntimeError::Critical { message: format!("Could not register to aggregator in 'unregistered → registered' phase for epoch {:?}.", epoch), nested_error: Some(e) } - }else{ + } else { RuntimeError::KeepState { message: format!("Could not register to aggregator in 'unregistered → registered' phase for epoch {:?}.", epoch), nested_error: Some(e) } } })?; @@ -359,6 +360,14 @@ impl StateMachine { self.metrics_service .signer_registration_success_last_epoch_gauge_set(epoch); + self.runner + .upkeep() + .await + .map_err(|e| RuntimeError::KeepState { + message: "Failed to upkeep signer in 'unregistered → registered' phase".to_string(), + nested_error: Some(e), + })?; + Ok(SignerState::Registered { epoch }) } @@ -545,6 +554,7 @@ mod tests { #[tokio::test] async fn unregistered_to_registered() { let mut runner = MockSignerRunner::new(); + runner.expect_upkeep().returning(|| Ok(())).once(); runner .expect_get_epoch_settings() .once() diff --git a/mithril-signer/src/transactions_importer_by_chunk.rs b/mithril-signer/src/transactions_importer_by_chunk.rs index 19159071fad..287aed4c40c 100644 --- a/mithril-signer/src/transactions_importer_by_chunk.rs +++ b/mithril-signer/src/transactions_importer_by_chunk.rs @@ -68,6 +68,8 @@ mod tests { use mockall::predicate::eq; use mockall::{mock, Sequence}; + use crate::test_tools::TestLogger; + use super::*; mock! { @@ -118,7 +120,7 @@ mod tests { highest_transaction_block_number_getter, Arc::new(wrapped_importer), chunk_size, - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ); let up_to_beacon = highest_block_number; @@ -146,7 +148,7 @@ mod tests { highest_transaction_block_number_getter, Arc::new(wrapped_importer), chunk_size, - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ); importer.import(up_to_beacon).await.unwrap(); @@ -166,7 +168,7 @@ mod tests { highest_transaction_block_number_getter, Arc::new(wrapped_importer), chunk_size, - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ); importer.import(up_to_beacon).await.unwrap(); @@ -190,7 +192,7 @@ mod tests { highest_transaction_block_number_getter, Arc::new(wrapped_importer), chunk_size, - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ); importer.import(up_to_beacon).await.unwrap(); @@ -213,7 +215,7 @@ mod tests { highest_transaction_block_number_getter, Arc::new(wrapped_importer), chunk_size, - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ); importer.import(up_to_beacon).await.unwrap(); diff --git a/mithril-signer/src/transactions_importer_with_pruner.rs b/mithril-signer/src/transactions_importer_with_pruner.rs index 0abb3f35ebe..c7aeaf08593 100644 --- a/mithril-signer/src/transactions_importer_with_pruner.rs +++ b/mithril-signer/src/transactions_importer_with_pruner.rs @@ -68,6 +68,8 @@ mod tests { use mockall::mock; use mockall::predicate::eq; + use crate::test_tools::TestLogger; + use super::*; mock! { @@ -98,7 +100,7 @@ mod tests { number_of_blocks_to_keep, Arc::new(transaction_pruner), Arc::new(transaction_importer), - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ) } } diff --git a/mithril-signer/src/transactions_importer_with_vacuum.rs b/mithril-signer/src/transactions_importer_with_vacuum.rs index 0c77aabb453..480530d9e24 100644 --- a/mithril-signer/src/transactions_importer_with_vacuum.rs +++ b/mithril-signer/src/transactions_importer_with_vacuum.rs @@ -6,7 +6,7 @@ use slog::{debug, Logger}; use mithril_common::entities::BlockNumber; use mithril_common::signable_builder::TransactionsImporter; use mithril_common::StdResult; -use mithril_persistence::sqlite::{vacuum_database, SqliteConnectionPool}; +use mithril_persistence::sqlite::{SqliteCleaner, SqliteCleaningTask, SqliteConnectionPool}; /// A decorator of [TransactionsImporter] that vacuums the database after running the import. pub struct TransactionsImporterWithVacuum { @@ -40,7 +40,10 @@ impl TransactionsImporter for TransactionsImporterWithVacuum { "Transaction Import finished - Vacuuming database to reclaim disk space" ); let connection = self.connection_pool.connection()?; - vacuum_database(&connection)?; + + SqliteCleaner::new(&connection) + .with_tasks(&[SqliteCleaningTask::Vacuum]) + .run()?; Ok(()) } @@ -54,6 +57,8 @@ mod tests { use mithril_common::test_utils::TempDir; use mithril_persistence::sqlite::SqliteConnection; + use crate::test_tools::TestLogger; + use super::*; mock! { @@ -79,7 +84,7 @@ mod tests { Self::new( connection_pool, Arc::new(transaction_importer), - crate::test_tools::logger_for_tests(), + TestLogger::stdout(), ) } } diff --git a/mithril-signer/src/upkeep_service.rs b/mithril-signer/src/upkeep_service.rs new file mode 100644 index 00000000000..fad60d3c5ca --- /dev/null +++ b/mithril-signer/src/upkeep_service.rs @@ -0,0 +1,206 @@ +//! ## Upkeep Service +//! +//! This service is responsible for the upkeep of the application. +//! +//! It is in charge of the following tasks: +//! * free up space by executing vacuum and WAL checkpoint on the database + +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; +use slog::{info, Logger}; + +use mithril_common::signed_entity_type_lock::SignedEntityTypeLock; +use mithril_common::StdResult; +use mithril_persistence::sqlite::{ + SqliteCleaner, SqliteCleaningTask, SqliteConnection, SqliteConnectionPool, +}; + +/// Define the service responsible for the upkeep of the application. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait UpkeepService: Send + Sync { + /// Run the upkeep service. + async fn run(&self) -> StdResult<()>; +} + +/// Implementation of the upkeep service for the signer. +/// +/// To ensure that connections are cleaned up properly, it creates new connections itself +/// instead of relying on a connection pool or a shared connection. +pub struct SignerUpkeepService { + main_db_connection: Arc, + cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, + logger: Logger, +} + +impl SignerUpkeepService { + /// Create a new instance of the aggregator upkeep service. + pub fn new( + main_db_connection: Arc, + cardano_tx_connection_pool: Arc, + signed_entity_type_lock: Arc, + logger: Logger, + ) -> Self { + Self { + main_db_connection, + cardano_tx_connection_pool, + signed_entity_type_lock, + logger, + } + } + + async fn upkeep_all_databases(&self) -> StdResult<()> { + if self.signed_entity_type_lock.has_locked_entities().await { + info!( + self.logger, + "UpkeepService::Some entities are locked - Skipping database upkeep" + ); + return Ok(()); + } + + let main_db_connection = self.main_db_connection.clone(); + let cardano_tx_db_connection_pool = self.cardano_tx_connection_pool.clone(); + let db_upkeep_logger = self.logger.clone(); + + // Run the database upkeep tasks in another thread to avoid blocking the tokio runtime + let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> { + info!(db_upkeep_logger, "UpkeepService::Cleaning main database"); + SqliteCleaner::new(&main_db_connection) + .with_logger(db_upkeep_logger.clone()) + .with_tasks(&[ + SqliteCleaningTask::Vacuum, + SqliteCleaningTask::WalCheckpointTruncate, + ]) + .run()?; + + info!( + db_upkeep_logger, + "UpkeepService::Cleaning cardano transactions database" + ); + let cardano_tx_db_connection = cardano_tx_db_connection_pool.connection()?; + SqliteCleaner::new(&cardano_tx_db_connection) + .with_logger(db_upkeep_logger.clone()) + .with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate]) + .run()?; + + Ok(()) + }); + + db_upkeep_thread + .await + .with_context(|| "Database Upkeep thread crashed")? + } +} + +#[async_trait] +impl UpkeepService for SignerUpkeepService { + async fn run(&self) -> StdResult<()> { + info!(self.logger, "UpkeepService::start"); + + self.upkeep_all_databases() + .await + .with_context(|| "Database upkeep failed")?; + + info!(self.logger, "UpkeepService::end"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants; + use mithril_common::test_utils::TempDir; + + use crate::database::test_helper::{ + cardano_tx_db_connection, cardano_tx_db_file_connection, main_db_connection, + main_db_file_connection, + }; + use crate::test_tools::TestLogger; + + use super::*; + + #[tokio::test] + async fn test_cleanup_database() { + let (main_db_path, ctx_db_path, log_path) = { + let db_dir = TempDir::create("signer_upkeep", "test_cleanup_database"); + ( + db_dir.join("main.db"), + db_dir.join("cardano_tx.db"), + db_dir.join("upkeep.log"), + ) + }; + + let main_db_connection = main_db_file_connection(&main_db_path).unwrap(); + let cardano_tx_connection = cardano_tx_db_file_connection(&ctx_db_path).unwrap(); + + let service = SignerUpkeepService::new( + Arc::new(main_db_connection), + Arc::new(SqliteConnectionPool::build_from_connection( + cardano_tx_connection, + )), + Arc::new(SignedEntityTypeLock::default()), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 1, + "Should have run only once since only the main database has a `Vacuum` cleanup" + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 2, + "Should have run twice since the two databases have a `WalCheckpointTruncate` cleanup" + ); + } + + #[tokio::test] + async fn test_doesnt_cleanup_db_if_any_entity_is_locked() { + let log_path = TempDir::create( + "signer_upkeep", + "test_doesnt_cleanup_db_if_any_entity_is_locked", + ) + .join("upkeep.log"); + + let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default()); + let service = SignerUpkeepService::new( + Arc::new(main_db_connection().unwrap()), + Arc::new(SqliteConnectionPool::build(1, cardano_tx_db_connection).unwrap()), + signed_entity_type_lock.clone(), + TestLogger::file(&log_path), + ); + + // Separate block to ensure the log is flushed after run + { + signed_entity_type_lock + .lock(SignedEntityTypeDiscriminants::CardanoTransactions) + .await; + service.run().await.expect("Upkeep service failed"); + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + + assert_eq!( + logs.matches(SqliteCleaningTask::Vacuum.log_message()) + .count(), + 0, + ); + assert_eq!( + logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message()) + .count(), + 0, + ); + } +} diff --git a/mithril-signer/tests/test_extensions/state_machine_tester.rs b/mithril-signer/tests/test_extensions/state_machine_tester.rs index f1d578d338e..b657c96d56c 100644 --- a/mithril-signer/tests/test_extensions/state_machine_tester.rs +++ b/mithril-signer/tests/test_extensions/state_machine_tester.rs @@ -25,16 +25,16 @@ use mithril_common::{ MithrilTickerService, StdError, TickerService, }; use mithril_persistence::database::repository::CardanoTransactionRepository; +use mithril_persistence::store::adapter::SQLiteAdapter; use mithril_persistence::{ sqlite::SqliteConnectionPool, - store::{adapter::MemoryAdapter, StakeStore, StakeStorer}, + store::{StakeStore, StakeStorer}, }; - use mithril_signer::{ metrics::*, AggregatorClient, CardanoTransactionsImporter, Configuration, MetricsService, MithrilSingleSigner, ProductionServiceBuilder, ProtocolInitializerStore, ProtocolInitializerStorer, RuntimeError, SignerRunner, SignerServices, SignerState, - StateMachine, + SignerUpkeepService, StateMachine, }; use super::FakeAggregator; @@ -92,6 +92,15 @@ impl StateMachineTester { let config = Configuration::new_sample(&selected_signer_party_id); let production_service_builder = ProductionServiceBuilder::new(&config); + let sqlite_connection = Arc::new( + production_service_builder + .build_sqlite_connection( + ":memory:", + mithril_signer::database::migration::get_migrations(), + ) + .await + .unwrap(), + ); let transaction_sqlite_connection = production_service_builder .build_sqlite_connection( ":memory:", @@ -131,14 +140,16 @@ impl StateMachineTester { )); let digester = Arc::new(DumbImmutableDigester::new("DIGEST", true)); let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( - Box::new(MemoryAdapter::new(None).unwrap()), + Box::new( + SQLiteAdapter::new("protocol_initializer", sqlite_connection.clone()).unwrap(), + ), config.store_retention_limit, )); let single_signer = Arc::new(MithrilSingleSigner::new( config.party_id.to_owned().unwrap_or_default(), )); let stake_store = Arc::new(StakeStore::new( - Box::new(MemoryAdapter::new(None).unwrap()), + Box::new(SQLiteAdapter::new("stake", sqlite_connection.clone()).unwrap()), config.store_retention_limit, )); let era_reader_adapter = Arc::new(EraReaderDummyAdapter::from_markers(vec![ @@ -169,7 +180,7 @@ impl StateMachineTester { Arc::new(MithrilStakeDistributionSignableBuilder::default()); let block_scanner = Arc::new(DumbBlockScanner::new()); let transaction_store = Arc::new(CardanoTransactionRepository::new( - sqlite_connection_cardano_transaction_pool, + sqlite_connection_cardano_transaction_pool.clone(), )); let transactions_importer = Arc::new(CardanoTransactionsImporter::new( block_scanner.clone(), @@ -199,6 +210,12 @@ impl StateMachineTester { chain_observer.clone(), slog_scope::logger(), )); + let upkeep_service = Arc::new(SignerUpkeepService::new( + sqlite_connection.clone(), + sqlite_connection_cardano_transaction_pool, + signed_entity_type_lock.clone(), + slog_scope::logger(), + )); let services = SignerServices { certificate_handler: certificate_handler.clone(), @@ -215,6 +232,7 @@ impl StateMachineTester { metrics_service: metrics_service.clone(), signed_entity_type_lock: Arc::new(SignedEntityTypeLock::default()), cardano_transactions_preloader, + upkeep_service, }; // set up stake distribution chain_observer