diff --git a/Cargo.lock b/Cargo.lock index 3bfdfa77e21..40b6287013e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3586,7 +3586,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.6.1" +version = "0.6.2" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index c44d6386400..48de3f7bc84 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.6.1" +version = "0.6.2" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs index 293e3e8a57d..49a05488082 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs @@ -5,10 +5,7 @@ use slog::{debug, warn, Logger}; use std::sync::Arc; use thiserror::Error; -use crate::{ - snapshot_uploaders::SnapshotLocation, snapshotter::OngoingSnapshot, SnapshotUploader, - Snapshotter, -}; +use crate::{file_uploaders::FileUri, snapshotter::OngoingSnapshot, FileUploader, Snapshotter}; use super::ArtifactBuilder; use mithril_common::logging::LoggerExtensions; @@ -33,7 +30,7 @@ pub struct CardanoImmutableFilesFullArtifactBuilder { cardano_network: CardanoNetwork, cardano_node_version: Version, snapshotter: Arc, - snapshot_uploader: Arc, + snapshot_uploader: Arc, compression_algorithm: CompressionAlgorithm, logger: Logger, } @@ -44,7 +41,7 @@ impl CardanoImmutableFilesFullArtifactBuilder { cardano_network: CardanoNetwork, cardano_node_version: &Version, snapshotter: Arc, - snapshot_uploader: Arc, + snapshot_uploader: Arc, compression_algorithm: CompressionAlgorithm, logger: Logger, ) -> Self { @@ -89,11 +86,11 @@ impl CardanoImmutableFilesFullArtifactBuilder { async fn upload_snapshot_archive( &self, ongoing_snapshot: &OngoingSnapshot, - ) -> StdResult> { + ) -> StdResult> { debug!(self.logger, ">> upload_snapshot_archive"); let location = self .snapshot_uploader - .upload_snapshot(ongoing_snapshot.get_file_path()) + .upload(ongoing_snapshot.get_file_path()) .await; if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await { @@ -158,7 +155,12 @@ impl ArtifactBuilder for CardanoImmutableFilesFullArt })?; let snapshot = self - .create_snapshot(beacon, &ongoing_snapshot, snapshot_digest, locations) + .create_snapshot( + beacon, + &ongoing_snapshot, + snapshot_digest, + locations.into_iter().map(Into::into).collect(), + ) .await?; Ok(snapshot) @@ -174,8 +176,7 @@ mod tests { use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data}; use crate::{ - snapshot_uploaders::MockSnapshotUploader, test_tools::TestLogger, DumbSnapshotUploader, - DumbSnapshotter, + file_uploaders::MockFileUploader, test_tools::TestLogger, DumbSnapshotter, DumbUploader, }; use super::*; @@ -190,7 +191,7 @@ mod tests { .unwrap(); let dumb_snapshotter = Arc::new(DumbSnapshotter::new()); - let dumb_snapshot_uploader = Arc::new(DumbSnapshotUploader::new()); + let dumb_snapshot_uploader = Arc::new(DumbUploader::new()); let cardano_immutable_files_full_artifact_builder = CardanoImmutableFilesFullArtifactBuilder::new( @@ -213,6 +214,7 @@ mod tests { let remote_locations = vec![dumb_snapshot_uploader .get_last_upload() .unwrap() + .map(Into::into) .expect("A snapshot should have been 'uploaded'")]; let artifact_expected = Snapshot::new( snapshot_digest.to_owned(), @@ -237,7 +239,7 @@ mod tests { fake_data::network(), &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbSnapshotUploader::new()), + Arc::new(DumbUploader::new()), CompressionAlgorithm::default(), TestLogger::stdout(), ); @@ -264,7 +266,7 @@ mod tests { network, &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbSnapshotUploader::new()), + Arc::new(DumbUploader::new()), CompressionAlgorithm::Gzip, TestLogger::stdout(), ); @@ -293,7 +295,7 @@ mod tests { fake_data::network(), &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbSnapshotUploader::new()), + Arc::new(DumbUploader::new()), algorithm, TestLogger::stdout(), ); @@ -325,9 +327,9 @@ mod tests { let file = NamedTempFile::new().unwrap(); let file_path = file.path(); let snapshot = OngoingSnapshot::new(file_path.to_path_buf(), 7331); - let mut snapshot_uploader = MockSnapshotUploader::new(); + let mut snapshot_uploader = MockFileUploader::new(); snapshot_uploader - .expect_upload_snapshot() + .expect_upload() .return_once(|_| Err(anyhow!("an error"))) .once(); diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index c09d6fb19ea..bf987fc4674 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -64,6 +64,7 @@ use crate::{ }, entities::AggregatorEpochSettings, event_store::{EventMessage, EventStore, TransmitterService}, + file_uploaders::GcpUploader, http_server::routes::router::{self, RouterConfig, RouterState}, services::{ AggregatorSignableSeedBuilder, AggregatorUpkeepService, BufferedCertifierService, @@ -74,12 +75,12 @@ use crate::{ UpkeepService, UsageReporter, }, store::CertificatePendingStorer, - tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter}, + tools::{CExplorerSignerRetriever, GenesisToolsDependency, SignersImporter}, AggregatorConfig, AggregatorRunner, AggregatorRuntime, CompressedArchiveSnapshotter, - Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, EpochSettingsStorer, - LocalSnapshotUploader, MetricsService, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl, - RemoteSnapshotUploader, SingleSignatureAuthenticator, SnapshotUploader, SnapshotUploaderType, - Snapshotter, SnapshotterCompressionAlgorithm, VerificationKeyStorer, + Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, EpochSettingsStorer, + FileUploader, LocalUploader, MetricsService, MithrilSignerRegisterer, MultiSigner, + MultiSignerImpl, SingleSignatureAuthenticator, SnapshotUploaderType, Snapshotter, + SnapshotterCompressionAlgorithm, VerificationKeyStorer, }; const SQLITE_FILE: &str = "aggregator.sqlite3"; @@ -118,7 +119,7 @@ pub struct DependenciesBuilder { pub stake_store: Option>, /// Snapshot uploader service. - pub snapshot_uploader: Option>, + pub snapshot_uploader: Option>, /// Multisigner service. pub multi_signer: Option>, @@ -446,7 +447,7 @@ impl DependenciesBuilder { Ok(self.stake_store.as_ref().cloned().unwrap()) } - async fn build_snapshot_uploader(&mut self) -> Result> { + async fn build_snapshot_uploader(&mut self) -> Result> { let logger = self.root_logger(); if self.configuration.environment == ExecutionEnvironment::Production { match self.configuration.snapshot_uploader_type { @@ -461,26 +462,25 @@ impl DependenciesBuilder { ) })?; - Ok(Arc::new(RemoteSnapshotUploader::new( - Box::new(GcpFileUploader::new(bucket.clone(), logger.clone())), + Ok(Arc::new(GcpUploader::new( bucket, self.configuration.snapshot_use_cdn_domain, - logger, + logger.clone(), ))) } - SnapshotUploaderType::Local => Ok(Arc::new(LocalSnapshotUploader::new( + SnapshotUploaderType::Local => Ok(Arc::new(LocalUploader::new( self.configuration.get_server_url(), &self.configuration.snapshot_directory, logger, ))), } } else { - Ok(Arc::new(DumbSnapshotUploader::new())) + Ok(Arc::new(DumbUploader::new())) } } - /// Get a [SnapshotUploader] - pub async fn get_snapshot_uploader(&mut self) -> Result> { + /// Get a [FileUploader] + pub async fn get_snapshot_uploader(&mut self) -> Result> { if self.snapshot_uploader.is_none() { self.snapshot_uploader = Some(self.build_snapshot_uploader().await?); } diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index e331d654c21..50de0d77a0f 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -32,13 +32,13 @@ use crate::{ }, entities::AggregatorEpochSettings, event_store::{EventMessage, TransmitterService}, + file_uploaders::FileUploader, multi_signer::MultiSigner, services::{ CertifierService, EpochService, MessageService, ProverService, SignedEntityService, StakeDistributionService, TransactionStore, UpkeepService, }, signer_registerer::SignerRecorder, - snapshot_uploaders::SnapshotUploader, store::CertificatePendingStorer, EpochSettingsStorer, MetricsService, SignerRegisterer, SignerRegistrationRoundOpener, SingleSignatureAuthenticator, Snapshotter, VerificationKeyStorer, @@ -71,7 +71,7 @@ pub struct DependencyContainer { pub stake_store: Arc, /// Snapshot uploader service. - pub snapshot_uploader: Arc, + pub snapshot_uploader: Arc, /// Multisigner service. pub multi_signer: Arc, diff --git a/mithril-aggregator/src/snapshot_uploaders/dumb_snapshot_uploader.rs b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs similarity index 62% rename from mithril-aggregator/src/snapshot_uploaders/dumb_snapshot_uploader.rs rename to mithril-aggregator/src/file_uploaders/dumb_uploader.rs index f077514aed4..0ad7fef61b3 100644 --- a/mithril-aggregator/src/snapshot_uploaders/dumb_snapshot_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs @@ -3,17 +3,17 @@ use async_trait::async_trait; use mithril_common::StdResult; use std::{path::Path, sync::RwLock}; -use super::{SnapshotLocation, SnapshotUploader}; +use crate::file_uploaders::{FileUploader, FileUri}; /// Dummy uploader for test purposes. /// -/// It actually does NOT upload any snapshot but remembers the last snapshot it +/// It actually does NOT upload any file but remembers the last file it /// was asked to upload. This is intended to by used by integration tests. -pub struct DumbSnapshotUploader { - last_uploaded: RwLock>, +pub struct DumbUploader { + last_uploaded: RwLock>, } -impl DumbSnapshotUploader { +impl DumbUploader { /// Create a new instance. pub fn new() -> Self { Self { @@ -22,32 +22,32 @@ impl DumbSnapshotUploader { } /// Return the last upload that was triggered. - pub fn get_last_upload(&self) -> StdResult> { + pub fn get_last_upload(&self) -> StdResult> { let value = self .last_uploaded .read() .map_err(|e| anyhow!("Error while saving filepath location: {e}"))?; - Ok(value.as_ref().map(|v| v.to_string())) + Ok(value.as_ref().map(Clone::clone)) } } -impl Default for DumbSnapshotUploader { +impl Default for DumbUploader { fn default() -> Self { Self::new() } } #[async_trait] -impl SnapshotUploader for DumbSnapshotUploader { - /// Upload a snapshot - async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult { +impl FileUploader for DumbUploader { + /// Upload a file + async fn upload(&self, filepath: &Path) -> StdResult { let mut value = self .last_uploaded .write() .map_err(|e| anyhow!("Error while saving filepath location: {e}"))?; - let location = snapshot_filepath.to_string_lossy().to_string(); + let location = FileUri(filepath.to_string_lossy().to_string()); *value = Some(location.clone()); Ok(location) @@ -60,18 +60,18 @@ mod tests { #[tokio::test] async fn test_dumb_uploader() { - let uploader = DumbSnapshotUploader::new(); + let uploader = DumbUploader::new(); assert!(uploader .get_last_upload() .expect("uploader should not fail") .is_none()); let res = uploader - .upload_snapshot(Path::new("/tmp/whatever")) + .upload(Path::new("/tmp/whatever")) .await .expect("uploading with a dumb uploader should not fail"); - assert_eq!(res, "/tmp/whatever".to_string()); + assert_eq!(res, FileUri("/tmp/whatever".to_string())); assert_eq!( - Some("/tmp/whatever".to_string()), + Some(FileUri("/tmp/whatever".to_string())), uploader .get_last_upload() .expect("getting dumb uploader last value after a fake download should not fail") diff --git a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs new file mode 100644 index 00000000000..a7f2824b2d1 --- /dev/null +++ b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs @@ -0,0 +1,138 @@ +use anyhow::{anyhow, Context}; +use async_trait::async_trait; +use cloud_storage::{ + bucket::Entity, bucket_access_control::Role, object_access_control::NewObjectAccessControl, + Client, +}; +use slog::{info, Logger}; +use std::{env, path::Path}; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use mithril_common::{logging::LoggerExtensions, StdResult}; + +use crate::{file_uploaders::FileUri, FileUploader}; + +/// GcpUploader represents a Google Cloud Platform file uploader interactor +pub struct GcpUploader { + bucket: String, + use_cdn_domain: bool, + logger: Logger, +} + +impl GcpUploader { + /// GcpUploader factory + pub fn new(bucket: String, use_cdn_domain: bool, logger: Logger) -> Self { + Self { + bucket, + use_cdn_domain, + logger: logger.new_with_component_name::(), + } + } + + fn get_location(&self, filename: &str) -> FileUri { + let mut uri = vec![]; + if !self.use_cdn_domain { + uri.push("storage.googleapis.com"); + } + uri.push(&self.bucket); + uri.push(filename); + + FileUri(format!("https://{}", uri.join("/"))) + } +} + +#[async_trait] +impl FileUploader for GcpUploader { + async fn upload(&self, filepath: &Path) -> StdResult { + if env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_err() { + return Err(anyhow!( + "Missing GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable".to_string() + )); + }; + + let filename = filepath.file_name().unwrap().to_str().unwrap(); + + info!(self.logger, "Uploading {filename}"); + let client = Client::default(); + let file = tokio::fs::File::open(filepath).await.unwrap(); + let stream = FramedRead::new(file, BytesCodec::new()); + client + .object() + .create_streamed( + &self.bucket, + stream, + None, + filename, + "application/octet-stream", + ) + .await + .with_context(|| "remote uploading failure")?; + + info!(self.logger, "Uploaded {filename}"); + + // ensure the uploaded file as public read access + // when a file is uploaded to Google cloud storage its permissions are overwritten so + // we need to put them back + let new_bucket_access_control = NewObjectAccessControl { + entity: Entity::AllUsers, + role: Role::Reader, + }; + + info!( + self.logger, + "Updating acl for {filename}: {new_bucket_access_control:?}" + ); + + client + .object_access_control() + .create(&self.bucket, filename, &new_bucket_access_control) + .await + .with_context(|| "updating acl failure")?; + + info!(self.logger, "Updated acl for {filename}"); + + Ok(self.get_location(filename)) + } +} + +#[cfg(test)] +mod tests { + use crate::test_tools::TestLogger; + + use super::*; + + #[tokio::test] + async fn get_location_not_using_cdn_domain_return_google_api_uri() { + let use_cdn_domain = false; + + let file_uploader = GcpUploader::new( + "cardano-testnet".to_string(), + use_cdn_domain, + TestLogger::stdout(), + ); + let filename = "snapshot.xxx.tar.gz"; + let expected_location = + "https://storage.googleapis.com/cardano-testnet/snapshot.xxx.tar.gz".to_string(); + + let location = file_uploader.get_location(filename); + + assert_eq!(FileUri(expected_location), location); + } + + #[tokio::test] + async fn get_location_using_cdn_domain_return_cdn_in_uri() { + let use_cdn_domain = true; + + let file_uploader = GcpUploader::new( + "cdn.mithril.network".to_string(), + use_cdn_domain, + TestLogger::stdout(), + ); + let filename = "snapshot.xxx.tar.gz"; + let expected_location = "https://cdn.mithril.network/snapshot.xxx.tar.gz".to_string(); + + let location = file_uploader.get_location(filename); + + assert_eq!(FileUri(expected_location), location); + } +} diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs new file mode 100644 index 00000000000..a46334aa5b6 --- /dev/null +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -0,0 +1,21 @@ +use async_trait::async_trait; +use mithril_common::StdResult; +use std::path::Path; + +/// FileUri represents a file URI used to identify the file's location +#[derive(Debug, PartialEq, Clone)] +pub struct FileUri(pub String); + +impl From for String { + fn from(file_uri: FileUri) -> Self { + file_uri.0 + } +} + +/// FileUploader represents a file uploader interactor +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait FileUploader: Sync + Send { + /// Upload a file + async fn upload(&self, filepath: &Path) -> StdResult; +} diff --git a/mithril-aggregator/src/snapshot_uploaders/local_snapshot_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs similarity index 62% rename from mithril-aggregator/src/snapshot_uploaders/local_snapshot_uploader.rs rename to mithril-aggregator/src/file_uploaders/local_uploader.rs index 461ac0dda4d..b6a2c7933f6 100644 --- a/mithril-aggregator/src/snapshot_uploaders/local_snapshot_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -6,28 +6,28 @@ use std::path::{Path, PathBuf}; use mithril_common::logging::LoggerExtensions; use mithril_common::StdResult; +use crate::file_uploaders::{FileUploader, FileUri}; use crate::http_server; -use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; use crate::tools; -/// LocalSnapshotUploader is a snapshot uploader working using local files -pub struct LocalSnapshotUploader { - /// Snapshot server listening IP - snapshot_server_url: String, +/// LocalUploader is a file uploader working using local files +pub struct LocalUploader { + /// File server listening IP + file_server_url: String, - /// Target folder where to store snapshots archive + /// Target folder where to store files archive target_location: PathBuf, logger: Logger, } -impl LocalSnapshotUploader { - /// LocalSnapshotUploader factory - pub(crate) fn new(snapshot_server_url: String, target_location: &Path, logger: Logger) -> Self { +impl LocalUploader { + /// LocalUploader factory + pub(crate) fn new(file_server_url: String, target_location: &Path, logger: Logger) -> Self { let logger = logger.new_with_component_name::(); - debug!(logger, "New LocalSnapshotUploader created"; "snapshot_server_url" => &snapshot_server_url); + debug!(logger, "New LocalUploader created"; "file_server_url" => &file_server_url); Self { - snapshot_server_url, + file_server_url, target_location: target_location.to_path_buf(), logger, } @@ -35,24 +35,26 @@ impl LocalSnapshotUploader { } #[async_trait] -impl SnapshotUploader for LocalSnapshotUploader { - async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult { - let archive_name = snapshot_filepath.file_name().unwrap().to_str().unwrap(); +impl FileUploader for LocalUploader { + async fn upload(&self, filepath: &Path) -> StdResult { + let archive_name = filepath.file_name().unwrap().to_str().unwrap(); let target_path = &self.target_location.join(archive_name); - tokio::fs::copy(snapshot_filepath, target_path) + tokio::fs::copy(filepath, target_path) .await - .with_context(|| "Snapshot copy failure")?; + .with_context(|| "File copy failure")?; let digest = tools::extract_digest_from_path(Path::new(archive_name)); + let specific_route = "artifact/snapshot"; let location = format!( - "{}{}/artifact/snapshot/{}/download", - self.snapshot_server_url, + "{}{}/{}/{}/download", + self.file_server_url, http_server::SERVER_BASE_PATH, + specific_route, digest.unwrap() ); - debug!(self.logger, "Snapshot 'uploaded' to local storage"; "location" => &location); - Ok(location) + debug!(self.logger, "File 'uploaded' to local storage"; "location" => &location); + Ok(FileUri(location)) } } @@ -63,11 +65,11 @@ mod tests { use std::path::{Path, PathBuf}; use tempfile::tempdir; + use crate::file_uploaders::{FileUploader, FileUri}; use crate::http_server; - use crate::snapshot_uploaders::SnapshotUploader; use crate::test_tools::TestLogger; - use super::LocalSnapshotUploader; + use super::LocalUploader; fn create_fake_archive(dir: &Path, digest: &str) -> PathBuf { let file_path = dir.join(format!("test.{digest}.tar.gz")); @@ -94,14 +96,14 @@ mod tests { http_server::SERVER_BASE_PATH, &digest ); - let uploader = LocalSnapshotUploader::new(url, target_dir.path(), TestLogger::stdout()); + let uploader = LocalUploader::new(url, target_dir.path(), TestLogger::stdout()); let location = uploader - .upload_snapshot(&archive) + .upload(&archive) .await .expect("local upload should not fail"); - assert_eq!(expected_location, location); + assert_eq!(FileUri(expected_location), location); } #[tokio::test] @@ -110,12 +112,12 @@ mod tests { let target_dir = tempdir().unwrap(); let digest = "41e27b9ed5a32531b95b2b7ff3c0757591a06a337efaf19a524a998e348028e7"; let archive = create_fake_archive(source_dir.path(), digest); - let uploader = LocalSnapshotUploader::new( + let uploader = LocalUploader::new( "http://test.com:8080/".to_string(), target_dir.path(), TestLogger::stdout(), ); - uploader.upload_snapshot(&archive).await.unwrap(); + uploader.upload(&archive).await.unwrap(); assert!(target_dir .path() diff --git a/mithril-aggregator/src/file_uploaders/mod.rs b/mithril-aggregator/src/file_uploaders/mod.rs new file mode 100644 index 00000000000..5a6e2a44e1e --- /dev/null +++ b/mithril-aggregator/src/file_uploaders/mod.rs @@ -0,0 +1,13 @@ +mod dumb_uploader; +mod gcp_uploader; +mod interface; +mod local_uploader; + +pub use dumb_uploader::*; +pub use gcp_uploader::GcpUploader; +pub use interface::FileUploader; +pub use interface::FileUri; +pub use local_uploader::LocalUploader; + +#[cfg(test)] +pub use interface::MockFileUploader; diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index da8d6142161..4d7c4c03d07 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -18,6 +18,7 @@ pub mod database; pub mod dependency_injection; pub mod entities; pub mod event_store; +mod file_uploaders; mod http_server; mod message_adapters; pub mod metrics; @@ -25,7 +26,6 @@ mod multi_signer; mod runtime; pub mod services; mod signer_registerer; -mod snapshot_uploaders; mod snapshotter; mod store; mod tools; @@ -38,6 +38,7 @@ pub use crate::configuration::{ pub use crate::multi_signer::{MultiSigner, MultiSignerImpl}; pub use commands::{CommandType, MainOpts}; pub use dependency_injection::DependencyContainer; +pub use file_uploaders::{DumbUploader, FileUploader, LocalUploader}; pub use message_adapters::{FromRegisterSignerAdapter, ToCertificatePendingMessageAdapter}; pub use metrics::*; pub use runtime::{ @@ -47,9 +48,6 @@ pub use signer_registerer::{ MithrilSignerRegisterer, SignerRecorder, SignerRegisterer, SignerRegistrationError, SignerRegistrationRound, SignerRegistrationRoundOpener, }; -pub use snapshot_uploaders::{ - DumbSnapshotUploader, LocalSnapshotUploader, RemoteSnapshotUploader, SnapshotUploader, -}; pub use snapshotter::{ CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotError, Snapshotter, SnapshotterCompressionAlgorithm, diff --git a/mithril-aggregator/src/snapshot_uploaders/mod.rs b/mithril-aggregator/src/snapshot_uploaders/mod.rs deleted file mode 100644 index 2b176a5ab05..00000000000 --- a/mithril-aggregator/src/snapshot_uploaders/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod dumb_snapshot_uploader; -mod local_snapshot_uploader; -mod remote_snapshot_uploader; -mod snapshot_uploader; - -pub use dumb_snapshot_uploader::*; -pub use local_snapshot_uploader::LocalSnapshotUploader; -pub use remote_snapshot_uploader::RemoteSnapshotUploader; -pub use snapshot_uploader::SnapshotLocation; -pub use snapshot_uploader::SnapshotUploader; - -#[cfg(test)] -pub use snapshot_uploader::MockSnapshotUploader; diff --git a/mithril-aggregator/src/snapshot_uploaders/remote_snapshot_uploader.rs b/mithril-aggregator/src/snapshot_uploaders/remote_snapshot_uploader.rs deleted file mode 100644 index be0345bd1d3..00000000000 --- a/mithril-aggregator/src/snapshot_uploaders/remote_snapshot_uploader.rs +++ /dev/null @@ -1,135 +0,0 @@ -use async_trait::async_trait; -use slog::{debug, Logger}; -use std::path::Path; - -use mithril_common::logging::LoggerExtensions; -use mithril_common::StdResult; - -use crate::snapshot_uploaders::{SnapshotLocation, SnapshotUploader}; -use crate::tools::RemoteFileUploader; - -/// GCPSnapshotUploader is a snapshot uploader working using Google Cloud Platform services -pub struct RemoteSnapshotUploader { - bucket: String, - file_uploader: Box, - use_cdn_domain: bool, - logger: Logger, -} - -impl RemoteSnapshotUploader { - /// GCPSnapshotUploader factory - pub fn new( - file_uploader: Box, - bucket: String, - use_cdn_domain: bool, - logger: Logger, - ) -> Self { - let logger = logger.new_with_component_name::(); - debug!(logger, "New GCPSnapshotUploader created"); - Self { - bucket, - file_uploader, - use_cdn_domain, - logger, - } - } -} - -#[async_trait] -impl SnapshotUploader for RemoteSnapshotUploader { - async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult { - let archive_name = snapshot_filepath.file_name().unwrap().to_str().unwrap(); - let location = if self.use_cdn_domain { - format!("https://{}/{}", self.bucket, archive_name) - } else { - format!( - "https://storage.googleapis.com/{}/{}", - self.bucket, archive_name - ) - }; - - debug!(self.logger, "Uploading snapshot to remote storage"; "location" => &location); - self.file_uploader.upload_file(snapshot_filepath).await?; - debug!(self.logger, "Snapshot upload to remote storage completed"; "location" => &location); - - Ok(location) - } -} - -#[cfg(test)] -mod tests { - use anyhow::anyhow; - use std::path::Path; - - use crate::snapshot_uploaders::SnapshotUploader; - use crate::test_tools::TestLogger; - use crate::tools::MockRemoteFileUploader; - - use super::RemoteSnapshotUploader; - - #[tokio::test] - async fn test_upload_snapshot_not_using_cdn_domain_ok() { - let use_cdn_domain = false; - let mut file_uploader = MockRemoteFileUploader::new(); - file_uploader.expect_upload_file().returning(|_| Ok(())); - let snapshot_uploader = RemoteSnapshotUploader::new( - Box::new(file_uploader), - "cardano-testnet".to_string(), - use_cdn_domain, - TestLogger::stdout(), - ); - let snapshot_filepath = Path::new("test/snapshot.xxx.tar.gz"); - let expected_location = - "https://storage.googleapis.com/cardano-testnet/snapshot.xxx.tar.gz".to_string(); - - let location = snapshot_uploader - .upload_snapshot(snapshot_filepath) - .await - .expect("remote upload should not fail"); - - assert_eq!(expected_location, location); - } - - #[tokio::test] - async fn test_upload_snapshot_using_cdn_domain_ok() { - let use_cdn_domain = true; - let mut file_uploader = MockRemoteFileUploader::new(); - file_uploader.expect_upload_file().returning(|_| Ok(())); - let snapshot_uploader = RemoteSnapshotUploader::new( - Box::new(file_uploader), - "cdn.mithril.network".to_string(), - use_cdn_domain, - TestLogger::stdout(), - ); - let snapshot_filepath = Path::new("test/snapshot.xxx.tar.gz"); - let expected_location = "https://cdn.mithril.network/snapshot.xxx.tar.gz".to_string(); - - let location = snapshot_uploader - .upload_snapshot(snapshot_filepath) - .await - .expect("remote upload should not fail"); - - assert_eq!(expected_location, location); - } - - #[tokio::test] - async fn test_upload_snapshot_ko() { - let mut file_uploader = MockRemoteFileUploader::new(); - file_uploader - .expect_upload_file() - .returning(|_| Err(anyhow!("unexpected error"))); - let snapshot_uploader = RemoteSnapshotUploader::new( - Box::new(file_uploader), - "".to_string(), - false, - TestLogger::stdout(), - ); - let snapshot_filepath = Path::new("test/snapshot.xxx.tar.gz"); - - let result = snapshot_uploader - .upload_snapshot(snapshot_filepath) - .await - .expect_err("remote upload should fail"); - assert_eq!("unexpected error".to_string(), result.to_string()); - } -} diff --git a/mithril-aggregator/src/snapshot_uploaders/snapshot_uploader.rs b/mithril-aggregator/src/snapshot_uploaders/snapshot_uploader.rs deleted file mode 100644 index d76b4a746e4..00000000000 --- a/mithril-aggregator/src/snapshot_uploaders/snapshot_uploader.rs +++ /dev/null @@ -1,13 +0,0 @@ -use async_trait::async_trait; -use mithril_common::StdResult; -use std::path::Path; - -pub type SnapshotLocation = String; - -/// SnapshotUploader represents a snapshot uploader interactor -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait SnapshotUploader: Sync + Send { - /// Upload a snapshot - async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult; -} diff --git a/mithril-aggregator/src/tools/mod.rs b/mithril-aggregator/src/tools/mod.rs index 1df67dc565f..bb13972d66e 100644 --- a/mithril-aggregator/src/tools/mod.rs +++ b/mithril-aggregator/src/tools/mod.rs @@ -4,7 +4,6 @@ mod era; mod genesis; #[cfg(test)] pub mod mocks; -mod remote_file_uploader; mod signer_importer; mod single_signature_authenticator; @@ -12,15 +11,11 @@ pub use certificates_hash_migrator::CertificatesHashMigrator; pub use digest_helpers::extract_digest_from_path; pub use era::EraTools; pub use genesis::{GenesisTools, GenesisToolsDependency}; -pub use remote_file_uploader::{GcpFileUploader, RemoteFileUploader}; pub use signer_importer::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, }; pub use single_signature_authenticator::*; -#[cfg(test)] -pub use remote_file_uploader::MockRemoteFileUploader; - /// Downcast the error to the specified error type and check if the error satisfies the condition. pub(crate) fn downcast_check( error: &mithril_common::StdError, diff --git a/mithril-aggregator/src/tools/remote_file_uploader.rs b/mithril-aggregator/src/tools/remote_file_uploader.rs deleted file mode 100644 index 3d3ce9f7513..00000000000 --- a/mithril-aggregator/src/tools/remote_file_uploader.rs +++ /dev/null @@ -1,90 +0,0 @@ -use anyhow::{anyhow, Context}; -use async_trait::async_trait; -use cloud_storage::{ - bucket::Entity, bucket_access_control::Role, object_access_control::NewObjectAccessControl, - Client, -}; -use slog::{info, Logger}; -use std::{env, path::Path}; -use tokio_util::{codec::BytesCodec, codec::FramedRead}; - -use mithril_common::logging::LoggerExtensions; -use mithril_common::StdResult; - -/// RemoteFileUploader represents a remote file uploader interactor -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait RemoteFileUploader: Sync + Send { - /// Upload a snapshot - async fn upload_file(&self, filepath: &Path) -> StdResult<()>; -} - -/// GcpFileUploader represents a Google Cloud Platform file uploader interactor -pub struct GcpFileUploader { - bucket: String, - logger: Logger, -} - -impl GcpFileUploader { - /// GcpFileUploader factory - pub fn new(bucket: String, logger: Logger) -> Self { - Self { - bucket, - logger: logger.new_with_component_name::(), - } - } -} - -#[async_trait] -impl RemoteFileUploader for GcpFileUploader { - async fn upload_file(&self, filepath: &Path) -> StdResult<()> { - if env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_err() { - return Err(anyhow!( - "Missing GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable".to_string() - )); - }; - - let filename = filepath.file_name().unwrap().to_str().unwrap(); - - info!(self.logger, "Uploading {filename}"); - let client = Client::default(); - let file = tokio::fs::File::open(filepath).await.unwrap(); - let stream = FramedRead::new(file, BytesCodec::new()); - client - .object() - .create_streamed( - &self.bucket, - stream, - None, - filename, - "application/octet-stream", - ) - .await - .with_context(|| "remote uploading failure")?; - - info!(self.logger, "Uploaded {filename}"); - - // ensure the uploaded file as public read access - // when a file is uploaded to Google cloud storage its permissions are overwritten so - // we need to put them back - let new_bucket_access_control = NewObjectAccessControl { - entity: Entity::AllUsers, - role: Role::Reader, - }; - - info!( - self.logger, - "Updating acl for {filename}: {new_bucket_access_control:?}" - ); - - client - .object_access_control() - .create(&self.bucket, filename, &new_bucket_access_control) - .await - .with_context(|| "updating acl failure")?; - - info!(self.logger, "Updated acl for {filename}"); - - Ok(()) - } -} diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index b27f874f11e..4c707632a91 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -7,7 +7,7 @@ use mithril_aggregator::{ database::{record::SignedEntityRecord, repository::OpenMessageRepository}, dependency_injection::DependenciesBuilder, event_store::EventMessage, - AggregatorRuntime, Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, + AggregatorRuntime, Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, SignerRegistrationError, }; use mithril_common::{ @@ -100,7 +100,7 @@ macro_rules! assert_metrics_eq { pub struct RuntimeTester { pub network: String, - pub snapshot_uploader: Arc, + pub snapshot_uploader: Arc, pub chain_observer: Arc, pub immutable_file_observer: Arc, pub digester: Arc, @@ -130,7 +130,7 @@ impl RuntimeTester { let logger = build_logger(); let global_logger = slog_scope::set_global_logger(logger.clone()); let network = configuration.network.clone(); - let snapshot_uploader = Arc::new(DumbSnapshotUploader::new()); + let snapshot_uploader = Arc::new(DumbUploader::new()); let immutable_file_observer = Arc::new(DumbImmutableFileObserver::new()); immutable_file_observer .shall_return(Some(start_time_point.immutable_file_number))