Skip to content

Commit

Permalink
Merge pull request #48 from harukizaemon/causation-ids
Browse files Browse the repository at this point in the history
Adds causation_id alongside correlation_id
  • Loading branch information
slashdotdash authored Apr 16, 2017
2 parents f4dd031 + b1cb785 commit 7ea2600
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 6 deletions.
2 changes: 2 additions & 0 deletions lib/event_store/event_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ defmodule EventStore.EventData do
EventData contains the data for a single event before being persisted to storage
"""
defstruct correlation_id: nil,
causation_id: nil,
event_type: nil ,
data: nil,
metadata: nil

@type t :: %EventStore.EventData{
correlation_id: String.t,
causation_id: String.t,
event_type: String.t,
data: binary,
metadata: binary
Expand Down
2 changes: 2 additions & 0 deletions lib/event_store/recorded_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule EventStore.RecordedEvent do
stream_id: non_neg_integer,
stream_version: non_neg_integer,
correlation_id: String.t,
causation_id: String.t,
event_type: String.t,
data: binary,
metadata: binary,
Expand All @@ -21,6 +22,7 @@ defmodule EventStore.RecordedEvent do
stream_id: nil,
stream_version: nil,
correlation_id: nil,
causation_id: nil,
event_type: nil,
data: nil,
metadata: nil,
Expand Down
10 changes: 7 additions & 3 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ CREATE TABLE events
stream_version bigint NOT NULL,
event_type text NOT NULL,
correlation_id text,
causation_id text,
data bytea NOT NULL,
metadata bytea NULL,
created_at timestamp without time zone default (now() at time zone 'utc') NOT NULL
Expand Down Expand Up @@ -111,12 +112,12 @@ RETURNING stream_id;
end

def create_events(number_of_events \\ 1) do
insert = ["INSERT INTO events (event_id, stream_id, stream_version, correlation_id, event_type, data, metadata, created_at) VALUES"]
insert = ["INSERT INTO events (event_id, stream_id, stream_version, correlation_id, causation_id, event_type, data, metadata, created_at) VALUES"]

params =
1..number_of_events
|> Enum.map(fn event_number ->
index = (event_number - 1) * 8
index = (event_number - 1) * 9
event_params = [
"($",
Integer.to_string(index + 1), ", $",
Expand All @@ -126,7 +127,8 @@ RETURNING stream_id;
Integer.to_string(index + 5), ", $",
Integer.to_string(index + 6), ", $",
Integer.to_string(index + 7), ", $",
Integer.to_string(index + 8), ")"
Integer.to_string(index + 8), ", $",
Integer.to_string(index + 9), ")"
]

if event_number == number_of_events do
Expand Down Expand Up @@ -248,6 +250,7 @@ SELECT
stream_version,
event_type,
correlation_id,
causation_id,
data,
metadata,
created_at
Expand All @@ -266,6 +269,7 @@ SELECT
stream_version,
event_type,
correlation_id,
causation_id,
data,
metadata,
created_at
Expand Down
1 change: 1 addition & 0 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule EventStore.Storage.Appender do
event.stream_id,
event.stream_version,
event.correlation_id,
event.causation_id,
event.event_type,
event.data,
event.metadata,
Expand Down
3 changes: 2 additions & 1 deletion lib/event_store/storage/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ defmodule EventStore.Storage.Reader do
|> Enum.map(&to_event_data_from_row/1)
end

def to_event_data_from_row([event_id, stream_id, stream_version, event_type, correlation_id, data, metadata, created_at]) do
def to_event_data_from_row([event_id, stream_id, stream_version, event_type, correlation_id, causation_id, data, metadata, created_at]) do
%RecordedEvent{
event_id: event_id,
stream_id: stream_id,
stream_version: stream_version,
event_type: event_type,
correlation_id: correlation_id,
causation_id: causation_id,
data: data,
metadata: metadata,
created_at: to_naive(created_at),
Expand Down
3 changes: 2 additions & 1 deletion lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ defmodule EventStore.Streams.Stream do
end)
end

defp map_to_recorded_event(%EventData{correlation_id: correlation_id, event_type: event_type, data: data, metadata: metadata}, serializer) do
defp map_to_recorded_event(%EventData{correlation_id: correlation_id, causation_id: causation_id, event_type: event_type, data: data, metadata: metadata}, serializer) do
%RecordedEvent{
correlation_id: correlation_id,
causation_id: causation_id,
event_type: event_type,
data: serializer.serialize(data),
metadata: serializer.serialize(metadata),
Expand Down
3 changes: 2 additions & 1 deletion scripts/events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ select
stream_version,
event_type,
correlation_id,
causation_id,
convert_from(data, current_setting('server_encoding')) as data,
convert_from(metadata, current_setting('server_encoding')) as metadata,
created_at
from events
order by event_id;
order by event_id;
7 changes: 7 additions & 0 deletions test/subscriptions/all_streams_subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do
expected_events = EventFactory.deserialize_events(recorded_events)

assert pluck(received_events, :correlation_id) == pluck(expected_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(expected_events, :causation_id)
assert pluck(received_events, :data) == pluck(expected_events, :data)
end

Expand All @@ -91,6 +92,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do
assert_receive {:events, received_events, nil}

assert pluck(received_events, :correlation_id) == pluck(events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(events, :causation_id)
assert pluck(received_events, :data) == pluck(events, :data)
end

Expand Down Expand Up @@ -192,6 +194,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id)
assert pluck(received_events, :data) == pluck(initial_events, :data)

# don't receive remaining events until ack received for all initial events
Expand All @@ -209,6 +212,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id)
assert pluck(received_events, :data) == pluck(remaining_events, :data)
end

Expand All @@ -232,6 +236,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id)
assert pluck(received_events, :data) == pluck(initial_events, :data)

subscription = StreamSubscription.ack(subscription, 3)
Expand All @@ -243,6 +248,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id)
assert pluck(received_events, :data) == pluck(remaining_events, :data)
end

Expand Down Expand Up @@ -282,6 +288,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id)
assert pluck(received_events, :data) == pluck(remaining_events, :data)
end
end
Expand Down
4 changes: 4 additions & 0 deletions test/subscriptions/single_stream_subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do
expected_events = EventFactory.deserialize_events(recorded_events)

assert pluck(received_events, :correlation_id) == pluck(expected_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(expected_events, :causation_id)
assert pluck(received_events, :data) == pluck(expected_events, :data)
end

Expand Down Expand Up @@ -131,6 +132,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do
assert_receive {:events, received_events, ^self}

assert pluck(received_events, :correlation_id) == pluck(events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(events, :causation_id)
assert pluck(received_events, :data) == pluck(events, :data)
end

Expand Down Expand Up @@ -268,6 +270,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(initial_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(initial_events, :causation_id)
assert pluck(received_events, :data) == pluck(initial_events, :data)

subscription =
Expand All @@ -283,6 +286,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do

assert length(received_events) == 3
assert pluck(received_events, :correlation_id) == pluck(remaining_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(remaining_events, :causation_id)
assert pluck(received_events, :data) == pluck(remaining_events, :data)
end

Expand Down
4 changes: 4 additions & 0 deletions test/support/event_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ defmodule EventStore.EventFactory do

def create_events(number_of_events, initial_event_number \\ 1) when number_of_events > 0 do
correlation_id = UUID.uuid4
causation_id = UUID.uuid4

1..number_of_events
|> Enum.map(fn number ->
%EventData{
correlation_id: correlation_id,
causation_id: causation_id,
event_type: "Elixir.EventStore.EventFactory.Event",
data: %EventStore.EventFactory.Event{event: (initial_event_number + number - 1)},
metadata: %{"user" => "user@example.com"}
Expand All @@ -21,6 +23,7 @@ defmodule EventStore.EventFactory do

def create_recorded_events(number_of_events, stream_id, initial_event_id \\ 1, initial_stream_version \\ 1) when number_of_events > 0 do
correlation_id = UUID.uuid4
causation_id = UUID.uuid4

1..number_of_events
|> Enum.map(fn number ->
Expand All @@ -32,6 +35,7 @@ defmodule EventStore.EventFactory do
stream_id: stream_id,
stream_version: stream_version,
correlation_id: correlation_id,
causation_id: causation_id,
event_type: "Elixir.EventStore.EventFactory.Event",
data: serialize(%EventStore.EventFactory.Event{event: event_id}),
metadata: serialize(%{"user" => "user@example.com"}),
Expand Down

0 comments on commit 7ea2600

Please sign in to comment.