Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align signer & aggregator state machines logs #486

Merged
merged 6 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod handlers {
pub async fn certificate_pending(
certificate_pending_store: Arc<CertificatePendingStore>,
) -> Result<impl warp::Reply, Infallible> {
debug!("certificate_pending");
debug!("⇄ HTTP SERVER: certificate_pending");

match certificate_pending_store.get().await {
Ok(Some(certificate_pending)) => Ok(reply::json(&certificate_pending, StatusCode::OK)),
Expand All @@ -61,7 +61,10 @@ mod handlers {
certificate_hash: String,
certificate_store: Arc<CertificateStore>,
) -> Result<impl warp::Reply, Infallible> {
debug!("certificate_certificate_hash/{}", certificate_hash);
debug!(
"⇄ HTTP SERVER: certificate_certificate_hash/{}",
certificate_hash
);

match certificate_store.get_from_hash(&certificate_hash).await {
Ok(Some(certificate)) => Ok(reply::json(&certificate, StatusCode::OK)),
Expand Down
2 changes: 1 addition & 1 deletion mithril-aggregator/src/http_server/routes/epoch_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod handlers {
protocol_parameters_store: Arc<ProtocolParametersStore>,
multi_signer: MultiSignerWrapper,
) -> Result<impl warp::Reply, Infallible> {
debug!("epoch_settings");
debug!("⇄ HTTP SERVER: epoch_settings");

match multi_signer.read().await.get_current_beacon().await {
Some(beacon) => match protocol_parameters_store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod handlers {
signature: entities::SingleSignatures,
multi_signer: MultiSignerWrapper,
) -> Result<impl warp::Reply, Infallible> {
debug!("register_signatures/{:?}", signature);
debug!("⇄ HTTP SERVER: register_signatures/{:?}", signature);

let mut multi_signer = multi_signer.write().await;
match multi_signer.register_single_signature(&signature).await {
Expand Down
2 changes: 1 addition & 1 deletion mithril-aggregator/src/http_server/routes/signer_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod handlers {
signer: entities::Signer,
multi_signer: MultiSignerWrapper,
) -> Result<impl warp::Reply, Infallible> {
debug!("register_signer/{:?}", signer);
debug!("⇄ HTTP SERVER: register_signer/{:?}", signer);

let mut multi_signer = multi_signer.write().await;
match key_decode_hex(&signer.verification_key) {
Expand Down
8 changes: 4 additions & 4 deletions mithril-aggregator/src/http_server/routes/snapshot_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ mod handlers {
pub async fn snapshots(
snapshot_store: Arc<dyn SnapshotStore>,
) -> Result<impl warp::Reply, Infallible> {
debug!("snapshots");
debug!("⇄ HTTP SERVER: snapshots");

match snapshot_store.list_snapshots().await {
Ok(snapshots) => Ok(reply::json(&snapshots, StatusCode::OK)),
Expand All @@ -86,7 +86,7 @@ mod handlers {
) -> Result<impl warp::Reply, Infallible> {
let filepath = reply.path().to_path_buf();
debug!(
"ensure_downloaded_file_is_a_snapshot / file: `{}`",
"⇄ HTTP SERVER: ensure_downloaded_file_is_a_snapshot / file: `{}`",
filepath.display()
);

Expand Down Expand Up @@ -115,7 +115,7 @@ mod handlers {
config: Configuration,
snapshot_store: Arc<dyn SnapshotStore>,
) -> Result<impl warp::Reply, Infallible> {
debug!("snapshot_download/{}", digest);
debug!("⇄ HTTP SERVER: snapshot_download/{}", digest);

match snapshot_store.get_snapshot_details(digest).await {
Ok(Some(snapshot)) => {
Expand Down Expand Up @@ -150,7 +150,7 @@ mod handlers {
digest: String,
snapshot_store: Arc<dyn SnapshotStore>,
) -> Result<impl warp::Reply, Infallible> {
debug!("snapshot_digest/{}", digest);
debug!("⇄ HTTP SERVER: snapshot_digest/{}", digest);

match snapshot_store.get_snapshot_details(digest).await {
Ok(snapshot) => match snapshot {
Expand Down
76 changes: 38 additions & 38 deletions mithril-aggregator/src/runtime/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,30 @@ impl AggregatorRunner {
impl AggregatorRunnerTrait for AggregatorRunner {
/// Return the current beacon from the chain
async fn get_beacon_from_chain(&self) -> Result<Beacon, RuntimeError> {
info!("RUNNER: get beacon from chain");
debug!("RUNNER: get beacon from chain");
Ok(self
.dependencies
.beacon_provider
.get_current_beacon()
.await?)
}

async fn does_certificate_exist_for_beacon(
&self,
beacon: &Beacon,
) -> Result<bool, RuntimeError> {
debug!("RUNNER: does_certificate_exist_for_beacon");
let certificate_exist = self
.dependencies
.certificate_store
.get_from_beacon(beacon)
.await?
.is_some();
Ok(certificate_exist)
}

async fn is_certificate_chain_valid(&self) -> Result<bool, RuntimeError> {
info!("RUNNER: is_certificate_chain_valid");
debug!("RUNNER: is_certificate_chain_valid");
let certificate_store = self.dependencies.certificate_store.clone();
let latest_certificates = certificate_store.get_list(1).await?;
let latest_certificate = latest_certificates.first();
Expand All @@ -181,44 +195,30 @@ impl AggregatorRunnerTrait for AggregatorRunner {
{
Ok(()) => Ok(true),
Err(error) => {
warn!("Invalid certificate chain"; "error" => ?error);
warn!(" > invalid certificate chain"; "error" => ?error);
Ok(false)
}
}
}

async fn does_certificate_exist_for_beacon(
&self,
beacon: &Beacon,
) -> Result<bool, RuntimeError> {
info!("RUNNER: does_certificate_exist_for_beacon");
let certificate_exist = self
.dependencies
.certificate_store
.get_from_beacon(beacon)
.await?
.is_some();
Ok(certificate_exist)
}

async fn compute_digest(&self, new_beacon: &Beacon) -> Result<String, RuntimeError> {
info!("RUNNER: compute_digest");
debug!("RUNNER: compute_digest");
let digester = self.dependencies.digester.clone();

debug!("computing digest"; "db_directory" => self.config.db_directory.display());
debug!(" > computing digest"; "cardano_db_directory" => self.config.db_directory.display());

debug!("launching digester thread");
debug!(" > launching digester thread");
let digest = digester
.compute_digest(new_beacon)
.await
.map_err(|e| RuntimeError::General(e.into()))?;
debug!("computed digest: {}", digest);
debug!(" > computed digest: {}", digest);

Ok(digest)
}

async fn update_beacon(&self, new_beacon: &Beacon) -> Result<(), RuntimeError> {
info!("RUNNER: update beacon"; "beacon" => #?new_beacon);
debug!("RUNNER: update beacon"; "beacon" => #?new_beacon);
self.dependencies
.multi_signer
.write()
Expand All @@ -229,7 +229,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
}

async fn update_stake_distribution(&self, new_beacon: &Beacon) -> Result<(), RuntimeError> {
info!("RUNNER: update stake distribution"; "beacon" => #?new_beacon);
debug!("RUNNER: update stake distribution"; "beacon" => #?new_beacon);
let stake_distribution = self
.dependencies
.chain_observer
Expand All @@ -254,7 +254,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
new_beacon: &Beacon,
) -> Result<(), RuntimeError> {
info!("RUNNER: update protocol parameters"; "beacon" => #?new_beacon);
debug!("RUNNER: update protocol parameters"; "beacon" => #?new_beacon);
let protocol_parameters = self.dependencies.config.protocol_parameters.clone();
Ok(self
.dependencies
Expand All @@ -266,7 +266,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
}

async fn update_message_in_multisigner(&self, digest: String) -> Result<(), RuntimeError> {
info!("RUNNER: update message in multisigner");
debug!("RUNNER: update message in multisigner");
let mut multi_signer = self.dependencies.multi_signer.write().await;
let mut protocol_message = ProtocolMessage::new();
protocol_message.set_message_part(ProtocolMessagePartKey::SnapshotDigest, digest);
Expand All @@ -288,7 +288,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
beacon: Beacon,
) -> Result<CertificatePending, RuntimeError> {
info!("RUNNER: create new pending certificate from multisigner");
debug!("RUNNER: create new pending certificate from multisigner");
let multi_signer = self.dependencies.multi_signer.read().await;

let signers = match multi_signer.get_signers().await {
Expand Down Expand Up @@ -328,7 +328,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
pending_certificate: CertificatePending,
) -> Result<(), RuntimeError> {
info!("RUNNER: saving pending certificate");
debug!("RUNNER: saving pending certificate");

self.dependencies
.certificate_pending_store
Expand All @@ -338,11 +338,11 @@ impl AggregatorRunnerTrait for AggregatorRunner {
}

async fn drop_pending_certificate(&self) -> Result<Option<CertificatePending>, RuntimeError> {
info!("RUNNER: drop pending certificate");
debug!("RUNNER: drop pending certificate");

let certificate_pending = self.dependencies.certificate_pending_store.remove().await?;
if certificate_pending.is_none() {
warn!("drop_pending_certificate::no certificate pending in store, did the previous loop crashed ?");
warn!(" > drop_pending_certificate::no certificate pending in store, did the previous loop crashed ?");
}

Ok(certificate_pending)
Expand All @@ -351,7 +351,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
/// Is a multi-signature ready?
/// Can we create a multi-signature.
async fn is_multisig_created(&self) -> Result<bool, RuntimeError> {
info!("RUNNER: check if we can create a multi-signature");
debug!("RUNNER: check if we can create a multi-signature");
let has_multisig = self
.dependencies
.multi_signer
Expand All @@ -362,9 +362,9 @@ impl AggregatorRunnerTrait for AggregatorRunner {
.is_some();

if has_multisig {
debug!("new MULTISIG created");
debug!(" > new multi-signature created");
} else {
info!("no multisig created");
info!(" > no multi-signature created");
}
Ok(has_multisig)
}
Expand All @@ -373,7 +373,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
beacon: &Beacon,
) -> Result<OngoingSnapshot, RuntimeError> {
info!("RUNNER: create snapshot archive");
debug!("RUNNER: create snapshot archive");

let snapshotter = self.dependencies.snapshotter.clone();
let protocol_message = self
Expand Down Expand Up @@ -401,7 +401,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
.await
.map_err(|e| RuntimeError::General(e.into()))??;

debug!("snapshot created: '{:?}'", ongoing_snapshot);
debug!(" > snapshot created: '{:?}'", ongoing_snapshot);

Ok(ongoing_snapshot)
}
Expand All @@ -410,7 +410,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
ongoing_snapshot: &OngoingSnapshot,
) -> Result<Vec<SnapshotLocation>, RuntimeError> {
info!("RUNNER: upload snapshot archive");
debug!("RUNNER: upload snapshot archive");
let location = self
.dependencies
.snapshot_uploader
Expand All @@ -420,7 +420,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {

if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
warn!(
"Post upload ongoing snapshot file removal failure: {}",
" > Post upload ongoing snapshot file removal failure: {}",
error
);
}
Expand All @@ -432,7 +432,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
&self,
beacon: &Beacon,
) -> Result<Certificate, RuntimeError> {
info!("RUNNER: create and save certificate");
debug!("RUNNER: create and save certificate");
let certificate_store = self.dependencies.certificate_store.clone();
let latest_certificates = certificate_store.get_list(2).await?;
let last_certificate = latest_certificates.get(0);
Expand Down Expand Up @@ -480,7 +480,7 @@ impl AggregatorRunnerTrait for AggregatorRunner {
ongoing_snapshot: &OngoingSnapshot,
remote_locations: Vec<String>,
) -> Result<Snapshot, RuntimeError> {
info!("RUNNER: create and save snapshot");
debug!("RUNNER: create and save snapshot");
let snapshot_digest = certificate
.protocol_message
.get_message_part(&ProtocolMessagePartKey::SnapshotDigest)
Expand Down
Loading