Skip to content

Commit

Permalink
Merge pull request #123 from danieleades/thiserror
Browse files Browse the repository at this point in the history
use 'thiserror'
  • Loading branch information
serverlesstechnology authored Jan 6, 2025
2 parents b2fdba5 + 48026a1 commit 3615030
Show file tree
Hide file tree
Showing 19 changed files with 57 additions and 116 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ documentation = "https://docs.rs/cqrs-es"
repository.workspace = true
readme = "README.md"
exclude = ["docs"]
rust-version = "1.70.0"
rust-version = "1.79.0"

[dependencies]
async-trait = "0.1"
Expand Down
1 change: 1 addition & 0 deletions demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tower = "0.4"
tower-http = "0.4"

lambda_http = "0.8"
thiserror = "2.0.9"

[[bin]]
name = "cqrs-demo"
Expand Down
7 changes: 2 additions & 5 deletions demo/src/domain/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@ pub struct BankAccount {

#[async_trait]
impl Aggregate for BankAccount {
// This identifier should be unique to the system.
const TYPE: &'static str = "account";
type Command = BankAccountCommand;
type Event = BankAccountEvent;
type Error = BankAccountError;
type Services = BankAccountServices;

// This identifier should be unique to the system.
fn aggregate_type() -> String {
"account".to_string()
}

// The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system
// so expect to use helper functions elsewhere to keep the code clean.
async fn handle(
Expand Down
12 changes: 2 additions & 10 deletions demo/src/domain/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use cqrs_es::DomainEvent;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum BankAccountEvent {
Expand Down Expand Up @@ -37,19 +36,12 @@ impl DomainEvent for BankAccountEvent {
}
}

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct BankAccountError(String);

impl From<&str> for BankAccountError {
fn from(msg: &str) -> Self {
Self(msg.to_string())
}
}

impl Display for BankAccountError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl std::error::Error for BankAccountError {}
1 change: 1 addition & 0 deletions persistence/dynamo-es/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures = "0.3"
serde = { workspace = true, features = ["derive"]}
serde_json = "1.0"
tokio = { workspace = true, features = ["rt"] }
thiserror = "2.0.9"

[dev-dependencies]
aws-config = "1.1.5"
Expand Down
27 changes: 9 additions & 18 deletions persistence/dynamo-es/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fmt::{Debug, Display, Formatter};

use aws_sdk_dynamodb::error::{BuildError, SdkError};
use aws_sdk_dynamodb::operation::query::QueryError;
use aws_sdk_dynamodb::operation::scan::ScanError;
Expand All @@ -8,31 +6,24 @@ use cqrs_es::persist::PersistenceError;
use cqrs_es::AggregateError;
use serde::de::StdError;

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum DynamoAggregateError {
#[error("optimistic lock error")]
OptimisticLock,
#[error(transparent)]
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
DeserializationError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(
"Too many operations: {0}, DynamoDb supports only up to 25 operations per transactions"
)]
TransactionListTooLong(usize),
#[error("missing attribute: {0}")]
MissingAttribute(String),
#[error(transparent)]
UnknownError(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl Display for DynamoAggregateError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::OptimisticLock => write!(f, "optimistic lock error"),
Self::MissingAttribute(attribute) => write!(f, "missing attribute: {attribute}"),
Self::ConnectionError(msg) => write!(f, "{msg}"),
Self::DeserializationError(msg) => write!(f, "{msg}"),
Self::UnknownError(msg) => write!(f, "{msg}"),
Self::TransactionListTooLong(length) => write!(f, "Too many operations: {length}, DynamoDb supports only up to 25 operations per transactions"),
}
}
}

impl std::error::Error for DynamoAggregateError {}

impl<T: std::error::Error> From<DynamoAggregateError> for AggregateError<T> {
fn from(error: DynamoAggregateError) -> Self {
match error {
Expand Down
15 changes: 6 additions & 9 deletions persistence/dynamo-es/src/event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ impl DynamoEventRepository {
let expected_snapshot = current_snapshot - 1;
let (mut transactions, current_sequence) =
Self::build_event_put_transactions(&self.event_table, events);
let aggregate_type_and_id =
AttributeValue::S(format!("{}:{}", A::aggregate_type(), &aggregate_id));
let aggregate_type = AttributeValue::S(A::aggregate_type());
let aggregate_type_and_id = AttributeValue::S(format!("{}:{}", A::TYPE, &aggregate_id));
let aggregate_type = AttributeValue::S(A::TYPE.to_string());
let aggregate_id = AttributeValue::S(aggregate_id);
let current_sequence = AttributeValue::N(current_sequence.to_string());
let current_snapshot = AttributeValue::N(current_snapshot.to_string());
Expand Down Expand Up @@ -278,9 +277,7 @@ impl PersistedEventRepository for DynamoEventRepository {
&self,
aggregate_id: &str,
) -> Result<Vec<SerializedEvent>, PersistenceError> {
let request = self
.query_events(&A::aggregate_type(), aggregate_id)
.await?;
let request = self.query_events(A::TYPE, aggregate_id).await?;
Ok(request)
}

Expand All @@ -290,7 +287,7 @@ impl PersistedEventRepository for DynamoEventRepository {
number_events: usize,
) -> Result<Vec<SerializedEvent>, PersistenceError> {
Ok(self
.query_events_from(&A::aggregate_type(), aggregate_id, number_events)
.query_events_from(A::TYPE, aggregate_id, number_events)
.await?)
}

Expand All @@ -299,7 +296,7 @@ impl PersistedEventRepository for DynamoEventRepository {
aggregate_id: &str,
) -> Result<Option<SerializedSnapshot>, PersistenceError> {
let query_output = self
.query_table(&A::aggregate_type(), aggregate_id, &self.snapshot_table)
.query_table(A::TYPE, aggregate_id, &self.snapshot_table)
.await?;
let query_items_vec = match query_output.items {
None => return Ok(None),
Expand Down Expand Up @@ -343,7 +340,7 @@ impl PersistedEventRepository for DynamoEventRepository {
aggregate_id: &str,
) -> Result<ReplayStream, PersistenceError> {
let query = self
.create_query(&self.event_table, &A::aggregate_type(), aggregate_id)
.create_query(&self.event_table, A::TYPE, aggregate_id)
.await
.limit(self.stream_channel_size as i32);
Ok(stream_events(query, self.stream_channel_size))
Expand Down
7 changes: 2 additions & 5 deletions persistence/dynamo-es/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ pub(crate) mod tests {

#[async_trait]
impl Aggregate for TestAggregate {
const TYPE: &'static str = "TestAggregate";
type Command = TestCommand;
type Event = TestEvent;
type Error = TestError;
type Services = TestServices;

fn aggregate_type() -> String {
"TestAggregate".to_string()
}

async fn handle(
&self,
_command: Self::Command,
Expand Down Expand Up @@ -145,7 +142,7 @@ pub(crate) mod tests {
SerializedEvent {
aggregate_id: id.to_string(),
sequence,
aggregate_type: TestAggregate::aggregate_type(),
aggregate_type: TestAggregate::TYPE.to_string(),
event_type: event.event_type(),
event_version: event.event_version(),
payload,
Expand Down
1 change: 1 addition & 0 deletions persistence/mysql-es/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"]}
serde_json = "1.0"
sqlx = { version = "0.8", features = [ "mysql", "json"] }
tokio = { workspace = true, features = ["rt"] }
thiserror = "2.0.9"

[dev-dependencies]
uuid.workspace = true
Expand Down
21 changes: 5 additions & 16 deletions persistence/mysql-es/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
use std::fmt::{Debug, Display, Formatter};

use cqrs_es::persist::PersistenceError;
use cqrs_es::AggregateError;
use sqlx::Error;

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum MysqlAggregateError {
#[error("optimistic lock error")]
OptimisticLock,
#[error(transparent)]
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
DeserializationError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
UnknownError(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl Display for MysqlAggregateError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::OptimisticLock => write!(f, "optimistic lock error"),
Self::ConnectionError(error) => write!(f, "{error}"),
Self::DeserializationError(error) => write!(f, "{error}"),
Self::UnknownError(error) => write!(f, "{error}"),
}
}
}

impl std::error::Error for MysqlAggregateError {}

impl From<sqlx::Error> for MysqlAggregateError {
fn from(err: sqlx::Error) -> Self {
// TODO: improve error handling
Expand Down
1 change: 1 addition & 0 deletions persistence/postgres-es/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.8", features = ["postgres", "json"] }
tokio = { workspace = true, features = ["rt"] }
thiserror = "2.0.9"

[dev-dependencies]
uuid.workspace = true
Expand Down
21 changes: 5 additions & 16 deletions persistence/postgres-es/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
use std::fmt::{Debug, Display, Formatter};

use cqrs_es::persist::PersistenceError;
use cqrs_es::AggregateError;
use sqlx::Error;

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum PostgresAggregateError {
#[error("optimistic lock error")]
OptimisticLock,
#[error(transparent)]
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
DeserializationError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
UnknownError(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl Display for PostgresAggregateError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::OptimisticLock => write!(f, "optimistic lock error"),
Self::UnknownError(error) => write!(f, "{error}"),
Self::DeserializationError(error) => write!(f, "{error}"),
Self::ConnectionError(error) => write!(f, "{error}"),
}
}
}

impl std::error::Error for PostgresAggregateError {}

impl From<sqlx::Error> for PostgresAggregateError {
fn from(err: sqlx::Error) -> Self {
// TODO: improve error handling
Expand Down
14 changes: 7 additions & 7 deletions persistence/postgres-es/src/event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl PersistedEventRepository for PostgresEventRepository {
aggregate_id: &str,
) -> Result<Option<SerializedSnapshot>, PersistenceError> {
let row: PgRow = match sqlx::query(self.query_factory.select_snapshot())
.bind(A::aggregate_type())
.bind(A::TYPE)
.bind(aggregate_id)
.fetch_optional(&self.pool)
.await
Expand Down Expand Up @@ -89,7 +89,7 @@ impl PersistedEventRepository for PostgresEventRepository {
) -> Result<ReplayStream, PersistenceError> {
Ok(stream_events(
self.query_factory.select_events().to_string(),
A::aggregate_type(),
A::TYPE.to_string(),
aggregate_id.to_string(),
self.pool.clone(),
self.stream_channel_size,
Expand All @@ -100,7 +100,7 @@ impl PersistedEventRepository for PostgresEventRepository {
async fn stream_all_events<A: Aggregate>(&self) -> Result<ReplayStream, PersistenceError> {
Ok(stream_events(
self.query_factory.all_events().to_string(),
A::aggregate_type(),
A::TYPE.to_string(),
"".to_string(),
self.pool.clone(),
self.stream_channel_size,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl PostgresEventRepository {
query: &str,
) -> Result<Vec<SerializedEvent>, PersistenceError> {
let mut rows = sqlx::query(query)
.bind(A::aggregate_type())
.bind(A::TYPE)
.bind(aggregate_id)
.fetch(&self.pool);
let mut result: Vec<SerializedEvent> = Default::default();
Expand Down Expand Up @@ -239,7 +239,7 @@ impl PostgresEventRepository {
.persist_events::<A>(self.query_factory.insert_event(), &mut tx, events)
.await?;
sqlx::query(self.query_factory.insert_snapshot())
.bind(A::aggregate_type())
.bind(A::TYPE)
.bind(aggregate_id.as_str())
.bind(current_sequence as i32)
.bind(current_snapshot as i32)
Expand All @@ -264,7 +264,7 @@ impl PostgresEventRepository {

let aggregate_payload = serde_json::to_value(&aggregate)?;
let result = sqlx::query(self.query_factory.update_snapshot())
.bind(A::aggregate_type())
.bind(A::TYPE)
.bind(aggregate_id.as_str())
.bind(current_sequence as i32)
.bind(current_snapshot as i32)
Expand Down Expand Up @@ -330,7 +330,7 @@ impl PostgresEventRepository {
let payload = serde_json::to_value(&event.payload)?;
let metadata = serde_json::to_value(&event.metadata)?;
sqlx::query(inser_event_query)
.bind(A::aggregate_type())
.bind(A::TYPE)
.bind(event.aggregate_id.as_str())
.bind(event.sequence as i32)
.bind(event_type)
Expand Down
7 changes: 2 additions & 5 deletions persistence/postgres-es/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@ pub(crate) mod tests {

#[async_trait]
impl Aggregate for TestAggregate {
const TYPE: &'static str = "TestAggregate";
type Command = TestCommand;
type Event = TestEvent;
type Error = TestError;
type Services = TestServices;

fn aggregate_type() -> String {
"TestAggregate".to_string()
}

async fn handle(
&self,
_command: Self::Command,
Expand Down Expand Up @@ -115,7 +112,7 @@ pub(crate) mod tests {
SerializedEvent {
aggregate_id: id.to_string(),
sequence,
aggregate_type: TestAggregate::aggregate_type(),
aggregate_type: TestAggregate::TYPE.to_string(),
event_type: event.event_type(),
event_version: event.event_version(),
payload,
Expand Down
Loading

0 comments on commit 3615030

Please sign in to comment.