Skip to content

Commit

Permalink
refactor(eventually): move to nested, local mod.rs pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
ar3s3ru committed Dec 11, 2023
1 parent ca0bf78 commit 191273c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ pub(crate) mod test_user_domain {
mod test {
use std::error::Error;

use crate::aggregate::repository::{Getter, Saver};
use crate::aggregate::test_user_domain::{User, UserEvent};
use crate::aggregate::Repository;
use crate::event::store::EventStoreExt;
use crate::{aggregate, event, version};

Expand Down
46 changes: 22 additions & 24 deletions eventually/src/command.rs → eventually/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,27 @@ where

#[cfg(test)]
mod test_user_domain {
use std::sync::Arc;

use async_trait::async_trait;

use crate::aggregate::repository::AnyRepositoryExt;
use crate::aggregate::test_user_domain::{User, UserEvent};
use crate::{aggregate, command, event, message};

struct UserService(Arc<dyn aggregate::repository::AnyRepository<User>>);

impl<R> From<R> for UserService
where
R: aggregate::Repository<User> + 'static,
<R as aggregate::Repository<User>>::GetError: std::error::Error + Send + Sync + 'static,
<R as aggregate::Repository<User>>::SaveError: std::error::Error + Send + Sync + 'static,
{
fn from(value: R) -> Self {
Self(Arc::new(value.with_any_errors()))
}
}

struct CreateUser {
email: String,
password: String,
Expand All @@ -80,16 +96,8 @@ mod test_user_domain {
}
}

struct CreateUserHandler<R>(R)
where
R: aggregate::repository::Saver<User>;

#[async_trait]
impl<R> command::Handler<CreateUser> for CreateUserHandler<R>
where
R: aggregate::repository::Saver<User>,
R::Error: std::error::Error + Send + Sync + 'static,
{
impl command::Handler<CreateUser> for UserService {
type Error = anyhow::Error;

async fn handle(&self, command: command::Envelope<CreateUser>) -> Result<(), Self::Error> {
Expand All @@ -113,18 +121,8 @@ mod test_user_domain {
}
}

struct ChangeUserPasswordHandler<R>(R)
where
R: aggregate::Repository<User>;

#[async_trait]
impl<R> command::Handler<ChangeUserPassword> for ChangeUserPasswordHandler<R>
where
R: aggregate::Repository<User>,
<R as aggregate::repository::Getter<User>>::Error:
std::error::Error + Send + Sync + 'static,
<R as aggregate::repository::Saver<User>>::Error: std::error::Error + Send + Sync + 'static,
{
impl command::Handler<ChangeUserPassword> for UserService {
type Error = anyhow::Error;

async fn handle(
Expand Down Expand Up @@ -159,7 +157,7 @@ mod test_user_domain {
}),
}])
.assert_on(|event_store| {
CreateUserHandler(aggregate::EventSourcedRepository::from(event_store))
UserService::from(aggregate::EventSourcedRepository::from(event_store))
})
.await;
}
Expand All @@ -181,7 +179,7 @@ mod test_user_domain {
}))
.then_fails()
.assert_on(|event_store| {
CreateUserHandler(aggregate::EventSourcedRepository::from(event_store))
UserService::from(aggregate::EventSourcedRepository::from(event_store))
})
.await;
}
Expand Down Expand Up @@ -209,7 +207,7 @@ mod test_user_domain {
}),
}])
.assert_on(|event_store| {
ChangeUserPasswordHandler(aggregate::EventSourcedRepository::from(event_store))
UserService::from(aggregate::EventSourcedRepository::from(event_store))
})
.await;
}
Expand All @@ -223,7 +221,7 @@ mod test_user_domain {
}))
.then_fails()
.assert_on(|event_store| {
ChangeUserPasswordHandler(aggregate::EventSourcedRepository::from(event_store))
UserService::from(aggregate::EventSourcedRepository::from(event_store))
})
.await;
}
Expand Down
File renamed without changes.
51 changes: 21 additions & 30 deletions eventually/src/event/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,48 +270,53 @@ where
#[cfg(test)]
mod test {
use futures::TryStreamExt;
use lazy_static::lazy_static;

use super::*;
use crate::event;
use crate::event::{Appender, Streamer};
use crate::message::tests::StringMessage;
use crate::version::Version;

#[tokio::test]
async fn it_works() {
let event_store = InMemory::<&'static str, StringMessage>::default();
const STREAM_ID: &str = "stream:test";

let stream_id = "stream:test";
let events = vec![
lazy_static! {
static ref EVENTS: Vec<event::Envelope<StringMessage>> = vec![
event::Envelope::from(StringMessage("event-1")),
event::Envelope::from(StringMessage("event-2")),
event::Envelope::from(StringMessage("event-3")),
];
}

#[tokio::test]
async fn it_works() {
let event_store = InMemory::<&'static str, StringMessage>::default();

let new_event_stream_version = event_store
.append(
stream_id,
STREAM_ID,
event::StreamVersionExpected::MustBe(0),
events.clone(),
EVENTS.clone(),
)
.await
.expect("append should not fail");

let expected_version = events.len() as Version;
let expected_version = EVENTS.len() as Version;
assert_eq!(expected_version, new_event_stream_version);

let expected_events = events
let expected_events = EVENTS
.clone()
.into_iter()
.enumerate()
.map(|(i, event)| event::Persisted {
stream_id,
stream_id: STREAM_ID,
version: (i as Version) + 1,
event,
})
.collect::<Vec<_>>();

let event_stream: Vec<_> = event_store
.stream(&stream_id, event::VersionSelect::All)
.stream(&STREAM_ID, event::VersionSelect::All)
.try_collect()
.await
.expect("opening an event stream should not fail");
Expand All @@ -324,24 +329,17 @@ mod test {
let event_store = InMemory::<&'static str, StringMessage>::default();
let tracking_event_store = event_store.with_recorded_events_tracking();

let stream_id = "stream:test";
let events = vec![
event::Envelope::from(StringMessage("event-1")),
event::Envelope::from(StringMessage("event-2")),
event::Envelope::from(StringMessage("event-3")),
];

tracking_event_store
.append(
stream_id,
STREAM_ID,
event::StreamVersionExpected::MustBe(0),
events.clone(),
EVENTS.clone(),
)
.await
.expect("append should not fail");

let event_stream: Vec<_> = tracking_event_store
.stream(&stream_id, event::VersionSelect::All)
.stream(&STREAM_ID, event::VersionSelect::All)
.try_collect()
.await
.expect("opening an event stream should not fail");
Expand All @@ -353,18 +351,11 @@ mod test {
async fn version_conflict_checks_work_as_expected() {
let event_store = InMemory::<&'static str, StringMessage>::default();

let stream_id = "stream:test";
let events = vec![
event::Envelope::from(StringMessage("event-1")),
event::Envelope::from(StringMessage("event-2")),
event::Envelope::from(StringMessage("event-3")),
];

let append_error = event_store
.append(
stream_id,
STREAM_ID,
event::StreamVersionExpected::MustBe(3),
events.clone(),
EVENTS.clone(),
)
.await
.expect_err("the event stream version should be zero");
Expand Down
11 changes: 11 additions & 0 deletions eventually/src/version.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
//! Contains the types necessary for Optimistic Locking through versioning.
/// A version used for Optimistic Locking.
///
/// Used by the [crate::aggregate::Root] to avoid concurrency issues,

Check warning on line 5 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:5:18 | 5 | /// Used by the [crate::aggregate::Root] to avoid concurrency issues, | ^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 5 | /// Used by the [`crate::aggregate::Root`] to avoid concurrency issues, | ~~~~~~~~~~~~~~~~~~~~~~~~

Check warning on line 5 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:5:18 | 5 | /// Used by the [crate::aggregate::Root] to avoid concurrency issues, | ^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 5 | /// Used by the [`crate::aggregate::Root`] to avoid concurrency issues, | ~~~~~~~~~~~~~~~~~~~~~~~~

Check warning on line 5 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:5:18 | 5 | /// Used by the [crate::aggregate::Root] to avoid concurrency issues, | ^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 5 | /// Used by the [`crate::aggregate::Root`] to avoid concurrency issues, | ~~~~~~~~~~~~~~~~~~~~~~~~
/// and [crate::event::Store] to implement stream-local ordering to the messages.

Check warning on line 6 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:6:10 | 6 | /// and [crate::event::Store] to implement stream-local ordering to the messages. | ^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 6 | /// and [`crate::event::Store`] to implement stream-local ordering to the messages. | ~~~~~~~~~~~~~~~~~~~~~

Check warning on line 6 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:6:10 | 6 | /// and [crate::event::Store] to implement stream-local ordering to the messages. | ^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 6 | /// and [`crate::event::Store`] to implement stream-local ordering to the messages. | ~~~~~~~~~~~~~~~~~~~~~

Check warning on line 6 in eventually/src/version.rs

View workflow job for this annotation

GitHub Actions / clippy

item in documentation is missing backticks

warning: item in documentation is missing backticks --> eventually/src/version.rs:6:10 | 6 | /// and [crate::event::Store] to implement stream-local ordering to the messages. | ^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown help: try | 6 | /// and [`crate::event::Store`] to implement stream-local ordering to the messages. | ~~~~~~~~~~~~~~~~~~~~~
pub type Version = u64;

/// This error is returned by a function when a version conflict error has
/// been detected.
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
#[error("conflict error detected, expected version was: {expected}, found: {actual}")]
pub struct ConflictError {
/// The [Version] value that was expected when calling the function that failed.
pub expected: Version,

/// The actual [Version] value, which mismatch caused this error.
pub actual: Version,
}

0 comments on commit 191273c

Please sign in to comment.