From fba460a49cf34e9ebe1f5cae13a3613c74e5ef03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Fri, 17 Mar 2023 10:31:44 +0100 Subject: [PATCH 1/3] add stake distribution service --- .../src/database/provider/stake_pool.rs | 109 +++--- mithril-aggregator/src/lib.rs | 2 +- mithril-aggregator/src/multi_signer.rs | 11 +- mithril-aggregator/src/runtime/runner.rs | 3 +- .../src/stake_distribution_service.rs | 353 ++++++++++++++++++ mithril-aggregator/tests/certificate_chain.rs | 43 +-- .../tests/test_extensions/runtime_tester.rs | 5 +- mithril-common/src/chain_observer/mod.rs | 2 +- mithril-common/src/entities/type_alias.rs | 4 +- mithril-common/src/store/stake_store.rs | 12 +- .../src/test_utils/mithril_fixture.rs | 3 +- 11 files changed, 428 insertions(+), 119 deletions(-) create mode 100644 mithril-aggregator/src/stake_distribution_service.rs diff --git a/mithril-aggregator/src/database/provider/stake_pool.rs b/mithril-aggregator/src/database/provider/stake_pool.rs index 3898518bf93..807b6f91004 100644 --- a/mithril-aggregator/src/database/provider/stake_pool.rs +++ b/mithril-aggregator/src/database/provider/stake_pool.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use chrono::NaiveDateTime; @@ -121,7 +118,7 @@ impl<'client> Provider<'client> for StakePoolProvider<'client> { let aliases = SourceAlias::new(&[("{:stake_pool:}", "sp")]); let projection = Self::Entity::get_projection().expand(aliases); - format!("select {projection} from stake_pool as sp where {condition} order by epoch asc, stake desc") + format!("select {projection} from stake_pool as sp where {condition} order by epoch asc, stake desc, stake_pool_id asc") } } @@ -254,7 +251,7 @@ impl StakeStorer for StakePoolStore { .lock() .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; let provider = UpdateStakePoolProvider::new(connection); - let mut new_stakes: HashMap = HashMap::new(); + let mut new_stakes = StakeDistribution::new(); connection .execute("begin transaction") .map_err(|e| AdapterError::QueryError(e.into()))?; @@ -287,7 +284,7 @@ impl StakeStorer for StakePoolStore { let cursor = provider .get_by_epoch(&epoch) .map_err(|e| AdapterError::GeneralError(format!("Could not get stakes: {e}")))?; - let mut stake_distribution: HashMap = HashMap::new(); + let mut stake_distribution = StakeDistribution::new(); for stake_pool in cursor { stake_distribution.insert(stake_pool.stake_pool_id, stake_pool.stake); @@ -297,10 +294,54 @@ impl StakeStorer for StakePoolStore { } } -#[cfg(test)] -mod tests { +#[cfg(any(test, feature = "test_only"))] +pub fn setup_stake_db(connection: &Connection) -> Result<(), StdError> { use crate::database::migration::get_migrations; + let migrations = get_migrations(); + let migration = migrations + .iter() + .find(|&m| m.version == 1) + .ok_or_else(|| -> StdError { + "There should be a migration version 1".to_string().into() + })?; + let query = { + // leverage the expanded parameter from this provider which is unit + // tested on its own above. + let update_provider = UpdateStakePoolProvider::new(connection); + let (sql_values, _) = update_provider + .get_update_condition("pool_id", Epoch(1), 1000) + .expand(); + + connection.execute(&migration.alterations)?; + + format!("insert into stake_pool {sql_values}") + }; + let stake_distribution: &[(&str, i64, i64); 9] = &[ + ("pool1", 1, 1000), + ("pool2", 1, 1100), + ("pool3", 1, 1300), + ("pool1", 2, 1230), + ("pool2", 2, 1090), + ("pool3", 2, 1300), + ("pool1", 3, 1250), + ("pool2", 3, 1370), + ("pool3", 3, 1300), + ]; + for (pool_id, epoch, stake) in stake_distribution { + let mut statement = connection.prepare(&query)?; + + statement.bind(1, *pool_id).unwrap(); + statement.bind(2, *epoch).unwrap(); + statement.bind(3, *stake).unwrap(); + statement.next().unwrap(); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { use super::*; #[test] @@ -357,54 +398,10 @@ mod tests { assert_eq!(vec![Value::Integer(5)], params); } - fn setup_db(connection: &Connection) -> Result<(), StdError> { - let migrations = get_migrations(); - let migration = - migrations - .iter() - .find(|&m| m.version == 1) - .ok_or_else(|| -> StdError { - "There should be a migration version 1".to_string().into() - })?; - let query = { - // leverage the expanded parameter from this provider which is unit - // tested on its own above. - let update_provider = UpdateStakePoolProvider::new(connection); - let (sql_values, _) = update_provider - .get_update_condition("pool_id", Epoch(1), 1000) - .expand(); - - connection.execute(&migration.alterations)?; - - format!("insert into stake_pool {sql_values}") - }; - let stake_distribution: &[(&str, i64, i64); 9] = &[ - ("pool1", 1, 1000), - ("pool2", 1, 1100), - ("pool3", 1, 1300), - ("pool1", 2, 1230), - ("pool2", 2, 1090), - ("pool3", 2, 1300), - ("pool1", 3, 1250), - ("pool2", 3, 1370), - ("pool3", 3, 1300), - ]; - for (pool_id, epoch, stake) in stake_distribution { - let mut statement = connection.prepare(&query)?; - - statement.bind(1, *pool_id).unwrap(); - statement.bind(2, *epoch).unwrap(); - statement.bind(3, *stake).unwrap(); - statement.next().unwrap(); - } - - Ok(()) - } - #[test] fn test_get_stake_pools() { let connection = Connection::open(":memory:").unwrap(); - setup_db(&connection).unwrap(); + setup_stake_db(&connection).unwrap(); let provider = StakePoolProvider::new(&connection); let mut cursor = provider.get_by_epoch(&Epoch(1)).unwrap(); @@ -430,7 +427,7 @@ mod tests { #[test] fn test_update_stakes() { let connection = Connection::open(":memory:").unwrap(); - setup_db(&connection).unwrap(); + setup_stake_db(&connection).unwrap(); let provider = UpdateStakePoolProvider::new(&connection); let stake_pool = provider.persist("pool4", Epoch(3), 9999).unwrap(); @@ -452,7 +449,7 @@ mod tests { #[test] fn test_prune() { let connection = Connection::open(":memory:").unwrap(); - setup_db(&connection).unwrap(); + setup_stake_db(&connection).unwrap(); let provider = DeleteStakePoolProvider::new(&connection); let cursor = provider.prune(Epoch(2)).unwrap(); diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index fff1ea3d83b..316893b3411 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -25,7 +25,7 @@ mod signer_registerer; mod snapshot_stores; mod snapshot_uploaders; mod snapshotter; -//pub mod stake_pools; +pub mod stake_distribution_service; mod store; mod tools; diff --git a/mithril-aggregator/src/multi_signer.rs b/mithril-aggregator/src/multi_signer.rs index e3866d680ea..9d37b0e60b9 100644 --- a/mithril-aggregator/src/multi_signer.rs +++ b/mithril-aggregator/src/multi_signer.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use chrono::prelude::*; use hex::ToHex; use slog_scope::{debug, trace, warn}; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use thiserror::Error; use mithril_common::{ @@ -12,7 +12,7 @@ use mithril_common::{ ProtocolPartyId, ProtocolRegistrationError, ProtocolSignerVerificationKey, ProtocolSingleSignature, ProtocolStakeDistribution, }, - entities::{self, Epoch, SignerWithStake}, + entities::{self, Epoch, SignerWithStake, StakeDistribution}, store::{StakeStorer, StoreError}, }; @@ -492,7 +492,7 @@ impl MultiSigner for MultiSignerImpl { .ok_or_else(ProtocolError::UnavailableBeacon)? .epoch .offset_to_recording_epoch(); - let stakes = HashMap::from_iter(stakes.iter().cloned()); + let stakes = StakeDistribution::from_iter(stakes.iter().cloned()); self.stake_store.save_stakes(epoch, stakes).await?; Ok(()) @@ -742,10 +742,7 @@ mod tests { None, ); let stake_store = StakeStore::new( - Box::new( - MemoryAdapter::>::new(None) - .unwrap(), - ), + Box::new(MemoryAdapter::::new(None).unwrap()), None, ); let single_signature_store = SingleSignatureStore::new( diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 26f85bdd1a0..3b7d1dff1fa 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -755,7 +755,6 @@ pub mod tests { use mithril_common::test_utils::MithrilFixtureBuilder; use mithril_common::{entities::ProtocolMessagePartKey, test_utils::fake_data}; use mithril_common::{BeaconProviderImpl, CardanoNetwork}; - use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use tempfile::NamedTempFile; @@ -876,7 +875,7 @@ pub mod tests { let beacon = fake_data::beacon(); let recording_epoch = beacon.epoch.offset_to_recording_epoch(); let stake_distribution: StakeDistribution = - HashMap::from([("a".to_string(), 5), ("b".to_string(), 10)]); + StakeDistribution::from([("a".to_string(), 5), ("b".to_string(), 10)]); stake_store .save_stakes(recording_epoch, stake_distribution.clone()) diff --git a/mithril-aggregator/src/stake_distribution_service.rs b/mithril-aggregator/src/stake_distribution_service.rs new file mode 100644 index 00000000000..68fe11913e1 --- /dev/null +++ b/mithril-aggregator/src/stake_distribution_service.rs @@ -0,0 +1,353 @@ +//! Stake Pool manager for the Runners +//! + +use std::{ + fmt::Display, + sync::{Arc, RwLock}, +}; + +use async_trait::async_trait; +use mithril_common::{ + chain_observer::ChainObserver, + entities::{Epoch, StakeDistribution}, + store::StakeStorer, + StdError, +}; +use tokio::sync::{Mutex, MutexGuard}; + +use crate::database::provider::StakePoolStore; + +/// Errors related to the [StakePoolDistributionService]. +#[derive(Debug)] +pub enum StakePoolDistributionServiceError { + /// Critical errors cannot be recovered. + Technical { + /// Error message + message: String, + /// Eventual nested error + error: Option, + }, + /// The stake distribution for the given Epoch is not available. + Unavailable(Epoch), + /// The stake distribution compute is in progress for this Epoch. + Busy(Epoch), +} + +impl StakePoolDistributionServiceError { + /// Simple way to nest technical errors + pub fn technical_subsystem(error: StdError) -> Box { + Box::new(Self::Technical { + message: "Stake pool service subsystem error occured.".to_string(), + error: Some(error), + }) + } +} + +impl TryFrom for StakePoolDistributionServiceError { + type Error = Box; + + fn try_from(value: StdError) -> Result { + Err(Box::new(Self::Technical { + message: "subsystem error".to_string(), + error: Some(value), + })) + } +} + +impl Display for StakePoolDistributionServiceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Technical { message, error } => { + if let Some(nested_error) = error { + write!( + f, + "Critical error: {message} (nested error: '{nested_error}')" + ) + } else { + write!(f, "Critical error: {message}") + } + } + Self::Unavailable(epoch) => { + write!( + f, + "The stake distribution for epoch {epoch:?} is not available." + ) + } + Self::Busy(epoch) => { + write!( + f, + "The stake distribution for epoch {epoch:?} is actually processed." + ) + } + } + } +} + +impl std::error::Error for StakePoolDistributionServiceError {} + +/// Responsible of synchronizing with Cardano stake distribution. +#[async_trait] +pub trait StakeDistributionService { + /// Return the stake distribution fot the given epoch. + async fn get_stake_distribution( + &self, + epoch: Epoch, + ) -> Result>; + + /// This launches the stake distribution computation if not already started. + async fn update_stake_distribution(&self) + -> Result<(), Box>; +} + +/// Token to manage stake distribution update +struct UpdateToken { + /// Stake distribution update semaphore + is_busy: Mutex<()>, + /// Last computed stake distribution + busy_on_epoch: RwLock, +} + +impl Default for UpdateToken { + fn default() -> Self { + Self { + is_busy: Mutex::new(()), + busy_on_epoch: RwLock::new(Epoch(0)), + } + } +} + +impl UpdateToken { + pub fn update(&self, epoch: Epoch) -> Result, StdError> { + let update_semaphore = self.is_busy.try_lock().map_err(|_| { + let last_updated_epoch = self.busy_on_epoch.read().unwrap(); + + StakePoolDistributionServiceError::Busy(*last_updated_epoch) + })?; + let mut last_updated_epoch = self.busy_on_epoch.write().unwrap(); + *last_updated_epoch = epoch; + + Ok(update_semaphore) + } + + pub fn is_busy(&self) -> Option { + if self.is_busy.try_lock().is_err() { + Some(*self.busy_on_epoch.read().unwrap()) + } else { + None + } + } +} +/// Implementation of the stake distribution service. +pub struct MithrilStakeDistributionService { + /// internal stake persistent layer + stake_store: Arc, + /// Chain interaction subsystem + chain_observer: Arc, + /// Lock management for updates + update_token: UpdateToken, +} + +impl MithrilStakeDistributionService { + /// Create a new service instance + pub fn new(stake_store: Arc, chain_observer: Arc) -> Self { + Self { + stake_store, + chain_observer, + update_token: UpdateToken::default(), + } + } +} + +#[async_trait] +impl StakeDistributionService for MithrilStakeDistributionService { + async fn get_stake_distribution( + &self, + epoch: Epoch, + ) -> Result> { + let stake_distribution = self + .stake_store + .get_stakes(epoch) + .await + .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))? + .ok_or_else(|| StakePoolDistributionServiceError::Technical { + message: "The stake distribution should be at least an empty list.".to_string(), + error: None, + })?; + + if !stake_distribution.is_empty() { + Ok(stake_distribution) + } else if let Some(last_epoch) = self.update_token.is_busy() { + if last_epoch == epoch { + Err(StakePoolDistributionServiceError::Busy(epoch).into()) + } else { + Err(StakePoolDistributionServiceError::Unavailable(epoch).into()) + } + } else { + Err(StakePoolDistributionServiceError::Unavailable(epoch).into()) + } + } + + async fn update_stake_distribution( + &self, + ) -> Result<(), Box> { + let current_epoch = self + .chain_observer + .get_current_epoch() + .await + .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))? + .expect("Chain observer get_current_epoch should never return None.") + .offset_to_recording_epoch(); + + match self.get_stake_distribution(current_epoch).await { + Ok(_) => return Ok(()), + Err(e) if matches!(*e, StakePoolDistributionServiceError::Unavailable(_)) => (), + Err(e) => return Err(e), + }; + let _mutex = self + .update_token + .update(current_epoch) + .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e))?; + let stake_distribution = self + .chain_observer + .get_current_stake_distribution() + .await + .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))? + .expect("ChainObserver get_current_stake_distribution should never return None."); + + let _ = self + .stake_store + .save_stakes(current_epoch, stake_distribution) + .await + .map_err(|e| StakePoolDistributionServiceError::technical_subsystem(e.into()))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use crate::database::provider::setup_stake_db; + + use super::*; + use mithril_common::chain_observer::MockChainObserver; + use sqlite::Connection; + + fn get_service(chain_observer: MockChainObserver) -> MithrilStakeDistributionService { + let connection = Connection::open(":memory:").unwrap(); + setup_stake_db(&connection).unwrap(); + let stake_store = StakePoolStore::new(Arc::new(Mutex::new(connection))); + + MithrilStakeDistributionService::new(Arc::new(stake_store), Arc::new(chain_observer)) + } + + #[tokio::test] + async fn get_current_stake_distribution() { + let chain_observer = MockChainObserver::new(); + let service = get_service(chain_observer); + let expected_stake_distribution: StakeDistribution = + [("pool2", 1370), ("pool3", 1300), ("pool1", 1250)] + .into_iter() + .map(|(pool_id, stake)| (pool_id.to_string(), stake as u64)) + .collect(); + + assert_eq!( + expected_stake_distribution, + service.get_stake_distribution(Epoch(3)).await.unwrap() + ); + } + + #[tokio::test] + async fn get_unavailable_stake_distribution() { + let chain_observer = MockChainObserver::new(); + let service = get_service(chain_observer); + let result = service.get_stake_distribution(Epoch(5)).await.unwrap_err(); + + assert!(matches!( + *result, + StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 5 + )); + } + + #[tokio::test] + async fn update_stake_distribution_ok() { + let expected_stake_distribution = StakeDistribution::from_iter( + [("pool1", 2000), ("pool2", 2000), ("pool3", 2000)] + .into_iter() + .map(|(p, s)| (p.to_string(), s as u64)), + ); + let returned_stake_distribution = expected_stake_distribution.clone(); + let mut chain_observer = MockChainObserver::new(); + chain_observer + .expect_get_current_epoch() + .returning(|| Ok(Some(Epoch(3)))); + chain_observer + .expect_get_current_stake_distribution() + .return_once(|| Ok(Some(returned_stake_distribution))); + let service = get_service(chain_observer); + service.update_stake_distribution().await.unwrap(); + let sd = service.get_stake_distribution(Epoch(4)).await.unwrap(); + + assert_eq!(expected_stake_distribution, sd); + } + + #[tokio::test] + async fn update_stake_distribution_already() { + let mut chain_observer = MockChainObserver::new(); + chain_observer + .expect_get_current_epoch() + .returning(|| Ok(Some(Epoch(2)))) + .times(1); + let service = get_service(chain_observer); + service.update_stake_distribution().await.unwrap(); + } + + #[tokio::test] + async fn get_not_ready_yet() { + let mut chain_observer = MockChainObserver::new(); + chain_observer + .expect_get_current_epoch() + .returning(|| Ok(Some(Epoch(3)))); + let service = get_service(chain_observer); + let _mutex = service.update_token.update(Epoch(4)).unwrap(); + let result = service.get_stake_distribution(Epoch(4)).await.unwrap_err(); + + assert!(matches!( + *result, + StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4 + )); + } + + #[tokio::test] + async fn get_not_ready_but_unavailable() { + let mut chain_observer = MockChainObserver::new(); + chain_observer + .expect_get_current_epoch() + .returning(|| Ok(Some(Epoch(3)))); + let service = get_service(chain_observer); + let _mutex = service.update_token.update(Epoch(4)).unwrap(); + let result = service.get_stake_distribution(Epoch(0)).await.unwrap_err(); + + assert!(matches!( + *result, + StakePoolDistributionServiceError::Unavailable(Epoch(x)) if x == 0 + )); + } + + #[tokio::test] + async fn update_but_busy() { + let mut chain_observer = MockChainObserver::new(); + chain_observer + .expect_get_current_epoch() + .returning(|| Ok(Some(Epoch(3)))); + let service = get_service(chain_observer); + let _mutex = service.update_token.update(Epoch(4)).unwrap(); + let result = service.update_stake_distribution().await.unwrap_err(); + + assert!(matches!( + *result, + StakePoolDistributionServiceError::Busy(Epoch(x)) if x == 4 + )); + } +} diff --git a/mithril-aggregator/tests/certificate_chain.rs b/mithril-aggregator/tests/certificate_chain.rs index 5bf64862743..b752c6316a2 100644 --- a/mithril-aggregator/tests/certificate_chain.rs +++ b/mithril-aggregator/tests/certificate_chain.rs @@ -1,13 +1,8 @@ mod test_extensions; -use std::collections::BTreeMap; - use mithril_aggregator::VerificationKeyStorer; use mithril_common::{ - chain_observer::ChainObserver, - crypto_helper::{ProtocolPartyId, ProtocolStake}, - entities::ProtocolParameters, - test_utils::MithrilFixtureBuilder, + chain_observer::ChainObserver, entities::ProtocolParameters, test_utils::MithrilFixtureBuilder, }; use test_extensions::RuntimeTester; @@ -151,22 +146,8 @@ async fn certificate_chain() { "The new epoch certificate should be linked to the first certificate of the previous epoch" ); assert_eq!( - BTreeMap::from_iter( - last_certificates[0] - .metadata - .get_stake_distribution() - .into_iter() - .collect::>() - .into_iter(), - ), - BTreeMap::from_iter( - last_certificates[2] - .metadata - .get_stake_distribution() - .into_iter() - .collect::>() - .into_iter(), - ), + last_certificates[0].metadata.get_stake_distribution(), + last_certificates[2].metadata.get_stake_distribution(), "The stake distribution update should only be taken into account at the next epoch", ); @@ -214,22 +195,8 @@ async fn certificate_chain() { "The new epoch certificate should be linked to the first certificate of the previous epoch" ); assert_ne!( - BTreeMap::from_iter( - last_certificates[0] - .metadata - .get_stake_distribution() - .into_iter() - .collect::>() - .into_iter(), - ), - BTreeMap::from_iter( - last_certificates[2] - .metadata - .get_stake_distribution() - .into_iter() - .collect::>() - .into_iter(), - ), + last_certificates[0].metadata.get_stake_distribution(), + last_certificates[2].metadata.get_stake_distribution(), "The stake distribution update should have been applied for this epoch", ); } diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index b83a4f29cbd..c27fcaee215 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -7,7 +7,6 @@ use mithril_common::test_utils::{ MithrilFixtureBuilder, SignerFixture, StakeDistributionGenerationMethod, }; use slog::Drain; -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedReceiver; @@ -20,7 +19,7 @@ use mithril_common::crypto_helper::{key_encode_hex, ProtocolClerk, ProtocolGenes use mithril_common::digesters::DumbImmutableFileObserver; use mithril_common::entities::{ Certificate, Epoch, ImmutableFileNumber, ProtocolParameters, SignerWithStake, SingleSignatures, - Snapshot, + Snapshot, StakeDistribution, }; use mithril_common::{chain_observer::FakeObserver, digesters::DumbImmutableDigester}; @@ -314,7 +313,7 @@ impl RuntimeTester { .with_signers(signers_with_stake.len()) .with_protocol_parameters(protocol_parameters) .with_stake_distribution(StakeDistributionGenerationMethod::Custom( - HashMap::from_iter( + StakeDistribution::from_iter( signers_with_stake .into_iter() .map(|s| (s.party_id, s.stake)), diff --git a/mithril-common/src/chain_observer/mod.rs b/mithril-common/src/chain_observer/mod.rs index 314fe479d28..2ae725e91a0 100644 --- a/mithril-common/src/chain_observer/mod.rs +++ b/mithril-common/src/chain_observer/mod.rs @@ -9,7 +9,7 @@ mod model; pub use cli_observer::{CardanoCliChainObserver, CardanoCliRunner}; #[cfg(any(test, feature = "test_only"))] pub use fake_observer::FakeObserver; -pub use interface::{ChainObserver, ChainObserverError}; +pub use interface::{ChainObserver, ChainObserverError, MockChainObserver}; pub use model::{ ChainAddress, TxDatum, TxDatumBuilder, TxDatumError, TxDatumFieldTypeName, TxDatumFieldValue, }; diff --git a/mithril-common/src/entities/type_alias.rs b/mithril-common/src/entities/type_alias.rs index a694e7507e0..10116d67ff3 100644 --- a/mithril-common/src/entities/type_alias.rs +++ b/mithril-common/src/entities/type_alias.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::BTreeMap; /// ImmutableFileNumber represents the id of immutable files in the Cardano node database pub type ImmutableFileNumber = u64; @@ -13,7 +13,7 @@ pub type PartyId = String; pub type Stake = u64; /// StakeDistribution represents the stakes of multiple participants in the Cardano chain -pub type StakeDistribution = HashMap; +pub type StakeDistribution = BTreeMap; /// LotteryIndex represents the index of a Mithril single signature lottery pub type LotteryIndex = u64; diff --git a/mithril-common/src/store/stake_store.rs b/mithril-common/src/store/stake_store.rs index b93442eb6c0..07282379d8f 100644 --- a/mithril-common/src/store/stake_store.rs +++ b/mithril-common/src/store/stake_store.rs @@ -81,8 +81,6 @@ impl StakeStorer for StakeStore { #[cfg(test)] mod tests { - use std::collections::HashMap; - use super::super::adapter::MemoryAdapter; use super::*; @@ -94,7 +92,7 @@ mod tests { let mut values: Vec<(Epoch, StakeDistribution)> = Vec::new(); for epoch in 1..=nb_epoch { - let mut signers: StakeDistribution = HashMap::new(); + let mut signers: StakeDistribution = StakeDistribution::new(); for party_idx in 1..=signers_per_epoch { let party_id = format!("{party_idx}"); @@ -116,7 +114,7 @@ mod tests { async fn save_key_in_empty_store() { let store = init_store(0, 0, None); let res = store - .save_stakes(Epoch(1), HashMap::from([("1".to_string(), 123)])) + .save_stakes(Epoch(1), StakeDistribution::from([("1".to_string(), 123)])) .await .expect("Test adapter should not fail."); @@ -127,12 +125,12 @@ mod tests { async fn update_signer_in_store() { let store = init_store(1, 1, None); let res = store - .save_stakes(Epoch(1), HashMap::from([("1".to_string(), 123)])) + .save_stakes(Epoch(1), StakeDistribution::from([("1".to_string(), 123)])) .await .expect("Test adapter should not fail."); assert_eq!( - HashMap::from([("1".to_string(), 101)]), + StakeDistribution::from([("1".to_string(), 101)]), res.expect("the result should not be empty"), ); } @@ -164,7 +162,7 @@ mod tests { async fn check_retention_limit() { let store = init_store(2, 2, Some(2)); let _res = store - .save_stakes(Epoch(3), HashMap::from([("1".to_string(), 123)])) + .save_stakes(Epoch(3), StakeDistribution::from([("1".to_string(), 123)])) .await .unwrap(); assert!(store.get_stakes(Epoch(1)).await.unwrap().is_none()); diff --git a/mithril-common/src/test_utils/mithril_fixture.rs b/mithril-common/src/test_utils/mithril_fixture.rs index 3cbb8288130..8cd4509f75e 100644 --- a/mithril-common/src/test_utils/mithril_fixture.rs +++ b/mithril-common/src/test_utils/mithril_fixture.rs @@ -2,7 +2,6 @@ use crate::{ crypto_helper::{ProtocolInitializer, ProtocolSigner, ProtocolStakeDistribution}, entities::{ProtocolParameters, Signer, SignerWithStake, StakeDistribution}, }; -use std::collections::HashMap; /// A fixture of Mithril data types. #[derive(Debug, Clone)] @@ -70,7 +69,7 @@ impl MithrilFixture { /// Get the fixture stake distribution. pub fn stake_distribution(&self) -> StakeDistribution { - HashMap::from_iter(self.stake_distribution.clone()) + StakeDistribution::from_iter(self.stake_distribution.clone()) } /// Get the fixture protocol stake distribution. From 91853b5a6e01ad14b6dd23524e62df4e2d7260a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Wed, 22 Mar 2023 12:10:53 +0100 Subject: [PATCH 2/3] use stake distribution service in dependencies --- mithril-aggregator/src/command_args.rs | 56 ++++--- .../src/database/provider/stake_pool.rs | 13 +- mithril-aggregator/src/dependency.rs | 71 ++++++--- mithril-aggregator/src/lib.rs | 2 +- mithril-aggregator/src/runtime/runner.rs | 2 + .../src/stake_distribution_service.rs | 4 +- .../tests/test_extensions/dependency.rs | 27 ++-- .../src/database/version_checker.rs | 141 ++++++++++-------- .../src/store/adapter/sqlite_adapter.rs | 43 ++---- mithril-signer/src/runtime/signer_services.rs | 6 +- 10 files changed, 191 insertions(+), 174 deletions(-) diff --git a/mithril-aggregator/src/command_args.rs b/mithril-aggregator/src/command_args.rs index 3a707aba318..9d9b313b1d8 100644 --- a/mithril-aggregator/src/command_args.rs +++ b/mithril-aggregator/src/command_args.rs @@ -3,16 +3,9 @@ use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value, ValueKind use slog::Level; use slog_scope::{crit, debug, info}; use sqlite::Connection; -use std::{ - error::Error, - ffi::OsStr, - fs, - net::IpAddr, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{error::Error, ffi::OsStr, fs, net::IpAddr, path::PathBuf, sync::Arc}; use tokio::{ - sync::{oneshot, RwLock}, + sync::{oneshot, Mutex, RwLock}, task::JoinSet, time::Duration, }; @@ -40,6 +33,7 @@ use crate::{ database::provider::StakePoolStore, event_store::{self, TransmitterService}, http_server::routes::router, + stake_distribution_service::MithrilStakeDistributionService, tools::{EraTools, GenesisTools, GenesisToolsDependency}, AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, CertificateStore, Configuration, DefaultConfiguration, DependencyManager, GenesisConfiguration, @@ -49,12 +43,10 @@ use crate::{ const SQLITE_MONITORING_FILE: &str = "monitoring.sqlite3"; -fn setup_genesis_dependencies( +async fn setup_genesis_dependencies( config: &GenesisConfiguration, ) -> Result> { let sqlite_db_path = Some(config.get_sqlite_file()); - check_database_migration(config.get_sqlite_file())?; - let chain_observer = Arc::new( mithril_common::chain_observer::CardanoCliChainObserver::new(Box::new( CardanoCliRunner::new( @@ -70,6 +62,11 @@ fn setup_genesis_dependencies( .map(|path| path.as_os_str()) .unwrap_or(OsStr::new(":memory:")), )?)); + + // DATABASE MIGRATION + check_database_migration(sqlite_connection.clone()).await?; + + let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone())); let immutable_file_observer = Arc::new(ImmutableFileSystemObserver::new(&config.db_directory)); let beacon_provider = Arc::new(BeaconProviderImpl::new( chain_observer, @@ -99,7 +96,6 @@ fn setup_genesis_dependencies( )?), config.store_retention_limit, )); - let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone())); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(SQLiteAdapter::new("single_signature", sqlite_connection)?), config.store_retention_limit, @@ -159,18 +155,23 @@ async fn do_first_launch_initialization_if_needed( /// Database version checker. /// This is the place where migrations are to be registered. -fn check_database_migration(sql_file_path: PathBuf) -> Result<(), Box> { +pub async fn check_database_migration( + connection: Arc>, +) -> Result<(), Box> { let mut db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - sql_file_path, + connection, ); for migration in crate::database::migration::get_migrations() { db_checker.add_migration(migration); } - db_checker.apply().map_err(|e| -> Box { e }) + db_checker + .apply() + .await + .map_err(|e| -> Box { e }) } /// Mithril Aggregator Node @@ -345,11 +346,14 @@ impl ServeCommand { .try_deserialize() .map_err(|e| format!("configuration deserialize error: {e}"))?; debug!("SERVE command"; "config" => format!("{config:?}")); - check_database_migration(config.get_sqlite_file())?; // Init dependencies let sqlite_db_path = config.get_sqlite_file(); let sqlite_connection = Arc::new(Mutex::new(Connection::open(sqlite_db_path)?)); + + // DATABASE MIGRATION + check_database_migration(sqlite_connection.clone()).await?; + let snapshot_store = Arc::new(LocalSnapshotStore::new( Box::new(SQLiteAdapter::new("snapshot", sqlite_connection.clone())?), LIST_SNAPSHOTS_MAX_ITEMS, @@ -370,7 +374,6 @@ impl ServeCommand { )?), config.store_retention_limit, )); - let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone())); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(SQLiteAdapter::new( "single_signature", @@ -381,7 +384,7 @@ impl ServeCommand { let protocol_parameters_store = Arc::new(ProtocolParametersStore::new( Box::new(SQLiteAdapter::new( "protocol_parameters", - sqlite_connection, + sqlite_connection.clone(), )?), config.store_retention_limit, )); @@ -394,6 +397,11 @@ impl ServeCommand { ), )), ); + let stake_store = Arc::new(StakePoolStore::new(sqlite_connection.clone())); + let stake_distribution_service = Arc::new(MithrilStakeDistributionService::new( + stake_store.clone(), + chain_observer.clone(), + )); let immutable_file_observer = Arc::new(ImmutableFileSystemObserver::new(&config.db_directory)); let beacon_provider = Arc::new(BeaconProviderImpl::new( @@ -452,13 +460,14 @@ impl ServeCommand { // Init dependency manager let dependency_manager = DependencyManager { config: config.clone(), + sqlite_connection, + stake_store, snapshot_store: snapshot_store.clone(), snapshot_uploader: snapshot_uploader.clone(), multi_signer: multi_signer.clone(), certificate_pending_store: certificate_pending_store.clone(), certificate_store: certificate_store.clone(), verification_key_store: verification_key_store.clone(), - stake_store: stake_store.clone(), single_signature_store: single_signature_store.clone(), protocol_parameters_store: protocol_parameters_store.clone(), chain_observer: chain_observer.clone(), @@ -474,6 +483,7 @@ impl ServeCommand { era_reader: era_reader.clone(), event_transmitter, api_version_provider, + stake_distribution_service, }; let dependency_manager = Arc::new(dependency_manager); @@ -633,7 +643,7 @@ impl ExportGenesisSubCommand { "Genesis export payload to sign to {}", self.target_path.display() ); - let dependencies = setup_genesis_dependencies(&config)?; + let dependencies = setup_genesis_dependencies(&config).await?; let genesis_tools = GenesisTools::from_dependencies(dependencies).await?; genesis_tools.export_payload_to_sign(&self.target_path) @@ -662,7 +672,7 @@ impl ImportGenesisSubCommand { "Genesis import signed payload from {}", self.signed_payload_path.to_string_lossy() ); - let dependencies = setup_genesis_dependencies(&config)?; + let dependencies = setup_genesis_dependencies(&config).await?; let genesis_tools = GenesisTools::from_dependencies(dependencies).await?; genesis_tools @@ -690,7 +700,7 @@ impl BootstrapGenesisSubCommand { .map_err(|e| format!("configuration deserialize error: {e}"))?; debug!("BOOTSTRAP GENESIS command"; "config" => format!("{config:?}")); println!("Genesis bootstrap for test only!"); - let dependencies = setup_genesis_dependencies(&config)?; + let dependencies = setup_genesis_dependencies(&config).await?; let genesis_tools = GenesisTools::from_dependencies(dependencies).await?; let genesis_secret_key = key_decode_hex(&self.genesis_secret_key)?; diff --git a/mithril-aggregator/src/database/provider/stake_pool.rs b/mithril-aggregator/src/database/provider/stake_pool.rs index 807b6f91004..a9ca6d57c2b 100644 --- a/mithril-aggregator/src/database/provider/stake_pool.rs +++ b/mithril-aggregator/src/database/provider/stake_pool.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use async_trait::async_trait; use chrono::NaiveDateTime; @@ -14,6 +14,7 @@ use mithril_common::{ }; use mithril_common::StdError; +use tokio::sync::Mutex; /// Delete stake pools for Epoch older than this. const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(2); @@ -246,10 +247,7 @@ impl StakeStorer for StakePoolStore { epoch: Epoch, stakes: StakeDistribution, ) -> Result, StoreError> { - let connection = &*self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let connection = &*self.connection.lock().await; let provider = UpdateStakePoolProvider::new(connection); let mut new_stakes = StakeDistribution::new(); connection @@ -276,10 +274,7 @@ impl StakeStorer for StakePoolStore { } async fn get_stakes(&self, epoch: Epoch) -> Result, StoreError> { - let connection = &*self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let connection = &*self.connection.lock().await; let provider = StakePoolProvider::new(connection); let cursor = provider .get_by_epoch(&epoch) diff --git a/mithril-aggregator/src/dependency.rs b/mithril-aggregator/src/dependency.rs index 7192f80c146..549aa6c8df2 100644 --- a/mithril-aggregator/src/dependency.rs +++ b/mithril-aggregator/src/dependency.rs @@ -11,18 +11,22 @@ use mithril_common::{ store::StakeStorer, BeaconProvider, }; +use sqlite::Connection; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; -use crate::snapshot_uploaders::SnapshotUploader; use crate::{ - configuration::*, CertificatePendingStore, CertificateStore, ProtocolParametersStore, - ProtocolParametersStorer, SignerRegisterer, SignerRegistrationRoundOpener, - SingleSignatureStore, Snapshotter, VerificationKeyStore, VerificationKeyStorer, + configuration::*, database::provider::StakePoolStore, CertificatePendingStore, + CertificateStore, ProtocolParametersStore, ProtocolParametersStorer, SignerRegisterer, + SignerRegistrationRoundOpener, SingleSignatureStore, Snapshotter, VerificationKeyStore, + VerificationKeyStorer, }; use crate::{event_store::EventMessage, snapshot_stores::SnapshotStore}; use crate::{event_store::TransmitterService, multi_signer::MultiSigner}; +use crate::{ + snapshot_uploaders::SnapshotUploader, stake_distribution_service::StakeDistributionService, +}; /// MultiSignerWrapper wraps a MultiSigner pub type MultiSignerWrapper = Arc>; @@ -32,6 +36,15 @@ pub struct DependencyManager { /// Configuration structure. pub config: Configuration, + /// SQLite database connection + /// This is not a real service but is is needed to instanciate all store + /// services. Shall be private dependency. + pub sqlite_connection: Arc>, + + /// Stake Store used by the StakeDistributionService + /// It shall be a private dependency. + pub stake_store: Arc, + /// Snapshot store. pub snapshot_store: Arc, @@ -50,9 +63,6 @@ pub struct DependencyManager { /// Verification key store. pub verification_key_store: Arc, - /// Stake store. - pub stake_store: Arc, - /// Signer single signature store. pub single_signature_store: Arc, @@ -97,6 +107,9 @@ pub struct DependencyManager { /// API Version provider pub api_version_provider: Arc, + + /// Stake Distribution Service + pub stake_distribution_service: Arc, } #[doc(hidden)] @@ -167,7 +180,7 @@ impl DependencyManager { for (epoch, params) in parameters_per_epoch { self.fill_verification_key_store(epoch, ¶ms.0).await; - self.fill_stakes_store(epoch, ¶ms.0).await; + self.fill_stakes_store(epoch, params.0.to_vec()).await; self.protocol_parameters_store .save_protocol_parameters(epoch, params.1) .await @@ -215,7 +228,7 @@ impl DependencyManager { (epoch_to_sign, second_epoch_signers), ] { self.fill_verification_key_store(epoch, &signers).await; - self.fill_stakes_store(epoch, &signers).await; + self.fill_stakes_store(epoch, signers).await; } self.init_protocol_parameter_store(genesis_protocol_parameters) @@ -252,8 +265,9 @@ impl DependencyManager { } } - async fn fill_stakes_store(&self, target_epoch: Epoch, signers: &[SignerWithStake]) { - self.stake_store + async fn fill_stakes_store(&self, target_epoch: Epoch, signers: Vec) { + let _ = self + .stake_store .save_stakes( target_epoch, signers @@ -269,10 +283,13 @@ impl DependencyManager { #[cfg(test)] pub mod tests { use crate::{ - event_store::TransmitterService, AggregatorConfig, CertificatePendingStore, - CertificateStore, Configuration, DependencyManager, DumbSnapshotUploader, DumbSnapshotter, - LocalSnapshotStore, MithrilSignerRegisterer, MultiSignerImpl, ProtocolParametersStore, - SingleSignatureStore, SnapshotUploaderType, VerificationKeyStore, + check_database_migration, database::provider::StakePoolStore, + event_store::TransmitterService, + stake_distribution_service::MithrilStakeDistributionService, AggregatorConfig, + CertificatePendingStore, CertificateStore, Configuration, DependencyManager, + DumbSnapshotUploader, DumbSnapshotter, LocalSnapshotStore, MithrilSignerRegisterer, + MultiSignerImpl, ProtocolParametersStore, SingleSignatureStore, SnapshotUploaderType, + VerificationKeyStore, }; use mithril_common::{ api_version::APIVersionProvider, @@ -284,14 +301,15 @@ pub mod tests { adapters::{EraReaderAdapterType, EraReaderBootstrapAdapter}, EraChecker, EraReader, }, - store::{adapter::MemoryAdapter, StakeStore}, + store::adapter::MemoryAdapter, test_utils::fake_data, BeaconProvider, BeaconProviderImpl, CardanoNetwork, }; + use sqlite::Connection; use std::{path::PathBuf, sync::Arc}; use tokio::sync::{ mpsc::{self}, - RwLock, + Mutex, RwLock, }; pub async fn initialize_dependencies() -> (DependencyManager, AggregatorConfig) { @@ -319,6 +337,8 @@ pub mod tests { era_reader_adapter_type: EraReaderAdapterType::Bootstrap, era_reader_adapter_params: None, }; + let connection = Arc::new(Mutex::new(Connection::open(":memory:").unwrap())); + check_database_migration(connection.clone()).await.unwrap(); let snapshot_store = Arc::new(LocalSnapshotStore::new( Box::new(MemoryAdapter::new(None).unwrap()), 20, @@ -334,10 +354,6 @@ pub mod tests { Box::new(MemoryAdapter::new(None).unwrap()), config.store_retention_limit, )); - let stake_store = Arc::new(StakeStore::new( - Box::new(MemoryAdapter::new(None).unwrap()), - config.store_retention_limit, - )); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(MemoryAdapter::new(None).unwrap()), config.store_retention_limit, @@ -346,9 +362,10 @@ pub mod tests { Box::new(MemoryAdapter::new(None).unwrap()), None, )); + let stake_pool_store = Arc::new(StakePoolStore::new(connection.clone())); let multi_signer = MultiSignerImpl::new( verification_key_store.clone(), - stake_store.clone(), + stake_pool_store.clone(), single_signature_store.clone(), protocol_parameters_store.clone(), ); @@ -360,6 +377,10 @@ pub mod tests { immutable_file_observer.clone(), CardanoNetwork::TestNet(42), )); + let stake_distribution_service = Arc::new(MithrilStakeDistributionService::new( + stake_pool_store.clone(), + chain_observer.clone(), + )); let certificate_verifier = Arc::new(MithrilCertificateVerifier::new(slog_scope::logger())); let signer_registerer = Arc::new(MithrilSignerRegisterer::new( chain_observer.clone(), @@ -382,6 +403,8 @@ pub mod tests { let api_version_provider = Arc::new(APIVersionProvider::new(era_checker.clone())); let dependency_manager = DependencyManager { + sqlite_connection: connection, + stake_store: stake_pool_store, config, snapshot_store, snapshot_uploader, @@ -389,7 +412,6 @@ pub mod tests { certificate_pending_store, certificate_store, verification_key_store, - stake_store, single_signature_store, protocol_parameters_store, chain_observer, @@ -405,6 +427,7 @@ pub mod tests { era_reader, event_transmitter, api_version_provider, + stake_distribution_service, }; let config = AggregatorConfig::new( diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index 316893b3411..a2c549064a6 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -35,7 +35,7 @@ pub use crate::configuration::{ pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError}; pub use crate::snapshot_stores::{LocalSnapshotStore, SnapshotStore}; pub use certificate_creator::{CertificateCreator, MithrilCertificateCreator}; -pub use command_args::MainOpts; +pub use command_args::{check_database_migration, MainOpts}; pub use dependency::DependencyManager; pub use message_adapters::{ FromRegisterSignerAdapter, ToCertificatePendingMessageAdapter, ToEpochSettingsMessageAdapter, diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs index 3b7d1dff1fa..d229cb5178f 100644 --- a/mithril-aggregator/src/runtime/runner.rs +++ b/mithril-aggregator/src/runtime/runner.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use chrono::Utc; use mithril_common::entities::Epoch; use mithril_common::entities::PartyId; +use mithril_common::store::StakeStorer; use slog_scope::{debug, info, warn}; use mithril_common::crypto_helper::ProtocolStakeDistribution; @@ -752,6 +753,7 @@ pub mod tests { use mithril_common::entities::{ Beacon, CertificatePending, HexEncodedKey, ProtocolMessage, StakeDistribution, }; + use mithril_common::store::StakeStorer; use mithril_common::test_utils::MithrilFixtureBuilder; use mithril_common::{entities::ProtocolMessagePartKey, test_utils::fake_data}; use mithril_common::{BeaconProviderImpl, CardanoNetwork}; diff --git a/mithril-aggregator/src/stake_distribution_service.rs b/mithril-aggregator/src/stake_distribution_service.rs index 68fe11913e1..184daa04f63 100644 --- a/mithril-aggregator/src/stake_distribution_service.rs +++ b/mithril-aggregator/src/stake_distribution_service.rs @@ -87,7 +87,7 @@ impl std::error::Error for StakePoolDistributionServiceError {} /// Responsible of synchronizing with Cardano stake distribution. #[async_trait] -pub trait StakeDistributionService { +pub trait StakeDistributionService: Sync + Send { /// Return the stake distribution fot the given epoch. async fn get_stake_distribution( &self, @@ -226,8 +226,6 @@ impl StakeDistributionService for MithrilStakeDistributionService { #[cfg(test)] mod tests { - use std::sync::Mutex; - use crate::database::provider::setup_stake_db; use super::*; diff --git a/mithril-aggregator/tests/test_extensions/dependency.rs b/mithril-aggregator/tests/test_extensions/dependency.rs index 033261f42aa..e439078ca45 100644 --- a/mithril-aggregator/tests/test_extensions/dependency.rs +++ b/mithril-aggregator/tests/test_extensions/dependency.rs @@ -1,9 +1,11 @@ +use mithril_aggregator::database::provider::StakePoolStore; use mithril_aggregator::event_store::{EventMessage, TransmitterService}; +use mithril_aggregator::stake_distribution_service::MithrilStakeDistributionService; use mithril_aggregator::{ - AggregatorConfig, CertificatePendingStore, CertificateStore, Configuration, DependencyManager, - DumbSnapshotUploader, DumbSnapshotter, LocalSnapshotStore, MithrilSignerRegisterer, - MultiSignerImpl, ProtocolParametersStore, SingleSignatureStore, SnapshotUploaderType, - VerificationKeyStore, + check_database_migration, AggregatorConfig, CertificatePendingStore, CertificateStore, + Configuration, DependencyManager, DumbSnapshotUploader, DumbSnapshotter, LocalSnapshotStore, + MithrilSignerRegisterer, MultiSignerImpl, ProtocolParametersStore, SingleSignatureStore, + SnapshotUploaderType, VerificationKeyStore, }; use mithril_common::api_version::APIVersionProvider; use mithril_common::certificate_chain::MithrilCertificateVerifier; @@ -15,12 +17,12 @@ use mithril_common::era::{ adapters::EraReaderAdapterType, EraChecker, EraReader, EraReaderAdapter, }; use mithril_common::store::adapter::MemoryAdapter; -use mithril_common::store::StakeStore; use mithril_common::{BeaconProvider, BeaconProviderImpl, CardanoNetwork}; +use sqlite::Connection; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; // TODO: remove this allow, create a real dependency injection builder. #[allow(clippy::too_many_arguments)] @@ -61,6 +63,8 @@ pub async fn initialize_dependencies( era_reader_adapter_type: EraReaderAdapterType::Bootstrap, era_reader_adapter_params: None, }; + let connection = Arc::new(Mutex::new(Connection::open(":memory:").unwrap())); + check_database_migration(connection.clone()).await.unwrap(); let certificate_pending_store = Arc::new(CertificatePendingStore::new(Box::new( MemoryAdapter::new(None).unwrap(), ))); @@ -71,9 +75,10 @@ pub async fn initialize_dependencies( Box::new(MemoryAdapter::new(None).unwrap()), config.store_retention_limit, )); - let stake_store = Arc::new(StakeStore::new( - Box::new(MemoryAdapter::new(None).unwrap()), - config.store_retention_limit, + let stake_store = Arc::new(StakePoolStore::new(connection.clone())); + let stake_distribution_service = Arc::new(MithrilStakeDistributionService::new( + stake_store.clone(), + chain_observer.clone(), )); let single_signature_store = Arc::new(SingleSignatureStore::new( Box::new(MemoryAdapter::new(None).unwrap()), @@ -123,13 +128,14 @@ pub async fn initialize_dependencies( let dependency_manager = DependencyManager { config, + sqlite_connection: connection, + stake_store, snapshot_store, snapshot_uploader, multi_signer, certificate_pending_store, certificate_store, verification_key_store, - stake_store, single_signature_store, protocol_parameters_store, chain_observer, @@ -145,6 +151,7 @@ pub async fn initialize_dependencies( era_reader, event_transmitter, api_version_provider, + stake_distribution_service, }; let config = AggregatorConfig::new( diff --git a/mithril-common/src/database/version_checker.rs b/mithril-common/src/database/version_checker.rs index 6d87657bb4b..70a391fa3f4 100644 --- a/mithril-common/src/database/version_checker.rs +++ b/mithril-common/src/database/version_checker.rs @@ -2,8 +2,11 @@ use chrono::Local; use slog::{debug, error}; use slog::{info, Logger}; use sqlite::Connection; +use tokio::sync::Mutex; -use std::{cmp::Ordering, collections::BTreeSet, path::PathBuf}; +use std::ops::Deref; +use std::sync::Arc; +use std::{cmp::Ordering, collections::BTreeSet}; use crate::StdError; @@ -13,10 +16,9 @@ use super::{ }; /// Struct to perform application version check in the database. -#[derive(Debug)] pub struct DatabaseVersionChecker { /// Pathbuf to the SQLite3 file. - sqlite_file_path: PathBuf, + connection: Arc>, /// Application type which vesion is verified. application_type: ApplicationNodeType, @@ -33,12 +35,12 @@ impl DatabaseVersionChecker { pub fn new( logger: Logger, application_type: ApplicationNodeType, - sqlite_file_path: PathBuf, + connection: Arc>, ) -> Self { let migrations = BTreeSet::new(); Self { - sqlite_file_path, + connection, application_type, logger, migrations, @@ -52,18 +54,14 @@ impl DatabaseVersionChecker { self } - /// Performs an actual version check in the database. This method creates a - /// connection to the SQLite3 file and drops it at the end. - pub fn apply(&self) -> Result<(), StdError> { - debug!( - &self.logger, - "check database version, database file = '{}'", - self.sqlite_file_path.display() - ); - let connection = Connection::open(&self.sqlite_file_path)?; - let provider = DatabaseVersionProvider::new(&connection); + /// Apply migrations + pub async fn apply(&self) -> Result<(), StdError> { + debug!(&self.logger, "check database version",); + let lock = self.connection.lock().await; + let connection = lock.deref(); + let provider = DatabaseVersionProvider::new(connection); provider.create_table_if_not_exists(&self.application_type)?; - let updater = DatabaseVersionUpdater::new(&connection); + let updater = DatabaseVersionUpdater::new(connection); let db_version = provider .get_application_version(&self.application_type)? .unwrap(); // At least a record exists. @@ -80,7 +78,7 @@ impl DatabaseVersionChecker { "Database needs upgrade from version '{}' to version '{}', applying new migrations…", db_version.version, migration_version ); - self.apply_migrations(&db_version, &updater, &connection)?; + self.apply_migrations(&db_version, &updater, connection)?; info!( &self.logger, "database upgraded to version '{}'", migration_version @@ -171,11 +169,13 @@ impl Eq for SqlMigration {} #[cfg(test)] mod tests { + use std::path::PathBuf; + use super::*; - fn check_database_version(filepath: &PathBuf, db_version: DbVersion) { - let connection = Connection::open(filepath).unwrap(); - let provider = DatabaseVersionProvider::new(&connection); + async fn check_database_version(connection: Arc>, db_version: DbVersion) { + let lock = connection.lock().await; + let provider = DatabaseVersionProvider::new(lock.deref()); let version = provider .get_application_version(&ApplicationNodeType::Aggregator) .unwrap() @@ -184,7 +184,7 @@ mod tests { assert_eq!(db_version, version.version); } - fn create_sqlite_file(name: &str) -> PathBuf { + fn create_sqlite_file(name: &str) -> Result<(PathBuf, Connection), StdError> { let dirpath = std::env::temp_dir().join("mithril_test_database"); std::fs::create_dir_all(&dirpath).unwrap(); let filepath = dirpath.join(name); @@ -193,11 +193,14 @@ mod tests { std::fs::remove_file(filepath.as_path()).unwrap(); } - filepath + let connection = Connection::open(&filepath).map_err(|e| -> StdError { e.into() })?; + + Ok((filepath, connection)) } - fn get_table_whatever_column_count(filepath: &PathBuf) -> i64 { - let connection = Connection::open(filepath).unwrap(); + async fn get_table_whatever_column_count(cnt_mutex: Arc>) -> i64 { + let lock = cnt_mutex.lock().await; + let connection = lock.deref(); let sql = "select count(*) as column_count from pragma_table_info('whatever');"; let column_count = connection .prepare(sql) @@ -213,20 +216,22 @@ mod tests { column_count } - #[test] - fn test_upgrade_with_migration() { - let filepath = create_sqlite_file("test_upgrade_with_migration.sqlite3"); + #[tokio::test] + async fn test_upgrade_with_migration() { + let (_filepath, connection) = + create_sqlite_file("test_upgrade_with_migration.sqlite3").unwrap(); + let connection = Arc::new(Mutex::new(connection)); let mut db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - filepath.clone(), + connection.clone(), ); - db_checker.apply().unwrap(); - assert_eq!(0, get_table_whatever_column_count(&filepath)); + db_checker.apply().await.unwrap(); + assert_eq!(0, get_table_whatever_column_count(connection.clone()).await); - db_checker.apply().unwrap(); - assert_eq!(0, get_table_whatever_column_count(&filepath)); + db_checker.apply().await.unwrap(); + assert_eq!(0, get_table_whatever_column_count(connection.clone()).await); let alterations = "create table whatever (thing_id integer); insert into whatever (thing_id) values (1), (2), (3), (4);"; let migration = SqlMigration { @@ -234,13 +239,13 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap(); - assert_eq!(1, get_table_whatever_column_count(&filepath)); - check_database_version(&filepath, 1); + db_checker.apply().await.unwrap(); + assert_eq!(1, get_table_whatever_column_count(connection.clone()).await); + check_database_version(connection.clone(), 1).await; - db_checker.apply().unwrap(); - assert_eq!(1, get_table_whatever_column_count(&filepath)); - check_database_version(&filepath, 1); + db_checker.apply().await.unwrap(); + assert_eq!(1, get_table_whatever_column_count(connection.clone()).await); + check_database_version(connection.clone(), 1).await; let alterations = "alter table whatever add column thing_content text; update whatever set thing_content = 'some content'"; let migration = SqlMigration { @@ -248,9 +253,9 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap(); - assert_eq!(2, get_table_whatever_column_count(&filepath)); - check_database_version(&filepath, 2); + db_checker.apply().await.unwrap(); + assert_eq!(2, get_table_whatever_column_count(connection.clone()).await); + check_database_version(connection.clone(), 2).await; // in the test below both migrations are declared in reversed order to // ensure they are played in the right order. The last one depends on @@ -267,18 +272,20 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap(); - assert_eq!(4, get_table_whatever_column_count(&filepath)); - check_database_version(&filepath, 4); + db_checker.apply().await.unwrap(); + assert_eq!(4, get_table_whatever_column_count(connection.clone()).await); + check_database_version(connection, 4).await; } - #[test] - fn starting_with_migration() { - let filepath = create_sqlite_file("starting_with_migration.sqlite3"); + #[tokio::test] + async fn starting_with_migration() { + let (_filepath, connection) = + create_sqlite_file("starting_with_migration.sqlite3").unwrap(); + let connection = Arc::new(Mutex::new(connection)); let mut db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - filepath.clone(), + connection.clone(), ); let alterations = "create table whatever (thing_id integer); insert into whatever (thing_id) values (1), (2), (3), (4);"; @@ -287,21 +294,22 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap(); - assert_eq!(1, get_table_whatever_column_count(&filepath)); - check_database_version(&filepath, 1); + db_checker.apply().await.unwrap(); + assert_eq!(1, get_table_whatever_column_count(connection.clone()).await); + check_database_version(connection, 1).await; } - #[test] + #[tokio::test] /// This test case ensure that when multiple migrations are played and one fails: /// * previous migrations are ok and the database version is updated /// * further migrations are not played. - fn test_failing_migration() { - let filepath = create_sqlite_file("test_failing_migration.sqlite3"); + async fn test_failing_migration() { + let (_filepath, connection) = create_sqlite_file("test_failing_migration.sqlite3").unwrap(); + let connection = Arc::new(Mutex::new(connection)); let mut db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - filepath.clone(), + connection.clone(), ); // Table whatever does not exist, this should fail with error. let alterations = "create table whatever (thing_id integer); insert into whatever (thing_id) values (1), (2), (3), (4);"; @@ -322,17 +330,18 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap_err(); - check_database_version(&filepath, 1); + db_checker.apply().await.unwrap_err(); + check_database_version(connection, 1).await; } - #[test] - fn test_fail_downgrading() { - let filepath = create_sqlite_file("test_fail_downgrading.sqlite3"); + #[tokio::test] + async fn test_fail_downgrading() { + let (_filepath, connection) = create_sqlite_file("test_fail_downgrading.sqlite3").unwrap(); + let connection = Arc::new(Mutex::new(connection)); let mut db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - filepath.clone(), + connection.clone(), ); let alterations = "create table whatever (thing_id integer); insert into whatever (thing_id) values (1), (2), (3), (4);"; let migration = SqlMigration { @@ -340,19 +349,19 @@ mod tests { alterations: alterations.to_string(), }; db_checker.add_migration(migration); - db_checker.apply().unwrap(); - check_database_version(&filepath, 1); + db_checker.apply().await.unwrap(); + check_database_version(connection.clone(), 1).await; // re instanciate a new checker with no migration registered (version 0). let db_checker = DatabaseVersionChecker::new( slog_scope::logger(), ApplicationNodeType::Aggregator, - filepath.clone(), + connection.clone(), ); assert!( - db_checker.apply().is_err(), + db_checker.apply().await.is_err(), "using an old version with an up to date database should fail" ); - check_database_version(&filepath, 1); + check_database_version(connection, 1).await; } } diff --git a/mithril-common/src/store/adapter/sqlite_adapter.rs b/mithril-common/src/store/adapter/sqlite_adapter.rs index d736e07d3f5..bfb3f807f86 100644 --- a/mithril-common/src/store/adapter/sqlite_adapter.rs +++ b/mithril-common/src/store/adapter/sqlite_adapter.rs @@ -2,14 +2,9 @@ use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use sha2::{Digest, Sha256}; use sqlite::{Connection, State, Statement}; +use tokio::sync::Mutex; -use std::{ - marker::PhantomData, - ops::Deref, - sync::{Arc, Mutex}, - thread::sleep, - time::Duration, -}; +use std::{marker::PhantomData, ops::Deref, sync::Arc, thread::sleep, time::Duration}; use super::{AdapterError, StoreAdapter}; @@ -34,9 +29,7 @@ where /// Create a new SQLiteAdapter instance. pub fn new(table_name: &str, connection: Arc>) -> Result { { - let conn = &*connection.lock().map_err(|e| { - AdapterError::GeneralError(format!("SQLite adapter instanciation failed: '{e}'.")) - })?; + let conn = &*connection.blocking_lock(); Self::check_table_exists(conn, table_name)?; } @@ -156,10 +149,7 @@ where type Record = V; async fn store_record(&mut self, key: &Self::Key, record: &Self::Record) -> Result<()> { - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let sql = format!( "insert into {} (key_hash, key, value) values (?1, ?2, ?3) on conflict (key_hash) do update set value = excluded.value", @@ -190,10 +180,7 @@ where } async fn get_record(&self, key: &Self::Key) -> Result> { - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let sql = format!("select value from {} where key_hash = ?1", self.table); let statement = self.get_statement_for_key(connection, sql, key)?; @@ -202,10 +189,7 @@ where } async fn record_exists(&self, key: &Self::Key) -> Result { - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let sql = format!( "select exists(select 1 from {} where key_hash = ?1) as record_exists", @@ -225,10 +209,7 @@ where } async fn get_last_n_records(&self, how_many: usize) -> Result> { - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let sql = format!( "select cast(key as text) as key, cast(value as text) as value from {} order by ROWID desc limit ?1", @@ -260,10 +241,7 @@ where "delete from {} where key_hash = ?1 returning value", self.table ); - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let statement = self.get_statement_for_key(connection, sql, key)?; @@ -271,10 +249,7 @@ where } async fn get_iter(&self) -> Result + '_>> { - let lock = self - .connection - .lock() - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let lock = self.connection.lock().await; let connection = lock.deref(); let iterator = SQLiteResultIterator::new(connection, &self.table)?; diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index ca6af4ac5f8..108671d7e81 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -1,9 +1,7 @@ use async_trait::async_trait; use sqlite::Connection; -use std::{ - fs, - sync::{Arc, Mutex}, -}; +use std::{fs, sync::Arc}; +use tokio::sync::Mutex; use mithril_common::{ api_version::APIVersionProvider, From 8f296a909e1547ff731f33712925bc3b1ad52036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Wed, 22 Mar 2023 17:27:26 +0100 Subject: [PATCH 3/3] fix dependencies in Signer and Client --- Cargo.lock | 6 +++--- mithril-aggregator/Cargo.toml | 2 +- mithril-aggregator/src/stake_distribution_service.rs | 2 +- mithril-common/Cargo.toml | 2 +- mithril-common/src/store/adapter/sqlite_adapter.rs | 4 +++- mithril-common/src/store/store_pruner.rs | 6 ++---- mithril-signer/Cargo.toml | 2 +- mithril-signer/src/main.rs | 8 -------- mithril-signer/src/runtime/signer_services.rs | 11 +++++++++++ 9 files changed, 23 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c30ddb79b53..c6e380e9d58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2010,7 +2010,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.2.33" +version = "0.2.34" dependencies = [ "async-trait", "chrono", @@ -2072,7 +2072,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.2.28" +version = "0.2.29" dependencies = [ "async-trait", "bech32", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.24" +version = "0.2.25" dependencies = [ "async-trait", "clap 4.1.8", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 4af023cb97a..513c76ee8ef 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.2.33" +version = "0.2.34" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/stake_distribution_service.rs b/mithril-aggregator/src/stake_distribution_service.rs index 184daa04f63..408588c67c5 100644 --- a/mithril-aggregator/src/stake_distribution_service.rs +++ b/mithril-aggregator/src/stake_distribution_service.rs @@ -17,7 +17,7 @@ use tokio::sync::{Mutex, MutexGuard}; use crate::database::provider::StakePoolStore; -/// Errors related to the [StakePoolDistributionService]. +/// Errors related to the [StakeDistributionService]. #[derive(Debug)] pub enum StakePoolDistributionServiceError { /// Critical errors cannot be recovered. diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 7ef1111b89e..1d6cd2e223d 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.2.28" +version = "0.2.29" authors = { workspace = true } edition = { workspace = true } documentation = { workspace = true } diff --git a/mithril-common/src/store/adapter/sqlite_adapter.rs b/mithril-common/src/store/adapter/sqlite_adapter.rs index bfb3f807f86..23810e2c980 100644 --- a/mithril-common/src/store/adapter/sqlite_adapter.rs +++ b/mithril-common/src/store/adapter/sqlite_adapter.rs @@ -29,7 +29,9 @@ where /// Create a new SQLiteAdapter instance. pub fn new(table_name: &str, connection: Arc>) -> Result { { - let conn = &*connection.blocking_lock(); + let conn = &*connection + .try_lock() + .map_err(|e| AdapterError::InitializationError(e.into()))?; Self::check_table_exists(conn, table_name)?; } diff --git a/mithril-common/src/store/store_pruner.rs b/mithril-common/src/store/store_pruner.rs index e802c1ac0f3..85e336d252d 100644 --- a/mithril-common/src/store/store_pruner.rs +++ b/mithril-common/src/store/store_pruner.rs @@ -41,12 +41,10 @@ pub trait StorePruner { #[cfg(test)] mod tests { - use std::{ - cmp::min, - sync::{Arc, Mutex}, - }; + use std::{cmp::min, sync::Arc}; use sqlite::Connection; + use tokio::sync::Mutex; use crate::store::adapter::SQLiteAdapter; diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 777adca4c72..f249a739f25 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.24" +version = "0.2.25" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/main.rs b/mithril-signer/src/main.rs index cb0676dab7a..c62374d24fc 100644 --- a/mithril-signer/src/main.rs +++ b/mithril-signer/src/main.rs @@ -5,7 +5,6 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use mithril_common::database::{ApplicationNodeType, DatabaseVersionChecker}; use mithril_signer::{ Configuration, DefaultConfiguration, ProductionServiceBuilder, ServiceBuilder, SignerRunner, SignerState, StateMachine, @@ -102,13 +101,6 @@ async fn main() -> Result<(), String> { .build() .await .map_err(|e| e.to_string())?; - DatabaseVersionChecker::new( - slog_scope::logger(), - ApplicationNodeType::Signer, - config.get_sqlite_file(), - ) - .apply() - .map_err(|e| e.to_string())?; debug!("Started"; "run_mode" => &args.run_mode, "config" => format!("{config:?}")); let mut state_machine = StateMachine::new( diff --git a/mithril-signer/src/runtime/signer_services.rs b/mithril-signer/src/runtime/signer_services.rs index 108671d7e81..d0caf034b5f 100644 --- a/mithril-signer/src/runtime/signer_services.rs +++ b/mithril-signer/src/runtime/signer_services.rs @@ -7,6 +7,7 @@ use mithril_common::{ api_version::APIVersionProvider, chain_observer::{CardanoCliChainObserver, CardanoCliRunner, ChainObserver}, crypto_helper::{OpCert, ProtocolPartyId, SerDeShelleyFileFormat}, + database::{ApplicationNodeType, DatabaseVersionChecker}, digesters::{ cache::{ImmutableFileDigestCacheProvider, JsonImmutableFileDigestCacheProviderBuilder}, ImmutableFileObserver, @@ -143,6 +144,16 @@ impl<'a> ServiceBuilder for ProductionServiceBuilder<'a> { let sqlite_db_path = self.config.get_sqlite_file(); let sqlite_connection = Arc::new(Mutex::new(Connection::open(sqlite_db_path)?)); + DatabaseVersionChecker::new( + slog_scope::logger(), + ApplicationNodeType::Signer, + sqlite_connection.clone(), + ) + .apply() + .await + .map_err(|e| -> Box { + format!("Database migration error {e}").into() + })?; let protocol_initializer_store = Arc::new(ProtocolInitializerStore::new( Box::new(SQLiteAdapter::new( "protocol_initializer",