Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: rework snapshot_uploaders module to improve genericity #2165

Merged
merged 13 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.6.1"
version = "0.6.2"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use slog::{debug, warn, Logger};
use std::sync::Arc;
use thiserror::Error;

use crate::{
snapshot_uploaders::SnapshotLocation, snapshotter::OngoingSnapshot, SnapshotUploader,
Snapshotter,
};
use crate::{file_uploaders::FileUri, snapshotter::OngoingSnapshot, FileUploader, Snapshotter};

use super::ArtifactBuilder;
use mithril_common::logging::LoggerExtensions;
Expand All @@ -33,7 +30,7 @@ pub struct CardanoImmutableFilesFullArtifactBuilder {
cardano_network: CardanoNetwork,
cardano_node_version: Version,
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
snapshot_uploader: Arc<dyn FileUploader>,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
}
Expand All @@ -44,7 +41,7 @@ impl CardanoImmutableFilesFullArtifactBuilder {
cardano_network: CardanoNetwork,
cardano_node_version: &Version,
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
snapshot_uploader: Arc<dyn FileUploader>,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -89,11 +86,11 @@ impl CardanoImmutableFilesFullArtifactBuilder {
async fn upload_snapshot_archive(
&self,
ongoing_snapshot: &OngoingSnapshot,
) -> StdResult<Vec<SnapshotLocation>> {
) -> StdResult<Vec<FileUri>> {
debug!(self.logger, ">> upload_snapshot_archive");
let location = self
.snapshot_uploader
.upload_snapshot(ongoing_snapshot.get_file_path())
.upload(ongoing_snapshot.get_file_path())
.await;

if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
Expand Down Expand Up @@ -158,7 +155,12 @@ impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArt
})?;

let snapshot = self
.create_snapshot(beacon, &ongoing_snapshot, snapshot_digest, locations)
.create_snapshot(
beacon,
&ongoing_snapshot,
snapshot_digest,
locations.into_iter().map(Into::into).collect(),
)
.await?;

Ok(snapshot)
Expand All @@ -174,8 +176,7 @@ mod tests {
use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};

use crate::{
snapshot_uploaders::MockSnapshotUploader, test_tools::TestLogger, DumbSnapshotUploader,
DumbSnapshotter,
file_uploaders::MockFileUploader, test_tools::TestLogger, DumbSnapshotter, DumbUploader,
};

use super::*;
Expand All @@ -190,7 +191,7 @@ mod tests {
.unwrap();

let dumb_snapshotter = Arc::new(DumbSnapshotter::new());
let dumb_snapshot_uploader = Arc::new(DumbSnapshotUploader::new());
let dumb_snapshot_uploader = Arc::new(DumbUploader::new());

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
Expand All @@ -213,6 +214,7 @@ mod tests {
let remote_locations = vec![dumb_snapshot_uploader
.get_last_upload()
.unwrap()
.map(Into::into)
.expect("A snapshot should have been 'uploaded'")];
let artifact_expected = Snapshot::new(
snapshot_digest.to_owned(),
Expand All @@ -237,7 +239,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
CompressionAlgorithm::default(),
TestLogger::stdout(),
);
Expand All @@ -264,7 +266,7 @@ mod tests {
network,
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);
Expand Down Expand Up @@ -293,7 +295,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
algorithm,
TestLogger::stdout(),
);
Expand Down Expand Up @@ -325,9 +327,9 @@ mod tests {
let file = NamedTempFile::new().unwrap();
let file_path = file.path();
let snapshot = OngoingSnapshot::new(file_path.to_path_buf(), 7331);
let mut snapshot_uploader = MockSnapshotUploader::new();
let mut snapshot_uploader = MockFileUploader::new();
snapshot_uploader
.expect_upload_snapshot()
.expect_upload()
.return_once(|_| Err(anyhow!("an error")))
.once();

Expand Down
28 changes: 14 additions & 14 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use crate::{
},
entities::AggregatorEpochSettings,
event_store::{EventMessage, EventStore, TransmitterService},
file_uploaders::GcpUploader,
http_server::routes::router::{self, RouterConfig, RouterState},
services::{
AggregatorSignableSeedBuilder, AggregatorUpkeepService, BufferedCertifierService,
Expand All @@ -74,12 +75,12 @@ use crate::{
UpkeepService, UsageReporter,
},
store::CertificatePendingStorer,
tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter},
tools::{CExplorerSignerRetriever, GenesisToolsDependency, SignersImporter},
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CompressedArchiveSnapshotter,
Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, EpochSettingsStorer,
LocalSnapshotUploader, MetricsService, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl,
RemoteSnapshotUploader, SingleSignatureAuthenticator, SnapshotUploader, SnapshotUploaderType,
Snapshotter, SnapshotterCompressionAlgorithm, VerificationKeyStorer,
Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, EpochSettingsStorer,
FileUploader, LocalUploader, MetricsService, MithrilSignerRegisterer, MultiSigner,
MultiSignerImpl, SingleSignatureAuthenticator, SnapshotUploaderType, Snapshotter,
SnapshotterCompressionAlgorithm, VerificationKeyStorer,
};

const SQLITE_FILE: &str = "aggregator.sqlite3";
Expand Down Expand Up @@ -118,7 +119,7 @@ pub struct DependenciesBuilder {
pub stake_store: Option<Arc<StakePoolStore>>,

/// Snapshot uploader service.
pub snapshot_uploader: Option<Arc<dyn SnapshotUploader>>,
pub snapshot_uploader: Option<Arc<dyn FileUploader>>,

/// Multisigner service.
pub multi_signer: Option<Arc<dyn MultiSigner>>,
Expand Down Expand Up @@ -446,7 +447,7 @@ impl DependenciesBuilder {
Ok(self.stake_store.as_ref().cloned().unwrap())
}

async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn SnapshotUploader>> {
async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
let logger = self.root_logger();
if self.configuration.environment == ExecutionEnvironment::Production {
match self.configuration.snapshot_uploader_type {
Expand All @@ -461,26 +462,25 @@ impl DependenciesBuilder {
)
})?;

Ok(Arc::new(RemoteSnapshotUploader::new(
Box::new(GcpFileUploader::new(bucket.clone(), logger.clone())),
Ok(Arc::new(GcpUploader::new(
bucket,
self.configuration.snapshot_use_cdn_domain,
logger,
logger.clone(),
)))
}
SnapshotUploaderType::Local => Ok(Arc::new(LocalSnapshotUploader::new(
SnapshotUploaderType::Local => Ok(Arc::new(LocalUploader::new(
self.configuration.get_server_url(),
&self.configuration.snapshot_directory,
logger,
))),
}
} else {
Ok(Arc::new(DumbSnapshotUploader::new()))
Ok(Arc::new(DumbUploader::new()))
}
}

/// Get a [SnapshotUploader]
pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn SnapshotUploader>> {
/// Get a [FileUploader]
pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
if self.snapshot_uploader.is_none() {
self.snapshot_uploader = Some(self.build_snapshot_uploader().await?);
}
Expand Down
4 changes: 2 additions & 2 deletions mithril-aggregator/src/dependency_injection/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ use crate::{
},
entities::AggregatorEpochSettings,
event_store::{EventMessage, TransmitterService},
file_uploaders::FileUploader,
multi_signer::MultiSigner,
services::{
CertifierService, EpochService, MessageService, ProverService, SignedEntityService,
StakeDistributionService, TransactionStore, UpkeepService,
},
signer_registerer::SignerRecorder,
snapshot_uploaders::SnapshotUploader,
store::CertificatePendingStorer,
EpochSettingsStorer, MetricsService, SignerRegisterer, SignerRegistrationRoundOpener,
SingleSignatureAuthenticator, Snapshotter, VerificationKeyStorer,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct DependencyContainer {
pub stake_store: Arc<StakePoolStore>,

/// Snapshot uploader service.
pub snapshot_uploader: Arc<dyn SnapshotUploader>,
pub snapshot_uploader: Arc<dyn FileUploader>,

/// Multisigner service.
pub multi_signer: Arc<dyn MultiSigner>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use async_trait::async_trait;
use mithril_common::StdResult;
use std::{path::Path, sync::RwLock};

use super::{SnapshotLocation, SnapshotUploader};
use crate::file_uploaders::{FileUploader, FileUri};

/// Dummy uploader for test purposes.
///
/// It actually does NOT upload any snapshot but remembers the last snapshot it
/// It actually does NOT upload any file but remembers the last file it
/// was asked to upload. This is intended to by used by integration tests.
pub struct DumbSnapshotUploader {
last_uploaded: RwLock<Option<String>>,
pub struct DumbUploader {
last_uploaded: RwLock<Option<FileUri>>,
}

impl DumbSnapshotUploader {
impl DumbUploader {
/// Create a new instance.
pub fn new() -> Self {
Self {
Expand All @@ -22,32 +22,32 @@ impl DumbSnapshotUploader {
}

/// Return the last upload that was triggered.
pub fn get_last_upload(&self) -> StdResult<Option<String>> {
pub fn get_last_upload(&self) -> StdResult<Option<FileUri>> {
let value = self
.last_uploaded
.read()
.map_err(|e| anyhow!("Error while saving filepath location: {e}"))?;

Ok(value.as_ref().map(|v| v.to_string()))
Ok(value.as_ref().map(Clone::clone))
}
}

impl Default for DumbSnapshotUploader {
impl Default for DumbUploader {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SnapshotUploader for DumbSnapshotUploader {
/// Upload a snapshot
async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult<SnapshotLocation> {
impl FileUploader for DumbUploader {
/// Upload a file
async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
let mut value = self
.last_uploaded
.write()
.map_err(|e| anyhow!("Error while saving filepath location: {e}"))?;

let location = snapshot_filepath.to_string_lossy().to_string();
let location = FileUri(filepath.to_string_lossy().to_string());
*value = Some(location.clone());

Ok(location)
Expand All @@ -60,18 +60,18 @@ mod tests {

#[tokio::test]
async fn test_dumb_uploader() {
let uploader = DumbSnapshotUploader::new();
let uploader = DumbUploader::new();
assert!(uploader
.get_last_upload()
.expect("uploader should not fail")
.is_none());
let res = uploader
.upload_snapshot(Path::new("/tmp/whatever"))
.upload(Path::new("/tmp/whatever"))
.await
.expect("uploading with a dumb uploader should not fail");
assert_eq!(res, "/tmp/whatever".to_string());
assert_eq!(res, FileUri("/tmp/whatever".to_string()));
assert_eq!(
Some("/tmp/whatever".to_string()),
Some(FileUri("/tmp/whatever".to_string())),
uploader
.get_last_upload()
.expect("getting dumb uploader last value after a fake download should not fail")
Expand Down
Loading