diff --git a/Cargo.lock b/Cargo.lock index 1e1688db75d..049c884e336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2062,7 +2062,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.2.48" +version = "0.2.49" dependencies = [ "async-trait", "chrono", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 84d5e415874..1a746e03c84 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.2.48" +version = "0.2.49" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/database/migration.rs b/mithril-aggregator/src/database/migration.rs index 0e0c8ffb8b0..a652334d9fc 100644 --- a/mithril-aggregator/src/database/migration.rs +++ b/mithril-aggregator/src/database/migration.rs @@ -187,6 +187,40 @@ insert into signer_registration (signer_id, left join stake_pool on stake_pool.stake_pool_id = verification_key_signer.key and stake_pool.epoch = verification_key.key order by verification_key.key, verification_key_signer.key asc; drop table verification_key; +"#, + ), + // Migration 7 + // Add the `signed_entity` table and migration data from the previous + // `snapshot` JSON format. + SqlMigration::new( + 7, + r#" +create table signed_entity ( + signed_entity_id text not null, + signed_entity_type_id integer not null, + certificate_id text not null, + beacon json not null, + entity json not null, + created_at text not null default current_timestamp, + primary key (signed_entity_id) + foreign key (signed_entity_type_id) references signed_entity_type(signed_entity_type_id) + foreign key (certificate_id) references certificate(certificate_id) +); +create table if not exists snapshot (key_hash text primary key, key json not null, value json not null); +insert into signed_entity (signed_entity_id, + signed_entity_type_id, + certificate_id, + beacon, + entity) + select + json_extract(snapshot.value, '$.digest') as signed_entity_id, + 2 as signed_entity_type_id, + json_extract(snapshot.value, '$.certificate_hash') as certificate_id, + json_extract(snapshot.value, '$.beacon') as beacon, + snapshot.value as entity + from snapshot + order by ROWID asc; +drop table snapshot; "#, ), ] diff --git a/mithril-aggregator/src/database/provider/mod.rs b/mithril-aggregator/src/database/provider/mod.rs index 41bdf17c692..27f3b1784dd 100644 --- a/mithril-aggregator/src/database/provider/mod.rs +++ b/mithril-aggregator/src/database/provider/mod.rs @@ -2,11 +2,13 @@ mod certificate; mod epoch_setting; mod open_message; +mod signed_entity; mod signer_registration; mod stake_pool; pub use certificate::*; pub use epoch_setting::*; pub use open_message::*; +pub use signed_entity::*; pub use signer_registration::*; pub use stake_pool::*; diff --git a/mithril-aggregator/src/database/provider/signed_entity.rs b/mithril-aggregator/src/database/provider/signed_entity.rs new file mode 100644 index 00000000000..8cc84d7c474 --- /dev/null +++ b/mithril-aggregator/src/database/provider/signed_entity.rs @@ -0,0 +1,588 @@ +use std::sync::Arc; + +use sqlite::{Connection, Value}; + +use async_trait::async_trait; + +use mithril_common::{ + entities::{SignedEntityType, Snapshot}, + sqlite::{ + EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity, + WhereCondition, + }, + store::adapter::{AdapterError, StoreAdapter}, +}; + +use mithril_common::StdError; +use tokio::sync::Mutex; + +/// SignedEntity record is the representation of a stored signed_entity. +#[derive(Debug, PartialEq, Clone)] +pub struct SignedEntityRecord { + /// Signed entity id. + signed_entity_id: String, + + /// Signed entity type. + signed_entity_type: SignedEntityType, + + /// Certificate id for this signed entity. + certificate_id: String, + + /// Raw signed entity (in JSON format). + pub entity: String, + + /// Date and time when the signed_entity was created + created_at: String, +} + +impl From for SignedEntityRecord { + fn from(other: Snapshot) -> Self { + let entity = serde_json::to_string(&other).unwrap(); + SignedEntityRecord { + signed_entity_id: other.digest, + signed_entity_type: SignedEntityType::CardanoImmutableFilesFull(other.beacon), + certificate_id: other.certificate_hash, + entity, + created_at: other.created_at, + } + } +} + +impl From for Snapshot { + fn from(other: SignedEntityRecord) -> Snapshot { + serde_json::from_str(&other.entity).unwrap() + } +} + +impl SqLiteEntity for SignedEntityRecord { + fn hydrate(row: sqlite::Row) -> Result + where + Self: Sized, + { + let signed_entity_id = row.get::(0); + let signed_entity_type_id_int = row.get::(1); + let certificate_id = row.get::(2); + let beacon_str = row.get::(3); + let entity_str = row.get::(4); + let created_at = row.get::(5); + + let signed_entity_record = Self { + signed_entity_id, + signed_entity_type: SignedEntityType::hydrate( + signed_entity_type_id_int.try_into().map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast i64 ({signed_entity_type_id_int}) to u64. Error: '{e}'" + )) + })?, + &beacon_str, + )?, + certificate_id, + entity: entity_str, + created_at, + }; + + Ok(signed_entity_record) + } + + fn get_projection() -> Projection { + Projection::from(&[ + ( + "signed_entity_id", + "{:signed_entity:}.signed_entity_id", + "text", + ), + ( + "signed_entity_type_id", + "{:signed_entity:}.signed_entity_type_id", + "integer", + ), + ("certificate_id", "{:signed_entity:}.certificate_id", "text"), + ("beacon", "{:signed_entity:}.beacon", "text"), + ("entity", "{:signed_entity:}.entity", "text"), + ("created_at", "{:signed_entity:}.created_at", "text"), + ]) + } +} + +/// Simple [SignedEntityRecord] provider. +pub struct SignedEntityRecordProvider<'client> { + client: &'client Connection, +} + +impl<'client> SignedEntityRecordProvider<'client> { + /// Create a new provider + pub fn new(client: &'client Connection) -> Self { + Self { client } + } + + fn condition_by_signed_entity_id( + &self, + signed_entity_id: String, + ) -> Result { + Ok(WhereCondition::new( + "signed_entity_id = ?*", + vec![Value::String(signed_entity_id)], + )) + } + + fn condition_by_signed_entity_type( + &self, + signed_entity_type: SignedEntityType, + ) -> Result { + let signed_entity_type_id: i64 = signed_entity_type.index() as i64; + + Ok(WhereCondition::new( + "signed_entity_type_id = ?*", + vec![Value::Integer(signed_entity_type_id)], + )) + } + + /// Get SignedEntityRecords for a given signed_entity id. + pub fn get_by_signed_entity_id( + &self, + signed_entity_id: String, + ) -> Result, StdError> { + let filters = self.condition_by_signed_entity_id(signed_entity_id)?; + let signed_entity_record = self.find(filters)?; + + Ok(signed_entity_record) + } + + /// Get SignedEntityRecords for a given signed entity type. + pub fn get_by_signed_entity_type( + &self, + signed_entity_type: SignedEntityType, + ) -> Result, StdError> { + let filters = self.condition_by_signed_entity_type(signed_entity_type)?; + let signed_entity_record = self.find(filters)?; + + Ok(signed_entity_record) + } + + /// Get all SignedEntityRecords. + pub fn get_all(&self) -> Result, StdError> { + let filters = WhereCondition::default(); + let signed_entity_record = self.find(filters)?; + + Ok(signed_entity_record) + } +} + +impl<'client> Provider<'client> for SignedEntityRecordProvider<'client> { + type Entity = SignedEntityRecord; + + fn get_connection(&'client self) -> &'client Connection { + self.client + } + + fn get_definition(&self, condition: &str) -> String { + let aliases = SourceAlias::new(&[("{:signed_entity:}", "se")]); + let projection = Self::Entity::get_projection().expand(aliases); + format!( + "select {projection} from signed_entity as se where {condition} order by ROWID desc" + ) + } +} + +/// Query to insert the signed_entity record +pub struct InsertSignedEntityRecordProvider<'conn> { + connection: &'conn Connection, +} + +impl<'conn> InsertSignedEntityRecordProvider<'conn> { + /// Create a new instance + pub fn new(connection: &'conn Connection) -> Self { + Self { connection } + } + + fn get_insert_condition(&self, signed_entity_record: SignedEntityRecord) -> WhereCondition { + WhereCondition::new( + "(signed_entity_id, signed_entity_type_id, certificate_id, beacon, entity, created_at) values (?*, ?*, ?*, ?*, ?*, ?*)", + vec![ + Value::String(signed_entity_record.signed_entity_id), + Value::Integer(signed_entity_record.signed_entity_type.index() as i64), + Value::String(signed_entity_record.certificate_id), + Value::String(signed_entity_record.signed_entity_type.get_json_beacon().unwrap()), + Value::String(signed_entity_record.entity), + Value::String(signed_entity_record.created_at), + ], + ) + } + + fn persist( + &self, + signed_entity_record: SignedEntityRecord, + ) -> Result { + let filters = self.get_insert_condition(signed_entity_record.clone()); + + let entity = self.find(filters)?.next().unwrap_or_else(|| { + panic!( + "No entity returned by the persister, signed_entity_record = {signed_entity_record:?}" + ) + }); + + Ok(entity) + } +} + +impl<'conn> Provider<'conn> for InsertSignedEntityRecordProvider<'conn> { + type Entity = SignedEntityRecord; + + fn get_connection(&'conn self) -> &'conn Connection { + self.connection + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection() + .expand(SourceAlias::new(&[("{:signed_entity:}", "signed_entity")])); + + format!("insert into signed_entity {condition} returning {projection}") + } +} + +/// Service to deal with signed_entity (read & write). +pub struct SignedEntityStoreAdapter { + connection: Arc>, +} + +impl SignedEntityStoreAdapter { + /// Create a new SignedEntityStoreAdapter service + pub fn new(connection: Arc>) -> Self { + Self { connection } + } +} + +#[async_trait] +impl StoreAdapter for SignedEntityStoreAdapter { + type Key = String; + type Record = Snapshot; + + async fn store_record( + &mut self, + _key: &Self::Key, + record: &Self::Record, + ) -> Result<(), AdapterError> { + let connection = &*self.connection.lock().await; + let provider = InsertSignedEntityRecordProvider::new(connection); + let _signed_entity_record = provider + .persist(record.to_owned().into()) + .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + + Ok(()) + } + + async fn get_record(&self, key: &Self::Key) -> Result, AdapterError> { + let connection = &*self.connection.lock().await; + let provider = SignedEntityRecordProvider::new(connection); + let mut cursor = provider + .get_by_signed_entity_id(key.to_string()) + .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let signed_entity = cursor + .next() + .map(|signed_entity_record| signed_entity_record.into()); + + Ok(signed_entity) + } + + async fn record_exists(&self, key: &Self::Key) -> Result { + Ok(self.get_record(key).await?.is_some()) + } + + async fn get_last_n_records( + &self, + how_many: usize, + ) -> Result, AdapterError> { + Ok(self + .get_iter() + .await? + .take(how_many) + .map(|se| (se.digest.to_owned(), se)) + .collect()) + } + + async fn remove(&mut self, _key: &Self::Key) -> Result, AdapterError> { + unimplemented!() + } + + async fn get_iter(&self) -> Result + '_>, AdapterError> { + let connection = &*self.connection.lock().await; + let provider = SignedEntityRecordProvider::new(connection); + let cursor = provider + .get_all() + .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; + let signed_entities: Vec = cursor.map(|se| se.into()).collect(); + Ok(Box::new(signed_entities.into_iter())) + } +} + +#[cfg(test)] +mod tests { + use mithril_common::{entities::Beacon, test_utils::fake_data}; + + use crate::database::migration::get_migrations; + + use super::*; + + pub fn fake_signed_entity_records(total_records: usize) -> Vec { + let snapshots = fake_data::snapshots(total_records as u64); + (0..total_records) + .map(|idx| { + let snapshot = snapshots.get(idx).unwrap().to_owned(); + let entity = serde_json::to_string(&snapshot).unwrap(); + SignedEntityRecord { + signed_entity_id: snapshot.digest, + signed_entity_type: SignedEntityType::CardanoImmutableFilesFull( + snapshot.beacon, + ), + certificate_id: snapshot.certificate_hash, + entity, + created_at: snapshot.created_at, + } + }) + .collect() + } + + pub fn setup_signed_entity_db( + connection: &Connection, + signed_entity_records: Vec, + ) -> Result<(), StdError> { + for migration in get_migrations() { + connection.execute(&migration.alterations)?; + } + + if signed_entity_records.is_empty() { + return Ok(()); + } + + let query = { + // leverage the expanded parameter from this provider which is unit + // tested on its own above. + let insert_provider = InsertSignedEntityRecordProvider::new(connection); + let (sql_values, _) = insert_provider + .get_insert_condition(signed_entity_records.first().unwrap().to_owned()) + .expand(); + format!("insert into signed_entity {sql_values}") + }; + + for signed_entity_record in signed_entity_records { + let mut statement = connection.prepare(&query)?; + + statement + .bind(1, signed_entity_record.signed_entity_id.as_str()) + .unwrap(); + statement + .bind(2, signed_entity_record.signed_entity_type.index() as i64) + .unwrap(); + statement + .bind(3, signed_entity_record.certificate_id.as_str()) + .unwrap(); + statement + .bind( + 4, + signed_entity_record + .signed_entity_type + .get_json_beacon() + .unwrap() + .as_str(), + ) + .unwrap(); + statement + .bind(5, signed_entity_record.entity.as_str()) + .unwrap(); + statement + .bind(6, signed_entity_record.created_at.as_str()) + .unwrap(); + + statement.next().unwrap(); + } + + Ok(()) + } + + #[test] + fn test_convert_signed_entity() { + let snapshots = fake_data::snapshots(1); + let snapshot = snapshots.first().unwrap().to_owned(); + let snapshot_expected = snapshot.clone(); + + let signed_entity: SignedEntityRecord = snapshot.into(); + let snapshot: Snapshot = signed_entity.into(); + assert_eq!(snapshot_expected, snapshot); + } + + #[test] + fn projection() { + let projection = SignedEntityRecord::get_projection(); + let aliases = SourceAlias::new(&[("{:signed_entity:}", "se")]); + + assert_eq!( + "se.signed_entity_id as signed_entity_id, se.signed_entity_type_id as signed_entity_type_id, se.certificate_id as certificate_id, se.beacon as beacon, se.entity as entity, se.created_at as created_at" + .to_string(), + projection.expand(aliases) + ); + } + + #[test] + fn get_signed_entity_record_by_signed_entity_type() { + let connection = Connection::open(":memory:").unwrap(); + let provider = SignedEntityRecordProvider::new(&connection); + let condition = provider + .condition_by_signed_entity_type(SignedEntityType::dummy()) + .unwrap(); + let (filter, values) = condition.expand(); + + assert_eq!("signed_entity_type_id = ?1".to_string(), filter); + assert_eq!( + vec![Value::Integer(SignedEntityType::dummy().index() as i64)], + values + ); + } + + #[test] + fn get_signed_entity_record_by_signed_entity_id() { + let connection = Connection::open(":memory:").unwrap(); + let provider = SignedEntityRecordProvider::new(&connection); + let condition = provider + .condition_by_signed_entity_id("signed-ent-123".to_string()) + .unwrap(); + let (filter, values) = condition.expand(); + + assert_eq!("signed_entity_id = ?1".to_string(), filter); + assert_eq!(vec![Value::String("signed-ent-123".to_string())], values); + } + + #[test] + fn insert_signed_entity_record() { + let snapshots = fake_data::snapshots(1); + let snapshot = snapshots.first().unwrap().to_owned(); + let signed_entity_record: SignedEntityRecord = snapshot.into(); + let connection = Connection::open(":memory:").unwrap(); + let provider = InsertSignedEntityRecordProvider::new(&connection); + let condition = provider.get_insert_condition(signed_entity_record.clone()); + let (values, params) = condition.expand(); + + assert_eq!( + "(signed_entity_id, signed_entity_type_id, certificate_id, beacon, entity, created_at) values (?1, ?2, ?3, ?4, ?5, ?6)".to_string(), + values + ); + assert_eq!( + vec![ + Value::String(signed_entity_record.signed_entity_id), + Value::Integer(signed_entity_record.signed_entity_type.index() as i64), + Value::String(signed_entity_record.certificate_id), + Value::String( + signed_entity_record + .signed_entity_type + .get_json_beacon() + .unwrap() + ), + Value::String(signed_entity_record.entity), + Value::String(signed_entity_record.created_at), + ], + params + ); + } + + #[test] + fn test_get_signed_entity_records() { + let signed_entity_records = fake_signed_entity_records(5); + + let connection = Connection::open(":memory:").unwrap(); + setup_signed_entity_db(&connection, signed_entity_records.clone()).unwrap(); + + let provider = SignedEntityRecordProvider::new(&connection); + + let first_signed_entity_type = signed_entity_records.first().unwrap().to_owned(); + let signed_entity_records: Vec = provider + .get_by_signed_entity_id(first_signed_entity_type.clone().signed_entity_id) + .unwrap() + .collect(); + assert_eq!(vec![first_signed_entity_type], signed_entity_records); + + let signed_entity_records: Vec = provider + .get_by_signed_entity_type(SignedEntityType::CardanoImmutableFilesFull( + Beacon::default(), + )) + .unwrap() + .collect(); + let expected_signed_entity_records: Vec = signed_entity_records + .iter() + .filter_map(|se| { + (se.signed_entity_type.index() + == SignedEntityType::CardanoImmutableFilesFull(Beacon::default()).index()) + .then_some(se.to_owned()) + }) + .collect(); + assert_eq!(expected_signed_entity_records, signed_entity_records); + + let signed_entity_records: Vec = provider.get_all().unwrap().collect(); + let expected_signed_entity_records: Vec = + signed_entity_records.iter().map(|c| c.to_owned()).collect(); + assert_eq!(expected_signed_entity_records, signed_entity_records); + } + + #[test] + fn test_insert_signed_entity_record() { + let signed_entity_records = fake_signed_entity_records(5); + + let connection = Connection::open(":memory:").unwrap(); + setup_signed_entity_db(&connection, Vec::new()).unwrap(); + + let provider = InsertSignedEntityRecordProvider::new(&connection); + + for signed_entity_record in signed_entity_records { + let signed_entity_record_saved = + provider.persist(signed_entity_record.clone()).unwrap(); + assert_eq!(signed_entity_record, signed_entity_record_saved); + } + } + + #[tokio::test] + async fn test_store_adapter() { + let signed_entity_records = fake_signed_entity_records(5); + + let connection = Connection::open(":memory:").unwrap(); + setup_signed_entity_db(&connection, Vec::new()).unwrap(); + + let mut signed_entity_store_adapter = + SignedEntityStoreAdapter::new(Arc::new(Mutex::new(connection))); + + for signed_entity_record in &signed_entity_records { + assert!(signed_entity_store_adapter + .store_record( + &signed_entity_record.signed_entity_id, + &signed_entity_record.to_owned().into() + ) + .await + .is_ok()); + } + + for signed_entity_record in &signed_entity_records { + assert!(signed_entity_store_adapter + .record_exists(&signed_entity_record.signed_entity_id) + .await + .unwrap()); + assert_eq!( + Some(signed_entity_record.to_owned().into()), + signed_entity_store_adapter + .get_record(&signed_entity_record.signed_entity_id) + .await + .unwrap() + ); + } + + assert_eq!( + signed_entity_records, + signed_entity_store_adapter + .get_last_n_records(signed_entity_records.len()) + .await + .unwrap() + .into_iter() + .map(|(_k, v)| v.into()) + .rev() + .collect::>() + ) + } +} diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 4017321bc4f..cefafb8ce71 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -37,7 +37,8 @@ use warp::Filter; use crate::{ configuration::{ExecutionEnvironment, LIST_SNAPSHOTS_MAX_ITEMS}, database::provider::{ - CertificateStoreAdapter, EpochSettingStore, SignerRegistrationStoreAdapter, StakePoolStore, + CertificateStoreAdapter, EpochSettingStore, SignedEntityStoreAdapter, + SignerRegistrationStoreAdapter, StakePoolStore, }, event_store::{EventMessage, EventStore, TransmitterService}, http_server::routes::router, @@ -256,11 +257,7 @@ impl DependenciesBuilder { dyn StoreAdapter, > = match self.configuration.environment { ExecutionEnvironment::Production => { - let adapter = SQLiteAdapter::new("snapshot", self.get_sqlite_connection().await?) - .map_err(|e| DependenciesBuilderError::Initialization { - message: "Cannot create SQLite adapter for Snapshot Store.".to_string(), - error: Some(e.into()), - })?; + let adapter = SignedEntityStoreAdapter::new(self.get_sqlite_connection().await?); Box::new(adapter) }