From 191273ccde36bcabdf75320aa27108f14e1ee079 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Mon, 11 Dec 2023 10:52:25 +0100 Subject: [PATCH] refactor(eventually): move to nested, local mod.rs pattern --- .../src/{aggregate.rs => aggregate/mod.rs} | 2 +- eventually/src/{command.rs => command/mod.rs} | 46 ++++++++--------- eventually/src/{event.rs => event/mod.rs} | 0 eventually/src/event/store.rs | 51 ++++++++----------- eventually/src/version.rs | 11 ++++ 5 files changed, 55 insertions(+), 55 deletions(-) rename eventually/src/{aggregate.rs => aggregate/mod.rs} (99%) rename eventually/src/{command.rs => command/mod.rs} (86%) rename eventually/src/{event.rs => event/mod.rs} (100%) diff --git a/eventually/src/aggregate.rs b/eventually/src/aggregate/mod.rs similarity index 99% rename from eventually/src/aggregate.rs rename to eventually/src/aggregate/mod.rs index 4c32ab58..87f49744 100644 --- a/eventually/src/aggregate.rs +++ b/eventually/src/aggregate/mod.rs @@ -387,8 +387,8 @@ pub(crate) mod test_user_domain { mod test { use std::error::Error; - use crate::aggregate::repository::{Getter, Saver}; use crate::aggregate::test_user_domain::{User, UserEvent}; + use crate::aggregate::Repository; use crate::event::store::EventStoreExt; use crate::{aggregate, event, version}; diff --git a/eventually/src/command.rs b/eventually/src/command/mod.rs similarity index 86% rename from eventually/src/command.rs rename to eventually/src/command/mod.rs index 556c8ce5..d34ac9ba 100644 --- a/eventually/src/command.rs +++ b/eventually/src/command/mod.rs @@ -64,11 +64,27 @@ where #[cfg(test)] mod test_user_domain { + use std::sync::Arc; + use async_trait::async_trait; + use crate::aggregate::repository::AnyRepositoryExt; use crate::aggregate::test_user_domain::{User, UserEvent}; use crate::{aggregate, command, event, message}; + struct UserService(Arc>); + + impl From for UserService + where + R: aggregate::Repository + 'static, + >::GetError: std::error::Error + Send + Sync + 'static, + >::SaveError: std::error::Error + Send + Sync + 'static, + { + fn from(value: R) -> Self { + Self(Arc::new(value.with_any_errors())) + } + } + struct CreateUser { email: String, password: String, @@ -80,16 +96,8 @@ mod test_user_domain { } } - struct CreateUserHandler(R) - where - R: aggregate::repository::Saver; - #[async_trait] - impl command::Handler for CreateUserHandler - where - R: aggregate::repository::Saver, - R::Error: std::error::Error + Send + Sync + 'static, - { + impl command::Handler for UserService { type Error = anyhow::Error; async fn handle(&self, command: command::Envelope) -> Result<(), Self::Error> { @@ -113,18 +121,8 @@ mod test_user_domain { } } - struct ChangeUserPasswordHandler(R) - where - R: aggregate::Repository; - #[async_trait] - impl command::Handler for ChangeUserPasswordHandler - where - R: aggregate::Repository, - >::Error: - std::error::Error + Send + Sync + 'static, - >::Error: std::error::Error + Send + Sync + 'static, - { + impl command::Handler for UserService { type Error = anyhow::Error; async fn handle( @@ -159,7 +157,7 @@ mod test_user_domain { }), }]) .assert_on(|event_store| { - CreateUserHandler(aggregate::EventSourcedRepository::from(event_store)) + UserService::from(aggregate::EventSourcedRepository::from(event_store)) }) .await; } @@ -181,7 +179,7 @@ mod test_user_domain { })) .then_fails() .assert_on(|event_store| { - CreateUserHandler(aggregate::EventSourcedRepository::from(event_store)) + UserService::from(aggregate::EventSourcedRepository::from(event_store)) }) .await; } @@ -209,7 +207,7 @@ mod test_user_domain { }), }]) .assert_on(|event_store| { - ChangeUserPasswordHandler(aggregate::EventSourcedRepository::from(event_store)) + UserService::from(aggregate::EventSourcedRepository::from(event_store)) }) .await; } @@ -223,7 +221,7 @@ mod test_user_domain { })) .then_fails() .assert_on(|event_store| { - ChangeUserPasswordHandler(aggregate::EventSourcedRepository::from(event_store)) + UserService::from(aggregate::EventSourcedRepository::from(event_store)) }) .await; } diff --git a/eventually/src/event.rs b/eventually/src/event/mod.rs similarity index 100% rename from eventually/src/event.rs rename to eventually/src/event/mod.rs diff --git a/eventually/src/event/store.rs b/eventually/src/event/store.rs index 03ed235c..e2b8abaf 100644 --- a/eventually/src/event/store.rs +++ b/eventually/src/event/store.rs @@ -270,6 +270,7 @@ where #[cfg(test)] mod test { use futures::TryStreamExt; + use lazy_static::lazy_static; use super::*; use crate::event; @@ -277,41 +278,45 @@ mod test { use crate::message::tests::StringMessage; use crate::version::Version; - #[tokio::test] - async fn it_works() { - let event_store = InMemory::<&'static str, StringMessage>::default(); + const STREAM_ID: &str = "stream:test"; - let stream_id = "stream:test"; - let events = vec![ + lazy_static! { + static ref EVENTS: Vec> = vec![ event::Envelope::from(StringMessage("event-1")), event::Envelope::from(StringMessage("event-2")), event::Envelope::from(StringMessage("event-3")), ]; + } + + #[tokio::test] + async fn it_works() { + let event_store = InMemory::<&'static str, StringMessage>::default(); let new_event_stream_version = event_store .append( - stream_id, + STREAM_ID, event::StreamVersionExpected::MustBe(0), - events.clone(), + EVENTS.clone(), ) .await .expect("append should not fail"); - let expected_version = events.len() as Version; + let expected_version = EVENTS.len() as Version; assert_eq!(expected_version, new_event_stream_version); - let expected_events = events + let expected_events = EVENTS + .clone() .into_iter() .enumerate() .map(|(i, event)| event::Persisted { - stream_id, + stream_id: STREAM_ID, version: (i as Version) + 1, event, }) .collect::>(); let event_stream: Vec<_> = event_store - .stream(&stream_id, event::VersionSelect::All) + .stream(&STREAM_ID, event::VersionSelect::All) .try_collect() .await .expect("opening an event stream should not fail"); @@ -324,24 +329,17 @@ mod test { let event_store = InMemory::<&'static str, StringMessage>::default(); let tracking_event_store = event_store.with_recorded_events_tracking(); - let stream_id = "stream:test"; - let events = vec![ - event::Envelope::from(StringMessage("event-1")), - event::Envelope::from(StringMessage("event-2")), - event::Envelope::from(StringMessage("event-3")), - ]; - tracking_event_store .append( - stream_id, + STREAM_ID, event::StreamVersionExpected::MustBe(0), - events.clone(), + EVENTS.clone(), ) .await .expect("append should not fail"); let event_stream: Vec<_> = tracking_event_store - .stream(&stream_id, event::VersionSelect::All) + .stream(&STREAM_ID, event::VersionSelect::All) .try_collect() .await .expect("opening an event stream should not fail"); @@ -353,18 +351,11 @@ mod test { async fn version_conflict_checks_work_as_expected() { let event_store = InMemory::<&'static str, StringMessage>::default(); - let stream_id = "stream:test"; - let events = vec![ - event::Envelope::from(StringMessage("event-1")), - event::Envelope::from(StringMessage("event-2")), - event::Envelope::from(StringMessage("event-3")), - ]; - let append_error = event_store .append( - stream_id, + STREAM_ID, event::StreamVersionExpected::MustBe(3), - events.clone(), + EVENTS.clone(), ) .await .expect_err("the event stream version should be zero"); diff --git a/eventually/src/version.rs b/eventually/src/version.rs index 9980431b..c2d59484 100644 --- a/eventually/src/version.rs +++ b/eventually/src/version.rs @@ -1,8 +1,19 @@ +//! Contains the types necessary for Optimistic Locking through versioning. + +/// A version used for Optimistic Locking. +/// +/// Used by the [crate::aggregate::Root] to avoid concurrency issues, +/// and [crate::event::Store] to implement stream-local ordering to the messages. pub type Version = u64; +/// This error is returned by a function when a version conflict error has +/// been detected. #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] #[error("conflict error detected, expected version was: {expected}, found: {actual}")] pub struct ConflictError { + /// The [Version] value that was expected when calling the function that failed. pub expected: Version, + + /// The actual [Version] value, which mismatch caused this error. pub actual: Version, }