diff --git a/lib/event_store/event_data.ex b/lib/event_store/event_data.ex index 87c05d13..cabcb9e5 100644 --- a/lib/event_store/event_data.ex +++ b/lib/event_store/event_data.ex @@ -3,12 +3,14 @@ defmodule EventStore.EventData do EventData contains the data for a single event before being persisted to storage """ defstruct correlation_id: nil, + causation_id: nil, event_type: nil , data: nil, metadata: nil @type t :: %EventStore.EventData{ correlation_id: String.t, + causation_id: String.t, event_type: String.t, data: binary, metadata: binary diff --git a/lib/event_store/recorded_event.ex b/lib/event_store/recorded_event.ex index b910ff4e..67ec0c77 100644 --- a/lib/event_store/recorded_event.ex +++ b/lib/event_store/recorded_event.ex @@ -10,6 +10,7 @@ defmodule EventStore.RecordedEvent do stream_id: non_neg_integer, stream_version: non_neg_integer, correlation_id: String.t, + causation_id: String.t, event_type: String.t, data: binary, metadata: binary, @@ -21,6 +22,7 @@ defmodule EventStore.RecordedEvent do stream_id: nil, stream_version: nil, correlation_id: nil, + causation_id: nil, event_type: nil, data: nil, metadata: nil, diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index 1bac7d6e..3e612f6e 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -49,6 +49,7 @@ CREATE TABLE events stream_version bigint NOT NULL, event_type text NOT NULL, correlation_id text, + causation_id text, data bytea NOT NULL, metadata bytea NULL, created_at timestamp without time zone default (now() at time zone 'utc') NOT NULL @@ -111,12 +112,12 @@ RETURNING stream_id; end def create_events(number_of_events \\ 1) do - insert = ["INSERT INTO events (event_id, stream_id, stream_version, correlation_id, event_type, data, metadata, created_at) VALUES"] + insert = ["INSERT INTO events (event_id, stream_id, stream_version, correlation_id, causation_id, event_type, data, metadata, created_at) VALUES"] params = 1..number_of_events |> Enum.map(fn event_number -> - index = (event_number - 1) * 8 + index = (event_number - 1) * 9 event_params = [ "($", Integer.to_string(index + 1), ", $", @@ -126,7 +127,8 @@ RETURNING stream_id; Integer.to_string(index + 5), ", $", Integer.to_string(index + 6), ", $", Integer.to_string(index + 7), ", $", - Integer.to_string(index + 8), ")" + Integer.to_string(index + 8), ", $", + Integer.to_string(index + 9), ")" ] if event_number == number_of_events do @@ -248,6 +250,7 @@ SELECT stream_version, event_type, correlation_id, + causation_id, data, metadata, created_at @@ -266,6 +269,7 @@ SELECT stream_version, event_type, correlation_id, + causation_id, data, metadata, created_at diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 774161a2..5b066aa0 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -36,6 +36,7 @@ defmodule EventStore.Storage.Appender do event.stream_id, event.stream_version, event.correlation_id, + event.causation_id, event.event_type, event.data, event.metadata, diff --git a/lib/event_store/storage/reader.ex b/lib/event_store/storage/reader.ex index 49298c33..9e951edd 100644 --- a/lib/event_store/storage/reader.ex +++ b/lib/event_store/storage/reader.ex @@ -55,13 +55,14 @@ defmodule EventStore.Storage.Reader do |> Enum.map(&to_event_data_from_row/1) end - def to_event_data_from_row([event_id, stream_id, stream_version, event_type, correlation_id, data, metadata, created_at]) do + def to_event_data_from_row([event_id, stream_id, stream_version, event_type, correlation_id, causation_id, data, metadata, created_at]) do %RecordedEvent{ event_id: event_id, stream_id: stream_id, stream_version: stream_version, event_type: event_type, correlation_id: correlation_id, + causation_id: causation_id, data: data, metadata: metadata, created_at: to_naive(created_at), diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 8579a81e..b5464187 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -121,9 +121,10 @@ defmodule EventStore.Streams.Stream do end) end - defp map_to_recorded_event(%EventData{correlation_id: correlation_id, event_type: event_type, data: data, metadata: metadata}, serializer) do + defp map_to_recorded_event(%EventData{correlation_id: correlation_id, causation_id: causation_id, event_type: event_type, data: data, metadata: metadata}, serializer) do %RecordedEvent{ correlation_id: correlation_id, + causation_id: causation_id, event_type: event_type, data: serializer.serialize(data), metadata: serializer.serialize(metadata), diff --git a/scripts/events.sql b/scripts/events.sql index 360718d0..961405e4 100644 --- a/scripts/events.sql +++ b/scripts/events.sql @@ -4,8 +4,9 @@ select stream_version, event_type, correlation_id, + causation_id, convert_from(data, current_setting('server_encoding')) as data, convert_from(metadata, current_setting('server_encoding')) as metadata, created_at from events -order by event_id; \ No newline at end of file +order by event_id; diff --git a/test/subscriptions/all_streams_subscription_test.exs b/test/subscriptions/all_streams_subscription_test.exs index e2aa8024..c50ce94e 100644 --- a/test/subscriptions/all_streams_subscription_test.exs +++ b/test/subscriptions/all_streams_subscription_test.exs @@ -74,6 +74,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do expected_events = EventFactory.deserialize_events(recorded_events) assert pluck(received_events, :correlation_id) == pluck(expected_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(expected_events, :causation_id) assert pluck(received_events, :data) == pluck(expected_events, :data) end @@ -91,6 +92,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert_receive {:events, received_events, nil} assert pluck(received_events, :correlation_id) == pluck(events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(events, :causation_id) assert pluck(received_events, :data) == pluck(events, :data) end @@ -192,6 +194,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id) assert pluck(received_events, :data) == pluck(initial_events, :data) # don't receive remaining events until ack received for all initial events @@ -209,6 +212,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id) assert pluck(received_events, :data) == pluck(remaining_events, :data) end @@ -232,6 +236,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id) assert pluck(received_events, :data) == pluck(initial_events, :data) subscription = StreamSubscription.ack(subscription, 3) @@ -243,6 +248,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id) assert pluck(received_events, :data) == pluck(remaining_events, :data) end @@ -282,6 +288,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id) assert pluck(received_events, :data) == pluck(remaining_events, :data) end end diff --git a/test/subscriptions/single_stream_subscription_test.exs b/test/subscriptions/single_stream_subscription_test.exs index 714c4f42..d905cc0b 100644 --- a/test/subscriptions/single_stream_subscription_test.exs +++ b/test/subscriptions/single_stream_subscription_test.exs @@ -81,6 +81,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do expected_events = EventFactory.deserialize_events(recorded_events) assert pluck(received_events, :correlation_id) == pluck(expected_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(expected_events, :causation_id) assert pluck(received_events, :data) == pluck(expected_events, :data) end @@ -131,6 +132,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert_receive {:events, received_events, ^self} assert pluck(received_events, :correlation_id) == pluck(events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(events, :causation_id) assert pluck(received_events, :data) == pluck(events, :data) end @@ -268,6 +270,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id) assert pluck(received_events, :data) == pluck(initial_events, :data) subscription = @@ -283,6 +286,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert length(received_events) == 3 assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id) + assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id) assert pluck(received_events, :data) == pluck(remaining_events, :data) end diff --git a/test/support/event_factory.ex b/test/support/event_factory.ex index 1c0e0feb..1f150ad2 100644 --- a/test/support/event_factory.ex +++ b/test/support/event_factory.ex @@ -7,11 +7,13 @@ defmodule EventStore.EventFactory do def create_events(number_of_events, initial_event_number \\ 1) when number_of_events > 0 do correlation_id = UUID.uuid4 + causation_id = UUID.uuid4 1..number_of_events |> Enum.map(fn number -> %EventData{ correlation_id: correlation_id, + causation_id: causation_id, event_type: "Elixir.EventStore.EventFactory.Event", data: %EventStore.EventFactory.Event{event: (initial_event_number + number - 1)}, metadata: %{"user" => "user@example.com"} @@ -21,6 +23,7 @@ defmodule EventStore.EventFactory do def create_recorded_events(number_of_events, stream_id, initial_event_id \\ 1, initial_stream_version \\ 1) when number_of_events > 0 do correlation_id = UUID.uuid4 + causation_id = UUID.uuid4 1..number_of_events |> Enum.map(fn number -> @@ -32,6 +35,7 @@ defmodule EventStore.EventFactory do stream_id: stream_id, stream_version: stream_version, correlation_id: correlation_id, + causation_id: causation_id, event_type: "Elixir.EventStore.EventFactory.Event", data: serialize(%EventStore.EventFactory.Event{event: event_id}), metadata: serialize(%{"user" => "user@example.com"}),