diff --git a/gleam.toml b/gleam.toml index 9940e52..68a7dd2 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "eventsourcing" -version = "0.1.1" +version = "0.1.2" # Fill out these fields if you intend to generate HTML documentation or publish # your project to the Hex package manager. diff --git a/src/eventsourcing.gleam b/src/eventsourcing.gleam index 3f48f73..d76ddeb 100644 --- a/src/eventsourcing.gleam +++ b/src/eventsourcing.gleam @@ -1,10 +1,37 @@ import gleam/list import gleam/result +/// Used by the EventStore implementations +pub type Aggregate(entity, command, event, error) { + Aggregate( + entity: entity, + handle: Handle(entity, command, event, error), + apply: Apply(entity, event), + ) +} + +/// Used by the EventStore implementations +pub type AggregateContext(entity, command, event, error) { + AggregateContext( + aggregate_id: AggregateId, + aggregate: Aggregate(entity, command, event, error), + sequence: Int, + ) +} + type AggregateId = String -pub type EventSourcing( +type Handle(entity, command, event, error) = + fn(entity, command) -> Result(List(event), error) + +type Apply(entity, event) = + fn(entity, event) -> entity + +type Query(event) = + fn(AggregateId, List(EventEnvelop(event))) -> Nil + +pub opaque type EventSourcing( eventstore, entity, command, @@ -18,10 +45,7 @@ pub type EventSourcing( ) } -pub fn new(event_store, queries) { - EventSourcing(event_store:, queries:) -} - +/// Wrapper around the event store implementations pub type EventStore(eventstore, entity, command, event, error) { EventStore( eventstore: eventstore, @@ -36,32 +60,83 @@ pub type EventStore(eventstore, entity, command, event, error) { ) } -pub type AggregateContext(entity, command, event, error) { - AggregateContext( - aggregate_id: AggregateId, - aggregate: Aggregate(entity, command, event, error), - sequence: Int, - ) -} - -/// Aggregate -type Handle(entity, command, event, error) = - fn(entity, command) -> Result(List(event), error) - -type Apply(entity, event) = - fn(entity, event) -> entity - -type Query(event) = - fn(AggregateId, List(EventEnvelop(event))) -> Nil - -pub type Aggregate(entity, command, event, error) { - Aggregate( - entity: entity, - handle: Handle(entity, command, event, error), - apply: Apply(entity, event), - ) +/// Create a new EventSourcing instance providing +/// an Event Store and a list of queries you want +/// run whenever events are commited. +/// +/// # Examples +/// ```gleam +/// pub type BankAccount { +/// BankAccount(opened: Bool, balance: Float) +/// } +/// +/// pub type BankAccountCommand { +/// OpenAccount(account_id: String) +/// DepositMoney(amount: Float) +/// WithDrawMoney(amount: Float) +/// } +/// +/// pub type BankAccountEvent { +/// AccountOpened(account_id: String) +/// CustomerDepositedCash(amount: Float, balance: Float) +/// CustomerWithdrewCash(amount: Float, balance: Float) +/// } +/// +/// pub fn handle( +/// bank_account: BankAccount, +/// command: BankAccountCommand, +/// ) -> Result(List(BankAccountEvent), Nil) { +/// case command { +/// OpenAccount(account_id) -> Ok([AccountOpened(account_id)]) +/// DepositMoney(amount) -> { +/// let balance = bank_account.balance +. amount +/// case amount >. 0.0 { +/// True -> Ok([CustomerDepositedCash(amount:, balance:)]) +/// False -> Error(Nil) +/// } +/// } +/// WithDrawMoney(amount) -> { +/// let balance = bank_account.balance -. amount +/// case amount >. 0.0 && balance >. 0.0 { +/// True -> Ok([CustomerWithdrewCash(amount:, balance:)]) +/// False -> Error(Nil) +/// } +/// } +/// } +/// } +/// +/// pub fn apply(bank_account: BankAccount, event: BankAccountEvent) { +/// case event { +/// AccountOpened(_) -> BankAccount(..bank_account, opened: True) +/// CustomerDepositedCash(_, balance) -> BankAccount(..bank_account, balance:) +/// CustomerWithdrewCash(_, balance) -> BankAccount(..bank_account, balance:) +/// } +/// } +/// fn main() { +/// let mem_store = +/// memory_store.new(BankAccount(opened: False, balance: 0.0), handle, apply) +/// let query = fn( +/// aggregate_id: String, +/// events: List(eventsourcing.EventEnvelop(BankAccountEvent)), +/// ) { +/// io.println( +/// "Aggregate Bank Account with ID: " +/// <> aggregate_id +/// <> " commited " +/// <> events |> list.length |> int.to_string +/// <> " events.", +/// ) +/// } +/// let event_sourcing = eventsourcing.new(mem_store, [query]) +/// } +/// ``` +pub fn new(event_store, queries) { + EventSourcing(event_store:, queries:) } +/// The main function of the package. +/// Run execute with your event_sourcing instance and the command you want to apply. +/// It will return a Result with Ok(Nil) or Error(your domain error) if the command failed. pub fn execute( event_sourcing: EventSourcing( eventstore, @@ -95,6 +170,9 @@ pub fn execute( Nil } +/// An EventEnvelop is a wrapper around your domain events +/// used by the Event Stores. You can use this type constructor +/// if the event store provides a `load_events` function. pub type EventEnvelop(event) { MemoryStoreEventEnvelop( aggregate_id: AggregateId, diff --git a/src/eventsourcing/memory_store.gleam b/src/eventsourcing/memory_store.gleam index 9f26121..ed00b92 100644 --- a/src/eventsourcing/memory_store.gleam +++ b/src/eventsourcing/memory_store.gleam @@ -77,7 +77,7 @@ fn load_commited_events( pub fn load_events( memory_store: MemoryStore(entity, command, event, error), aggregate_id: eventsourcing.AggregateId, -) { +) -> List(eventsourcing.EventEnvelop(event)) { load_commited_events(memory_store, aggregate_id) |> fn(events) { io.println( @@ -91,7 +91,14 @@ pub fn load_events( } } -pub fn load_aggregate( +pub fn load_aggregate_entity( + memory_store: MemoryStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, +) -> entity { + load_aggregate(memory_store, aggregate_id).aggregate.entity +} + +fn load_aggregate( memory_store: MemoryStore(entity, command, event, error), aggregate_id: eventsourcing.AggregateId, ) -> eventsourcing.AggregateContext(entity, command, event, error) { @@ -119,7 +126,7 @@ fn commit( memory_store: MemoryStore(entity, command, event, error), context: eventsourcing.AggregateContext(entity, command, event, error), events: List(event), -) { +) -> List(eventsourcing.EventEnvelop(event)) { let eventsourcing.AggregateContext(aggregate_id, _, sequence) = context let wrapped_events = wrap_events(aggregate_id, sequence, events) let past_events = load_commited_events(memory_store, aggregate_id) diff --git a/src/eventsourcing/postgres_store.gleam b/src/eventsourcing/postgres_store.gleam index e0c23d8..9e8d5e6 100644 --- a/src/eventsourcing/postgres_store.gleam +++ b/src/eventsourcing/postgres_store.gleam @@ -56,10 +56,17 @@ pub fn new( ) } -pub fn load_aggregate( +pub fn load_aggregate_entity( postgres_store: PostgresStore(entity, command, event, error), aggregate_id: eventsourcing.AggregateId, -) { +) -> entity { + load_aggregate(postgres_store, aggregate_id).aggregate.entity +} + +fn load_aggregate( + postgres_store: PostgresStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, +) -> eventsourcing.AggregateContext(entity, command, event, error) { let assert Ok(commited_events) = load_events(postgres_store, aggregate_id) let #(aggregate, sequence) = @@ -83,7 +90,7 @@ pub fn load_aggregate( pub fn load_events( postgres_store: PostgresStore(entity, command, event, error), aggregate_id: eventsourcing.AggregateId, -) { +) -> Result(List(eventsourcing.EventEnvelop(event)), pgo.QueryError) { use resulted <- result.map(pgo.execute( select_events(), on: postgres_store.db, diff --git a/test/eventsourcing_test.gleam b/test/eventsourcing_test.gleam index c24d8bf..0e78cb4 100644 --- a/test/eventsourcing_test.gleam +++ b/test/eventsourcing_test.gleam @@ -2,8 +2,14 @@ import birdie import decode import eventsourcing import eventsourcing/memory_store +import eventsourcing/postgres_store import gleam/dynamic +import gleam/int +import gleam/io import gleam/json +import gleam/list +import gleam/option +import gleam/pgo import gleam/result import gleeunit import gleeunit/should @@ -16,7 +22,19 @@ pub fn main() { pub fn memory_store_execute_open_account_test() { let mem_store = memory_store.new(BankAccount(opened: False, balance: 0.0), handle, apply) - let event_sourcing = eventsourcing.new(mem_store, []) + let query = fn( + aggregate_id: String, + events: List(eventsourcing.EventEnvelop(BankAccountEvent)), + ) { + io.println( + "Aggregate Bank Account with ID: " + <> aggregate_id + <> " commited " + <> events |> list.length |> int.to_string + <> " events.", + ) + } + let event_sourcing = eventsourcing.new(mem_store, [query]) eventsourcing.execute( event_sourcing, "92085b42-032c-4d7a-84de-a86d67123858", @@ -25,9 +43,45 @@ pub fn memory_store_execute_open_account_test() { |> should.be_ok |> should.equal(Nil) - memory_store.load_aggregate( + memory_store.load_aggregate_entity( mem_store.eventstore, "92085b42-032c-4d7a-84de-a86d67123858", + ) + |> pprint.format + |> birdie.snap(title: "memory store open account") +} + +pub fn postgres_store_execute_open_account_test() { + let postgres_store = + postgres_store.new( + pgo_config: pgo.Config( + ..pgo.default_config(), + host: "localhost", + database: "postgres", + pool_size: 15, + password: option.Some("postgres"), + ), + emtpy_entity: BankAccount(opened: False, balance: 0.0), + handle_command_function: handle, + apply_function: apply, + event_encoder: encode_event, + event_decoder: decode_event, + event_type: event_type(), + event_version: "1", + aggregate_type: aggregate_type(), + ) + let event_sourcing = eventsourcing.new(postgres_store, []) + eventsourcing.execute( + event_sourcing, + "92085b42-032c-4d7a-84de-a86d67123858", + OpenAccount("92085b42-032c-4d7a-84de-a86d67123858"), + ) + |> should.be_ok + |> should.equal(Nil) + + postgres_store.load_aggregate( + postgres_store.eventstore, + "92085b42-032c-4d7a-84de-a86d67123858", ).aggregate.entity |> pprint.format |> birdie.snap(title: "memory store open account")