Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cardano Immutable Files Full Artifact builder in aggregator #900

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.3.11"
version = "0.3.12"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
68 changes: 56 additions & 12 deletions mithril-aggregator/src/artifact_builder/artifact_builder_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use std::sync::Arc;

use mithril_common::{
entities::{Certificate, Epoch, SignedEntityType},
entities::{Beacon, Certificate, Epoch, SignedEntityType, Snapshot},
signable_builder::Artifact,
StdResult,
};
Expand Down Expand Up @@ -31,6 +31,7 @@ pub trait ArtifactBuilderService: Send + Sync {
pub struct MithrilArtifactBuilderService {
mithril_stake_distribution_artifact_builder:
Arc<dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>>,
cardano_immutable_files_full_artifact_builder: Arc<dyn ArtifactBuilder<Beacon, Snapshot>>,
}

impl MithrilArtifactBuilderService {
Expand All @@ -40,9 +41,11 @@ impl MithrilArtifactBuilderService {
mithril_stake_distribution_artifact_builder: Arc<
dyn ArtifactBuilder<Epoch, MithrilStakeDistribution>,
>,
cardano_immutable_files_full_artifact_builder: Arc<dyn ArtifactBuilder<Beacon, Snapshot>>,
) -> Self {
Self {
mithril_stake_distribution_artifact_builder,
cardano_immutable_files_full_artifact_builder,
}
}
}
Expand All @@ -55,16 +58,19 @@ impl ArtifactBuilderService for MithrilArtifactBuilderService {
signed_entity_type: SignedEntityType,
certificate: &Certificate,
) -> StdResult<Arc<dyn Artifact>> {
let artifact = match signed_entity_type {
SignedEntityType::MithrilStakeDistribution(e) => Arc::new(
match signed_entity_type {
SignedEntityType::MithrilStakeDistribution(epoch) => Ok(Arc::new(
self.mithril_stake_distribution_artifact_builder
.compute_artifact(e, certificate)
.compute_artifact(epoch, certificate)
.await?,
),
)),
SignedEntityType::CardanoImmutableFilesFull(beacon) => Ok(Arc::new(
self.cardano_immutable_files_full_artifact_builder
.compute_artifact(beacon, certificate)
.await?,
)),
_ => todo!(),
};

Ok(artifact)
}
}
}

