diff --git a/birdie_snapshots/sqlite_store.accepted b/birdie_snapshots/sqlite_store.accepted new file mode 100644 index 0000000..e4acb0f --- /dev/null +++ b/birdie_snapshots/sqlite_store.accepted @@ -0,0 +1,5 @@ +--- +version: 1.1.8 +title: sqlite store +--- +BankAccount(True, 4.01) \ No newline at end of file diff --git a/gleam.toml b/gleam.toml index 3daa479..567c888 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "eventsourcing" -version = "0.1.3" +version = "0.2.0" # Fill out these fields if you intend to generate HTML documentation or publish # your project to the Hex package manager. @@ -14,6 +14,7 @@ gleam_otp = ">= 0.10.0 and < 1.0.0" gleam_erlang = ">= 0.25.0 and < 1.0.0" gleam_pgo = ">= 0.13.0 and < 1.0.0" gleam_json = ">= 1.0.1 and < 2.0.0" +sqlight = ">= 0.9.0 and < 1.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index ed39b9a..4a8e072 100644 --- a/manifest.toml +++ b/manifest.toml @@ -6,6 +6,7 @@ packages = [ { name = "backoff", version = "1.1.6", build_tools = ["rebar3"], requirements = [], otp_app = "backoff", source = "hex", outer_checksum = "CF0CFFF8995FB20562F822E5CC47D8CCF664C5ECDC26A684CBE85C225F9D7C39" }, { name = "birdie", version = "1.1.8", build_tools = ["gleam"], requirements = ["argv", "filepath", "glance", "gleam_community_ansi", "gleam_erlang", "gleam_stdlib", "justin", "rank", "simplifile", "trie_again"], otp_app = "birdie", source = "hex", outer_checksum = "D225C0A3035FCD73A88402925A903AAD3567A1515C9EAE8364F11C17AD1805BB" }, { name = "decode", version = "0.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "decode", source = "hex", outer_checksum = "965F517F67B8C172CA27A5C8E34C73733139E8C9E64736181B8C3179281F9793" }, + { name = "esqlite", version = "0.8.8", build_tools = ["rebar3"], requirements = [], otp_app = "esqlite", source = "hex", outer_checksum = "374902457C7D94DC9409C98D3BDD1CA0D50A60DC9F3BDF1FD8EB74C0DCDF02D6" }, { name = "filepath", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "EFB6FF65C98B2A16378ABC3EE2B14124168C0CE5201553DE652E2644DCFDB594" }, { name = "glam", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glam", source = "hex", outer_checksum = "66EC3BCD632E51EED029678F8DF419659C1E57B1A93D874C5131FE220DFAD2B2" }, { name = "glance", version = "0.11.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "glexer"], otp_app = "glance", source = "hex", outer_checksum = "8F3314D27773B7C3B9FB58D8C02C634290422CE531988C0394FA0DF8676B964D" }, @@ -26,6 +27,7 @@ packages = [ { name = "pprint", version = "1.0.3", build_tools = ["gleam"], requirements = ["glam", "gleam_stdlib"], otp_app = "pprint", source = "hex", outer_checksum = "76BBB92E23D12D954BD452686543F29EDE8EBEBB7FC0ACCBCA66EEF276EC3A06" }, { name = "rank", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "rank", source = "hex", outer_checksum = "5660E361F0E49CBB714CC57CC4C89C63415D8986F05B2DA0C719D5642FAD91C9" }, { name = "simplifile", version = "2.0.1", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "5FFEBD0CAB39BDD343C3E1CCA6438B2848847DC170BA2386DF9D7064F34DF000" }, + { name = "sqlight", version = "0.9.0", build_tools = ["gleam"], requirements = ["esqlite", "gleam_stdlib"], otp_app = "sqlight", source = "hex", outer_checksum = "2D9C9BA420A5E7DCE7DB2DAAE4CAB0BE6218BEB48FD1531C583550B3D1316E94" }, { name = "thoas", version = "1.2.1", build_tools = ["rebar3"], requirements = [], otp_app = "thoas", source = "hex", outer_checksum = "E38697EDFFD6E91BD12CEA41B155115282630075C2A727E7A6B2947F5408B86A" }, { name = "trie_again", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "trie_again", source = "hex", outer_checksum = "5B19176F52B1BD98831B57FDC97BD1F88C8A403D6D8C63471407E78598E27184" }, ] @@ -40,3 +42,4 @@ gleam_pgo = { version = ">= 0.13.0 and < 1.0.0" } gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } pprint = { version = ">= 1.0.3 and < 2.0.0" } +sqlight = { version = ">= 0.9.0 and < 1.0.0" } diff --git a/src/eventsourcing/sqlite_store.gleam b/src/eventsourcing/sqlite_store.gleam new file mode 100644 index 0000000..a8ab830 --- /dev/null +++ b/src/eventsourcing/sqlite_store.gleam @@ -0,0 +1,240 @@ +import eventsourcing +import gleam/dynamic +import gleam/int +import gleam/io +import gleam/list +import gleam/pair +import gleam/result +import sqlight + +// CONSTANTS ---- + +const insert_event_query = " + INSERT INTO event + (aggregate_type, aggregate_id, sequence, event_type, event_version, payload) + VALUES + ($1, $2, $3, $4, $5, $6) + " + +const select_events_query = " + SELECT aggregate_type, aggregate_id, sequence, event_type, event_version, payload + FROM event + WHERE aggregate_type = $1 AND aggregate_id = $2 + ORDER BY sequence + " + +const create_event_table_query = " + CREATE TABLE IF NOT EXISTS event + ( + aggregate_type text NOT NULL, + aggregate_id text NOT NULL, + sequence bigint CHECK (sequence >= 0) NOT NULL, + event_type text NOT NULL, + event_version text NOT NULL, + payload text NOT NULL, + PRIMARY KEY (aggregate_type, aggregate_id, sequence) + ); + " + +// TYPES ---- + +pub opaque type SqliteStore(entity, command, event, error) { + SqliteStore( + db: sqlight.Connection, + empty_aggregate: eventsourcing.Aggregate(entity, command, event, error), + event_encoder: fn(event) -> String, + event_decoder: fn(String) -> Result(event, List(dynamic.DecodeError)), + event_type: String, + event_version: String, + aggregate_type: String, + ) +} + +// CONSTRUCTORS ---- + +pub fn new( + sqlight_connection sqlight_connection: sqlight.Connection, + empty_entity empty_entity: entity, + handle_command_function handle: eventsourcing.Handle( + entity, + command, + event, + error, + ), + apply_function apply: eventsourcing.Apply(entity, event), + event_encoder event_encoder: fn(event) -> String, + event_decoder event_decoder: fn(String) -> + Result(event, List(dynamic.DecodeError)), + event_type event_type: String, + event_version event_version: String, + aggregate_type aggregate_type: String, +) -> eventsourcing.EventStore( + SqliteStore(entity, command, event, error), + entity, + command, + event, + error, +) { + let eventstore = + SqliteStore( + db: sqlight_connection, + empty_aggregate: eventsourcing.Aggregate(empty_entity, handle, apply), + event_encoder:, + event_decoder:, + event_type:, + event_version:, + aggregate_type:, + ) + + eventsourcing.EventStore( + eventstore:, + commit: commit, + load_aggregate: load_aggregate, + ) +} + +pub fn create_event_table( + sqlite_store: SqliteStore(entity, command, event, error), +) { + sqlight.query( + create_event_table_query, + on: sqlite_store.db, + with: [], + expecting: dynamic.dynamic, + ) +} + +pub fn load_aggregate_entity( + sqlite_store: SqliteStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, +) -> entity { + load_aggregate(sqlite_store, aggregate_id).aggregate.entity +} + +pub fn load_events( + sqlite_store: SqliteStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, +) { + use resulted <- result.map(sqlight.query( + select_events_query, + on: sqlite_store.db, + with: [ + sqlight.text(sqlite_store.aggregate_type), + sqlight.text(aggregate_id), + ], + expecting: dynamic.decode6( + eventsourcing.SerializedEventEnvelop, + dynamic.element(1, dynamic.string), + dynamic.element(2, dynamic.int), + dynamic.element(5, fn(dyn) { + let assert Ok(payload) = + dynamic.string(dyn) |> result.map(sqlite_store.event_decoder) + payload + }), + dynamic.element(3, dynamic.string), + dynamic.element(4, dynamic.string), + dynamic.element(0, dynamic.string), + ), + )) + resulted +} + +fn load_aggregate( + sqlite_store: SqliteStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, +) -> eventsourcing.AggregateContext(entity, command, event, error) { + let assert Ok(commited_events) = load_events(sqlite_store, aggregate_id) + + let #(aggregate, sequence) = + list.fold( + over: commited_events, + from: #(sqlite_store.empty_aggregate, 0), + with: fn(aggregate_and_sequence, event_envelop) { + let #(aggregate, _) = aggregate_and_sequence + #( + eventsourcing.Aggregate( + ..aggregate, + entity: aggregate.apply(aggregate.entity, event_envelop.payload), + ), + event_envelop.sequence, + ) + }, + ) + eventsourcing.AggregateContext(aggregate_id:, aggregate:, sequence:) +} + +fn commit( + sqlite_store: SqliteStore(entity, command, event, error), + context: eventsourcing.AggregateContext(entity, command, event, error), + events: List(event), +) { + let eventsourcing.AggregateContext(aggregate_id, _, sequence) = context + let wrapped_events = wrap_events(sqlite_store, aggregate_id, events, sequence) + persist_events(sqlite_store, wrapped_events) + io.println( + "storing: " + <> wrapped_events |> list.length |> int.to_string + <> " events for Aggregate ID '" + <> aggregate_id + <> "'", + ) + wrapped_events +} + +fn wrap_events( + postgres_store: SqliteStore(entity, command, event, error), + aggregate_id: eventsourcing.AggregateId, + events: List(event), + sequence: Int, +) -> List(eventsourcing.EventEnvelop(event)) { + list.map_fold( + over: events, + from: sequence, + with: fn(sequence: Int, event: event) { + let next_sequence = sequence + 1 + #( + next_sequence, + eventsourcing.SerializedEventEnvelop( + aggregate_id:, + sequence: sequence + 1, + payload: event, + event_type: postgres_store.event_type, + event_version: postgres_store.event_version, + aggregate_type: postgres_store.aggregate_type, + ), + ) + }, + ) + |> pair.second +} + +fn persist_events( + sqlite_store: SqliteStore(entity, command, event, error), + wrapped_events: List(eventsourcing.EventEnvelop(event)), +) { + wrapped_events + |> list.map(fn(event) { + let assert eventsourcing.SerializedEventEnvelop( + aggregate_id, + sequence, + payload, + event_type, + event_version, + aggregate_type, + ) = event + + sqlight.query( + insert_event_query, + on: sqlite_store.db, + with: [ + sqlight.text(aggregate_type), + sqlight.text(aggregate_id), + sqlight.int(sequence), + sqlight.text(event_type), + sqlight.text(event_version), + sqlight.text(payload |> sqlite_store.event_encoder), + ], + expecting: dynamic.dynamic, + ) + }) +} diff --git a/test/sqlite_store_test.gleam b/test/sqlite_store_test.gleam new file mode 100644 index 0000000..32169cb --- /dev/null +++ b/test/sqlite_store_test.gleam @@ -0,0 +1,197 @@ +import birdie +import decode +import eventsourcing +import eventsourcing/sqlite_store +import gleam/dynamic +import gleam/int +import gleam/io +import gleam/json +import gleam/list +import gleam/result +import gleeunit +import gleeunit/should +import pprint +import sqlight + +pub fn main() { + gleeunit.main() +} + +pub fn sqlite_store_test() { + let assert Ok(db) = sqlight.open(":memory:") + let sqlite_store = + sqlite_store.new( + sqlight_connection: db, + empty_entity: BankAccount(opened: False, balance: 0.0), + handle_command_function: handle, + apply_function: apply, + event_encoder: event_encoder, + event_decoder: event_decoder, + event_type: bank_account_event_type, + event_version: "1.0", + aggregate_type: bank_account_type, + ) + let query = fn( + aggregate_id: String, + events: List(eventsourcing.EventEnvelop(BankAccountEvent)), + ) { + io.println_error( + "Aggregate Bank Account with ID: " + <> aggregate_id + <> " commited " + <> events |> list.length |> int.to_string + <> " events.", + ) + } + sqlite_store.create_event_table(sqlite_store.eventstore) + |> should.be_ok + + let event_sourcing = eventsourcing.new(sqlite_store, [query]) + + eventsourcing.execute( + event_sourcing, + "92085b42-032c-4d7a-84de-a86d67123858", + OpenAccount("92085b42-032c-4d7a-84de-a86d67123858"), + ) + |> should.be_ok + |> should.equal(Nil) + + eventsourcing.execute( + event_sourcing, + "92085b42-032c-4d7a-84de-a86d67123858", + DepositMoney(10.0), + ) + |> should.be_ok + |> should.equal(Nil) + + eventsourcing.execute( + event_sourcing, + "92085b42-032c-4d7a-84de-a86d67123858", + WithDrawMoney(5.99), + ) + |> should.be_ok + |> should.equal(Nil) + + sqlite_store.load_aggregate_entity( + sqlite_store.eventstore, + "92085b42-032c-4d7a-84de-a86d67123858", + ) + |> pprint.format + |> birdie.snap(title: "sqlite store") +} + +pub type BankAccount { + BankAccount(opened: Bool, balance: Float) +} + +const bank_account_type = "BankAccount" + +pub type BankAccountCommand { + OpenAccount(account_id: String) + DepositMoney(amount: Float) + WithDrawMoney(amount: Float) +} + +pub type BankAccountEvent { + AccountOpened(account_id: String) + CustomerDepositedCash(amount: Float, balance: Float) + CustomerWithdrewCash(amount: Float, balance: Float) +} + +const bank_account_event_type = "BankAccountEvent" + +pub fn handle( + bank_account: BankAccount, + command: BankAccountCommand, +) -> Result(List(BankAccountEvent), Nil) { + case command { + OpenAccount(account_id) -> Ok([AccountOpened(account_id)]) + DepositMoney(amount) -> { + let balance = bank_account.balance +. amount + case amount >. 0.0 { + True -> Ok([CustomerDepositedCash(amount:, balance:)]) + False -> Error(Nil) + } + } + WithDrawMoney(amount) -> { + let balance = bank_account.balance -. amount + case amount >. 0.0 && balance >. 0.0 { + True -> Ok([CustomerWithdrewCash(amount:, balance:)]) + False -> Error(Nil) + } + } + } +} + +pub fn apply(bank_account: BankAccount, event: BankAccountEvent) { + case event { + AccountOpened(_) -> BankAccount(..bank_account, opened: True) + CustomerDepositedCash(_, balance) -> BankAccount(..bank_account, balance:) + CustomerWithdrewCash(_, balance) -> BankAccount(..bank_account, balance:) + } +} + +pub fn event_encoder(event: BankAccountEvent) -> String { + case event { + AccountOpened(account_id) -> + json.object([ + #("event-type", json.string("account-opened")), + #("account-id", json.string(account_id)), + ]) + CustomerDepositedCash(amount, balance) -> + json.object([ + #("event-type", json.string("customer-deposited-cash")), + #("amount", json.float(amount)), + #("balance", json.float(balance)), + ]) + CustomerWithdrewCash(amount, balance) -> + json.object([ + #("event-type", json.string("customer-withdrew-cash")), + #("amount", json.float(amount)), + #("balance", json.float(balance)), + ]) + } + |> json.to_string +} + +pub fn event_decoder( + string: String, +) -> Result(BankAccountEvent, List(dynamic.DecodeError)) { + let account_opened_decoder = + decode.into({ + use account_id <- decode.parameter + AccountOpened(account_id) + }) + |> decode.field("account-id", decode.string) + + let customer_deposited_cash = + decode.into({ + use amount <- decode.parameter + use balance <- decode.parameter + CustomerDepositedCash(amount, balance) + }) + |> decode.field("amount", decode.float) + |> decode.field("balance", decode.float) + + let customer_withdrew_cash = + decode.into({ + use amount <- decode.parameter + use balance <- decode.parameter + CustomerWithdrewCash(amount, balance) + }) + |> decode.field("amount", decode.float) + |> decode.field("balance", decode.float) + + let decoder = + decode.at(["event-type"], decode.string) + |> decode.then(fn(event_type) { + case event_type { + "account-opened" -> account_opened_decoder + "customer-deposited-cash" -> customer_deposited_cash + "customer-withdrew-cash" -> customer_withdrew_cash + _ -> decode.fail("event-type") + } + }) + json.decode(from: string, using: decode.from(decoder, _)) + |> result.map_error(fn(_) { [] }) +}