diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0649c6c4..fb78a26e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -148,11 +148,11 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - - name: Install nightly toolchain + - name: Install stable toolchain uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly + toolchain: stable override: true components: clippy,rustfmt diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..c0b1d54e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "rust-analyzer.cargo.features": "all", + "rust-analyzer.check.command": "clippy" +} diff --git a/eventually-macros/Cargo.toml b/eventually-macros/Cargo.toml index c083ee6a..c1a038a6 100644 --- a/eventually-macros/Cargo.toml +++ b/eventually-macros/Cargo.toml @@ -1,12 +1,25 @@ [package] name = "eventually-macros" +description = "Macros for eventually crate" version = "0.1.0" edition = "2021" +authors = ["Danilo Cianfrone "] +license = "MIT" +readme = "../README.md" +repository = "https://github.com/get-eventually/eventually-rs" + +categories = [ + "rust-patterns", + "web-programming", + "asynchronous", + "data-structures", +] +keywords = ["architecture", "ddd", "event-sourcing", "cqrs", "es"] [lib] proc-macro = true [dependencies] syn = { version = "1.0.109", features = ["full"] } -quote = "1.0.33" +quote = "1.0.35" eventually = { path = "../eventually" } diff --git a/eventually-macros/src/lib.rs b/eventually-macros/src/lib.rs index efae80fe..db6ad603 100644 --- a/eventually-macros/src/lib.rs +++ b/eventually-macros/src/lib.rs @@ -1,55 +1,21 @@ -//! Module containing useful macros for the [eventually] crate. +//! `eventually-macros` contains useful macros that provides +//! different implementations of traits and functionalities from [eventually]. -#![deny(unsafe_code, unused_qualifications, trivial_casts)] -#![warn(missing_docs)] -#![deny(clippy::all)] -#![warn(clippy::pedantic)] +#![deny(unsafe_code, unused_qualifications, trivial_casts, missing_docs)] +#![deny(clippy::all, clippy::pedantic, clippy::cargo)] use proc_macro::TokenStream; use quote::quote; -use syn::{parse_macro_input, AttributeArgs, Fields, ItemEnum, ItemStruct, Meta, NestedMeta, Path}; +use syn::{parse_macro_input, AttributeArgs, Fields, ItemStruct, Meta, NestedMeta, Path}; -#[proc_macro_derive(Message)] -pub fn derive_message(input: TokenStream) -> TokenStream { - let item = parse_macro_input!(input as ItemEnum); - let item_name = item.ident; - let event_prefix = item_name - .to_string() - .strip_suffix("Event") - .unwrap() - .to_owned(); - - let match_cases = item.variants.iter().fold(quote! {}, |acc, variant| { - let event_type = &variant.ident; - let event_name = format!("{}{}", event_prefix, event_type); - - quote! { - #acc - #item_name::#event_type { .. } => #event_name, - } - }); - - let result = quote! { - impl eventually::message::Message for #item_name { - fn name(&self) -> &'static str { - match self { - #match_cases - } - } - } - }; - - result.into() -} - -/// Implements a newtype to use the [eventually::aggregate::Root] instance with -/// user-defined [eventually::aggregate::Aggregate] types. +/// Implements a newtype to use the [`eventually::aggregate::Root`] instance with +/// user-defined [`eventually::aggregate::Aggregate`] types. /// /// # Context /// -/// The eventually API uses `aggregate::Root` to manage the versioning and -/// list of events to commit for an `Aggregate` instance. Domain commands -/// are to be implemented on the `aggregate::Root` instance, as it gives +/// The eventually API uses [`aggregate::Root`][eventually::aggregate::Root] +/// to manage the versioning and list of events to commit for an `Aggregate` instance. +/// Domain commands are to be implemented on the `aggregate::Root` instance, as it gives /// access to use `Root.record_that` or `Root.record_new` to record Domain Events. /// /// However, it's not possible to use `impl aggregate::Root` (`MyAggregateType` @@ -58,7 +24,11 @@ pub fn derive_message(input: TokenStream) -> TokenStream { /// /// This attribute macro makes the implementation of a newtype easy, as it Implements /// conversion traits from and to `aggregate::Root` and implements automatic deref -/// through [std::ops::Deref] and [std::ops::DerefMut]. +/// through [`std::ops::Deref`] and [`std::ops::DerefMut`]. +/// +/// # Panics +/// +/// This method will panic if the Aggregate Root type is not provided as a macro parameter. #[proc_macro_attribute] pub fn aggregate_root(args: TokenStream, item: TokenStream) -> TokenStream { let args = parse_macro_input!(args as AttributeArgs); diff --git a/eventually-postgres/Cargo.toml b/eventually-postgres/Cargo.toml index 2b0c8f4f..87882545 100644 --- a/eventually-postgres/Cargo.toml +++ b/eventually-postgres/Cargo.toml @@ -12,26 +12,27 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] -async-trait = "0.1.74" -chrono = "0.4.31" +anyhow = "1.0.80" +async-trait = "0.1.77" +chrono = "0.4.34" eventually = { path = "../eventually", version = "0.5.0", features = [ "serde-json", ] } -futures = "0.3.29" +futures = "0.3.30" lazy_static = "1.4.0" -regex = "1.10.2" -sqlx = { version = "0.7.2", features = [ +regex = "1.10.3" +sqlx = { version = "0.7.3", features = [ "runtime-tokio-rustls", "postgres", "migrate", ] } -thiserror = "1.0.50" +thiserror = "1.0.57" [dev-dependencies] -tokio = { version = "1.34.0", features = ["macros", "rt"] } +tokio = { version = "1.36.0", features = ["macros", "rt"] } eventually = { path = "../eventually", version = "0.5.0", features = [ "serde-json", ] } eventually-macros = { path = "../eventually-macros", version = "0.1.0" } -serde = { version = "1.0.192", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } rand = "0.8.5" diff --git a/eventually-postgres/src/aggregate.rs b/eventually-postgres/src/aggregate.rs index 1322c93d..d7adafda 100644 --- a/eventually-postgres/src/aggregate.rs +++ b/eventually-postgres/src/aggregate.rs @@ -1,42 +1,49 @@ +//! This module contains the implementation of the [`eventually::aggregate::Repository`] trait, +//! to work specifically with `PostgreSQL` databases. +//! +//! Check out the [Repository] type for more information. + use std::marker::PhantomData; +use anyhow::anyhow; use async_trait::async_trait; use eventually::aggregate::Aggregate; -use eventually::serde::Serde; use eventually::version::Version; -use eventually::{aggregate, version}; +use eventually::{aggregate, serde, version}; use sqlx::{PgPool, Postgres, Row}; +/// Implements the [`eventually::aggregate::Repository`] trait for +/// `PostgreSQL` databases. #[derive(Debug, Clone)] -pub struct Repository +pub struct Repository where T: Aggregate, ::Id: ToString, - OutT: From, - OutEvt: From, - TSerde: Serde, - EvtSerde: Serde, + Serde: serde::Serde, + EvtSerde: serde::Serde, { pool: PgPool, - aggregate_serde: TSerde, + aggregate_serde: Serde, event_serde: EvtSerde, t: PhantomData, - out_t: PhantomData, - out_evt: PhantomData, } -impl Repository +impl Repository where T: Aggregate, ::Id: ToString, - OutT: From, - OutEvt: From, - TSerde: Serde, - EvtSerde: Serde, + Serde: serde::Serde, + EvtSerde: serde::Serde, { + /// Runs the latest migrations necessary for the implementation to work, + /// then returns a new [`Repository`] instance. + /// + /// # Errors + /// + /// An error is returned if the migrations fail to run. pub async fn new( pool: PgPool, - aggregate_serde: TSerde, + aggregate_serde: Serde, event_serde: EvtSerde, ) -> Result { // Make sure the latest migrations are used before using the Repository instance. @@ -47,60 +54,16 @@ where aggregate_serde, event_serde, t: PhantomData, - out_t: PhantomData, - out_evt: PhantomData, }) } } -#[derive(Debug, thiserror::Error)] -pub enum GetError { - #[error("failed to fetch the aggregate state row: {0}")] - FetchAggregateRow(#[source] sqlx::Error), - #[error("failed to deserialize the aggregate state from the database row: {0}")] - DeserializeAggregate(#[source] Box), - #[error("failed to convert the aggregate state into its domain type: {0}")] - ConvertAggregate(#[source] Box), - #[error("database returned an error: {0}")] - Database(#[from] sqlx::Error), -} - -#[derive(Debug, thiserror::Error)] -pub enum SaveError { - #[error("failed to begin a new transaction: {0}")] - BeginTransaction(#[source] sqlx::Error), - #[error("conflict error detected: {0})")] - Conflict(#[source] version::ConflictError), - #[error("concurrent update detected, represented as a conflict error: {0})")] - Concurrency(#[source] version::ConflictError), - #[error("failed to save the new aggregate state: {0}")] - SaveAggregateState(#[source] sqlx::Error), - #[error("failed to append a new domain event: {0}")] - AppendEvent(#[source] sqlx::Error), - #[error("failed to commit transaction: {0}")] - CommitTransaction(#[source] sqlx::Error), - #[error("database returned an error: {0}")] - Database(#[from] sqlx::Error), -} - -impl From for Option { - fn from(err: SaveError) -> Self { - match err { - SaveError::Conflict(v) => Some(v), - SaveError::Concurrency(v) => Some(v), - _ => None, - } - } -} - -impl Repository +impl Repository where T: Aggregate + Send + Sync, ::Id: ToString, - OutT: From + Send + Sync, - OutEvt: From, - TSerde: Serde + Send + Sync, - EvtSerde: Serde, + Serde: serde::Serde + Send + Sync, + EvtSerde: serde::Serde + Send + Sync, { async fn save_aggregate_state( &self, @@ -108,10 +71,14 @@ where aggregate_id: &str, expected_version: Version, root: &mut aggregate::Root, - ) -> Result<(), SaveError> { - let out_state = root.to_aggregate_type::(); - let bytes_state = self.aggregate_serde.serialize(out_state); + ) -> Result<(), aggregate::repository::SaveError> { + let out_state = root.to_aggregate_type::(); + let bytes_state = self + .aggregate_serde + .serialize(out_state) + .map_err(|err| anyhow!("failed to serialize aggregate root state: {}", err))?; + #[allow(clippy::cast_possible_truncation)] sqlx::query("CALL upsert_aggregate($1, $2, $3, $4, $5)") .bind(aggregate_id) .bind(T::type_name()) @@ -121,15 +88,17 @@ where .execute(&mut **tx) .await .map_err(|err| match crate::check_for_conflict_error(&err) { - Some(err) => SaveError::Conflict(err), - None => match err.as_database_error().and_then(|err| err.code()) { - Some(code) if code == "40001" => { - SaveError::Concurrency(version::ConflictError { - expected: expected_version, - actual: root.version(), - }) - }, - _ => SaveError::SaveAggregateState(err), + Some(err) => aggregate::repository::SaveError::Conflict(err), + None => match err + .as_database_error() + .and_then(sqlx::error::DatabaseError::code) + { + Some(code) if code == "40001" => version::ConflictError { + expected: expected_version, + actual: root.version(), + } + .into(), + _ => anyhow!("failed to save aggregate state: {}", err).into(), }, })?; @@ -138,25 +107,14 @@ where } #[async_trait] -impl aggregate::Repository - for Repository +impl aggregate::repository::Getter for Repository where - T: Aggregate + TryFrom + Send + Sync, + T: Aggregate + Send + Sync, ::Id: ToString, - >::Error: std::error::Error + Send + Sync + 'static, - OutT: From + Send + Sync, - OutEvt: From + Send + Sync, - TSerde: Serde + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, - EvtSerde: Serde + Send + Sync, + Serde: serde::Serde + Send + Sync, + EvtSerde: serde::Serde + Send + Sync, { - type GetError = GetError; - type SaveError = SaveError; - - async fn get( - &self, - id: &T::Id, - ) -> Result, aggregate::repository::GetError> { + async fn get(&self, id: &T::Id) -> Result, aggregate::repository::GetError> { let aggregate_id = id.to_string(); let row = sqlx::query( @@ -170,27 +128,47 @@ where .await .map_err(|err| match err { sqlx::Error::RowNotFound => aggregate::repository::GetError::NotFound, - _ => aggregate::repository::GetError::Inner(GetError::FetchAggregateRow(err)), + _ => anyhow!("failed to fetch the aggregate state row: {}", err).into(), })?; - let version: i32 = row.try_get("version").map_err(GetError::Database)?; - let bytes_state: Vec = row.try_get("state").map_err(GetError::Database)?; + let version: i32 = row + .try_get("version") + .map_err(|err| anyhow!("failed to get 'version' column from row: {}", err))?; + + let bytes_state: Vec = row + .try_get("state") + .map_err(|err| anyhow!("failed to get 'state' column from row: {}", err))?; let aggregate: T = self .aggregate_serde - .deserialize(bytes_state) - .map_err(|err| GetError::DeserializeAggregate(Box::new(err))) - .and_then(|out_t| { - T::try_from(out_t).map_err(|err| GetError::ConvertAggregate(Box::new(err))) + .deserialize(&bytes_state) + .map_err(|err| { + anyhow!( + "failed to deserialize the aggregate state from the database row: {}", + err + ) })?; + #[allow(clippy::cast_sign_loss)] Ok(aggregate::Root::rehydrate_from_state( version as Version, aggregate, )) } +} - async fn save(&self, root: &mut aggregate::Root) -> Result<(), Self::SaveError> { +#[async_trait] +impl aggregate::repository::Saver for Repository +where + T: Aggregate + Send + Sync, + ::Id: ToString, + Serde: serde::Serde + Send + Sync, + EvtSerde: serde::Serde + Send + Sync, +{ + async fn save( + &self, + root: &mut aggregate::Root, + ) -> Result<(), aggregate::repository::SaveError> { let events_to_commit = root.take_uncommitted_events(); if events_to_commit.is_empty() { @@ -201,11 +179,12 @@ where .pool .begin() .await - .map_err(SaveError::BeginTransaction)?; + .map_err(|err| anyhow!("failed to begin transaction: {}", err))?; sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE") .execute(&mut *tx) - .await?; + .await + .map_err(|err| anyhow!("failed to begin transaction: {}", err))?; let aggregate_id = root.aggregate_id().to_string(); let expected_root_version = root.version() - (events_to_commit.len() as Version); @@ -213,6 +192,7 @@ where self.save_aggregate_state(&mut tx, &aggregate_id, expected_root_version, root) .await?; + #[allow(clippy::cast_possible_truncation)] crate::event::append_domain_events( &mut tx, &self.event_serde, @@ -221,9 +201,11 @@ where events_to_commit, ) .await - .map_err(SaveError::AppendEvent)?; + .map_err(|err| anyhow!("failed to append aggregate root domain events: {}", err))?; - tx.commit().await.map_err(SaveError::CommitTransaction)?; + tx.commit() + .await + .map_err(|err| anyhow!("failed to commit transaction: {}", err))?; Ok(()) } diff --git a/eventually-postgres/src/event.rs b/eventually-postgres/src/event.rs index f2a354a9..19cbfcb0 100644 --- a/eventually-postgres/src/event.rs +++ b/eventually-postgres/src/event.rs @@ -1,24 +1,21 @@ use std::marker::PhantomData; use std::string::ToString; +use anyhow::anyhow; use async_trait::async_trait; use chrono::Utc; use eventually::message::{Message, Metadata}; -use eventually::serde::Serde; use eventually::version::Version; -use eventually::{event, version}; +use eventually::{event, serde, version}; use futures::future::ready; use futures::{StreamExt, TryStreamExt}; -use regex::Regex; -use sqlx::postgres::{PgDatabaseError, PgRow}; +use sqlx::postgres::PgRow; use sqlx::{PgPool, Postgres, Row, Transaction}; #[derive(Debug, thiserror::Error)] pub enum StreamError { - #[error("failed to convert domain event from its serialization type: {0}")] - ConvertEvent(#[source] Box), #[error("failed to deserialize event from database: {0}")] - DeserializeEvent(#[source] Box), + DeserializeEvent(#[source] anyhow::Error), #[error("failed to get column '{name}' from result row: {error}")] ReadColumn { name: &'static str, @@ -29,50 +26,22 @@ pub enum StreamError { Database(#[source] sqlx::Error), } -#[derive(Debug, thiserror::Error)] -pub enum AppendError { - #[error("conflict error detected: {0}")] - Conflict(#[source] version::ConflictError), - #[error("concurrent update detected, represented as a conflict error: {0}")] - Concurrency(#[source] version::ConflictError), - #[error("failed to begin transaction: {0}")] - BeginTransaction(#[source] sqlx::Error), - #[error("failed to upsert new event stream version: {0}")] - UpsertEventStream(#[source] sqlx::Error), - #[error("failed to append a new domain event: {0}")] - AppendEvent(#[source] sqlx::Error), - #[error("failed to commit transaction: {0}")] - CommitTransaction(#[source] sqlx::Error), - #[error("db returned an error: {0}")] - Database(#[from] sqlx::Error), -} - -impl From for Option { - fn from(err: AppendError) -> Self { - match err { - AppendError::Conflict(v) => Some(v), - AppendError::Concurrency(v) => Some(v), - _ => None, - } - } -} - -pub(crate) async fn append_domain_event( +pub(crate) async fn append_domain_event( tx: &mut Transaction<'_, Postgres>, - serde: &impl Serde, + serde: &impl serde::Serializer, event_stream_id: &str, event_version: i32, new_event_stream_version: i32, event: event::Envelope, -) -> Result<(), sqlx::Error> +) -> anyhow::Result<()> where Evt: Message, - OutEvt: From, { let event_type = event.message.name(); - let out_event = OutEvt::from(event.message); - let serialized_event = serde.serialize(out_event); let mut metadata = event.metadata; + let serialized_event = serde + .serialize(event.message) + .map_err(|err| anyhow!("failed to serialize event message: {}", err))?; metadata.insert("Recorded-At".to_owned(), Utc::now().to_rfc3339()); metadata.insert( @@ -94,20 +63,21 @@ where Ok(()) } -pub(crate) async fn append_domain_events( +pub(crate) async fn append_domain_events( tx: &mut Transaction<'_, Postgres>, - serde: &impl Serde, + serde: &impl serde::Serializer, event_stream_id: &str, new_version: i32, events: Vec>, -) -> Result<(), sqlx::Error> +) -> anyhow::Result<()> where Evt: Message, - OutEvt: From, { + #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] let current_event_stream_version = new_version - (events.len() as i32); for (i, event) in events.into_iter().enumerate() { + #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] let event_version = current_event_stream_version + (i as i32) + 1; append_domain_event( @@ -125,28 +95,29 @@ where } #[derive(Debug, Clone)] -pub struct Store +pub struct Store where Id: ToString + Clone, - Evt: TryFrom, - OutEvt: From, - S: Serde, + Serde: serde::Serde, { pool: PgPool, - serde: S, + serde: Serde, id_type: PhantomData, evt_type: PhantomData, - out_evt_type: PhantomData, } -impl Store +impl Store where Id: ToString + Clone, - Evt: TryFrom, - OutEvt: From, - S: Serde, + Serde: serde::Serde, { - pub async fn new(pool: PgPool, serde: S) -> Result { + /// Runs the latest migrations necessary for the implementation to work, + /// then returns a new [`Store`] instance. + /// + /// # Errors + /// + /// An error is returned if the migrations fail to run. + pub async fn new(pool: PgPool, serde: Serde) -> Result { // Make sure the latest migrations are used before using the Store instance. crate::MIGRATIONS.run(&pool).await?; @@ -155,7 +126,6 @@ where serde, id_type: PhantomData, evt_type: PhantomData, - out_evt_type: PhantomData, }) } } @@ -168,65 +138,58 @@ where .map_err(|err| StreamError::ReadColumn { name, error: err }) } -impl Store +impl Store where Id: ToString + Clone + Send + Sync, - Evt: TryFrom + Message + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, - OutEvt: From + Send + Sync, - S: Serde + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, + Evt: Message + Send + Sync, + Serde: serde::Serde + Send + Sync, { fn event_row_to_persisted_event( &self, stream_id: Id, - row: PgRow, + row: &PgRow, ) -> Result, StreamError> { - let version_column: i32 = try_get_column(&row, "version")?; - let event_column: Vec = try_get_column(&row, "event")?; - let metadata_column: sqlx::types::Json = try_get_column(&row, "metadata")?; + let version_column: i32 = try_get_column(row, "version")?; + let event_column: Vec = try_get_column(row, "event")?; + let metadata_column: sqlx::types::Json = try_get_column(row, "metadata")?; let deserialized_event = self .serde - .deserialize(event_column) - .map_err(|err| StreamError::DeserializeEvent(Box::new(err)))?; - - let converted_event = Evt::try_from(deserialized_event) - .map_err(|err| StreamError::ConvertEvent(Box::new(err)))?; + .deserialize(&event_column) + .map_err(StreamError::DeserializeEvent)?; + #[allow(clippy::cast_sign_loss)] Ok(event::Persisted { stream_id, version: version_column as Version, event: event::Envelope { - message: converted_event, + message: deserialized_event, metadata: metadata_column.0, }, }) } } -impl event::Streamer for Store +impl event::store::Streamer for Store where Id: ToString + Clone + Send + Sync, - Evt: TryFrom + Message + std::fmt::Debug + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, - OutEvt: From + Send + Sync, - S: Serde + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, + Evt: Message + Send + Sync, + Serde: serde::Serde + Send + Sync, { type Error = StreamError; fn stream(&self, id: &Id, select: event::VersionSelect) -> event::Stream { + #[allow(clippy::cast_possible_truncation)] let from_version: i32 = match select { event::VersionSelect::All => 0, event::VersionSelect::From(v) => v as i32, }; let query = sqlx::query( - r#"SELECT version, event, metadata + r"SELECT version, event, metadata FROM events WHERE event_stream_id = $1 AND version >= $2 - ORDER BY version"#, + ORDER BY version", ); let id = id.clone(); @@ -236,43 +199,40 @@ where .bind(from_version) .fetch(&self.pool) .map_err(StreamError::Database) - .and_then(move |row| ready(self.event_row_to_persisted_event(id.clone(), row))) + .and_then(move |row| ready(self.event_row_to_persisted_event(id.clone(), &row))) .boxed() } } #[async_trait] -impl event::Appender for Store +impl event::store::Appender for Store where Id: ToString + Clone + Send + Sync, - Evt: TryFrom + Message + std::fmt::Debug + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, - OutEvt: From + Send + Sync, - S: Serde + Send + Sync, - >::Error: std::error::Error + Send + Sync + 'static, + Evt: Message + Send + Sync, + Serde: serde::Serde + Send + Sync, { - type Error = AppendError; - async fn append( &self, id: Id, - version_check: event::StreamVersionExpected, + version_check: version::Check, events: Vec>, - ) -> Result { + ) -> Result { let mut tx = self .pool .begin() .await - .map_err(AppendError::BeginTransaction)?; + .map_err(|err| anyhow!("failed to begin transaction: {}", err))?; sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE") .execute(&mut *tx) - .await?; + .await + .map_err(|err| anyhow!("failed to begin transaction: {}", err))?; let string_id = id.to_string(); let new_version: i32 = match version_check { - event::StreamVersionExpected::Any => { + version::Check::Any => { + #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] let events_len = events.len() as i32; sqlx::query("SELECT * FROM upsert_event_stream_with_no_version_check($1, $2)") @@ -280,11 +240,13 @@ where .bind(events_len) .fetch_one(&mut *tx) .await - .and_then(|row| row.try_get(0))? + .and_then(|row| row.try_get(0)) + .map_err(|err| anyhow!("failed to upsert new event stream version: {}", err))? }, - event::StreamVersionExpected::MustBe(v) => { + version::Check::MustBe(v) => { let new_version = v + (events.len() as Version); + #[allow(clippy::cast_possible_truncation)] sqlx::query("CALL upsert_event_stream($1, $2, $3)") .bind(&string_id) .bind(v as i32) @@ -292,15 +254,21 @@ where .execute(&mut *tx) .await .map_err(|err| match crate::check_for_conflict_error(&err) { - Some(err) => AppendError::Conflict(err), - None => match err.as_database_error().and_then(|err| err.code()) { + Some(err) => event::store::AppendError::Conflict(err), + None => match err + .as_database_error() + .and_then(sqlx::error::DatabaseError::code) + { Some(code) if code == "40001" => { - AppendError::Concurrency(version::ConflictError { + event::store::AppendError::Conflict(version::ConflictError { expected: v, actual: new_version, }) }, - _ => AppendError::UpsertEventStream(err), + _ => event::store::AppendError::Internal(anyhow!( + "failed to upsert new event stream version: {}", + err + )), }, }) .map(|_| new_version as i32)? @@ -309,10 +277,13 @@ where append_domain_events(&mut tx, &self.serde, &string_id, new_version, events) .await - .map_err(AppendError::AppendEvent)?; + .map_err(|err| anyhow!("failed to append new domain events: {}", err))?; - tx.commit().await.map_err(AppendError::CommitTransaction)?; + tx.commit() + .await + .map_err(|err| anyhow!("failed to commit transaction: {}", err))?; + #[allow(clippy::cast_sign_loss)] Ok(new_version as Version) } } diff --git a/eventually-postgres/src/lib.rs b/eventually-postgres/src/lib.rs index c90e5c62..c51849c6 100644 --- a/eventually-postgres/src/lib.rs +++ b/eventually-postgres/src/lib.rs @@ -1,12 +1,17 @@ +//! `eventually-postgres` contains different implementations of traits +//! from the [eventually] crate that are specific for `PostgreSQL` databases. +//! +//! Check out the [`aggregate::Repository`] and [`event::Store`] implementations +//! to know more. + #![deny(unsafe_code, unused_qualifications, trivial_casts)] -#![deny(clippy::all)] -#![warn(clippy::pedantic)] +#![deny(clippy::all, clippy::pedantic, clippy::cargo)] #![warn(missing_docs)] pub mod aggregate; pub mod event; -pub static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("./migrations"); +pub(crate) static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("./migrations"); use eventually::version::{ConflictError, Version}; use lazy_static::lazy_static; @@ -14,7 +19,7 @@ use regex::Regex; lazy_static! { static ref CONFLICT_ERROR_REGEX: Regex = - Regex::new(r#"version check failed, expected: (?P\d), got: (?P\d)"#) + Regex::new(r"version check failed, expected: (?P\d), got: (?P\d)") .expect("regex compiles successfully"); } @@ -27,7 +32,10 @@ pub(crate) fn check_for_conflict_error(err: &sqlx::Error) -> Option() .expect("field should be a valid integer"); - v as Version + #[allow(clippy::cast_sign_loss)] + { + v as Version + } } if let sqlx::Error::Database(ref pg_err) = err { diff --git a/eventually-postgres/tests/aggregate_repository.rs b/eventually-postgres/tests/aggregate_repository.rs index 7c1565cf..06df6173 100644 --- a/eventually-postgres/tests/aggregate_repository.rs +++ b/eventually-postgres/tests/aggregate_repository.rs @@ -1,9 +1,6 @@ -use eventually::aggregate::repository::GetError; -use eventually::aggregate::Repository; -use eventually::serde::json::JsonSerde; -use eventually::version; +use eventually::aggregate::repository::{self, GetError, Getter, Saver}; +use eventually::serde; use eventually_postgres::aggregate; -use futures::TryFutureExt; use rand::Rng; mod setup; @@ -16,8 +13,8 @@ async fn it_works() { let aggregate_repository = aggregate::Repository::new( pool, - JsonSerde::::default(), - JsonSerde::::default(), + serde::Json::::default(), + serde::Json::::default(), ) .await .unwrap(); @@ -65,8 +62,8 @@ async fn it_detects_data_races_and_returns_conflict_error() { let aggregate_repository = aggregate::Repository::new( pool, - JsonSerde::::default(), - JsonSerde::::default(), + serde::Json::::default(), + serde::Json::::default(), ) .await .unwrap(); @@ -84,17 +81,13 @@ async fn it_detects_data_races_and_returns_conflict_error() { let mut cloned_root = root.clone(); let result = futures::join!( - aggregate_repository - .save(&mut root) - .map_err(Option::::from), - aggregate_repository - .save(&mut cloned_root) - .map_err(Option::::from), + aggregate_repository.save(&mut root), + aggregate_repository.save(&mut cloned_root), ); match result { - (Ok(()), Err(Some(_))) => (), - (Err(Some(_)), Ok(())) => (), + (Ok(()), Err(repository::SaveError::Conflict(_))) => (), + (Err(repository::SaveError::Conflict(_)), Ok(())) => (), (first, second) => panic!( "invalid state detected, first: {:?}, second: {:?}", first, second diff --git a/eventually-postgres/tests/event_store.rs b/eventually-postgres/tests/event_store.rs index 83d06fa8..704af90d 100644 --- a/eventually-postgres/tests/event_store.rs +++ b/eventually-postgres/tests/event_store.rs @@ -1,9 +1,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; -use eventually::event::{Appender, Persisted, StreamVersionExpected, Streamer, VersionSelect}; -use eventually::serde::json::JsonSerde; -use eventually::version; +use eventually::event::store::{self, AppendError, Appender, Streamer}; +use eventually::event::{Persisted, VersionSelect}; use eventually::version::Version; +use eventually::{serde, version}; use eventually_postgres::event; use futures::TryStreamExt; use rand::Rng; @@ -16,7 +16,7 @@ async fn append_with_no_version_check_works() { .await .expect("connection to the database should work"); - let event_store = event::Store::new(pool, JsonSerde::::default()) + let event_store = event::Store::new(pool, serde::Json::::default()) .await .unwrap(); @@ -49,7 +49,7 @@ async fn append_with_no_version_check_works() { let new_event_stream_version = event_store .append( event_stream_id.clone(), - StreamVersionExpected::Any, + version::Check::Any, expected_events, ) .await @@ -72,7 +72,7 @@ async fn it_works_with_version_check_for_conflict() { .await .expect("connection to the database should work"); - let event_store = event::Store::new(pool, JsonSerde::::default()) + let event_store = event::Store::new(pool, serde::Json::::default()) .await .unwrap(); @@ -105,7 +105,7 @@ async fn it_works_with_version_check_for_conflict() { let new_event_stream_version = event_store .append( event_stream_id.clone(), - StreamVersionExpected::MustBe(0), + version::Check::MustBe(0), expected_events, ) .await @@ -123,23 +123,22 @@ async fn it_works_with_version_check_for_conflict() { // Appending twice the with an unexpected Event Stream version should // result in a version::ConflictError. - let error: Option = event_store - .append( - event_stream_id.clone(), - StreamVersionExpected::MustBe(0), - vec![], - ) + let error = event_store + .append(event_stream_id.clone(), version::Check::MustBe(0), vec![]) .await - .expect_err("the event store should have returned a conflict error") - .into(); - - assert_eq!( - error, - Some(version::ConflictError { - expected: 0, - actual: new_event_stream_version, - }) - ); + .expect_err("the event store should have returned a conflict error"); + + if let AppendError::Conflict(err) = error { + return assert_eq!( + err, + version::ConflictError { + expected: 0, + actual: new_event_stream_version, + } + ); + } + + panic!("unexpected error received: {}", error); } #[tokio::test] @@ -148,7 +147,7 @@ async fn it_handles_concurrent_writes_to_the_same_stream() { .await .expect("connection to the database should work"); - let event_store = event::Store::new(pool, JsonSerde::::default()) + let event_store = event::Store::new(pool, serde::Json::::default()) .await .unwrap(); @@ -168,23 +167,20 @@ async fn it_handles_concurrent_writes_to_the_same_stream() { let result = futures::join!( event_store.append( event_stream_id.clone(), - StreamVersionExpected::MustBe(0), + version::Check::MustBe(0), expected_events.clone(), ), event_store.append( event_stream_id.clone(), - StreamVersionExpected::MustBe(0), + version::Check::MustBe(0), expected_events, ) ); match result { - (Ok(_), Err(err)) | (Err(err), Ok(_)) => { - if let event::AppendError::Conflict(_) | event::AppendError::Concurrency(_) = err { - // This is the expected scenario :) - } else { - panic!("unexpected error, {:?}", err); - } + (Ok(_), Err(store::AppendError::Conflict(_))) + | (Err(store::AppendError::Conflict(_)), Ok(_)) => { + // This is the expected scenario :) }, (first, second) => panic!( "invalid state detected, first: {:?}, second: {:?}", diff --git a/eventually/Cargo.toml b/eventually/Cargo.toml index ea257b27..ecaefe67 100644 --- a/eventually/Cargo.toml +++ b/eventually/Cargo.toml @@ -24,18 +24,16 @@ serde-json = ["dep:serde_json"] full = ["serde-prost", "serde-json", "tracing"] [dependencies] -async-trait = "0.1.74" -futures = "0.3.29" -thiserror = "1.0.50" -prost = { version = "0.12.1", optional = true } -serde_json = { version = "1.0.108", optional = true } -serde = { version = "1.0.192", features = ["derive"] } +anyhow = "1.0.80" +async-trait = "0.1.77" +futures = "0.3.30" +thiserror = "1.0.57" +prost = { version = "0.12.3", optional = true } +serde_json = { version = "1.0.114", optional = true } +serde = { version = "1.0.197", features = ["derive"] } tracing = { version = "0.1.40", features = ["async-await"], optional = true } -anyhow = "1.0.75" [dev-dependencies] -# NOTE: this is only used for test components and assertions. -anyhow = "1.0.75" lazy_static = "1.4.0" -serde_json = "1.0.108" -tokio = { version = "1.34.0", features = ["macros", "rt-multi-thread"] } +serde_json = "1.0.114" +tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } diff --git a/eventually/src/aggregate/mod.rs b/eventually/src/aggregate/mod.rs index 87f49744..a5b0e7c6 100644 --- a/eventually/src/aggregate/mod.rs +++ b/eventually/src/aggregate/mod.rs @@ -29,7 +29,9 @@ use crate::version::Version; use crate::{event, message}; pub mod repository; +pub mod test; +use futures::TryStreamExt; pub use repository::{EventSourced as EventSourcedRepository, Repository}; /// An Aggregate represents a Domain Model that, through an Aggregate [Root], @@ -176,51 +178,6 @@ where std::mem::take(&mut self.recorded_events) } - /// Rehydrates an [Aggregate] Root from its state and version. - /// Useful for [Repository] implementations outside the [EventSourcedRepository] one. - #[doc(hidden)] - pub fn rehydrate_from_state(version: Version, aggregate: T) -> Root { - Root { - version, - aggregate, - recorded_events: Vec::default(), - } - } - - /// Creates a new [Root] instance from a Domain [Event] - /// while rehydrating an [Aggregate]. - /// - /// # Errors - /// - /// The method can return an error if the event to apply is unexpected - /// given the current state of the Aggregate. - #[doc(hidden)] - pub(crate) fn rehydrate_from(event: event::Envelope) -> Result, T::Error> { - Ok(Root { - version: 1, - aggregate: T::apply(None, event.message)?, - recorded_events: Vec::default(), - }) - } - - /// Applies a new Domain [Event] to the [Root] while rehydrating - /// an [Aggregate]. - /// - /// # Errors - /// - /// The method can return an error if the event to apply is unexpected - /// given the current state of the Aggregate. - #[doc(hidden)] - pub(crate) fn apply_rehydrated_event( - mut self, - event: event::Envelope, - ) -> Result, T::Error> { - self.aggregate = T::apply(Some(self.aggregate), event.message)?; - self.version += 1; - - Ok(self) - } - /// Creates a new [Aggregate] [Root] instance by applying the specified /// Domain Event. /// @@ -285,6 +242,106 @@ where } } +/// List of possible errors that can be returned by [`Root::rehydrate_async`]. +#[derive(Debug, thiserror::Error)] +pub enum RehydrateError { + /// Error returned during rehydration when the [Aggregate Root][Root] + /// is applying a Domain Event using [Aggregate::apply]. + /// + /// This usually implies the Event Stream for the [Aggregate] + /// contains corrupted or unexpected data. + #[error("failed to apply domain event while rehydrating aggregate: {0}")] + Domain(#[source] T), + + /// This error is returned by [Root::rehydrate_async] when the underlying + /// [futures::TryStream] has returned an error. + #[error("failed to rehydrate aggregate from event stream: {0}")] + Inner(#[source] I), +} + +impl Root +where + T: Aggregate, +{ + /// Rehydrates an [Aggregate] Root from its state and version. + /// Useful for [Repository] implementations outside the [EventSourcedRepository] one. + #[doc(hidden)] + pub fn rehydrate_from_state(version: Version, aggregate: T) -> Root { + Root { + version, + aggregate, + recorded_events: Vec::default(), + } + } + + /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events. + #[doc(hidden)] + pub(crate) fn rehydrate( + mut stream: impl Iterator>, + ) -> Result>, T::Error> { + stream.try_fold(None, |ctx: Option>, event| { + let new_ctx_result = match ctx { + None => Root::::rehydrate_from(event), + Some(ctx) => ctx.apply_rehydrated_event(event), + }; + + Ok(Some(new_ctx_result?)) + }) + } + + /// Rehydrates an [Aggregate Root][Root] from a stream of Domain Events. + #[doc(hidden)] + pub(crate) async fn rehydrate_async( + stream: impl futures::TryStream, Error = Err>, + ) -> Result>, RehydrateError> { + stream + .map_err(RehydrateError::Inner) + .try_fold(None, |ctx: Option>, event| async { + let new_ctx_result = match ctx { + None => Root::::rehydrate_from(event), + Some(ctx) => ctx.apply_rehydrated_event(event), + }; + + Ok(Some(new_ctx_result.map_err(RehydrateError::Domain)?)) + }) + .await + } + + /// Creates a new [Root] instance from a Domain [Event] + /// while rehydrating an [Aggregate]. + /// + /// # Errors + /// + /// The method can return an error if the event to apply is unexpected + /// given the current state of the Aggregate. + #[doc(hidden)] + pub(crate) fn rehydrate_from(event: event::Envelope) -> Result, T::Error> { + Ok(Root { + version: 1, + aggregate: T::apply(None, event.message)?, + recorded_events: Vec::default(), + }) + } + + /// Applies a new Domain [Event] to the [Root] while rehydrating + /// an [Aggregate]. + /// + /// # Errors + /// + /// The method can return an error if the event to apply is unexpected + /// given the current state of the Aggregate. + #[doc(hidden)] + pub(crate) fn apply_rehydrated_event( + mut self, + event: event::Envelope, + ) -> Result, T::Error> { + self.aggregate = T::apply(Some(self.aggregate), event.message)?; + self.version += 1; + + Ok(self) + } +} + // The warnings are happening due to usage of the methods only inside #[cfg(test)] #[allow(dead_code)] #[doc(hidden)] @@ -365,9 +422,7 @@ pub(crate) mod test_user_domain { return Err(UserError::EmptyPassword); } - Ok(Self::record_new( - UserEvent::WasCreated { email, password }.into(), - )?) + Self::record_new(UserEvent::WasCreated { email, password }.into()) } pub(crate) fn change_password(&mut self, password: String) -> Result<(), UserError> { @@ -384,11 +439,11 @@ pub(crate) mod test_user_domain { #[allow(clippy::semicolon_if_nothing_returned)] // False positives :shrugs: #[cfg(test)] -mod test { +mod tests { use std::error::Error; + use crate::aggregate::repository::{Getter, Saver}; use crate::aggregate::test_user_domain::{User, UserEvent}; - use crate::aggregate::Repository; use crate::event::store::EventStoreExt; use crate::{aggregate, event, version}; diff --git a/eventually/src/aggregate/repository.rs b/eventually/src/aggregate/repository.rs new file mode 100644 index 00000000..8c3591dc --- /dev/null +++ b/eventually/src/aggregate/repository.rs @@ -0,0 +1,161 @@ +//! Module containing the definition of a [Repository], to fetch and store +//! Aggregate Roots from a data store. +//! +//! If you are looking for the Event-sourced implementation of an Aggregate Repository, +//! take a look at [`EventSourced`]. + +use std::fmt::Debug; +use std::marker::PhantomData; + +use async_trait::async_trait; +use futures::TryStreamExt; + +use crate::aggregate::Aggregate; +use crate::{aggregate, event, version}; + +/// All possible errors returned by [`Getter::get`]. +#[derive(Debug, thiserror::Error)] +pub enum GetError { + /// Error returned when the [Aggregate Root][aggregate::Root] could not be found in the data store. + #[error("failed to get aggregate root: not found")] + NotFound, + /// Error returned when the [Getter] implementation has encountered an error. + #[error("failed to get aggregate root, an error occurred: {0}")] + Internal(#[from] anyhow::Error), +} + +/// Trait used to implement read access to a data store from which +/// to load an [aggregate::Root] instance, given its id. +#[async_trait] +pub trait Getter: Send + Sync +where + T: Aggregate, +{ + /// Loads an [aggregate::Root] instance from the data store, + /// referenced by its unique identifier. + async fn get(&self, id: &T::Id) -> Result, GetError>; +} + +/// All possible errors returned by [`Saver::save`]. +#[derive(Debug, thiserror::Error)] +pub enum SaveError { + /// Error returned when [Saver::save] encounters a conflict error while saving the new Aggregate Root. + #[error("failed to save aggregate root: {0}")] + Conflict(#[from] version::ConflictError), + /// Error returned when the [Saver] implementation has encountered an error. + #[error("failed to save aggregate root, an error occurred: {0}")] + Internal(#[from] anyhow::Error), +} + +/// Trait used to implement write access to a data store, which can be used +/// to save the latest state of an [aggregate::Root] instance. +#[async_trait] +pub trait Saver: Send + Sync +where + T: Aggregate, +{ + /// Saves a new version of an [aggregate::Root] instance to the data store. + async fn save(&self, root: &mut aggregate::Root) -> Result<(), SaveError>; +} + +/// A Repository is an object that allows to load and save +/// an [Aggregate Root][aggregate::Root] from and to a persistent data store. +pub trait Repository: Getter + Saver + Send + Sync +where + T: Aggregate, +{ +} + +impl Repository for R +where + T: Aggregate, + R: Getter + Saver + Send + Sync, +{ +} + +/// An Event-sourced implementation of the [Repository] interface. +/// +/// It uses an [Event Store][event::Store] instance to stream Domain Events +/// for a particular Aggregate, and append uncommitted Domain Events +/// recorded by an Aggregate Root. +#[derive(Debug, Clone)] +pub struct EventSourced +where + T: Aggregate, + S: event::Store, +{ + store: S, + aggregate: PhantomData, +} + +impl From for EventSourced +where + T: Aggregate, + S: event::Store, +{ + fn from(store: S) -> Self { + Self { + store, + aggregate: PhantomData, + } + } +} + +#[async_trait] +impl Getter for EventSourced +where + T: Aggregate, + T::Id: Clone, + T::Error: std::error::Error + Send + Sync + 'static, + S: event::Store, + >::Error: + std::error::Error + Send + Sync + 'static, +{ + async fn get(&self, id: &T::Id) -> Result, GetError> { + let stream = self + .store + .stream(id, event::VersionSelect::All) + .map_ok(|persisted| persisted.event); + + let ctx = aggregate::Root::::rehydrate_async(stream) + .await + .map_err(anyhow::Error::from) + .map_err(GetError::Internal)?; + + ctx.ok_or(GetError::NotFound) + } +} + +#[async_trait] +impl Saver for EventSourced +where + T: Aggregate, + T::Id: Clone, + S: event::Store, +{ + async fn save(&self, root: &mut aggregate::Root) -> Result<(), SaveError> { + let events_to_commit = root.take_uncommitted_events(); + let aggregate_id = root.aggregate_id(); + + if events_to_commit.is_empty() { + return Ok(()); + } + + let current_event_stream_version = + root.version() - (events_to_commit.len() as version::Version); + + self.store + .append( + aggregate_id.clone(), + version::Check::MustBe(current_event_stream_version), + events_to_commit, + ) + .await + .map_err(|err| match err { + event::store::AppendError::Conflict(err) => SaveError::Conflict(err), + event::store::AppendError::Internal(err) => SaveError::Internal(err), + })?; + + Ok(()) + } +} diff --git a/eventually/src/aggregate/repository/any.rs b/eventually/src/aggregate/repository/any.rs deleted file mode 100644 index a5923c54..00000000 --- a/eventually/src/aggregate/repository/any.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! Contains an super-type that decorates a [Repository] implementation -//! to return opaque errors using [anyhow::Error]. -//! -//! Check out [AnyRepository] for more information. - -use std::marker::PhantomData; - -use async_trait::async_trait; - -use crate::aggregate::repository::{self, Repository}; -use crate::aggregate::{self, Aggregate}; - -/// Represents a generic, opaque kind of error. Powered by [anyhow::Error], -/// but implemented in its own type to satisfy [std::error::Error] trait. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct AnyError(#[from] anyhow::Error); - -/// A [Repository] trait that uses opaque errors through [AnyError], -/// rather than requiring concrete error types for [Repository::get] and [Repository::save]. -/// -/// This trait makes it easier to be used with trait objects, like `Arc>`. -/// -/// Use [AnyRepositoryExt::with_any_errors] to make use of this trait. -pub trait AnyRepository: Repository -where - T: Aggregate, -{ -} - -/// Extension trait for [Repository] instances which errors implement [std::error::Error], -/// that can be used to convert to an [AnyRepository] instance instead. -pub trait AnyRepositoryExt: Sized + Repository -where - T: Aggregate, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, -{ - /// Converts the current [Repository] instance into an [AnyRepository] implementation. - #[must_use] - fn with_any_errors(self) -> AnyRepositoryImpl { - AnyRepositoryImpl { - inner: self, - aggregate_type: PhantomData, - } - } -} - -impl AnyRepositoryExt for R -where - T: Aggregate, - R: Repository, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, -{ -} - -/// The concrete super-type implementation that decorates a [Repository] -/// to return opaque errors using [AnyError]. -/// -/// Must use the [AnyRepositoryExt] super-trait extension to obtain an instance -/// of this type. -pub struct AnyRepositoryImpl -where - T: Aggregate, - R: Repository, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, -{ - inner: R, - aggregate_type: PhantomData, -} - -#[async_trait] -impl Repository for AnyRepositoryImpl -where - T: Aggregate, - R: Repository, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, -{ - type GetError = AnyError; - type SaveError = AnyError; - - async fn get( - &self, - id: &T::Id, - ) -> Result, repository::GetError> { - self.inner.get(id).await.map_err(|err| match err { - repository::GetError::NotFound => repository::GetError::NotFound, - repository::GetError::Inner(v) => { - repository::GetError::Inner(anyhow::Error::from(v).into()) - }, - }) - } - - async fn save(&self, root: &mut aggregate::Root) -> Result<(), Self::SaveError> { - Ok(self.inner.save(root).await.map_err(anyhow::Error::from)?) - } -} - -impl AnyRepository for AnyRepositoryImpl -where - T: Aggregate, - R: Repository, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, -{ -} diff --git a/eventually/src/aggregate/repository/event_sourced.rs b/eventually/src/aggregate/repository/event_sourced.rs deleted file mode 100644 index 27fe8e33..00000000 --- a/eventually/src/aggregate/repository/event_sourced.rs +++ /dev/null @@ -1,129 +0,0 @@ -//! Contains the impementation of the [EventSourced] [Repository] trait, -//! necessary to use the Event Sourcing pattern to rehydrate an [Aggregate] -//! state using an [event::Store]. - -use std::fmt::Debug; -use std::marker::PhantomData; - -use async_trait::async_trait; -use futures::TryStreamExt; - -use crate::aggregate::{self, repository, Aggregate, Repository}; -use crate::event; -use crate::version::Version; - -/// List of possible errors that can be returned by [EventSourced::get]. -#[derive(Debug, thiserror::Error)] -pub enum GetError { - /// This error is returned by [`EventSourced::get`] when - /// the desired [Aggregate] returns an error while applying a Domain Event - /// from the Event [Store][`event::Store`] during the _rehydration_ phase. - /// - /// This usually implies the Event Stream for the Aggregate - /// contains corrupted or unexpected data. - #[error("failed to rehydrate aggregate from event stream: {0}")] - Rehydrate(#[source] R), - - /// This error is returned by [`EventSourced::get`] when the - /// [Event Store][`event::Store`] used by the Repository returns - /// an unexpected error while streaming back the Aggregate's Event Stream. - #[error("event store failed while streaming events: {0}")] - Stream(#[source] S), -} - -/// List of possible errors that can be returned by [EventSourced::save]. -#[derive(Debug, thiserror::Error)] -pub enum SaveError { - /// This error is returned by [EventSourced::save] when - /// the [event::Store] used by the Repository returns - /// an error while saving the uncommitted Domain Events - /// to the Aggregate's Event Stream. - #[error("event store failed while appending events: {0}")] - Append(#[from] T), -} - -/// An Event-sourced implementation of the [Repository] interface. -/// -/// It uses an [Event Store][`event::Store`] instance to stream Domain Events -/// for a particular Aggregate, and append uncommitted Domain Events -/// recorded by an Aggregate Root. -#[derive(Debug, Clone)] -pub struct EventSourced -where - T: Aggregate, - S: event::Store, -{ - store: S, - aggregate: PhantomData, -} - -impl From for EventSourced -where - T: Aggregate, - S: event::Store, -{ - fn from(store: S) -> Self { - Self { - store, - aggregate: PhantomData, - } - } -} - -#[async_trait] -impl Repository for EventSourced -where - T: Aggregate, - T::Id: Clone, - T::Error: Debug, - S: event::Store, -{ - type GetError = GetError>::Error>; - type SaveError = SaveError<>::Error>; - - async fn get( - &self, - id: &T::Id, - ) -> Result, repository::GetError> { - let ctx = self - .store - .stream(id, event::VersionSelect::All) - .map_ok(|persisted| persisted.event) - .map_err(GetError::Stream) - .try_fold(None, |ctx: Option>, event| async { - let new_ctx_result = match ctx { - None => aggregate::Root::::rehydrate_from(event), - Some(ctx) => ctx.apply_rehydrated_event(event), - }; - - let new_ctx = new_ctx_result.map_err(GetError::Rehydrate)?; - - Ok(Some(new_ctx)) - }) - .await?; - - ctx.ok_or(repository::GetError::NotFound) - } - - async fn save(&self, root: &mut aggregate::Root) -> Result<(), Self::SaveError> { - let events_to_commit = root.take_uncommitted_events(); - let aggregate_id = root.aggregate_id(); - - if events_to_commit.is_empty() { - return Ok(()); - } - - let current_event_stream_version = root.version() - (events_to_commit.len() as Version); - - self.store - .append( - aggregate_id.clone(), - event::StreamVersionExpected::MustBe(current_event_stream_version), - events_to_commit, - ) - .await - .map_err(SaveError::Append)?; - - Ok(()) - } -} diff --git a/eventually/src/aggregate/repository/mod.rs b/eventually/src/aggregate/repository/mod.rs deleted file mode 100644 index 59419648..00000000 --- a/eventually/src/aggregate/repository/mod.rs +++ /dev/null @@ -1,56 +0,0 @@ -//! Module containing the definition of a [Repository], to fetch and store -//! Aggregate Roots from a data store. -//! -//! If you are looking for the Event-sourced implementation of an Aggregate Repository, -//! take a look at [EventSourced]. - -pub mod any; -pub mod event_sourced; - -use std::fmt::Debug; - -use async_trait::async_trait; - -// Public re-exports. -pub use self::any::{AnyRepository, AnyRepositoryExt}; -pub use self::event_sourced::EventSourced; -// Crate imports. -use crate::aggregate; -use crate::aggregate::Aggregate; - -/// Error returned by a call to [Repository::get]. -/// This type is used to check whether an Aggregate Root has been found or not. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum GetError { - /// This error is retured by [Repository::get] when the - /// desired Aggregate [Root] could not be found in the data store. - #[error("aggregate root was not found")] - NotFound, - - /// Error variant returned by [Repository::get] when the underlying - /// concrete implementation has encountered an error. - #[error("failed to get aggregate root: {0}")] - Inner(#[from] I), -} - -/// A Repository is an object that allows to load and save -/// an [Aggregate Root][Root] from and to a persistent data store. -#[async_trait] -pub trait Repository: Send + Sync -where - T: Aggregate, -{ - /// Error type returned by the concrete implementation of the trait. - /// It is returned in [get] using [GetError::Other]. - type GetError: Send + Sync; - - /// Error type returned by the concrete implementation of the trait. - type SaveError: Send + Sync; - - /// Loads an Aggregate Root instance from the data store, - /// referenced by its unique identifier. - async fn get(&self, id: &T::Id) -> Result, GetError>; - - /// Saves a new version of an Aggregate Root instance to the data store. - async fn save(&self, root: &mut aggregate::Root) -> Result<(), Self::SaveError>; -} diff --git a/eventually/src/aggregate/test.rs b/eventually/src/aggregate/test.rs new file mode 100644 index 00000000..9749dad1 --- /dev/null +++ b/eventually/src/aggregate/test.rs @@ -0,0 +1,197 @@ +//! Module exposing a [Scenario] type to test [Aggregate]s using +//! the [given-then-when canvas](https://www.agilealliance.org/glossary/gwt/). + +use std::fmt::Debug; +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::Arc; + +use crate::aggregate::{Aggregate, Root}; +use crate::event; + +/// A test scenario that can be used to test an [Aggregate] and [Aggregate Root][Root] +/// using a [given-then-when canvas](https://www.agilealliance.org/glossary/gwt/) approach. +#[derive(Default, Clone, Copy)] +pub struct Scenario(PhantomData) +where + T: Aggregate, + T::Id: Clone, + T::Event: Debug + PartialEq, + T::Error: Debug; + +impl Scenario +where + T: Aggregate, + T::Id: Clone, + T::Event: Debug + PartialEq, + T::Error: Debug, +{ + /// Specifies the precondition for the test [Scenario]. + /// + /// In other words, it can be used to specify all the Domain [Event][event::Envelope]s + /// that make up the state of the [Aggregate Root][Root]. + #[must_use] + pub fn given(self, events: Vec>) -> ScenarioGiven { + ScenarioGiven { + events, + marker: PhantomData, + } + } + + /// Specifies the action/mutation to execute in this [Scenario]. + /// + /// Use this branch when testing actions/mutations that create new [Aggregate Root][Root] + /// instances, i.e. with no prior Domain Events recorded. + #[must_use] + pub fn when(self, f: F) -> ScenarioWhen + where + R: From>, + F: Fn() -> Result, + { + ScenarioWhen { + mutate: f, + marker: PhantomData, + err_marker: PhantomData, + root_marker: PhantomData, + } + } +} + +#[doc(hidden)] +pub struct ScenarioGiven +where + T: Aggregate, + T::Id: Clone, + T::Event: Debug + PartialEq, + T::Error: Debug, +{ + events: Vec>, + marker: PhantomData, +} + +impl ScenarioGiven +where + T: Aggregate, + T::Id: Clone, + T::Event: Debug + PartialEq, + T::Error: Debug, +{ + /// Specifies the action/mutation to execute in this [Scenario]. + /// + /// Use this branch when testing actions/mutations that modify the state + /// of an [Aggregate Root][Root] that already exists, by specifying its + /// current state using [`Scenario::given`]. + /// + /// # Panics + /// + /// Please note: as this method expects that an [Aggregate Root][Root] instance + /// is available when executing the domain method, it will panic if a `Root` instance + /// could not be obtained by rehydrating the [`Aggregate`] state through the events + /// provided in [`Scenario::given`]. + #[must_use] + pub fn when(self, f: F) -> ScenarioWhen Result, Err> + where + R: From>, + F: Fn(&mut R) -> Result<(), Err>, + { + let events = Arc::new(self.events); + + ScenarioWhen { + marker: PhantomData, + err_marker: PhantomData, + root_marker: PhantomData, + mutate: move || -> Result { + let mut root: R = Root::::rehydrate(events.iter().cloned()) + .expect( + "no error is expected when applying domain events from a 'given' clause", + ) + .expect("an aggregate root instance is expected, but none was produced") + .into(); + + match f(&mut root) { + Ok(()) => Ok(root), + Err(err) => Err(err), + } + }, + } + } +} + +#[doc(hidden)] +pub struct ScenarioWhen +where + T: Aggregate, + T::Event: Debug + PartialEq, + R: From>, + F: Fn() -> Result, +{ + mutate: F, + marker: PhantomData, + err_marker: PhantomData, + root_marker: PhantomData, +} + +impl ScenarioWhen +where + T: Aggregate, + T::Event: Debug + PartialEq, + R: From> + Deref>, + F: Fn() -> Result, +{ + /// Specifies that the outcome of the [Scenario] is positive, and + /// should result in the creation of the specified Domain Events. + #[must_use] + pub fn then(self, result: Vec>) -> ScenarioThen { + ScenarioThen { + mutate: self.mutate, + expected: Ok(result), + marker: PhantomData, + } + } + + /// Specified that the outcome of the [Scenario] is negative. + /// + /// Use this method to assert the specific Error value that the + /// [Aggregate Root][Root] method should return. + #[must_use] + pub fn then_error(self, err: Err) -> ScenarioThen { + ScenarioThen { + mutate: self.mutate, + expected: Err(err), + marker: PhantomData, + } + } +} + +#[doc(hidden)] +pub struct ScenarioThen +where + T: Aggregate, + T::Event: Debug + PartialEq, + R: From> + Deref>, + F: Fn() -> Result, +{ + mutate: F, + expected: Result>, Err>, + marker: PhantomData, +} + +impl ScenarioThen +where + T: Aggregate, + T::Event: Debug + PartialEq, + R: From> + Deref>, + F: Fn() -> Result, + Err: PartialEq + Debug, +{ + /// Runs the [Scenario] and performs the various assertion for the test. + /// + /// # Panics + /// + /// This method will panic if the assertions have not passed, making + /// the test fail. + pub fn assert(self) { + let result = (self.mutate)().map(|root| root.recorded_events.clone()); + assert_eq!(self.expected, result); + } +} diff --git a/eventually/src/command/mod.rs b/eventually/src/command/mod.rs index d34ac9ba..589e7290 100644 --- a/eventually/src/command/mod.rs +++ b/eventually/src/command/mod.rs @@ -68,20 +68,17 @@ mod test_user_domain { use async_trait::async_trait; - use crate::aggregate::repository::AnyRepositoryExt; use crate::aggregate::test_user_domain::{User, UserEvent}; use crate::{aggregate, command, event, message}; - struct UserService(Arc>); + struct UserService(Arc>); impl From for UserService where R: aggregate::Repository + 'static, - >::GetError: std::error::Error + Send + Sync + 'static, - >::SaveError: std::error::Error + Send + Sync + 'static, { - fn from(value: R) -> Self { - Self(Arc::new(value.with_any_errors())) + fn from(repository: R) -> Self { + Self(Arc::new(repository)) } } diff --git a/eventually/src/command/test.rs b/eventually/src/command/test.rs index 95be5a4b..f2cc3833 100644 --- a/eventually/src/command/test.rs +++ b/eventually/src/command/test.rs @@ -1,20 +1,19 @@ -//! Module exposing a test [Scenario] type to write Domain [Command]s +//! Module exposing a test [Scenario] type to write Domain [Command][command::Envelope]s //! test cases using the [given-then-when canvas](https://www.agilealliance.org/glossary/gwt/). use std::fmt::Debug; use std::hash::Hash; -use crate::event::store::EventStoreExt; -use crate::event::Appender; -use crate::{command, event, message}; +use crate::event::store::{Appender, EventStoreExt}; +use crate::{command, event, message, version}; -/// A test scenario that can be used to test a [Command] [Handler][command::Handler] +/// A test scenario that can be used to test a [Command][command::Envelope] [Handler][command::Handler] /// using a [given-then-when canvas](https://www.agilealliance.org/glossary/gwt/) approach. pub struct Scenario; impl Scenario { /// Sets the precondition state of the system for the [Scenario], which - /// is expressed by a list of Domain [Event]s in an Event-sourced system. + /// is expressed by a list of Domain [Event][event::Envelope]s in an Event-sourced system. #[must_use] pub fn given(self, events: Vec>) -> ScenarioGiven where @@ -23,7 +22,7 @@ impl Scenario { ScenarioGiven { given: events } } - /// Specifies the [Command] to test in the [Scenario], in the peculiar case + /// Specifies the [Command][command::Envelope] to test in the [Scenario], in the peculiar case /// of having a clean system. /// /// This is a shortcut for: @@ -55,7 +54,7 @@ impl ScenarioGiven where Evt: message::Message, { - /// Specifies the [Command] to test in the [Scenario]. + /// Specifies the [Command][command::Envelope] to test in the [Scenario]. #[must_use] pub fn when(self, command: command::Envelope) -> ScenarioWhen where @@ -130,7 +129,7 @@ where Evt: message::Message + Clone + PartialEq + Send + Sync + Debug, Cmd: message::Message, { - /// Executes the whole [Scenario] by constructing a Command [Handler] + /// Executes the whole [Scenario] by constructing a Command [Handler][command::Handler] /// with the provided closure function and running the specified assertions. /// /// # Panics @@ -148,7 +147,7 @@ where event_store .append( event.stream_id, - event::StreamVersionExpected::MustBe(event.version - 1), + version::Check::MustBe(event.version - 1), vec![event.event], ) .await diff --git a/eventually/src/event/mod.rs b/eventually/src/event/mod.rs index c6e0ea80..f6b751b9 100644 --- a/eventually/src/event/mod.rs +++ b/eventually/src/event/mod.rs @@ -1,18 +1,16 @@ //! Module `event` contains types and abstractions helpful for working //! with Domain Events. +pub mod store; use std::fmt::Debug; -use async_trait::async_trait; use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; -use crate::message; -use crate::version::{ConflictError, Version}; - -pub mod store; +pub use crate::event::store::Store; +use crate::{message, version}; -/// An Event is a [Message] carring the information about a Domain Event, +/// An Event is a [Message][message::Message] carring the information about a Domain Event, /// an occurrence in the system lifetime that is relevant for the Domain /// that is being implemented. pub type Envelope = message::Envelope; @@ -31,8 +29,8 @@ where /// This value is used for optimistic concurrency checks, to avoid /// data races in parallel command evaluations. /// - /// Check the [Version] type and module documentation for more info. - pub version: Version, + /// Check the [Version][version::Version] type and module documentation for more info. + pub version: version::Version, /// The actual Domain Event carried by this envelope. pub event: Envelope, @@ -41,87 +39,13 @@ where /// Specifies the slice of the Event Stream to select when calling [`Store::stream`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum VersionSelect { - /// Selects all [Events] in the Event [Stream]. + /// Selects all [Event][Envelope]s in the Event [Stream]. All, - /// Selects all [Events] in the Event [Stream] starting from the [Event] - /// with the specified [Version]. - From(Version), -} - -/// Specifies an expectation on the Event [Stream] version targeted -/// when calling [`Store::append`]. -/// -/// This type allows for optimistic concurrency checks, avoiding data races -/// when modifying the same Event Stream at the same time. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StreamVersionExpected { - /// Disables any kind of optimistic concurrency check, instructing the [Store] - /// to append the new [Event] no matter the current [Version] of the [Stream]. - Any, - - /// Sets the expectation that the Event [Stream] must be at the specified - /// [Version] for the [`Store::append`] call to succeed. - MustBe(Version), + /// Selects all [Event][Envelope]s in the Event [Stream] starting from the [Event] + /// with the specified [Version][version::Version]. + From(version::Version), } /// Stream is a stream of [Persisted] Domain Events. pub type Stream<'a, Id, Evt, Err> = BoxStream<'a, Result, Err>>; - -/// Interface used to stream [Persisted] Domain Events from an Event Store to an application. -pub trait Streamer: Send + Sync -where - StreamId: Send + Sync, - Event: message::Message + Send + Sync, -{ - /// The error type returned by the Store during a [`stream`] call. - type Error: Send + Sync; - - /// Opens an Event Stream, effectively streaming all Domain Events - /// of an Event Stream back in the application. - fn stream(&self, id: &StreamId, select: VersionSelect) -> Stream; -} - -#[async_trait] -/// Interface used to append new Domain Events in an Event Store. -pub trait Appender: Send + Sync -where - StreamId: Send + Sync, - Event: message::Message + Send + Sync, -{ - /// The error type returned by the Store during an [`append`] call. - /// It could be a [version::ConflictError], which is why the bound to - /// `Into>` is required. - type Error: Into> + Send + Sync; - - /// Appens new Domain Events to the specified Event Stream. - /// - /// The result of this operation is the new [Version] of the Event Stream - /// with the specified Domain Events added to it. - async fn append( - &self, - id: StreamId, - version_check: StreamVersionExpected, - events: Vec>, - ) -> Result; -} - -/// An [Event] Store, used to store Domain Events in Event Streams -- a stream -/// of Domain Events -- and retrieve them. -/// -/// Each Event Stream is represented by a unique Stream identifier. -pub trait Store: - Streamer + Appender + Send + Sync -where - StreamId: Send + Sync, - Event: message::Message + Send + Sync, -{ -} - -impl Store for T -where - T: Streamer + Appender + Send + Sync, - StreamId: Send + Sync, - Event: message::Message + Send + Sync, -{ -} diff --git a/eventually/src/event/store.rs b/eventually/src/event/store.rs index e2b8abaf..64af0ad1 100644 --- a/eventually/src/event/store.rs +++ b/eventually/src/event/store.rs @@ -1,5 +1,5 @@ -//! Contains implementations of the [event::Store] trait and connected abstractions, -//! such as the [std::collections::HashMap]'s based [InMemory] Event Store implementation. +//! Contains implementations of the [`event::Store`] trait and connected abstractions, +//! such as the [`std::collections::HashMap`]'s based [`InMemory`] Event Store implementation. use std::collections::HashMap; use std::convert::Infallible; @@ -9,8 +9,77 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use futures::stream::{iter, StreamExt}; -use crate::version::{ConflictError, Version}; -use crate::{event, message}; +use crate::{event, message, version}; + +/// Interface used to stream [Persisted][event::Persisted] Domain Events +/// from an Event Store to an application. +pub trait Streamer: Send + Sync +where + StreamId: Send + Sync, + Event: message::Message + Send + Sync, +{ + /// The error type returned by the Store during a [`stream`] call. + type Error: Send + Sync; + + /// Opens an Event Stream, effectively streaming all Domain Events + /// of an Event Stream back in the application. + fn stream( + &self, + id: &StreamId, + select: event::VersionSelect, + ) -> event::Stream; +} + +/// All possible error types returned by [`Appender::append`]. +#[derive(Debug, thiserror::Error)] +pub enum AppendError { + /// Error returned when [Appender::append] encounters a conflict error + /// while appending the new Domain Events. + #[error("failed to append new domain events: {0}")] + Conflict(#[from] version::ConflictError), + /// Error returned when the [Appender] implementation has encountered an error. + #[error("failed to append new domain events, an error occurred: {0}")] + Internal(#[from] anyhow::Error), +} + +#[async_trait] +/// Interface used to append new Domain Events in an Event Store. +pub trait Appender: Send + Sync +where + StreamId: Send + Sync, + Event: message::Message + Send + Sync, +{ + /// Appens new Domain Events to the specified Event Stream. + /// + /// The result of this operation is the new [Version][version::Version] + /// of the Event Stream with the specified Domain Events added to it. + async fn append( + &self, + id: StreamId, + version_check: version::Check, + events: Vec>, + ) -> Result; +} + +/// An [Event][event::Envelope] Store, used to store Domain Events in Event Streams -- a stream +/// of Domain Events -- and retrieve them. +/// +/// Each Event Stream is represented by a unique Stream identifier. +pub trait Store: + Streamer + Appender + Send + Sync +where + StreamId: Send + Sync, + Event: message::Message + Send + Sync, +{ +} + +impl Store for T +where + T: Streamer + Appender + Send + Sync, + StreamId: Send + Sync, + Event: message::Message + Send + Sync, +{ +} #[derive(Debug)] struct InMemoryBackend @@ -31,8 +100,8 @@ where } } -/// In-memory implementation of [event::Store] trait, -/// backed by a thread-safe [std::collections::HashMap]. +/// In-memory implementation of [`event::Store`] trait, +/// backed by a thread-safe [`std::collections::HashMap`]. #[derive(Debug, Clone)] pub struct InMemory where @@ -52,7 +121,7 @@ where } } -impl event::Streamer for InMemory +impl Streamer for InMemory where Id: Clone + Eq + Hash + Send + Sync, Evt: message::Message + Clone + Send + Sync, @@ -81,19 +150,17 @@ where } #[async_trait] -impl event::Appender for InMemory +impl Appender for InMemory where Id: Clone + Eq + Hash + Send + Sync, Evt: message::Message + Clone + Send + Sync, { - type Error = ConflictError; - async fn append( &self, id: Id, - version_check: event::StreamVersionExpected, + version_check: version::Check, events: Vec>, - ) -> Result { + ) -> Result { let mut backend = self .backend .write() @@ -106,12 +173,12 @@ where .map(|event| event.version) .unwrap_or_default(); - if let event::StreamVersionExpected::MustBe(expected) = version_check { + if let version::Check::MustBe(expected) = version_check { if last_event_stream_version != expected { - return Err(ConflictError { + return Err(AppendError::Conflict(version::ConflictError { expected, actual: last_event_stream_version, - }); + })); } } @@ -140,7 +207,7 @@ where } } -/// Decorator type for an [event::Store] implementation that tracks the list of +/// Decorator type for an [`event::Store`] implementation that tracks the list of /// recorded Domain Events through it. /// /// Useful for testing purposes, i.e. asserting that Domain Events written throguh @@ -148,7 +215,7 @@ where #[derive(Debug, Clone)] pub struct Tracking where - T: event::Store + Send + Sync, + T: Store + Send + Sync, StreamId: Send + Sync, Event: message::Message + Send + Sync, { @@ -160,11 +227,16 @@ where impl Tracking where - T: event::Store + Send + Sync, + T: Store + Send + Sync, StreamId: Clone + Send + Sync, Event: message::Message + Clone + Send + Sync, { /// Returns the list of recoded Domain Events through this decorator so far. + /// + /// # Panics + /// + /// Since the internal data is thread-safe through an [`RwLock`], this method + /// could potentially panic while attempting to get a read-only lock on the data recorded. pub fn recorded_events(&self) -> Vec> { self.events .read() @@ -173,6 +245,11 @@ where } /// Resets the list of recorded Domain Events through this decorator. + /// + /// # Panics + /// + /// Since the internal data is thread-safe through an [`RwLock`], this method + /// could potentially panic while attempting to get a read-write lock to empty the internal store. pub fn reset_recorded_events(&self) { self.events .write() @@ -181,13 +258,13 @@ where } } -impl event::Streamer for Tracking +impl Streamer for Tracking where - T: event::Store + Send + Sync, + T: Store + Send + Sync, StreamId: Clone + Send + Sync, Event: message::Message + Clone + Send + Sync, { - type Error = >::Error; + type Error = >::Error; fn stream( &self, @@ -199,34 +276,32 @@ where } #[async_trait] -impl event::Appender for Tracking +impl Appender for Tracking where - T: event::Store + Send + Sync, + T: Store + Send + Sync, StreamId: Clone + Send + Sync, Event: message::Message + Clone + Send + Sync, { - type Error = >::Error; - async fn append( &self, id: StreamId, - version_check: event::StreamVersionExpected, + version_check: version::Check, events: Vec>, - ) -> Result { + ) -> Result { let new_version = self .store .append(id.clone(), version_check, events.clone()) .await?; let events_size = events.len(); - let previous_version = new_version - (events_size as Version); + let previous_version = new_version - (events_size as version::Version); let mut persisted_events = events .into_iter() .enumerate() .map(|(i, event)| event::Persisted { stream_id: id.clone(), - version: previous_version + (i as Version) + 1, + version: previous_version + (i as version::Version) + 1, event, }) .collect(); @@ -242,13 +317,12 @@ where /// Extension trait that can be used to pull in supertypes implemented /// in this module. -pub trait EventStoreExt: - event::Store + Send + Sync + Sized +pub trait EventStoreExt: Store + Send + Sync + Sized where StreamId: Clone + Send + Sync, Event: message::Message + Clone + Send + Sync, { - /// Returns a [Tracking] instance that decorates the original [event::Store] + /// Returns a [`Tracking`] instance that decorates the original [`event::Store`] /// instanca this method has been called on. fn with_recorded_events_tracking(self) -> Tracking { Tracking { @@ -260,7 +334,7 @@ where impl EventStoreExt for T where - T: event::Store + Send + Sync, + T: Store + Send + Sync, StreamId: Clone + Send + Sync, Event: message::Message + Clone + Send + Sync, { @@ -274,7 +348,7 @@ mod test { use super::*; use crate::event; - use crate::event::{Appender, Streamer}; + use crate::event::store::{Appender, Streamer}; use crate::message::tests::StringMessage; use crate::version::Version; @@ -293,11 +367,7 @@ mod test { let event_store = InMemory::<&'static str, StringMessage>::default(); let new_event_stream_version = event_store - .append( - STREAM_ID, - event::StreamVersionExpected::MustBe(0), - EVENTS.clone(), - ) + .append(STREAM_ID, version::Check::MustBe(0), EVENTS.clone()) .await .expect("append should not fail"); @@ -330,11 +400,7 @@ mod test { let tracking_event_store = event_store.with_recorded_events_tracking(); tracking_event_store - .append( - STREAM_ID, - event::StreamVersionExpected::MustBe(0), - EVENTS.clone(), - ) + .append(STREAM_ID, version::Check::MustBe(0), EVENTS.clone()) .await .expect("append should not fail"); @@ -352,20 +418,20 @@ mod test { let event_store = InMemory::<&'static str, StringMessage>::default(); let append_error = event_store - .append( - STREAM_ID, - event::StreamVersionExpected::MustBe(3), - EVENTS.clone(), - ) + .append(STREAM_ID, version::Check::MustBe(3), EVENTS.clone()) .await .expect_err("the event stream version should be zero"); - assert_eq!( - ConflictError { - expected: 3, - actual: 0, - }, - append_error - ); + if let AppendError::Conflict(err) = append_error { + return assert_eq!( + version::ConflictError { + expected: 3, + actual: 0, + }, + err + ); + } + + panic!("expected conflict error, received: {append_error}") } } diff --git a/eventually/src/lib.rs b/eventually/src/lib.rs index ba5f9249..66f3b827 100644 --- a/eventually/src/lib.rs +++ b/eventually/src/lib.rs @@ -1,12 +1,15 @@ -#![deny(unsafe_code, unused_qualifications, trivial_casts)] -#![deny(clippy::all)] -#![warn(clippy::pedantic)] -#![warn(missing_docs)] +//! `eventually` is a crate that helps you apply different patterns to your Rust +//! application domain code, such as: Event Sourcing, Aggregate Root, Outbox Pattern, +//! and so on. + +#![deny(unsafe_code, unused_qualifications, trivial_casts, missing_docs)] +#![deny(clippy::all, clippy::pedantic, clippy::cargo)] pub mod aggregate; pub mod command; pub mod event; pub mod message; +// pub mod query; pub mod serde; #[cfg(feature = "tracing")] pub mod tracing; diff --git a/eventually/src/message.rs b/eventually/src/message.rs index c7eb2b66..b570feb1 100644 --- a/eventually/src/message.rs +++ b/eventually/src/message.rs @@ -1,19 +1,37 @@ +//! This module contains the definition of a [Message] type, which +//! can be used to describe some sort of domain value such as a [Domain Event][crate::event::Envelope], +//! a [Domain Command][crate::command::Envelope], and so on. + use std::collections::HashMap; use serde::{Deserialize, Serialize}; +/// Represents a piece of domain data that occurs in the system. +/// +/// Each Message has a specific name to it, which should ideally be +/// unique within the domain you're operating in. Example: a Domain Event +/// that represents when an Order was created can have a `name()`: `"OrderWasCreated"`. pub trait Message { + /// Returns the domain name of the [Message]. fn name(&self) -> &'static str; } +/// Optional metadata to attach to an [Envelope] to provide additional context +/// to the [Message] carried out. pub type Metadata = HashMap; +/// Represents a [Message] packaged for persistance and/or processing by other +/// parts of the system. +/// +/// It carries both the actual message (i.e. a payload) and some optional [Metadata]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Envelope where T: Message, { + /// The message payload. pub message: T, + /// Optional metadata to provide additional context to the message. pub metadata: Metadata, } @@ -21,12 +39,10 @@ impl Envelope where T: Message, { + /// Adds a new entry in the [Envelope]'s [Metadata]. #[must_use] - pub fn and_metadata(mut self, f: F) -> Self - where - F: Fn(&mut Metadata), - { - f(&mut self.metadata); + pub fn with_metadata(mut self, key: String, value: String) -> Self { + self.metadata.insert(key, value); self } } @@ -72,13 +88,13 @@ pub(crate) mod tests { metadata: Metadata::default(), }; - let new_message = message.clone().and_metadata(|m| { - m.insert("hello_world".to_owned(), "test".to_owned()); - m.insert("test_number".to_owned(), 1.to_string()); - }); + let new_message = message + .clone() + .with_metadata("hello_world".into(), "test".into()) + .with_metadata("test_number".into(), 1.to_string()); - println!("Message: {:?}", message); - println!("New message: {:?}", new_message); + println!("Message: {message:?}"); + println!("New message: {new_message:?}"); // Metadata does not affect equality of message. assert_eq!(message, new_message); diff --git a/eventually/src/serde.rs b/eventually/src/serde.rs new file mode 100644 index 00000000..5460c2e8 --- /dev/null +++ b/eventually/src/serde.rs @@ -0,0 +1,226 @@ +//! This module provides traits and implementations for serialization and +//! deserialization, allowing you to convert Rust data structures to and from +//! different formats like JSON, Protobuf, etc. + +use std::fmt::Display; +use std::marker::PhantomData; + +use anyhow::anyhow; +#[cfg(feature = "serde-prost")] +use prost::bytes::Bytes; +#[cfg(feature = "serde-json")] +use serde::{Deserialize, Serialize}; + +/// A serializer interface that can be used to serialize a Rust data type +/// into a specific wire format as a byte array. +pub trait Serializer: Send + Sync { + /// Serializes the given value into the protocol supported by this implementation. + /// + /// # Errors + /// + /// An error ([`anyhow::Error`]) is returned in case the serialization could not + /// succeed as expected. + fn serialize(&self, value: T) -> anyhow::Result>; +} + +/// A deserializer interface that can be used to deserialize a byte array +/// into an instance of a specific Rust data type from a specific wire format. +pub trait Deserializer: Send + Sync { + /// Deserializes the given value from a message encoded in the wire format + /// supported by this implementation. + /// + /// # Errors + /// + /// An error ([`anyhow::Error`]) is returned in case the deserialization could not + /// succeed as expected. + fn deserialize(&self, data: &[u8]) -> anyhow::Result; +} + +/// [Serializer] and [Deserializer] that can be used to serialize into and deserialize +/// from a given type into a specific wire format, such as JSON, Protobuf, etc. +pub trait Serde: Serializer + Deserializer + Send + Sync {} + +impl Serde for S where S: Serializer + Deserializer {} + +/// Implements the [Serde] trait to translate between two different types, +/// and using the specified [Serde] for serialization and deserialization +/// using the new `Out` type. +#[derive(Clone)] +pub struct Convert +where + In: Send + Sync, + Out: Send + Sync, + S: Serde + Send + Sync, +{ + serde: S, + inn: PhantomData, + out: PhantomData, +} + +impl Convert +where + In: Send + Sync, + Out: Send + Sync, + S: Serde + Send + Sync, +{ + /// Creates a new [Convert] serde instance. + pub fn new(serde: S) -> Self { + Self { + serde, + inn: PhantomData, + out: PhantomData, + } + } +} + +impl Serializer for Convert +where + In: TryFrom + Send + Sync, + Out: TryFrom + Send + Sync, + >::Error: Display, + S: Serde + Send + Sync, +{ + fn serialize(&self, value: In) -> anyhow::Result> { + self.serde.serialize( + value + .try_into() + .map_err(|err| anyhow!("failed to convert type values: {}", err))?, + ) + } +} + +impl Deserializer for Convert +where + In: TryFrom + Send + Sync, + Out: TryFrom + Send + Sync, + >::Error: Display, + S: Serde + Send + Sync, +{ + fn deserialize(&self, data: &[u8]) -> anyhow::Result { + let inn = self.serde.deserialize(data)?; + + inn.try_into() + .map_err(|err| anyhow!("failed to convert type values: {}", err)) + } +} + +/// Implements the [Serializer] and [Deserializer] traits, which use the [serde] crate +/// to serialize and deserialize a message into JSON. +#[cfg(feature = "serde-json")] +#[derive(Debug, Clone, Copy)] +pub struct Json(PhantomData) +where + T: Serialize + Send + Sync, + for<'d> T: Deserialize<'d>; + +#[cfg(feature = "serde-json")] +impl Default for Json +where + T: Serialize + Send + Sync, + for<'d> T: Deserialize<'d>, +{ + fn default() -> Self { + Self(PhantomData) + } +} + +#[cfg(feature = "serde-json")] +impl Serializer for Json +where + T: Serialize + Send + Sync, + for<'d> T: Deserialize<'d>, +{ + fn serialize(&self, value: T) -> anyhow::Result> { + serde_json::to_vec(&value) + .map_err(|err| anyhow!("failed to serialize value to json: {}", err)) + } +} + +#[cfg(feature = "serde-json")] +impl Deserializer for Json +where + T: Serialize + Send + Sync, + for<'d> T: Deserialize<'d>, +{ + fn deserialize(&self, data: &[u8]) -> anyhow::Result { + serde_json::from_slice(data) + .map_err(|err| anyhow!("failed to deserialize value from json: {}", err)) + } +} + +/// Implements the [Serde] trait which serializes and deserializes +/// the message using Protobuf format through the [`prost::Message`] trait. +#[cfg(feature = "serde-prost")] +#[derive(Debug, Clone, Copy, Default)] +pub struct Protobuf(PhantomData) +where + T: prost::Message + Default; + +#[cfg(feature = "serde-prost")] +impl Serializer for Protobuf +where + T: prost::Message + Default, +{ + fn serialize(&self, value: T) -> anyhow::Result> { + Ok(value.encode_to_vec()) + } +} + +#[cfg(feature = "serde-prost")] +impl Deserializer for Protobuf +where + T: prost::Message + Default, +{ + fn deserialize(&self, data: &[u8]) -> anyhow::Result { + let buf = Bytes::copy_from_slice(data); + + T::decode(buf) + .map_err(|err| anyhow!("failed to deserialize protobuf message into value: {}", err)) + } +} + +/// Implementation of [Serde] traits that uses [ProtoJson](https://protobuf.dev/programming-guides/proto3/#json) +/// as wire protocol. +#[cfg(feature = "serde-prost")] +#[cfg(feature = "serde-json")] +#[derive(Clone, Copy)] +pub struct ProtoJson(PhantomData) +where + T: prost::Message + Serialize + Default, + for<'de> T: Deserialize<'de>; + +#[cfg(feature = "serde-prost")] +#[cfg(feature = "serde-json")] +impl Default for ProtoJson +where + T: prost::Message + Serialize + Default, + for<'de> T: Deserialize<'de>, +{ + fn default() -> Self { + Self(PhantomData) + } +} + +#[cfg(feature = "serde-prost")] +#[cfg(feature = "serde-json")] +impl Serializer for ProtoJson +where + T: prost::Message + Serialize + Default, + for<'de> T: Deserialize<'de>, +{ + fn serialize(&self, value: T) -> anyhow::Result> { + Json::::default().serialize(value) + } +} + +#[cfg(feature = "serde-prost")] +#[cfg(feature = "serde-json")] +impl Deserializer for ProtoJson +where + T: prost::Message + Serialize + Default, + for<'de> T: Deserialize<'de>, +{ + fn deserialize(&self, data: &[u8]) -> anyhow::Result { + Json::::default().deserialize(data) + } +} diff --git a/eventually/src/serde/json.rs b/eventually/src/serde/json.rs deleted file mode 100644 index 1d09f22f..00000000 --- a/eventually/src/serde/json.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Contains the [Serializer] and [Deserializer] compatible -//! implementation using JSON. - -use std::marker::PhantomData; - -use serde::{Deserialize, Serialize}; - -use crate::serde::Serde; - -/// Implements the [Serializer] and [Deserializer] traits from [crate::serde] module, -/// which uses the [serde] crate to serialize and deserialize a message into JSON. -#[derive(Debug, Clone, Copy)] -pub struct JsonSerde(PhantomData) -where - T: Serialize + Send + Sync, - for<'d> T: Deserialize<'d>; - -impl Default for JsonSerde -where - T: Serialize + Send + Sync, - for<'d> T: Deserialize<'d>, -{ - fn default() -> Self { - Self(PhantomData) - } -} - -impl Serde for JsonSerde -where - T: Serialize + Send + Sync, - for<'d> T: Deserialize<'d>, -{ - type Error = serde_json::Error; - - fn deserialize(&self, data: Vec) -> Result { - serde_json::from_slice(&data) - } - - fn serialize(&self, value: T) -> Vec { - serde_json::to_vec(&value).expect("json serialization should not fail") - } -} diff --git a/eventually/src/serde/mod.rs b/eventually/src/serde/mod.rs deleted file mode 100644 index 397cc734..00000000 --- a/eventually/src/serde/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -#[cfg(feature = "serde-json")] -pub mod json; -#[cfg(feature = "serde-prost")] -pub mod prost; - -/// A [Serde] can be used to serialize into and deserialize from a given type -/// into a wire format, such as [JSON][json] or [Protobuf][prost]. -pub trait Serde: Send + Sync { - /// The error returned by the [Serde::deserialize] method. - type Error: Send + Sync; - - /// Serializes the given value into the wire format supported by this [Serde]. - fn serialize(&self, value: T) -> Vec; - - /// Deserializes the given value from a message encoded in the wire format - /// supported by this [Serde]. - fn deserialize(&self, data: Vec) -> Result; -} diff --git a/eventually/src/serde/prost.rs b/eventually/src/serde/prost.rs deleted file mode 100644 index f8d9075f..00000000 --- a/eventually/src/serde/prost.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Contains the [Serde] compatible implementation using Protobuf wire format through [prost]. - -use std::marker::PhantomData; - -use prost::bytes::Bytes; -use prost::Message; - -use crate::serde::Serde; - -/// Implements the [Serde] trait which serializes and deserializes -/// the message using Protobuf format through the [Message] trait in [prost]. -#[derive(Debug, Clone, Copy, Default)] -pub struct MessageSerde(PhantomData) -where - T: Message + Default; - -impl Serde for MessageSerde -where - T: Message + Default, -{ - type Error = prost::DecodeError; - - fn serialize(&self, value: T) -> Vec { - value.encode_to_vec() - } - - fn deserialize(&self, data: Vec) -> Result { - let buf = Bytes::from(data); - - T::decode(buf) - } -} diff --git a/eventually/src/tracing.rs b/eventually/src/tracing.rs index f31ed770..f0464dac 100644 --- a/eventually/src/tracing.rs +++ b/eventually/src/tracing.rs @@ -1,17 +1,17 @@ //! Module containing some extension traits to support code instrumentation //! using the `tracing` crate. -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::marker::PhantomData; use async_trait::async_trait; use tracing::instrument; use crate::aggregate::Aggregate; -use crate::version::Version; +use crate::version::{self, Version}; use crate::{aggregate, event, message}; -/// [aggregate::Repository] type wrapper that provides instrumentation +/// [`aggregate::Repository`] type wrapper that provides instrumentation /// features through the `tracing` crate. #[derive(Debug, Clone)] pub struct InstrumentedAggregateRepository @@ -20,51 +20,53 @@ where ::Id: Debug, ::Event: Debug, Inner: aggregate::Repository, - >::GetError: Display, - >::SaveError: Display, { inner: Inner, t: PhantomData, } #[async_trait] -impl aggregate::Repository for InstrumentedAggregateRepository +impl aggregate::repository::Getter for InstrumentedAggregateRepository where T: Aggregate + Debug, ::Id: Debug, ::Event: Debug, Inner: aggregate::Repository, - >::GetError: Display, - >::SaveError: Display, { - type GetError = >::GetError; - type SaveError = >::SaveError; - - #[instrument(name = "aggregate::Repository.get", ret, err, skip(self))] - async fn get( - &self, - id: &T::Id, - ) -> Result, aggregate::repository::GetError> { + #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive. + #[instrument(name = "aggregate::repository::Getter.get", ret, err, skip(self))] + async fn get(&self, id: &T::Id) -> Result, aggregate::repository::GetError> { self.inner.get(id).await } +} - #[instrument(name = "aggregate::Repository.save", ret, err, skip(self))] - async fn save(&self, root: &mut aggregate::Root) -> Result<(), Self::SaveError> { +#[async_trait] +impl aggregate::repository::Saver for InstrumentedAggregateRepository +where + T: Aggregate + Debug, + ::Id: Debug, + ::Event: Debug, + Inner: aggregate::Repository, +{ + #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive. + #[instrument(name = "aggregate::repository::Saver.save", ret, err, skip(self))] + async fn save( + &self, + root: &mut aggregate::Root, + ) -> Result<(), aggregate::repository::SaveError> { self.inner.save(root).await } } -/// Extension trait for any [aggregate::Repository] type to provide +/// Extension trait for any [`aggregate::Repository`] type to provide /// instrumentation features through the `tracing` crate. pub trait AggregateRepositoryExt: aggregate::Repository + Sized where T: Aggregate + Debug, ::Id: Debug, ::Event: Debug, - >::GetError: Display, - >::SaveError: Display, { - /// Returns an instrumented version of the [aggregate::Repository] instance. + /// Returns an instrumented version of the [`aggregate::Repository`] instance. fn with_tracing(self) -> InstrumentedAggregateRepository { InstrumentedAggregateRepository { inner: self, @@ -76,21 +78,18 @@ where impl AggregateRepositoryExt for R where R: aggregate::Repository, - >::GetError: Display, - >::SaveError: Display, T: Aggregate + Debug, ::Id: Debug, ::Event: Debug, { } -/// [event::Store] type wrapper that provides instrumentation +/// [`event::Store`] type wrapper that provides instrumentation /// features through the `tracing` crate. #[derive(Debug, Clone)] pub struct InstrumentedEventStore where T: event::Store + Send + Sync, - >::Error: Display + Send + Sync, StreamId: Debug + Send + Sync, Event: message::Message + Debug + Send + Sync, { @@ -99,15 +98,14 @@ where event: PhantomData, } -impl event::Streamer +impl event::store::Streamer for InstrumentedEventStore where T: event::Store + Send + Sync, - >::Error: Display + Send + Sync, StreamId: Debug + Send + Sync, Event: message::Message + Debug + Send + Sync, { - type Error = >::Error; + type Error = >::Error; #[instrument(name = "event::Store.stream", skip(self))] fn stream( @@ -120,36 +118,33 @@ where } #[async_trait] -impl event::Appender +impl event::store::Appender for InstrumentedEventStore where T: event::Store + Send + Sync, - >::Error: Display + Send + Sync, StreamId: Debug + Send + Sync, Event: message::Message + Debug + Send + Sync, { - type Error = >::Error; - + #[allow(clippy::blocks_in_conditions)] // NOTE(ar3s3ru): seems to be a false positive. #[instrument(name = "event::Store.append", ret, err, skip(self))] async fn append( &self, id: StreamId, - version_check: event::StreamVersionExpected, + version_check: version::Check, events: Vec>, - ) -> Result { + ) -> Result { self.store.append(id, version_check, events).await } } -/// Extension trait for any [event::Store] type to provide +/// Extension trait for any [`event::Store`] type to provide /// instrumentation features through the `tracing` crate. pub trait EventStoreExt: event::Store + Sized where - >::Error: Display, StreamId: Debug + Send + Sync, Event: message::Message + Debug + Send + Sync, { - /// Returns an instrumented version of the [event::Store] instance. + /// Returns an instrumented version of the [`event::Store`] instance. fn with_tracing(self) -> InstrumentedEventStore { InstrumentedEventStore { store: self, @@ -162,7 +157,6 @@ where impl EventStoreExt for T where T: event::Store + Send + Sync, - >::Error: Display + Send + Sync, StreamId: Debug + Send + Sync, Event: message::Message + Debug + Send + Sync, { diff --git a/eventually/src/version.rs b/eventually/src/version.rs index c2d59484..0f429f76 100644 --- a/eventually/src/version.rs +++ b/eventually/src/version.rs @@ -2,10 +2,26 @@ /// A version used for Optimistic Locking. /// -/// Used by the [crate::aggregate::Root] to avoid concurrency issues, -/// and [crate::event::Store] to implement stream-local ordering to the messages. +/// Used by the [`aggregate::Root`][crate::aggregate::Root] to avoid concurrency issues, +/// and [`event::Store`][crate::event::Store] to implement stream-local ordering to the messages. pub type Version = u64; +/// Used to set a specific expectation during an operation +/// that mutates some sort of resource (e.g. an [Event Stream][crate::event::Stream]) +/// that supports versioning. +/// +/// It allows for optimistic locking, avoiding data races +/// when modifying the same resource at the same time. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Check { + /// Disables any kind of optimistic locking check, allowing + /// for any [Version] to be used compared to the new one. + Any, + /// Expects that the previous [Version] used for the operation + /// must have the value specified. + MustBe(Version), +} + /// This error is returned by a function when a version conflict error has /// been detected. #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] diff --git a/examples/bank-accounting/Cargo.toml b/examples/bank-accounting/Cargo.toml index e8dfbbe1..d4c85ae5 100644 --- a/examples/bank-accounting/Cargo.toml +++ b/examples/bank-accounting/Cargo.toml @@ -6,8 +6,8 @@ readme = "README.md" publish = false [dependencies] -anyhow = "1.0.75" -async-trait = "0.1.74" +anyhow = "1.0.80" +async-trait = "0.1.77" eventually = { path = "../../eventually", features = [ "serde-prost", "tracing", @@ -16,19 +16,18 @@ eventually-macros = { path = "../../eventually-macros" } eventually-postgres = { path = "../../eventually-postgres" } opentelemetry = "0.21.0" opentelemetry-jaeger = "0.20.0" -prost = "0.12.1" -rust_decimal = "1.32.0" -sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "postgres"] } -thiserror = "1.0.50" -tokio = { version = "1.34.0", features = ["macros", "rt-multi-thread"] } -tonic = { version = "0.10.2", features = ["gzip", "transport"] } -tonic-health = "0.10.2" -tonic-reflection = "0.10.2" +prost = "0.12.3" +rust_decimal = "1.34.3" +sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "postgres"] } +thiserror = "1.0.57" +tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } +tonic = { version = "0.11.0", features = ["gzip", "transport"] } +tonic-health = "0.11.0" +tonic-reflection = "0.11.0" tower = "0.4.13" -tower-http = { version = "0.4.4", features = ["trace"] } tracing = "0.1.40" tracing-opentelemetry = "0.22.0" -tracing-subscriber = { version = "0.3.17", features = [ +tracing-subscriber = { version = "0.3.18", features = [ "fmt", "std", "registry", @@ -38,4 +37,4 @@ tracing-subscriber = { version = "0.3.17", features = [ [dev-dependencies] [build-dependencies] -tonic-build = { version = "0.10.2", features = ["prost"] } +tonic-build = { version = "0.11.0", features = ["prost"] } diff --git a/examples/bank-accounting/src/application.rs b/examples/bank-accounting/src/application.rs index 3ee29ee0..29357bbf 100644 --- a/examples/bank-accounting/src/application.rs +++ b/examples/bank-accounting/src/application.rs @@ -1,10 +1,7 @@ -use std::error::Error as StdError; use std::sync::Arc; use async_trait::async_trait; -use eventually::aggregate::repository::AnyRepositoryExt; -use eventually::aggregate::{self}; -use eventually::{command, message}; +use eventually::{aggregate, command, message}; use rust_decimal::Decimal; use crate::domain::{ @@ -13,18 +10,16 @@ use crate::domain::{ #[derive(Clone)] pub struct Service { - repository: Arc>, + repository: Arc>, } impl From for Service where R: aggregate::Repository + 'static, - >::GetError: StdError + Send + Sync + 'static, - >::SaveError: StdError + Send + Sync + 'static, { fn from(repository: R) -> Self { Self { - repository: Arc::new(repository.with_any_errors()), + repository: Arc::new(repository), } } } diff --git a/examples/bank-accounting/src/domain.rs b/examples/bank-accounting/src/domain.rs index bf6cac86..898fc8ab 100644 --- a/examples/bank-accounting/src/domain.rs +++ b/examples/bank-accounting/src/domain.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use eventually::aggregate; -use eventually_macros::{aggregate_root, Message}; +use eventually::message::Message; +use eventually_macros::aggregate_root; use rust_decimal::Decimal; pub type BankAccountRepository = aggregate::EventSourcedRepository; @@ -18,7 +19,7 @@ pub struct Transaction { pub type BankAccountHolderId = String; pub type BankAccountId = String; -#[derive(Debug, Clone, PartialEq, Eq, Message)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum BankAccountEvent { WasOpened { id: BankAccountId, @@ -49,6 +50,21 @@ pub enum BankAccountEvent { }, } +impl Message for BankAccountEvent { + fn name(&self) -> &'static str { + match self { + BankAccountEvent::WasOpened { .. } => "BankAccountWasOpened", + BankAccountEvent::DepositWasRecorded { .. } => "BankAccountDepositWasRecorded", + BankAccountEvent::TransferWasSent { .. } => "BankAccountTransferWasSent", + BankAccountEvent::TransferWasReceived { .. } => "BankAccountTransferWasReceived", + BankAccountEvent::TransferWasDeclined { .. } => "BankAccountTransferWasDeclined", + BankAccountEvent::TransferWasConfirmed { .. } => "BankAccountTransferWasConfirmed", + BankAccountEvent::WasClosed { .. } => "BankAccountWasClosed", + BankAccountEvent::WasReopened { .. } => "BankAccountWasReopened", + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] pub enum BankAccountError { #[error("bank account has not been opened yet")] diff --git a/examples/bank-accounting/src/grpc.rs b/examples/bank-accounting/src/grpc.rs index 4c05fdd6..05d6e7c0 100644 --- a/examples/bank-accounting/src/grpc.rs +++ b/examples/bank-accounting/src/grpc.rs @@ -49,7 +49,7 @@ impl proto::bank_accounting_server::BankAccounting for BankAccountingApi { if let Some(EmptyAccountId | EmptyAccountHolderId) = bank_error { tonic::Status::invalid_argument(e.to_string()) - } else if let Some(e) = conflict_error { + } else if conflict_error.is_some() { tonic::Status::already_exists(AlreadyOpened.to_string()) } else { tonic::Status::internal(e.to_string()) diff --git a/examples/bank-accounting/src/lib.rs b/examples/bank-accounting/src/lib.rs index 4f16d4cd..a847cc34 100644 --- a/examples/bank-accounting/src/lib.rs +++ b/examples/bank-accounting/src/lib.rs @@ -1,7 +1,3 @@ -#![deny(unsafe_code, unused_qualifications, trivial_casts)] -#![deny(clippy::all)] -#![warn(clippy::pedantic)] - pub mod application; pub mod domain; pub mod grpc; diff --git a/examples/bank-accounting/src/main.rs b/examples/bank-accounting/src/main.rs index a23729ad..19c1113c 100644 --- a/examples/bank-accounting/src/main.rs +++ b/examples/bank-accounting/src/main.rs @@ -1,12 +1,11 @@ use std::time::Duration; use anyhow::anyhow; -use bank_accounting::domain::BankAccountRepository; +use bank_accounting::domain::{BankAccountEvent, BankAccountRepository}; use bank_accounting::{application, grpc, proto}; -use eventually::serde::prost::MessageSerde; +use eventually::serde; use eventually::tracing::{AggregateRepositoryExt, EventStoreExt}; use eventually_postgres::event; -use tower_http::trace::TraceLayer; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -14,7 +13,10 @@ async fn main() -> anyhow::Result<()> { let pool = bank_accounting::postgres::connect().await?; - let bank_account_event_serde = MessageSerde::::default(); + let bank_account_event_serde = serde::Convert::::new( + serde::Protobuf::::default(), + ); + let bank_account_event_store = event::Store::new(pool, bank_account_event_serde) .await? .with_tracing(); @@ -43,11 +45,11 @@ async fn main() -> anyhow::Result<()> { ); let layer = tower::ServiceBuilder::new() - .layer(TraceLayer::new_for_grpc()) .timeout(Duration::from_secs(5)) .into_inner(); tonic::transport::Server::builder() + .trace_fn(|r| tracing::info_span!("server", uri = r.uri().to_string())) .layer(layer) .add_service(health_svc) .add_service(reflection_svc)