Expand All @@ -77,7 +83,8 @@ mod tests {
use crate::artifact_builder::MockArtifactBuilder;

#[tokio::test]
async fn test_artifact_builder_service_mithril_stake_distribution() {
async fn build_mithril_stake_distribution_artifact_when_given_mithril_stake_distribution_entity_type(
) {
let signers_with_stake = fake_data::signers_with_stakes(5);
let mithril_stake_distribution_expected = MithrilStakeDistribution::new(signers_with_stake);
let mithril_stake_distribution_clone = mithril_stake_distribution_expected.clone();
Expand All @@ -88,9 +95,13 @@ mod tests {
.once()
.return_once(move |_, _| Ok(mithril_stake_distribution_clone));

let artifact_builder_service = MithrilArtifactBuilderService::new(Arc::new(
mock_mithril_stake_distribution_artifact_builder,
));
let mock_cardano_immutable_files_full_artifact_builder =
MockArtifactBuilder::<Beacon, Snapshot>::new();

let artifact_builder_service = MithrilArtifactBuilderService::new(
Arc::new(mock_mithril_stake_distribution_artifact_builder),
Arc::new(mock_cardano_immutable_files_full_artifact_builder),
);
let certificate = Certificate::default();

let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(1));
Expand All @@ -105,4 +116,37 @@ mod tests {
serde_json::to_string(&mithril_stake_distribution_computed).unwrap()
);
}

#[tokio::test]
async fn build_snapshot_artifact_when_given_cardano_immutable_files_full_entity_type() {
let snapshot_expected = fake_data::snapshots(1).first().unwrap().to_owned();
let snapshot_expected_clone = snapshot_expected.clone();
let mock_mithril_stake_distribution_artifact_builder =
MockArtifactBuilder::<Epoch, MithrilStakeDistribution>::new();

let mut mock_cardano_immutable_files_full_artifact_builder =
MockArtifactBuilder::<Beacon, Snapshot>::new();
mock_cardano_immutable_files_full_artifact_builder
.expect_compute_artifact()
.once()
.return_once(move |_, _| Ok(snapshot_expected_clone));

let artifact_builder_service = MithrilArtifactBuilderService::new(
Arc::new(mock_mithril_stake_distribution_artifact_builder),
Arc::new(mock_cardano_immutable_files_full_artifact_builder),
);
let certificate = Certificate::default();

let signed_entity_type = SignedEntityType::CardanoImmutableFilesFull(Beacon::default());
let artifact = artifact_builder_service
.compute_artifact(signed_entity_type, &certificate)
.await
.unwrap();
let snapshot_computed: Snapshot =
serde_json::from_str(&serde_json::to_string(&artifact).unwrap()).unwrap();
assert_eq!(
serde_json::to_string(&snapshot_expected).unwrap(),
serde_json::to_string(&snapshot_computed).unwrap()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
use async_trait::async_trait;
use chrono::Utc;
use slog_scope::{debug, warn};
use std::sync::Arc;
use thiserror::Error;

use crate::{
snapshot_uploaders::SnapshotLocation, snapshotter::OngoingSnapshot, SnapshotError,
SnapshotUploader, Snapshotter,
};

use super::ArtifactBuilder;
use mithril_common::{
entities::{Beacon, Certificate, ProtocolMessage, ProtocolMessagePartKey, Snapshot},
StdResult,
};

/// [CardanoImmutableFilesFullArtifact] error
/// to fail.
#[derive(Debug, Error)]
pub enum CardanoImmutableFilesFullArtifactError {
/// Protocol message part is missing
#[error("Missing protocol message: '{0}'.")]
MissingProtocolMessage(String),
}

/// A [CardanoImmutableFilesFullArtifact] builder
pub struct CardanoImmutableFilesFullArtifactBuilder {
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
}

impl CardanoImmutableFilesFullArtifactBuilder {
/// CardanoImmutableFilesFull artifact builder factory
pub fn new(
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
) -> Self {
Self {
snapshotter,
snapshot_uploader,
}
}

async fn create_snapshot_archive(
&self,
beacon: &Beacon,
protocol_message: &ProtocolMessage,
) -> StdResult<OngoingSnapshot> {
debug!("CardanoImmutableFilesFullArtifactBuilder: create snapshot archive");

let snapshotter = self.snapshotter.clone();
let snapshot_digest = protocol_message
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
.ok_or_else(|| {
CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(format!(
"no digest message part found for beacon '{beacon:?}'."
))
})?;
let snapshot_name = format!(
"{}-e{}-i{}.{}.tar.gz",
beacon.network, beacon.epoch.0, beacon.immutable_file_number, snapshot_digest
);
// spawn a separate thread to prevent blocking
let ongoing_snapshot =
tokio::task::spawn_blocking(move || -> Result<OngoingSnapshot, SnapshotError> {
snapshotter.snapshot(&snapshot_name)
})
.await??;

debug!(" > snapshot created: '{:?}'", ongoing_snapshot);

Ok(ongoing_snapshot)
}

async fn upload_snapshot_archive(
&self,
ongoing_snapshot: &OngoingSnapshot,
) -> StdResult<Vec<SnapshotLocation>> {
debug!("CardanoImmutableFilesFullArtifactBuilder: upload snapshot archive");
let location = self
.snapshot_uploader
.upload_snapshot(ongoing_snapshot.get_file_path())
.await?;

if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
warn!(
" > Post upload ongoing snapshot file removal failure: {}",
error
);
}

Ok(vec![location])
}

async fn create_snapshot(
&self,
certificate: &Certificate,
ongoing_snapshot: &OngoingSnapshot,
remote_locations: Vec<String>,
) -> StdResult<Snapshot> {
debug!("CardanoImmutableFilesFullArtifactBuilder: create snapshot");
let snapshot_digest = certificate
.protocol_message
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
.ok_or_else(|| {
CardanoImmutableFilesFullArtifactError::MissingProtocolMessage(format!(
"message part 'digest' not found for snapshot '{}'.",
ongoing_snapshot.get_file_path().display()
))
})?
.to_owned();
let snapshot = Snapshot::new(
snapshot_digest,
certificate.beacon.to_owned(),
certificate.hash.to_owned(),
*ongoing_snapshot.get_file_size(),
format!("{:?}", Utc::now()),
remote_locations,
);

Ok(snapshot)
}
}

#[async_trait]
impl ArtifactBuilder<Beacon, Snapshot> for CardanoImmutableFilesFullArtifactBuilder {
async fn compute_artifact(
&self,
beacon: Beacon,
certificate: &Certificate,
) -> StdResult<Snapshot> {
let ongoing_snapshot = self
.create_snapshot_archive(&beacon, &certificate.protocol_message)
.await?;
let locations = self.upload_snapshot_archive(&ongoing_snapshot).await?;

let snapshot = self
.create_snapshot(certificate, &ongoing_snapshot, locations)
.await?;

Ok(snapshot)
}
}

#[cfg(test)]
mod tests {
use std::path::Path;

use mithril_common::test_utils::fake_data;
use tempfile::NamedTempFile;

use super::*;

use crate::{DumbSnapshotUploader, DumbSnapshotter};

#[tokio::test]
async fn should_compute_valid_artifact() {
let beacon = fake_data::beacon();
let certificate = fake_data::certificate("cert-123".to_string());
let snapshot_digest = certificate
.protocol_message
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
.unwrap();

let dumb_snapshotter = Arc::new(DumbSnapshotter::new());
let dumb_snapshot_uploader = Arc::new(DumbSnapshotUploader::new());

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
dumb_snapshotter.clone(),
dumb_snapshot_uploader.clone(),
);
let artifact = cardano_immutable_files_full_artifact_builder
.compute_artifact(beacon, &certificate)
.await
.unwrap();
let last_ongoing_snapshot = dumb_snapshotter
.get_last_snapshot()
.unwrap()
.expect("A snapshot should have been 'created'");

let remote_locations = vec![dumb_snapshot_uploader
.get_last_upload()
.unwrap()
.expect("A snapshot should have been 'uploaded'")];
let artifact_expected = Snapshot::new(
snapshot_digest.to_owned(),
certificate.beacon.to_owned(),
certificate.hash.to_owned(),
*last_ongoing_snapshot.get_file_size(),
artifact.created_at.clone(),
remote_locations,
);
assert_eq!(artifact_expected, artifact);
}

#[tokio::test]
async fn remove_snapshot_archive_after_upload() {
let file = NamedTempFile::new().unwrap();
let file_path = file.path();
let snapshot = OngoingSnapshot::new(file_path.to_path_buf(), 7331);

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
);

cardano_immutable_files_full_artifact_builder
.upload_snapshot_archive(&snapshot)
.await
.expect("Snapshot upload should not fail");

assert!(
!file_path.exists(),
"Ongoing snapshot file should have been removed after upload"
);
}

#[tokio::test]
async fn snapshot_archive_name_after_beacon_values() {
let beacon = Beacon::new("network".to_string(), 20, 145);
let mut message = ProtocolMessage::new();
message.set_message_part(
ProtocolMessagePartKey::SnapshotDigest,
"test+digest".to_string(),
);

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
);

let ongoing_snapshot = cardano_immutable_files_full_artifact_builder
.create_snapshot_archive(&beacon, &message)
.await
.expect("create_snapshot_archive should not fail");

assert_eq!(
Path::new(
format!(
"{}-e{}-i{}.{}.tar.gz",
beacon.network, beacon.epoch.0, beacon.immutable_file_number, "test+digest"
)
.as_str()
),
ongoing_snapshot.get_file_path()
);
}
}
Loading