diff --git a/mithril-aggregator/src/dependency.rs b/mithril-aggregator/src/dependency.rs index 7247600f303..ac9ca35868d 100644 --- a/mithril-aggregator/src/dependency.rs +++ b/mithril-aggregator/src/dependency.rs @@ -7,6 +7,7 @@ use super::entities::*; use super::multi_signer::MultiSigner; use super::snapshot_stores::SnapshotStore; use crate::beacon_store::BeaconStore; +use crate::snapshot_uploaders::SnapshotUploader; use crate::{CertificatePendingStore, CertificateStore, VerificationKeyStore}; /// BeaconStoreWrapper wraps a BeaconStore @@ -30,10 +31,14 @@ pub type VerificationKeyStoreWrapper = Arc>; /// StakeStoreWrapper wraps a StakeStore pub type StakeStoreWrapper = Arc>; +/// StakeStoreWrapper wraps a StakeStore +pub type SnapshotUploaderWrapper = Arc>; + /// DependencyManager handles the dependencies pub struct DependencyManager { pub config: Config, pub snapshot_store: Option, + pub snapshot_uploader: Option, pub multi_signer: Option, pub beacon_store: Option, pub certificate_pending_store: Option, @@ -48,6 +53,7 @@ impl DependencyManager { Self { config, snapshot_store: None, + snapshot_uploader: None, multi_signer: None, beacon_store: None, certificate_pending_store: None, @@ -63,6 +69,15 @@ impl DependencyManager { self } + /// With SnapshotUploader middleware + pub fn with_snapshot_uploader( + &mut self, + snapshot_uploader: SnapshotUploaderWrapper, + ) -> &mut Self { + self.snapshot_uploader = Some(snapshot_uploader); + self + } + /// With MultiSigner middleware pub fn with_multi_signer(&mut self, multi_signer: MultiSignerWrapper) -> &mut Self { self.multi_signer = Some(multi_signer); diff --git a/mithril-aggregator/src/entities.rs b/mithril-aggregator/src/entities.rs index 56d9f9f83a3..e4c017bfca7 100644 --- a/mithril-aggregator/src/entities.rs +++ b/mithril-aggregator/src/entities.rs @@ -1,6 +1,5 @@ -use crate::dependency::SnapshotStoreWrapper; +use crate::dependency::{SnapshotStoreWrapper, SnapshotUploaderWrapper}; use crate::snapshot_stores::LocalSnapshotStore; -use crate::snapshot_uploaders::SnapshotUploader; use crate::tools::GcpFileUploader; use crate::{LocalSnapshotUploader, RemoteSnapshotStore, RemoteSnapshotUploader}; use serde::{Deserialize, Serialize}; @@ -70,14 +69,14 @@ impl Config { } } - pub fn build_snapshot_uploader(&self) -> Box { + pub fn build_snapshot_uploader(&self) -> SnapshotUploaderWrapper { match self.snapshot_uploader_type { - SnapshotUploaderType::Gcp => Box::new(RemoteSnapshotUploader::new(Box::new( - GcpFileUploader::default(), + SnapshotUploaderType::Gcp => Arc::new(RwLock::new(RemoteSnapshotUploader::new( + Box::new(GcpFileUploader::default()), + ))), + SnapshotUploaderType::Local => Arc::new(RwLock::new(LocalSnapshotUploader::new( + self.server_url.clone(), ))), - SnapshotUploaderType::Local => { - Box::new(LocalSnapshotUploader::new(self.server_url.clone())) - } } } } diff --git a/mithril-aggregator/src/http_server.rs b/mithril-aggregator/src/http_server.rs index 6245679a96d..6395fdcf8ef 100644 --- a/mithril-aggregator/src/http_server.rs +++ b/mithril-aggregator/src/http_server.rs @@ -1,6 +1,5 @@ use mithril_common::crypto_helper::{key_decode_hex, ProtocolLotteryIndex, ProtocolPartyId}; use mithril_common::entities; -use mithril_common::fake_data; use serde_json::Value::Null; use slog_scope::{debug, info}; use std::convert::Infallible; @@ -10,7 +9,7 @@ use warp::Future; use warp::{http::Method, http::StatusCode, Filter}; use super::dependency::{ - BeaconStoreWrapper, CertificateStoreWrapper, DependencyManager, MultiSignerWrapper, + CertificatePendingStoreWrapper, CertificateStoreWrapper, DependencyManager, MultiSignerWrapper, SnapshotStoreWrapper, }; use super::multi_signer; @@ -76,8 +75,7 @@ mod router { ) -> impl Filter + Clone { warp::path!("certificate-pending") .and(warp::get()) - .and(with_beacon_store(dependency_manager.clone())) - .and(with_multi_signer(dependency_manager)) + .and(with_certificate_pending_store(dependency_manager)) .and_then(handlers::certificate_pending) } @@ -155,13 +153,6 @@ mod router { .and_then(handlers::register_signatures) } - /// With beacon store middleware - fn with_beacon_store( - dependency_manager: Arc, - ) -> impl Filter + Clone { - warp::any().map(move || dependency_manager.beacon_store.as_ref().unwrap().clone()) - } - /// With snapshot store middleware fn with_snapshot_store( dependency_manager: Arc, @@ -182,6 +173,19 @@ mod router { }) } + /// With certificate pending store + fn with_certificate_pending_store( + dependency_manager: Arc, + ) -> impl Filter + Clone { + warp::any().map(move || { + dependency_manager + .certificate_pending_store + .as_ref() + .unwrap() + .clone() + }) + } + /// With multi signer middleware fn with_multi_signer( dependency_manager: Arc, @@ -198,77 +202,25 @@ mod router { } mod handlers { + use crate::dependency::CertificatePendingStoreWrapper; + use super::*; use std::str::FromStr; use warp::http::Uri; /// Certificate Pending pub async fn certificate_pending( - beacon_store: BeaconStoreWrapper, - multi_signer: MultiSignerWrapper, + certificate_pending_store: CertificatePendingStoreWrapper, ) -> Result { debug!("certificate_pending"); - let beacon_store = beacon_store.read().await; - match beacon_store.get_current_beacon().await { - Ok(Some(beacon)) => { - let multi_signer = multi_signer.read().await; - match multi_signer.get_multi_signature().await { - Ok(None) => { - let mut certificate_pending = fake_data::certificate_pending(); - certificate_pending.beacon = beacon.clone(); - - let protocol_parameters = multi_signer.get_protocol_parameters().await; - if protocol_parameters.is_none() { - return Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0004".to_string(), - "no protocol parameters available".to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - let protocol_parameters = protocol_parameters.unwrap().into(); - - let previous_hash = certificate_pending.previous_hash; + let certificate_pending_store = certificate_pending_store.read().await; - let signers = multi_signer.get_signers().await; - if let Err(err) = signers { - return Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0007".to_string(), - err.to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - let signers = signers.unwrap(); - - let certificate_pending = entities::CertificatePending::new( - beacon, - protocol_parameters, - previous_hash, - signers, - ); - - Ok(warp::reply::with_status( - warp::reply::json(&certificate_pending), - StatusCode::OK, - )) - } - Ok(_) => Ok(warp::reply::with_status( - warp::reply::json(&Null), - StatusCode::NO_CONTENT, - )), - Err(err) => Ok(warp::reply::with_status( - warp::reply::json(&entities::Error::new( - "MITHRIL-E0008".to_string(), - err.to_string(), - )), - StatusCode::INTERNAL_SERVER_ERROR, - )), - } - } + match certificate_pending_store.get().await { + Ok(Some(certificate_pending)) => Ok(warp::reply::with_status( + warp::reply::json(&certificate_pending), + StatusCode::OK, + )), Ok(None) => Ok(warp::reply::with_status( warp::reply::json(&Null), StatusCode::NO_CONTENT, @@ -526,7 +478,8 @@ mod tests { use tokio::sync::RwLock; use warp::test::request; - use super::super::beacon_store::{BeaconStoreError, MockBeaconStore}; + use crate::CertificatePendingStore; + use super::super::entities::*; use super::super::multi_signer::MockMultiSigner; use super::super::multi_signer::ProtocolError; @@ -556,29 +509,14 @@ mod tests { #[tokio::test] async fn test_certificate_pending_get_ok() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let fake_signers = fake_data::signers(5); let method = Method::GET.as_str(); let path = "/certificate-pending"; - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); - mock_multi_signer - .expect_get_signers() - .return_once(|| Ok(fake_signers)); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_beacon_store(Arc::new(RwLock::new(beacon_store))) - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let response = request() .method(method) @@ -597,19 +535,11 @@ mod tests { #[tokio::test] async fn test_certificate_pending_get_ok_204() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(None)); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let method = Method::GET.as_str(); let path = "/certificate-pending"; @@ -630,22 +560,14 @@ mod tests { } #[tokio::test] - async fn test_certificate_pending_get_ko_current_beacon_500() { - let fake_protocol_parameters = fake_data::protocol_parameters(); + async fn test_certificate_pending_get_ko_500() { let method = Method::GET.as_str(); let path = "/certificate-pending"; - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Err(BeaconStoreError::GenericError())); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); + let certificate_pending_store = + CertificatePendingStore::new(Box::new(DumbStoreAdapter::new())); let mut dependency_manager = setup_dependency_manager(); dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); + .with_certificate_pending_store(Arc::new(RwLock::new(certificate_pending_store))); let response = request() .method(method) @@ -662,81 +584,6 @@ mod tests { .expect("OpenAPI error"); } - #[tokio::test] - async fn test_certificate_pending_get_ko_signers_500() { - let fake_protocol_parameters = fake_data::protocol_parameters(); - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| Some(fake_protocol_parameters.into())); - mock_multi_signer - .expect_get_signers() - .return_once(|| Err(ProtocolError::Codec("an error occurred".to_string()))); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); - let mut dependency_manager = setup_dependency_manager(); - dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); - - let method = Method::GET.as_str(); - let path = "/certificate-pending"; - - let response = request() - .method(method) - .path(&format!("/{}{}", SERVER_BASE_PATH, path)) - .reply(&router::routes(Arc::new(dependency_manager))) - .await; - - APISpec::from_file(API_SPEC_FILE) - .method(method) - .path(path) - .validate_request(&Null) - .unwrap() - .validate_response(&response) - .expect("OpenAPI error"); - } - - #[tokio::test] - async fn test_certificate_pending_get_ko_protocol_parameters_500() { - let mut beacon_store = MockBeaconStore::new(); - beacon_store - .expect_get_current_beacon() - .return_once(|| Ok(Some(fake_data::beacon()))); - let mut mock_multi_signer = MockMultiSigner::new(); - mock_multi_signer - .expect_get_protocol_parameters() - .return_once(|| None); - mock_multi_signer - .expect_get_multi_signature() - .return_once(|| Ok(None)); - let mut dependency_manager = setup_dependency_manager(); - dependency_manager - .with_multi_signer(Arc::new(RwLock::new(mock_multi_signer))) - .with_beacon_store(Arc::new(RwLock::new(beacon_store))); - - let method = Method::GET.as_str(); - let path = "/certificate-pending"; - - let response = request() - .method(method) - .path(&format!("/{}{}", SERVER_BASE_PATH, path)) - .reply(&router::routes(Arc::new(dependency_manager))) - .await; - APISpec::from_file(API_SPEC_FILE) - .method(method) - .path(path) - .validate_request(&Null) - .unwrap() - .validate_response(&response) - .expect("OpenAPI error"); - } - #[tokio::test] async fn test_certificate_certificate_hash_get_ok() { let mut certificate_store = CertificateStore::new(Box::new(DumbStoreAdapter::< diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index 5ea4f5debc2..290b9a20d3b 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -16,7 +16,7 @@ pub use crate::multi_signer::{MultiSigner, MultiSignerImpl, ProtocolError}; pub use crate::snapshot_stores::{RemoteSnapshotStore, SnapshotStore}; pub use beacon_store::{BeaconStore, BeaconStoreError, MemoryBeaconStore}; pub use dependency::DependencyManager; -pub use runtime::AggregatorRuntime; +pub use runtime::{AggregatorConfig, AggregatorRunner, AggregatorRuntime}; pub use snapshot_uploaders::{LocalSnapshotUploader, RemoteSnapshotUploader}; pub use snapshotter::{SnapshotError, Snapshotter}; pub use store::{ diff --git a/mithril-aggregator/src/main.rs b/mithril-aggregator/src/main.rs index 760e765a3a0..ee03badcf25 100644 --- a/mithril-aggregator/src/main.rs +++ b/mithril-aggregator/src/main.rs @@ -4,9 +4,9 @@ use clap::Parser; use config::{Map, Source, Value, ValueKind}; use mithril_aggregator::{ - AggregatorRuntime, BeaconStore, CertificatePendingStore, CertificateStore, Config, - DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, Server, - VerificationKeyStore, + AggregatorConfig, AggregatorRunner, AggregatorRuntime, BeaconStore, CertificatePendingStore, + CertificateStore, Config, DependencyManager, MemoryBeaconStore, MultiSigner, MultiSignerImpl, + Server, VerificationKeyStore, }; use mithril_common::crypto_helper::ProtocolStakeDistribution; use mithril_common::fake_data; @@ -19,6 +19,7 @@ use std::error::Error; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; +use tokio::time::Duration; /// Node args #[derive(Parser, Debug, Clone)] @@ -155,6 +156,7 @@ async fn main() -> Result<(), Box> { let mut dependency_manager = DependencyManager::new(config.clone()); dependency_manager .with_snapshot_store(snapshot_store.clone()) + .with_snapshot_uploader(snapshot_uploader.clone()) .with_multi_signer(multi_signer.clone()) .with_beacon_store(beacon_store.clone()) .with_certificate_pending_store(certificate_pending_store.clone()) @@ -165,19 +167,22 @@ async fn main() -> Result<(), Box> { // Start snapshot uploader let snapshot_directory = config.snapshot_directory.clone(); + let runtime_dependencies = dependency_manager.clone(); let handle = tokio::spawn(async move { - let runtime = AggregatorRuntime::new( - args.runtime_interval * 1000, - config.network.clone(), - config.db_directory.clone(), - snapshot_directory, - beacon_store.clone(), - multi_signer.clone(), - snapshot_store.clone(), - snapshot_uploader, - certificate_pending_store.clone(), - certificate_store.clone(), + let config = AggregatorConfig::new( + args.runtime_interval, + &config.network.clone(), + &config.db_directory.clone(), + &snapshot_directory, + runtime_dependencies, ); + let mut runtime = AggregatorRuntime::new( + Duration::from_secs(config.interval.into()), + None, + Arc::new(AggregatorRunner::new(config)), + ) + .await + .unwrap(); runtime.run().await }); diff --git a/mithril-aggregator/src/runtime.rs b/mithril-aggregator/src/runtime.rs deleted file mode 100644 index 22b0ef53be6..00000000000 --- a/mithril-aggregator/src/runtime.rs +++ /dev/null @@ -1,282 +0,0 @@ -use super::dependency::{BeaconStoreWrapper, MultiSignerWrapper, SnapshotStoreWrapper}; -use super::{BeaconStoreError, ProtocolError, SnapshotError, Snapshotter}; - -use mithril_common::crypto_helper::Bytes; -use mithril_common::digesters::{Digester, DigesterError, ImmutableDigester}; -use mithril_common::entities::{Beacon, Certificate}; -use mithril_common::fake_data; - -use crate::dependency::{CertificatePendingStoreWrapper, CertificateStoreWrapper}; -use crate::snapshot_stores::SnapshotStoreError; -use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; -use chrono::{DateTime, Utc}; -use hex::ToHex; -use mithril_common::entities::Snapshot; -use slog_scope::{debug, error, info, warn}; -use std::fs::File; -use std::io; -use std::io::{Seek, SeekFrom}; -use std::path::{Path, PathBuf}; -use thiserror::Error; -use tokio::time::{sleep, Duration}; - -#[derive(Error, Debug)] -pub enum RuntimeError { - #[error("multi signer error")] - MultiSigner(#[from] ProtocolError), - - #[error("beacon store error")] - BeaconStore(#[from] BeaconStoreError), - - #[error("snapshotter error")] - Snapshotter(#[from] SnapshotError), - - #[error("digester error")] - Digester(#[from] DigesterError), - - #[error("snapshot store error")] - SnapshotStore(#[from] SnapshotStoreError), - - #[error("certificate store error")] - CertificateStore(String), - - #[error("snapshot uploader error: {0}")] - SnapshotUploader(String), - - #[error("snapshot build error")] - SnapshotBuild(#[from] io::Error), - - #[error("general error")] - General(String), -} - -/// AggregatorRuntime -pub struct AggregatorRuntime { - /// Interval between each snapshot, in seconds - interval: u32, - - /// Cardano network - network: String, - - /// DB directory to snapshot - db_directory: PathBuf, - - /// Directory to store snapshot - snapshot_directory: PathBuf, - - /// Beacon store - beacon_store: BeaconStoreWrapper, - - /// Multi signer - multi_signer: MultiSignerWrapper, - - /// Snapshot store - snapshot_store: SnapshotStoreWrapper, - - /// Snapshot uploader - snapshot_uploader: Box, - - /// Pending certificate store - #[allow(dead_code)] - certificate_pending_store: CertificatePendingStoreWrapper, - - /// Certificate store - certificate_store: CertificateStoreWrapper, -} - -impl AggregatorRuntime { - /// AggregatorRuntime factory - // TODO: Fix this by implementing an Aggregator Config that implements the From trait for a general Config - #[allow(clippy::too_many_arguments)] - pub fn new( - interval: u32, - network: String, - db_directory: PathBuf, - snapshot_directory: PathBuf, - beacon_store: BeaconStoreWrapper, - multi_signer: MultiSignerWrapper, - snapshot_store: SnapshotStoreWrapper, - snapshot_uploader: Box, - certificate_pending_store: CertificatePendingStoreWrapper, - certificate_store: CertificateStoreWrapper, - ) -> Self { - Self { - interval, - network, - db_directory, - snapshot_directory, - beacon_store, - multi_signer, - snapshot_store, - snapshot_uploader, - certificate_pending_store, - certificate_store, - } - } - - /// Run snapshotter loop - pub async fn run(&self) { - info!("Starting runtime"); - - loop { - if let Err(e) = self.do_work().await { - error!("{:?}", e) - } - - info!("Sleeping for {}", self.interval); - sleep(Duration::from_millis(self.interval.into())).await; - } - } - - async fn do_work(&self) -> Result<(), RuntimeError> { - let snapshotter = - Snapshotter::new(self.db_directory.clone(), self.snapshot_directory.clone()); - let digester = ImmutableDigester::new(self.db_directory.clone(), slog_scope::logger()); - - info!("Computing digest"; "db_directory" => self.db_directory.display()); - let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) - .await - .map_err(|e| RuntimeError::General(e.to_string()))?; - match digest_result { - Ok(digest_result) => { - let mut beacon = fake_data::beacon(); - beacon.immutable_file_number = digest_result.last_immutable_file_number; - let message = &digest_result.digest.clone().into_bytes(); - - match self.manage_trigger_snapshot(message, &beacon).await { - Ok(Some(certificate)) => { - info!( - "Snapshotting immutables up to `{}` in an archive", - &beacon.immutable_file_number - ); - - let snapshot_name = - format!("{}.{}.tar.gz", self.network, &digest_result.digest); - - let snapshot_path = tokio::task::spawn_blocking( - move || -> Result { - snapshotter.snapshot(&snapshot_name) - }, - ) - .await - .map_err(|e| RuntimeError::General(e.to_string()))??; - - info!("Uploading snapshot archive"); - let uploaded_snapshot_location = self - .snapshot_uploader - .upload_snapshot(&snapshot_path) - .await - .map_err(RuntimeError::SnapshotUploader)?; - - info!( - "Snapshot archive uploaded, location: `{}`", - &uploaded_snapshot_location - ); - - let new_snapshot = build_new_snapshot( - digest_result.digest, - certificate.hash.to_owned(), - &snapshot_path, - uploaded_snapshot_location, - )?; - - info!("Storing snapshot data"; "snapshot" => format!("{:?}", new_snapshot)); - let mut snapshot_store = self.snapshot_store.write().await; - snapshot_store.add_snapshot(new_snapshot).await?; - - info!("Storing certificate data"; "certificate" => format!("{:?}", certificate)); - let mut certificate_store = self.certificate_store.write().await; - certificate_store - .save(certificate) - .await - .map_err(|e| RuntimeError::CertificateStore(e.to_string()))?; - - Ok(()) - } - Ok(None) => Ok(()), - Err(err) => Err(err), - } - } - Err(err) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Err(RuntimeError::Digester(err)) - } - } - } - - async fn manage_trigger_snapshot( - &self, - message: &Bytes, - beacon: &Beacon, - ) -> Result, RuntimeError> { - let mut multi_signer = self.multi_signer.write().await; - match multi_signer.get_multi_signature().await { - Ok(None) => { - { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.set_current_beacon(beacon.clone()).await?; - } - multi_signer - .update_current_message(message.to_owned()) - .await?; - match multi_signer.create_multi_signature().await { - Ok(Some(_)) => { - let message = multi_signer - .get_current_message() - .await - .unwrap() - .encode_hex::(); - debug!( - "A multi signature has been created for message: {}", - message - ); - let previous_hash = "".to_string(); - Ok(multi_signer - .create_certificate(beacon.clone(), previous_hash) - .await?) - } - Ok(None) => { - warn!("Not ready to create a multi signature: quorum is not reached yet"); - Ok(None) - } - Err(e) => { - warn!("Error while creating a multi signature: {}", e); - Err(RuntimeError::MultiSigner(e)) - } - } - } - Ok(_) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Ok(None) - } - Err(err) => { - let mut beacon_store = self.beacon_store.write().await; - beacon_store.reset_current_beacon().await?; - Err(RuntimeError::MultiSigner(err)) - } - } - } -} - -fn build_new_snapshot( - digest: String, - certificate_hash: String, - snapshot_filepath: &Path, - uploaded_snapshot_location: SnapshotLocation, -) -> Result { - let timestamp: DateTime = Utc::now(); - let created_at = format!("{:?}", timestamp); - - let mut tar_gz = File::open(&snapshot_filepath)?; - let size: u64 = tar_gz.seek(SeekFrom::End(0))?; - - Ok(Snapshot::new( - digest, - certificate_hash, - size, - created_at, - vec![uploaded_snapshot_location], - )) -} diff --git a/mithril-aggregator/src/runtime/error.rs b/mithril-aggregator/src/runtime/error.rs new file mode 100644 index 00000000000..12273eb6d51 --- /dev/null +++ b/mithril-aggregator/src/runtime/error.rs @@ -0,0 +1,40 @@ +use crate::snapshot_stores::SnapshotStoreError; +use crate::store::StoreError; +use crate::{BeaconStoreError, ProtocolError, SnapshotError}; + +use mithril_common::digesters::{DigesterError, ImmutableFileListingError}; +use std::io; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RuntimeError { + #[error("multi signer error")] + MultiSigner(#[from] ProtocolError), + + #[error("beacon store error")] + BeaconStore(#[from] BeaconStoreError), + + #[error("snapshotter error")] + Snapshotter(#[from] SnapshotError), + + #[error("digester error")] + Digester(#[from] DigesterError), + + #[error("snapshot store error")] + SnapshotStore(#[from] SnapshotStoreError), + + #[error("store error")] + StoreError(#[from] StoreError), + + #[error("snapshot uploader error: {0}")] + SnapshotUploader(String), + + #[error("snapshot build error")] + SnapshotBuild(#[from] io::Error), + + #[error("immutable file scanning error")] + ImmutableFileError(#[from] ImmutableFileListingError), + + #[error("general error")] + General(String), +} diff --git a/mithril-aggregator/src/runtime/mod.rs b/mithril-aggregator/src/runtime/mod.rs new file mode 100644 index 00000000000..5bcad8c4ed5 --- /dev/null +++ b/mithril-aggregator/src/runtime/mod.rs @@ -0,0 +1,7 @@ +mod error; +mod runner; +mod state_machine; + +pub use error::RuntimeError; +pub use runner::{AggregatorConfig, AggregatorRunner, AggregatorRunnerTrait}; +pub use state_machine::*; diff --git a/mithril-aggregator/src/runtime/runner.rs b/mithril-aggregator/src/runtime/runner.rs new file mode 100644 index 00000000000..5ce28dd4de5 --- /dev/null +++ b/mithril-aggregator/src/runtime/runner.rs @@ -0,0 +1,406 @@ +use std::path::PathBuf; + +use crate::snapshot_uploaders::SnapshotLocation; +use crate::{DependencyManager, SnapshotError, Snapshotter}; +use async_trait::async_trait; +use chrono::Utc; +use hex::FromHex; +use mithril_common::digesters::{Digester, DigesterResult, ImmutableDigester, ImmutableFile}; +use mithril_common::entities::{Beacon, Certificate, CertificatePending, Snapshot}; + +use slog_scope::{debug, error, info, trace, warn}; +use std::path::Path; +use std::sync::Arc; + +#[cfg(test)] +use mockall::automock; + +use super::RuntimeError; +pub struct AggregatorConfig { + /// Interval between each snapshot, in seconds + pub interval: u32, + + /// Cardano network + pub network: String, + + /// DB directory to snapshot + pub db_directory: PathBuf, + + /// Directory to store snapshot + pub snapshot_directory: PathBuf, + + /// Services dependencies + pub dependencies: Arc, +} + +impl AggregatorConfig { + pub fn new( + interval: u32, + network: &str, + db_directory: &Path, + snapshot_directory: &Path, + dependencies: Arc, + ) -> Self { + Self { + interval, + network: network.to_string(), + db_directory: db_directory.to_path_buf(), + snapshot_directory: snapshot_directory.to_path_buf(), + dependencies, + } + } +} + +#[async_trait] +pub trait AggregatorRunnerTrait: Sync + Send { + /// Return the current beacon if it is newer than the given one. + async fn is_new_beacon(&self, beacon: Option) -> Result, RuntimeError>; + + async fn compute_digest(&self, new_beacon: &Beacon) -> Result; + + async fn update_message_in_multisigner( + &self, + digest_result: DigesterResult, + ) -> Result<(), RuntimeError>; + + async fn create_new_pending_certificate_from_multisigner( + &self, + beacon: Beacon, + ) -> Result; + + async fn save_pending_certificate( + &self, + pending_certificate: CertificatePending, + ) -> Result<(), RuntimeError>; + + async fn drop_pending_certificate(&self) -> Result; + + async fn is_multisig_created(&self) -> Result; + + async fn create_snapshot_archive(&self) -> Result; + + async fn upload_snapshot_archive( + &self, + path: &Path, + ) -> Result, RuntimeError>; + + async fn create_and_save_certificate( + &self, + beacon: &Beacon, + certificate_pending: &CertificatePending, + ) -> Result; + + async fn create_and_save_snapshot( + &self, + certificate: Certificate, + file_path: &Path, + remote_locations: Vec, + ) -> Result; +} + +pub struct AggregatorRunner { + config: AggregatorConfig, +} + +impl AggregatorRunner { + pub fn new(config: AggregatorConfig) -> Self { + Self { config } + } +} + +#[cfg_attr(test, automock)] +#[async_trait] +impl AggregatorRunnerTrait for AggregatorRunner { + /// Is there a new beacon? + /// returns a new beacon if there is one more recent than the given one + async fn is_new_beacon( + &self, + maybe_beacon: Option, + ) -> Result, RuntimeError> { + info!("checking if there is a new beacon"); + debug!( + "checking immutables in directory {}", + self.config.db_directory.to_string_lossy() + ); + let db_path: &Path = self.config.db_directory.as_path(); + let immutable_file_number = ImmutableFile::list_completed_in_dir(db_path) + .map_err(RuntimeError::ImmutableFileError)? + .into_iter() + .last() + .ok_or_else(|| RuntimeError::General("no immutable file was returned".to_string()))? + .number; + let current_beacon = Beacon { + network: self.config.network.clone(), + epoch: 0, + immutable_file_number, + }; + + match maybe_beacon { + Some(beacon) if current_beacon > beacon => Ok(Some(current_beacon)), + None => Ok(Some(current_beacon)), + _ => Ok(None), + } + } + + /// Is a multisignature ready? + /// Can we create a multisignature. + async fn is_multisig_created(&self) -> Result { + info!("check if we can create a multisignature"); + let has_multisig = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .write() + .await + .create_multi_signature() + .await? + .is_some(); + + if has_multisig { + debug!("new MULTISIG created"); + } else { + info!("no multisig created"); + } + Ok(has_multisig) + } + + async fn compute_digest(&self, new_beacon: &Beacon) -> Result { + info!("running runner::compute_digester"); + let digester = + ImmutableDigester::new(self.config.db_directory.clone(), slog_scope::logger()); + debug!("computing digest"; "db_directory" => self.config.db_directory.display()); + + // digest is done in a separate thread because it is blocking the whole task + debug!("launching digester thread"); + let digest_result = tokio::task::spawn_blocking(move || digester.compute_digest()) + .await + .map_err(|e| RuntimeError::General(e.to_string()))??; + debug!( + "last immutable file number: {}", + digest_result.last_immutable_file_number + ); + + if digest_result.last_immutable_file_number != new_beacon.immutable_file_number { + error!("digest beacon is different than the given beacon"); + Err(RuntimeError::General( + format!("The digest has been computed for a different immutable ({}) file than the one given in the beacon ({}).", digest_result.last_immutable_file_number, new_beacon.immutable_file_number) + )) + } else { + trace!("digest last immutable file number and new beacon file number are consistent"); + Ok(digest_result) + } + } + + async fn create_new_pending_certificate_from_multisigner( + &self, + beacon: Beacon, + ) -> Result { + info!("running runner::create_pending_certificate"); + let multi_signer = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .read() + .await; + + debug!("creating certificate pending using multisigner"); + warn!("pending certificate's previous hash is fake"); + let pending_certificate = CertificatePending::new( + beacon, + multi_signer + .get_protocol_parameters() + .await + .ok_or_else(|| RuntimeError::General("no protocol parameters".to_string()))? + .into(), + "123".to_string(), + multi_signer.get_signers().await?, + ); + + Ok(pending_certificate) + } + + async fn save_pending_certificate( + &self, + pending_certificate: CertificatePending, + ) -> Result<(), RuntimeError> { + info!("saving pending certificate"); + + self.config + .dependencies + .certificate_pending_store + .as_ref() + .ok_or_else(|| { + RuntimeError::General("no certificate pending store registered".to_string()) + })? + .write() + .await + .save(pending_certificate) + .await + .map_err(|e| e.into()) + } + + async fn update_message_in_multisigner( + &self, + digest_result: DigesterResult, + ) -> Result<(), RuntimeError> { + info!("update message in multisigner"); + + self.config + .dependencies + .multi_signer + .as_ref() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .write() + .await + .update_current_message(digest_result.digest.into_bytes()) + .await + .map_err(RuntimeError::MultiSigner) + } + + async fn drop_pending_certificate(&self) -> Result { + info!("drop pending certificate"); + + let certificate_pending = self + .config + .dependencies + .certificate_pending_store + .as_ref() + .ok_or_else(|| { + RuntimeError::General("no certificate pending store registered".to_string()) + })? + .write() + .await + .remove() + .await? + .ok_or_else(|| { + RuntimeError::General("no certificate pending for the given beacon".to_string()) + })?; + + Ok(certificate_pending) + } + + async fn create_snapshot_archive(&self) -> Result { + info!("create snapshot archive"); + + let snapshotter = Snapshotter::new( + self.config.db_directory.clone(), + self.config.snapshot_directory.clone(), + ); + let message = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .read() + .await + .get_current_message() + .await + .ok_or_else(|| RuntimeError::General("no message found".to_string()))?; + let snapshot_name = format!( + "{}.{}.tar.gz", + self.config.network, + std::str::from_utf8(&message).map_err(|e| RuntimeError::General(e.to_string()))? + ); + // spawn a separate thread to prevent blocking + let snapshot_path = + tokio::task::spawn_blocking(move || -> Result { + snapshotter.snapshot(&snapshot_name) + }) + .await + .map_err(|e| RuntimeError::General(e.to_string()))??; + + debug!("snapshot created at '{}'", snapshot_path.to_string_lossy()); + + Ok(snapshot_path) + } + + async fn create_and_save_certificate( + &self, + beacon: &Beacon, + certificate_pending: &CertificatePending, + ) -> Result { + info!("create and save certificate"); + let multisigner = self + .config + .dependencies + .multi_signer + .as_ref() + .ok_or_else(|| RuntimeError::General("no multisigner registered".to_string()))? + .read() + .await; + let certificate = multisigner + .create_certificate(beacon.clone(), certificate_pending.previous_hash.clone()) + .await? + .ok_or_else(|| RuntimeError::General("no certificate generated".to_string()))?; + let _ = self + .config + .dependencies + .certificate_store + .as_ref() + .ok_or_else(|| RuntimeError::General("no certificate store registered".to_string()))? + .write() + .await + .save(certificate.clone()) + .await?; + + Ok(certificate) + } + + async fn upload_snapshot_archive( + &self, + path: &Path, + ) -> Result, RuntimeError> { + info!("upload snapshot archive"); + let location = self + .config + .dependencies + .snapshot_uploader + .as_ref() + .ok_or_else(|| { + RuntimeError::SnapshotUploader("no snapshot uploader registered".to_string()) + })? + .read() + .await + .upload_snapshot(path) + .await + .map_err(RuntimeError::SnapshotUploader)?; + + Ok(vec![location]) + } + + async fn create_and_save_snapshot( + &self, + certificate: Certificate, + file_path: &Path, + remote_locations: Vec, + ) -> Result { + let digest_hex = Vec::from_hex(certificate.digest).unwrap(); + let snapshot = Snapshot::new( + String::from_utf8(digest_hex).map_err(|e| RuntimeError::General(e.to_string()))?, + certificate.hash, + std::fs::metadata(file_path) + .map_err(|e| RuntimeError::General(e.to_string()))? + .len(), + format!("{:?}", Utc::now()), + remote_locations, + ); + + let _ = self + .config + .dependencies + .snapshot_store + .as_ref() + .ok_or_else(|| RuntimeError::General("no snapshot store registered".to_string()))? + .write() + .await + .add_snapshot(snapshot.clone()) + .await?; + + Ok(snapshot) + } +} diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs new file mode 100644 index 00000000000..782703daedb --- /dev/null +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -0,0 +1,388 @@ +use super::{AggregatorRunnerTrait, RuntimeError}; + +use mithril_common::entities::{Beacon, CertificatePending}; +use slog_scope::{debug, error, info, trace}; +use std::fmt::Display; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; + +#[derive(Clone, Debug, PartialEq)] +pub struct IdleState { + current_beacon: Option, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct SigningState { + current_beacon: Beacon, + certificate_pending: CertificatePending, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum AggregatorState { + Idle(IdleState), + Signing(SigningState), +} + +impl Display for AggregatorState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + AggregatorState::Idle(_) => write!(f, "idle"), + AggregatorState::Signing(_) => write!(f, "signing"), + } + } +} + +/// AggregatorRuntime +pub struct AggregatorRuntime { + /// the internal state of the automate + state: AggregatorState, + + /// time between each state machine execution + state_sleep: Duration, + + /// specific runner for this state machine + runner: Arc, +} + +impl AggregatorRuntime { + pub fn get_state(&self) -> String { + self.state.to_string() + } + + pub async fn new( + state_sleep: Duration, + init_state: Option, + runner: Arc, + ) -> Result { + info!("initializing runtime"); + + let state = if init_state.is_none() { + trace!("idle state, no current beacon"); + AggregatorState::Idle(IdleState { + current_beacon: None, + }) + } else { + trace!("got initial state from caller"); + init_state.unwrap() + }; + + Ok::(Self { + state_sleep, + state, + runner, + }) + } + + /// run + /// + /// launches an infinite loop ticking the state machine + pub async fn run(&mut self) { + info!("Starting runtime"); + debug!("current state: {}", self.state); + + loop { + if let Err(e) = self.cycle().await { + error!("{:?}", e) + } + + info!("Sleeping for {} seconds", self.state_sleep.as_secs()); + sleep(self.state_sleep).await; + } + } + + /// cycle + /// + /// one tick of the state machine + pub async fn cycle(&mut self) -> Result<(), RuntimeError> { + info!("================================================================================"); + info!("new cycle"); + match self.state.clone() { + AggregatorState::Idle(state) => { + info!("state IDLE"); + + if let Some(beacon) = self + .runner + .is_new_beacon(state.current_beacon.clone()) + .await? + { + trace!( + "new beacon found, immutable file number = {}", + beacon.immutable_file_number + ); + let new_state = self.transition_from_idle_to_signing(beacon).await?; + self.state = AggregatorState::Signing(new_state); + } else { + trace!("nothing to do in IDLE state") + } + } + AggregatorState::Signing(state) => { + info!("state SIGNING"); + + if let Some(beacon) = self + .runner + .is_new_beacon(Some(state.current_beacon.clone())) + .await? + { + trace!( + "new beacon found, immutable file number = {}", + beacon.immutable_file_number + ); + let new_state = self + .transition_from_signing_to_idle_new_beacon(state) + .await?; + self.state = AggregatorState::Idle(new_state); + } else if self.runner.is_multisig_created().await? { + trace!("new multisignature found"); + let new_state = self + .transition_from_signing_to_idle_multisignature(state) + .await?; + self.state = AggregatorState::Idle(new_state); + } else { + trace!("nothing to do in SIGNING state") + } + } + } + Ok(()) + } + + /// transition + /// + /// from SIGNING to IDLE because NEW MULTISIGNATURE + async fn transition_from_signing_to_idle_multisignature( + &self, + state: SigningState, + ) -> Result { + let certificate_pending = self.runner.drop_pending_certificate().await?; + let path = self.runner.create_snapshot_archive().await?; + let locations = self.runner.upload_snapshot_archive(&path).await?; + let certificate = self + .runner + .create_and_save_certificate(&state.current_beacon, &certificate_pending) + .await?; + let _ = self + .runner + .create_and_save_snapshot(certificate, &path, locations) + .await?; + + Ok(IdleState { + current_beacon: Some(state.current_beacon), + }) + } + + /// transition + /// + /// from SIGNING to IDLE because NEW BEACON + async fn transition_from_signing_to_idle_new_beacon( + &self, + state: SigningState, + ) -> Result { + self.runner.drop_pending_certificate().await?; + + Ok(IdleState { + current_beacon: Some(state.current_beacon), + }) + } + + /// transition + /// + /// from IDLE state to SIGNING because NEW BEACON + async fn transition_from_idle_to_signing( + &self, + new_beacon: Beacon, + ) -> Result { + debug!("launching transition from IDLE to SIGNING state"); + let digester_result = self.runner.compute_digest(&new_beacon).await?; + let _ = self + .runner + .update_message_in_multisigner(digester_result) + .await?; + let certificate = self + .runner + .create_new_pending_certificate_from_multisigner(new_beacon.clone()) + .await?; + let _ = self + .runner + .save_pending_certificate(certificate.clone()) + .await?; + let state = SigningState { + current_beacon: new_beacon, + certificate_pending: certificate, + }; + + Ok(state) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::super::runner::MockAggregatorRunner; + use super::*; + use mithril_common::digesters::DigesterResult; + use mithril_common::fake_data; + use mockall::predicate; + + async fn init_runtime( + init_state: Option, + runner: MockAggregatorRunner, + ) -> AggregatorRuntime { + AggregatorRuntime::new(Duration::from_millis(100), init_state, Arc::new(runner)) + .await + .unwrap() + } + + #[tokio::test] + pub async fn idle_check_no_new_beacon_with_current_beacon() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); + let mut runtime = init_runtime( + Some(AggregatorState::Idle(IdleState { + current_beacon: Some(fake_data::beacon()), + })), + runner, + ) + .await; + let _ = runtime.cycle().await.unwrap(); + + assert_eq!("idle".to_string(), runtime.get_state()); + } + + #[tokio::test] + pub async fn idle_check_no_new_beacon_with_no_current_beacon() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(Some(fake_data::beacon()))); + runner.expect_compute_digest().times(1).returning(|_| { + Ok(DigesterResult { + digest: "whatever".to_string(), + last_immutable_file_number: 123, + }) + }); + runner + .expect_update_message_in_multisigner() + .with(predicate::eq(DigesterResult { + digest: "whatever".to_string(), + last_immutable_file_number: 123, + })) + .times(1) + .returning(|_| Ok(())); + runner + .expect_create_new_pending_certificate_from_multisigner() + .with(predicate::eq(fake_data::beacon())) + .times(1) + .returning(|_| Ok(fake_data::certificate_pending())); + runner + .expect_save_pending_certificate() + .times(1) + .returning(|_| Ok(())); + + let mut runtime = init_runtime( + Some(AggregatorState::Idle(IdleState { + current_beacon: None, + })), + runner, + ) + .await; + let _ = runtime.cycle().await.unwrap(); + + assert_eq!("signing".to_string(), runtime.get_state()); + } + + #[tokio::test] + async fn signing_changing_beacon_to_idle() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(Some(fake_data::beacon()))); + runner + .expect_drop_pending_certificate() + .times(1) + .returning(|| Ok(fake_data::certificate_pending())); + + let state = SigningState { + // this current beacon must be outdated so the state machine will + // return to idle state + current_beacon: { + let mut beacon = fake_data::beacon(); + beacon.immutable_file_number -= 1; + + beacon + }, + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + let _ = runtime.cycle().await.unwrap(); + + assert_eq!("idle".to_string(), runtime.get_state()); + } + + #[tokio::test] + async fn signing_same_beacon_to_signing() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); + runner + .expect_is_multisig_created() + .times(1) + .returning(|| Ok(false)); + let state = SigningState { + current_beacon: fake_data::beacon(), + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + let _ = runtime.cycle().await.unwrap(); + + assert_eq!("signing".to_string(), runtime.get_state()); + } + + #[tokio::test] + async fn signing_multisig_ready_to_idle() { + let mut runner = MockAggregatorRunner::new(); + runner + .expect_is_new_beacon() + .times(1) + .returning(|_| Ok(None)); + runner + .expect_is_multisig_created() + .times(1) + .returning(|| Ok(true)); + runner + .expect_drop_pending_certificate() + .times(1) + .returning(|| Ok(fake_data::certificate_pending())); + runner + .expect_create_snapshot_archive() + .times(1) + .returning(|| Ok(PathBuf::new().join("/tmp/archive.zip"))); + runner + .expect_upload_snapshot_archive() + .times(1) + .returning(|_path| Ok(vec!["locA".to_string(), "locB".to_string()])); + runner + .expect_create_and_save_certificate() + .times(1) + .returning(|_, _| Ok(fake_data::certificate("whatever".to_string()))); + runner + .expect_create_and_save_snapshot() + .times(1) + .returning(|_, _, _| Ok(fake_data::snapshots(1)[0].clone())); + + let state = SigningState { + current_beacon: fake_data::beacon(), + certificate_pending: fake_data::certificate_pending(), + }; + let mut runtime = init_runtime(Some(AggregatorState::Signing(state)), runner).await; + let _ = runtime.cycle().await.unwrap(); + + assert_eq!("idle".to_string(), runtime.get_state()); + } +} diff --git a/mithril-aggregator/src/store/pending_certificate_store.rs b/mithril-aggregator/src/store/pending_certificate_store.rs index 4487d490407..e1b8da56778 100644 --- a/mithril-aggregator/src/store/pending_certificate_store.rs +++ b/mithril-aggregator/src/store/pending_certificate_store.rs @@ -1,9 +1,11 @@ use super::StoreError; -use mithril_common::entities::{Beacon, CertificatePending}; +use mithril_common::entities::CertificatePending; use mithril_common::store::adapter::StoreAdapter; -type Adapter = Box>; +type Adapter = Box>; + +const KEY: &str = "certificate_pending"; pub struct CertificatePendingStore { adapter: Adapter, @@ -14,33 +16,20 @@ impl CertificatePendingStore { Self { adapter } } - pub async fn get_from_beacon( - &self, - beacon: &Beacon, - ) -> Result, StoreError> { - Ok(self.adapter.get_record(beacon).await?) + pub async fn get(&self) -> Result, StoreError> { + Ok(self.adapter.get_record(&KEY.to_string()).await?) } pub async fn save(&mut self, certificate: CertificatePending) -> Result<(), StoreError> { Ok(self .adapter - .store_record(&certificate.beacon, &certificate) + .store_record(&KEY.to_string(), &certificate) .await?) } - pub async fn get_list(&self, last_n: usize) -> Result, StoreError> { - let vars = self.adapter.get_last_n_records(last_n).await?; - let result = vars.into_iter().map(|(_, y)| y).collect(); - - Ok(result) - } - - pub async fn remove( - &mut self, - beacon: &Beacon, - ) -> Result, StoreError> { + pub async fn remove(&mut self) -> Result, StoreError> { self.adapter - .remove(beacon) + .remove(&KEY.to_string()) .await .map_err(StoreError::AdapterError) } @@ -50,22 +39,23 @@ impl CertificatePendingStore { mod test { use super::*; + use mithril_common::entities::Beacon; use mithril_common::fake_data; use mithril_common::store::adapter::DumbStoreAdapter; - async fn get_certificate_pending_store(size: u64) -> CertificatePendingStore { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); + async fn get_certificate_pending_store(is_populated: bool) -> CertificatePendingStore { + let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - for ix in 0..size { - let beacon = Beacon::new("testnet".to_string(), ix / 3, ix); + if is_populated { + let beacon = Beacon::new("testnet".to_string(), 0, 0); let certificate_pending = CertificatePending::new( beacon.clone(), fake_data::protocol_parameters(), - ix.to_string(), + "previous_hash".to_string(), fake_data::signers(5), ); adapter - .store_record(&beacon, &certificate_pending) + .store_record(&KEY.to_string(), &certificate_pending) .await .unwrap(); } @@ -75,71 +65,60 @@ mod test { } #[tokio::test] - async fn list_is_empty() { - let store = get_certificate_pending_store(0).await; - - assert_eq!(0, store.get_list(100).await.unwrap().len()); - } - - #[tokio::test] - async fn list_has_some_members() { - let store = get_certificate_pending_store(1).await; + async fn get_certificate_pending_with_existing_certificate() { + let store = get_certificate_pending_store(true).await; + let result = store.get().await.unwrap(); - assert_eq!(1, store.get_list(100).await.unwrap().len()); - } - - #[tokio::test] - async fn get_certificate_pending_with_good_beacon() { - let beacon = Beacon::new("testnet".to_string(), 0, 0); - let store = get_certificate_pending_store(1).await; - let result = store.get_from_beacon(&beacon).await.unwrap(); assert!(result.is_some()); } #[tokio::test] - async fn get_certificate_pending_with_wrong_beacon() { - let beacon = Beacon::new("testnet".to_string(), 0, 1); - let store = get_certificate_pending_store(1).await; - let result = store.get_from_beacon(&beacon).await.unwrap(); + async fn get_certificate_pending_with_no_existing_certificate() { + let store = get_certificate_pending_store(false).await; + let result = store.get().await.unwrap(); + assert!(result.is_none()); } #[tokio::test] async fn save_certificate_pending_once() { - let mut store = get_certificate_pending_store(1).await; + let mut store = get_certificate_pending_store(false).await; let beacon = Beacon::new("testnet".to_string(), 0, 1); let certificate_pending = CertificatePending::new( beacon, fake_data::protocol_parameters(), - "0".to_string(), + "previous_hash".to_string(), fake_data::signers(1), ); assert!(store.save(certificate_pending).await.is_ok()); + assert!(store.get().await.unwrap().is_some()); } #[tokio::test] async fn update_certificate_pending() { - let mut store = get_certificate_pending_store(1).await; - let beacon = Beacon::new("testnet".to_string(), 0, 0); - let mut certificate_pending = store.get_from_beacon(&beacon).await.unwrap().unwrap(); + let mut store = get_certificate_pending_store(true).await; + let mut certificate_pending = store.get().await.unwrap().unwrap(); - assert_eq!("0".to_string(), certificate_pending.previous_hash); - certificate_pending.previous_hash = "one".to_string(); + assert_eq!( + "previous_hash".to_string(), + certificate_pending.previous_hash + ); + certificate_pending.previous_hash = "something".to_string(); assert!(store.save(certificate_pending).await.is_ok()); - let certificate_pending = store.get_from_beacon(&beacon).await.unwrap().unwrap(); + let certificate_pending = store.get().await.unwrap().unwrap(); - assert_eq!("one".to_string(), certificate_pending.previous_hash); + assert_eq!("something".to_string(), certificate_pending.previous_hash); } #[tokio::test] async fn remove_certificate_pending() { - let mut store = get_certificate_pending_store(1).await; + let mut store = get_certificate_pending_store(true).await; let beacon = Beacon::new("testnet".to_string(), 0, 0); - let certificate_pending = store.remove(&beacon).await.unwrap().unwrap(); + let certificate_pending = store.remove().await.unwrap().unwrap(); assert_eq!(beacon, certificate_pending.beacon); - assert!(store.get_from_beacon(&beacon).await.unwrap().is_none()); + assert!(store.get().await.unwrap().is_none()); } } diff --git a/mithril-common/src/digesters/digester.rs b/mithril-common/src/digesters/digester.rs index 08955ebab52..4168cd42c9b 100644 --- a/mithril-common/src/digesters/digester.rs +++ b/mithril-common/src/digesters/digester.rs @@ -3,7 +3,7 @@ use crate::entities::ImmutableFileNumber; use std::io; use thiserror::Error; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DigesterResult { /// The computed digest pub digest: String, diff --git a/mithril-common/src/entities.rs b/mithril-common/src/entities.rs index 07da17763a3..cf974dd4da6 100644 --- a/mithril-common/src/entities.rs +++ b/mithril-common/src/entities.rs @@ -5,7 +5,7 @@ use sha2::{Digest, Sha256}; pub type ImmutableFileNumber = u64; /// Beacon represents a point in the Cardano chain at which a Mithril certificate should be produced -#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, Hash, PartialOrd)] pub struct Beacon { /// Cardano network #[serde(rename = "network")] @@ -39,7 +39,6 @@ impl Beacon { hex::encode(hasher.finalize()) } } - /// CertificatePending represents a pending certificate in the process of production #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] pub struct CertificatePending { diff --git a/mithril-common/src/store/adapter/store_adapter.rs b/mithril-common/src/store/adapter/store_adapter.rs index 51ac5be3d14..16811963dea 100644 --- a/mithril-common/src/store/adapter/store_adapter.rs +++ b/mithril-common/src/store/adapter/store_adapter.rs @@ -35,5 +35,8 @@ pub trait StoreAdapter: Sync + Send { how_many: usize, ) -> Result, AdapterError>; + /// remove values from store + /// + /// if the value exists it is returned by the adapter otherwise None is returned async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError>; } diff --git a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs index ad28dd7e97f..79956d87d99 100644 --- a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs @@ -19,6 +19,8 @@ impl Spec { let aggregator_endpoint = self.infrastructure.aggregator().endpoint(); wait_for_pending_certificate(&aggregator_endpoint).await?; + let _ = self.infrastructure.add_immutable(); + let digest = assert_node_producing_snapshot(&aggregator_endpoint).await?; let certificate_hash = assert_signer_is_signing_snapshot(&aggregator_endpoint, &digest).await?; @@ -54,7 +56,7 @@ impl Spec { async fn wait_for_pending_certificate(aggregator_endpoint: &str) -> Result<(), String> { let url = format!("{}/certificate-pending", aggregator_endpoint); - match attempt!(10, Duration::from_millis(100), { + match attempt!(10, Duration::from_millis(1000), { match reqwest::get(url.clone()).await { Ok(response) => match response.status() { StatusCode::OK => { @@ -82,7 +84,7 @@ async fn wait_for_pending_certificate(aggregator_endpoint: &str) -> Result<(), S async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> Result { let url = format!("{}/snapshots", aggregator_endpoint); - match attempt!(10, Duration::from_millis(1000), { + match attempt!(20, Duration::from_millis(1500), { match reqwest::get(url.clone()).await { Ok(response) => match response.status() { StatusCode::OK => match response.json::>().await.as_deref() { diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs index d010c0d1c0d..8f14ff283fd 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs @@ -25,11 +25,14 @@ impl Aggregator { ("SNAPSHOT_UPLOADER_TYPE", "local"), ( "PENDING_CERTIFICATE_STORE_DIRECTORY", - "./store/pending-certs", + "./store/aggregator/pending-certs", ), - ("CERTIFICATE_STORE_DIRECTORY", "./store/certs"), - ("VERIFICATION_KEY_STORE_DIRECTORY", "./store/certs"), - ("STAKE_STORE_DIRECTORY", "./store/stakes"), + ("CERTIFICATE_STORE_DIRECTORY", "./store/aggregator/certs"), + ( + "VERIFICATION_KEY_STORE_DIRECTORY", + "./store/aggregator/certs", + ), + ("STAKE_STORE_DIRECTORY", "./store/aggregator/stakes"), ]); let args = vec![ "--db-directory", diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs index 65bbee1c645..322cb4c2790 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/client.rs @@ -48,7 +48,7 @@ impl Client { self.command.dump_logs_to_stdout().await?; Err(match status.code() { - Some(c) => format!("mithril-signer exited with code: {}", c), + Some(c) => format!("mithril-client exited with code: {}", c), None => "mithril-client was terminated with a signal".to_string(), }) } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs index 8ffcf182549..d6b8030249f 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; pub struct MithrilInfrastructure { work_dir: PathBuf, bin_dir: PathBuf, + db_dir: PathBuf, aggregator: Aggregator, signer: Signer, } @@ -25,6 +26,7 @@ impl MithrilInfrastructure { Ok(Self { work_dir: work_dir.to_path_buf(), bin_dir: bin_dir.to_path_buf(), + db_dir: db_dir.to_path_buf(), aggregator, signer, }) @@ -49,4 +51,22 @@ impl MithrilInfrastructure { pub fn build_client(&self) -> Result { Client::new(self.aggregator.endpoint(), &self.work_dir, &self.bin_dir) } + + pub fn add_immutable(&self) -> Result<(), Box> { + let db_path = self.db_dir.join("immutable"); + let glob_expr = format!("{}/*.chunk", db_path.to_string_lossy()); + + let mut filelist = glob::glob(&glob_expr)? + .map(|f| { + str::parse::(f.unwrap().file_stem().unwrap().to_str().unwrap()).unwrap() + }) + .collect::>(); + filelist.sort(); + let new_number = filelist.pop().unwrap() + 1; + + std::fs::File::create(db_path.join(format!("{:05}.chunk", new_number)))?; + std::fs::File::create(db_path.join(format!("{:05}.primary", new_number)))?; + std::fs::File::create(db_path.join(format!("{:05}.secondary", new_number)))?; + Ok(()) + } } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs index 0234e1805a4..7f687010c02 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs @@ -22,7 +22,7 @@ impl Signer { ("RUN_INTERVAL", "2000"), ("AGGREGATOR_ENDPOINT", &aggregator_endpoint), ("DB_DIRECTORY", db_directory.to_str().unwrap()), - ("STAKE_STORE_DIRECTORY", "./store/stakes") + ("STAKE_STORE_DIRECTORY", "./store/signer/stakes"), ]); let args = vec!["-vvv"];