From 974296a91f0040bd89602cc209a37ec5f63add6e Mon Sep 17 00:00:00 2001 From: "daniel.eades" Date: Thu, 2 Jan 2025 14:47:48 +0000 Subject: [PATCH 1/3] use an associated constant for the aggregate type --- Cargo.toml | 2 +- src/aggregate.rs | 15 +++++++-------- src/doc.rs | 10 ++-------- src/persist/event_store.rs | 4 +--- src/persist/serialized_event.rs | 2 +- tests/lib.rs | 5 +---- 6 files changed, 13 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0634c42..b820b58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ documentation = "https://docs.rs/cqrs-es" repository.workspace = true readme = "README.md" exclude = ["docs"] -rust-version = "1.70.0" +rust-version = "1.79.0" [dependencies] async-trait = "0.1" diff --git a/src/aggregate.rs b/src/aggregate.rs index a9fa832..af07060 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -27,14 +27,12 @@ use crate::DomainEvent; /// /// #[async_trait] /// impl Aggregate for Customer { +/// const TYPE: &'static str = "customer"; /// type Command = CustomerCommand; /// type Event = CustomerEvent; /// type Error = CustomerError; /// type Services = CustomerService; /// -/// -/// fn aggregate_type() -> String { "customer".to_string() } -/// /// async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result, Self::Error> { /// match command { /// CustomerCommand::AddCustomerName{name: changed_name} => { @@ -65,6 +63,9 @@ use crate::DomainEvent; /// ``` #[async_trait] pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send { + /// The aggregate type is used as the unique identifier for this aggregate and its events. + /// This is used for persisting the events and snapshots to a database. + const TYPE: &'static str; /// Specifies the inbound command used to make changes in the state of the Aggregate. type Command; /// Specifies the published events representing some change in state of the Aggregate. @@ -74,9 +75,7 @@ pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send { type Error: std::error::Error; /// The external services required for the logic within the Aggregate type Services: Send + Sync; - /// The aggregate type is used as the unique identifier for this aggregate and its events. - /// This is used for persisting the events and snapshots to a database. - fn aggregate_type() -> String; + /// This method consumes and processes commands. /// The result should be either a vector of events if the command is successful, /// or an error if the command is rejected. @@ -96,11 +95,11 @@ pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send { /// # } /// # #[async_trait] /// # impl Aggregate for Customer { + /// # const TYPE: &'static str = "customer"; /// # type Command = CustomerCommand; /// # type Event = CustomerEvent; /// # type Error = CustomerError; /// # type Services = CustomerService; - /// # fn aggregate_type() -> String { "customer".to_string() } /// async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result, Self::Error> { /// match command { /// CustomerCommand::AddCustomerName{name: changed_name} => { @@ -149,11 +148,11 @@ pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send { /// # } /// # #[async_trait] /// # impl Aggregate for Customer { + /// # const TYPE: &'static str = "customer"; /// # type Command = CustomerCommand; /// # type Event = CustomerEvent; /// # type Error = CustomerError; /// # type Services = CustomerService; - /// # fn aggregate_type() -> String { "customer".to_string() } /// # async fn handle(&self, command: Self::Command, service: &Self::Services) -> Result, Self::Error> { /// # Ok(vec![]) /// # } diff --git a/src/doc.rs b/src/doc.rs index 5a1e8b4..bb51319 100644 --- a/src/doc.rs +++ b/src/doc.rs @@ -30,15 +30,12 @@ pub struct MyAggregate; #[async_trait] impl Aggregate for MyAggregate { + const TYPE: &'static str = "MyAggregate"; type Command = MyCommands; type Event = MyEvents; type Error = MyUserError; type Services = MyService; - fn aggregate_type() -> String { - "MyAggregate".to_string() - } - async fn handle( &self, command: Self::Command, @@ -83,15 +80,12 @@ impl Query for MyQuery { #[async_trait] impl Aggregate for Customer { + const TYPE: &'static str = "Customer"; type Command = CustomerCommand; type Event = CustomerEvent; type Error = CustomerError; type Services = CustomerService; - fn aggregate_type() -> String { - "Customer".to_string() - } - async fn handle( &self, command: Self::Command, diff --git a/src/persist/event_store.rs b/src/persist/event_store.rs index adc84ba..d9001a4 100644 --- a/src/persist/event_store.rs +++ b/src/persist/event_store.rs @@ -365,14 +365,12 @@ pub(crate) mod shared_test { #[async_trait] impl Aggregate for TestAggregate { + const TYPE: &'static str = "TestAggregate"; type Command = TestCommands; type Event = TestEvents; type Error = TestError; type Services = TestService; - fn aggregate_type() -> String { - "TestAggregate".to_string() - } async fn handle( &self, command: Self::Command, diff --git a/src/persist/serialized_event.rs b/src/persist/serialized_event.rs index 769aade..335f1a2 100644 --- a/src/persist/serialized_event.rs +++ b/src/persist/serialized_event.rs @@ -83,7 +83,7 @@ impl TryFrom<&EventEnvelope> for SerializedEvent { type Error = PersistenceError; fn try_from(event: &EventEnvelope) -> Result { - let aggregate_type = A::aggregate_type(); + let aggregate_type = A::TYPE.to_string(); let event_type = event.payload.event_type(); let event_version = event.payload.event_version(); let payload = serde_json::to_value(&event.payload)?; diff --git a/tests/lib.rs b/tests/lib.rs index acb4168..b0ca7a8 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -30,15 +30,12 @@ pub struct TestService; #[async_trait] impl Aggregate for TestAggregate { + const TYPE: &'static str = "TestAggregate"; type Command = TestCommand; type Event = TestEvent; type Error = TestError; type Services = TestService; - fn aggregate_type() -> String { - "TestAggregate".to_string() - } - async fn handle( &self, command: Self::Command, From e4770527f4d780967087cfac416e0a3288aa5cb8 Mon Sep 17 00:00:00 2001 From: "daniel.eades" Date: Mon, 6 Jan 2025 15:49:32 +0000 Subject: [PATCH 2/3] fixup --- demo/src/domain/aggregate.rs | 7 ++----- persistence/dynamo-es/src/event_repository.rs | 15 ++++++--------- persistence/dynamo-es/src/testing.rs | 7 ++----- persistence/postgres-es/src/event_repository.rs | 14 +++++++------- persistence/postgres-es/src/testing.rs | 7 ++----- 5 files changed, 19 insertions(+), 31 deletions(-) diff --git a/demo/src/domain/aggregate.rs b/demo/src/domain/aggregate.rs index 2259061..cd97a93 100644 --- a/demo/src/domain/aggregate.rs +++ b/demo/src/domain/aggregate.rs @@ -14,16 +14,13 @@ pub struct BankAccount { #[async_trait] impl Aggregate for BankAccount { + // This identifier should be unique to the system. + const TYPE: &'static str = "account"; type Command = BankAccountCommand; type Event = BankAccountEvent; type Error = BankAccountError; type Services = BankAccountServices; - // This identifier should be unique to the system. - fn aggregate_type() -> String { - "account".to_string() - } - // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system // so expect to use helper functions elsewhere to keep the code clean. async fn handle( diff --git a/persistence/dynamo-es/src/event_repository.rs b/persistence/dynamo-es/src/event_repository.rs index 1b213a3..44861e5 100644 --- a/persistence/dynamo-es/src/event_repository.rs +++ b/persistence/dynamo-es/src/event_repository.rs @@ -196,9 +196,8 @@ impl DynamoEventRepository { let expected_snapshot = current_snapshot - 1; let (mut transactions, current_sequence) = Self::build_event_put_transactions(&self.event_table, events); - let aggregate_type_and_id = - AttributeValue::S(format!("{}:{}", A::aggregate_type(), &aggregate_id)); - let aggregate_type = AttributeValue::S(A::aggregate_type()); + let aggregate_type_and_id = AttributeValue::S(format!("{}:{}", A::TYPE, &aggregate_id)); + let aggregate_type = AttributeValue::S(A::TYPE.to_string()); let aggregate_id = AttributeValue::S(aggregate_id); let current_sequence = AttributeValue::N(current_sequence.to_string()); let current_snapshot = AttributeValue::N(current_snapshot.to_string()); @@ -278,9 +277,7 @@ impl PersistedEventRepository for DynamoEventRepository { &self, aggregate_id: &str, ) -> Result, PersistenceError> { - let request = self - .query_events(&A::aggregate_type(), aggregate_id) - .await?; + let request = self.query_events(A::TYPE, aggregate_id).await?; Ok(request) } @@ -290,7 +287,7 @@ impl PersistedEventRepository for DynamoEventRepository { number_events: usize, ) -> Result, PersistenceError> { Ok(self - .query_events_from(&A::aggregate_type(), aggregate_id, number_events) + .query_events_from(A::TYPE, aggregate_id, number_events) .await?) } @@ -299,7 +296,7 @@ impl PersistedEventRepository for DynamoEventRepository { aggregate_id: &str, ) -> Result, PersistenceError> { let query_output = self - .query_table(&A::aggregate_type(), aggregate_id, &self.snapshot_table) + .query_table(A::TYPE, aggregate_id, &self.snapshot_table) .await?; let query_items_vec = match query_output.items { None => return Ok(None), @@ -343,7 +340,7 @@ impl PersistedEventRepository for DynamoEventRepository { aggregate_id: &str, ) -> Result { let query = self - .create_query(&self.event_table, &A::aggregate_type(), aggregate_id) + .create_query(&self.event_table, A::TYPE, aggregate_id) .await .limit(self.stream_channel_size as i32); Ok(stream_events(query, self.stream_channel_size)) diff --git a/persistence/dynamo-es/src/testing.rs b/persistence/dynamo-es/src/testing.rs index d73b67b..9ebddcd 100644 --- a/persistence/dynamo-es/src/testing.rs +++ b/persistence/dynamo-es/src/testing.rs @@ -25,15 +25,12 @@ pub(crate) mod tests { #[async_trait] impl Aggregate for TestAggregate { + const TYPE: &'static str = "TestAggregate"; type Command = TestCommand; type Event = TestEvent; type Error = TestError; type Services = TestServices; - fn aggregate_type() -> String { - "TestAggregate".to_string() - } - async fn handle( &self, _command: Self::Command, @@ -145,7 +142,7 @@ pub(crate) mod tests { SerializedEvent { aggregate_id: id.to_string(), sequence, - aggregate_type: TestAggregate::aggregate_type(), + aggregate_type: TestAggregate::TYPE.to_string(), event_type: event.event_type(), event_version: event.event_version(), payload, diff --git a/persistence/postgres-es/src/event_repository.rs b/persistence/postgres-es/src/event_repository.rs index 2bc4174..c2f607d 100644 --- a/persistence/postgres-es/src/event_repository.rs +++ b/persistence/postgres-es/src/event_repository.rs @@ -47,7 +47,7 @@ impl PersistedEventRepository for PostgresEventRepository { aggregate_id: &str, ) -> Result, PersistenceError> { let row: PgRow = match sqlx::query(self.query_factory.select_snapshot()) - .bind(A::aggregate_type()) + .bind(A::TYPE) .bind(aggregate_id) .fetch_optional(&self.pool) .await @@ -89,7 +89,7 @@ impl PersistedEventRepository for PostgresEventRepository { ) -> Result { Ok(stream_events( self.query_factory.select_events().to_string(), - A::aggregate_type(), + A::TYPE.to_string(), aggregate_id.to_string(), self.pool.clone(), self.stream_channel_size, @@ -100,7 +100,7 @@ impl PersistedEventRepository for PostgresEventRepository { async fn stream_all_events(&self) -> Result { Ok(stream_events( self.query_factory.all_events().to_string(), - A::aggregate_type(), + A::TYPE.to_string(), "".to_string(), self.pool.clone(), self.stream_channel_size, @@ -140,7 +140,7 @@ impl PostgresEventRepository { query: &str, ) -> Result, PersistenceError> { let mut rows = sqlx::query(query) - .bind(A::aggregate_type()) + .bind(A::TYPE) .bind(aggregate_id) .fetch(&self.pool); let mut result: Vec = Default::default(); @@ -239,7 +239,7 @@ impl PostgresEventRepository { .persist_events::(self.query_factory.insert_event(), &mut tx, events) .await?; sqlx::query(self.query_factory.insert_snapshot()) - .bind(A::aggregate_type()) + .bind(A::TYPE) .bind(aggregate_id.as_str()) .bind(current_sequence as i32) .bind(current_snapshot as i32) @@ -264,7 +264,7 @@ impl PostgresEventRepository { let aggregate_payload = serde_json::to_value(&aggregate)?; let result = sqlx::query(self.query_factory.update_snapshot()) - .bind(A::aggregate_type()) + .bind(A::TYPE) .bind(aggregate_id.as_str()) .bind(current_sequence as i32) .bind(current_snapshot as i32) @@ -330,7 +330,7 @@ impl PostgresEventRepository { let payload = serde_json::to_value(&event.payload)?; let metadata = serde_json::to_value(&event.metadata)?; sqlx::query(inser_event_query) - .bind(A::aggregate_type()) + .bind(A::TYPE) .bind(event.aggregate_id.as_str()) .bind(event.sequence as i32) .bind(event_type) diff --git a/persistence/postgres-es/src/testing.rs b/persistence/postgres-es/src/testing.rs index c40f8b0..d70c992 100644 --- a/persistence/postgres-es/src/testing.rs +++ b/persistence/postgres-es/src/testing.rs @@ -17,15 +17,12 @@ pub(crate) mod tests { #[async_trait] impl Aggregate for TestAggregate { + const TYPE: &'static str = "TestAggregate"; type Command = TestCommand; type Event = TestEvent; type Error = TestError; type Services = TestServices; - fn aggregate_type() -> String { - "TestAggregate".to_string() - } - async fn handle( &self, _command: Self::Command, @@ -115,7 +112,7 @@ pub(crate) mod tests { SerializedEvent { aggregate_id: id.to_string(), sequence, - aggregate_type: TestAggregate::aggregate_type(), + aggregate_type: TestAggregate::TYPE.to_string(), event_type: event.event_type(), event_version: event.event_version(), payload, From 48026a1d1ca43dd609a956dc8077b9d0dd3e44ba Mon Sep 17 00:00:00 2001 From: "daniel.eades" Date: Mon, 6 Jan 2025 11:18:04 +0000 Subject: [PATCH 3/3] use 'thiserror' to generate errors --- demo/Cargo.toml | 1 + demo/src/domain/events.rs | 12 ++---------- persistence/dynamo-es/Cargo.toml | 1 + persistence/dynamo-es/src/error.rs | 27 +++++++++------------------ persistence/mysql-es/Cargo.toml | 1 + persistence/mysql-es/src/error.rs | 21 +++++---------------- persistence/postgres-es/Cargo.toml | 1 + persistence/postgres-es/src/error.rs | 21 +++++---------------- 8 files changed, 25 insertions(+), 60 deletions(-) diff --git a/demo/Cargo.toml b/demo/Cargo.toml index 4312514..b327169 100644 --- a/demo/Cargo.toml +++ b/demo/Cargo.toml @@ -27,6 +27,7 @@ tower = "0.4" tower-http = "0.4" lambda_http = "0.8" +thiserror = "2.0.9" [[bin]] name = "cqrs-demo" diff --git a/demo/src/domain/events.rs b/demo/src/domain/events.rs index d9743dc..32c10b3 100644 --- a/demo/src/domain/events.rs +++ b/demo/src/domain/events.rs @@ -1,6 +1,5 @@ use cqrs_es::DomainEvent; use serde::{Deserialize, Serialize}; -use std::fmt::{Debug, Display, Formatter}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum BankAccountEvent { @@ -37,7 +36,8 @@ impl DomainEvent for BankAccountEvent { } } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] +#[error("{0}")] pub struct BankAccountError(String); impl From<&str> for BankAccountError { @@ -45,11 +45,3 @@ impl From<&str> for BankAccountError { Self(msg.to_string()) } } - -impl Display for BankAccountError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for BankAccountError {} diff --git a/persistence/dynamo-es/Cargo.toml b/persistence/dynamo-es/Cargo.toml index 3dba3f2..a143d31 100644 --- a/persistence/dynamo-es/Cargo.toml +++ b/persistence/dynamo-es/Cargo.toml @@ -19,6 +19,7 @@ futures = "0.3" serde = { workspace = true, features = ["derive"]} serde_json = "1.0" tokio = { workspace = true, features = ["rt"] } +thiserror = "2.0.9" [dev-dependencies] aws-config = "1.1.5" diff --git a/persistence/dynamo-es/src/error.rs b/persistence/dynamo-es/src/error.rs index 9f5881c..441ab3f 100644 --- a/persistence/dynamo-es/src/error.rs +++ b/persistence/dynamo-es/src/error.rs @@ -1,5 +1,3 @@ -use std::fmt::{Debug, Display, Formatter}; - use aws_sdk_dynamodb::error::{BuildError, SdkError}; use aws_sdk_dynamodb::operation::query::QueryError; use aws_sdk_dynamodb::operation::scan::ScanError; @@ -8,31 +6,24 @@ use cqrs_es::persist::PersistenceError; use cqrs_es::AggregateError; use serde::de::StdError; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum DynamoAggregateError { + #[error("optimistic lock error")] OptimisticLock, + #[error(transparent)] ConnectionError(Box), + #[error(transparent)] DeserializationError(Box), + #[error( + "Too many operations: {0}, DynamoDb supports only up to 25 operations per transactions" + )] TransactionListTooLong(usize), + #[error("missing attribute: {0}")] MissingAttribute(String), + #[error(transparent)] UnknownError(Box), } -impl Display for DynamoAggregateError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::OptimisticLock => write!(f, "optimistic lock error"), - Self::MissingAttribute(attribute) => write!(f, "missing attribute: {attribute}"), - Self::ConnectionError(msg) => write!(f, "{msg}"), - Self::DeserializationError(msg) => write!(f, "{msg}"), - Self::UnknownError(msg) => write!(f, "{msg}"), - Self::TransactionListTooLong(length) => write!(f, "Too many operations: {length}, DynamoDb supports only up to 25 operations per transactions"), - } - } -} - -impl std::error::Error for DynamoAggregateError {} - impl From for AggregateError { fn from(error: DynamoAggregateError) -> Self { match error { diff --git a/persistence/mysql-es/Cargo.toml b/persistence/mysql-es/Cargo.toml index 53bee9a..2e16c6d 100644 --- a/persistence/mysql-es/Cargo.toml +++ b/persistence/mysql-es/Cargo.toml @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"]} serde_json = "1.0" sqlx = { version = "0.8", features = [ "mysql", "json"] } tokio = { workspace = true, features = ["rt"] } +thiserror = "2.0.9" [dev-dependencies] uuid.workspace = true diff --git a/persistence/mysql-es/src/error.rs b/persistence/mysql-es/src/error.rs index b4ef0ca..9c01fba 100644 --- a/persistence/mysql-es/src/error.rs +++ b/persistence/mysql-es/src/error.rs @@ -1,30 +1,19 @@ -use std::fmt::{Debug, Display, Formatter}; - use cqrs_es::persist::PersistenceError; use cqrs_es::AggregateError; use sqlx::Error; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum MysqlAggregateError { + #[error("optimistic lock error")] OptimisticLock, + #[error(transparent)] ConnectionError(Box), + #[error(transparent)] DeserializationError(Box), + #[error(transparent)] UnknownError(Box), } -impl Display for MysqlAggregateError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::OptimisticLock => write!(f, "optimistic lock error"), - Self::ConnectionError(error) => write!(f, "{error}"), - Self::DeserializationError(error) => write!(f, "{error}"), - Self::UnknownError(error) => write!(f, "{error}"), - } - } -} - -impl std::error::Error for MysqlAggregateError {} - impl From for MysqlAggregateError { fn from(err: sqlx::Error) -> Self { // TODO: improve error handling diff --git a/persistence/postgres-es/Cargo.toml b/persistence/postgres-es/Cargo.toml index c04bed9..c970af6 100644 --- a/persistence/postgres-es/Cargo.toml +++ b/persistence/postgres-es/Cargo.toml @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] } serde_json = "1.0" sqlx = { version = "0.8", features = ["postgres", "json"] } tokio = { workspace = true, features = ["rt"] } +thiserror = "2.0.9" [dev-dependencies] uuid.workspace = true diff --git a/persistence/postgres-es/src/error.rs b/persistence/postgres-es/src/error.rs index 15cd231..e8dd403 100644 --- a/persistence/postgres-es/src/error.rs +++ b/persistence/postgres-es/src/error.rs @@ -1,30 +1,19 @@ -use std::fmt::{Debug, Display, Formatter}; - use cqrs_es::persist::PersistenceError; use cqrs_es::AggregateError; use sqlx::Error; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum PostgresAggregateError { + #[error("optimistic lock error")] OptimisticLock, + #[error(transparent)] ConnectionError(Box), + #[error(transparent)] DeserializationError(Box), + #[error(transparent)] UnknownError(Box), } -impl Display for PostgresAggregateError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::OptimisticLock => write!(f, "optimistic lock error"), - Self::UnknownError(error) => write!(f, "{error}"), - Self::DeserializationError(error) => write!(f, "{error}"), - Self::ConnectionError(error) => write!(f, "{error}"), - } - } -} - -impl std::error::Error for PostgresAggregateError {} - impl From for PostgresAggregateError { fn from(err: sqlx::Error) -> Self { // TODO: improve error handling