diff --git a/CHANGELOG.md b/CHANGELOG.md index 232181b3..4ff66eff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Publish events directly to subscriptions ([#93](https://github.com/commanded/eventstore/pull/93)). - Use PostgreSQL advisory locks to enforce only one subscription instance ([#98](https://github.com/commanded/eventstore/pull/98)). - Remove stream process ([#99](https://github.com/commanded/eventstore/pull/99)). +- Use PostgreSQL's `NOTIFY` / `LISTEN` for event pub/sub ([#100](https://github.com/commanded/eventstore/pull/100)). ## v0.13.2 diff --git a/README.md b/README.md index 10a87c54..14881802 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ CQRS event store implemented in Elixir. Uses [PostgreSQL](http://www.postgresql. Requires Elixir v1.5 and PostgreSQL v9.5 or newer. -EventStore supports [running on a cluster of nodes](guides/Cluster.md) (since v0.11). Stream processes will be distributed amongst all available nodes, events are published to subscribers running on any node. +EventStore supports [running on a cluster of nodes](guides/Cluster.md). [Changelog](CHANGELOG.md) diff --git a/guides/Cluster.md b/guides/Cluster.md index 213a9890..fb29a562 100644 --- a/guides/Cluster.md +++ b/guides/Cluster.md @@ -1,49 +1,27 @@ # Cluster -EventStore supports running on a cluster of nodes. It uses the [Swarm](https://hex.pm/packages/swarm) library for process distribution. +EventStore supports running on multiple nodes as either a [distributed Erlang](http://erlang.org/doc/reference_manual/distributed.html) cluster or as multiple single instance nodes. -## Running on a cluster +## Event publication -1. Add `:swarm` as a dependency in your `mix.exs` file: +PostgreSQL `LISTEN` / `NOTIFY` is used to pub/sub event notifications. - ```elixir - defp deps do - [ - {:swarm, "~> 3.1"}, - ] - end - ``` +A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast to all connected nodes using Erlang's [pg2](http://erlang.org/doc/man/pg2.html) process groups. This limits the number of database connections to at most the number of running clusters. -2. Fetch the dependencies: +Running EventStore on multiple nodes that are not connected together to form a cluster will result in one listener process and database connection per node. - ```console - $ mix deps.get - ``` +## Running on a cluster -3. Configure the EventStore to use the `:distributed` registry in the environment config (e.g. `config/config.exs`): +1. Configure the EventStore to use the `:distributed` registry in the environment config (e.g. `config/config.exs`): ```elixir config :eventstore, registry: :distributed ``` -4. Swarm must be configured to use the `Swarm.Distribution.StaticQuorumRing` distribution strategy: - - ```elixir - config :swarm, - nodes: [:"node1@127.0.0.1", :"node2@127.0.0.1", :"node3@127.0.0.1"], - node_blacklist: [~r/^primary@.+$/], - distribution_strategy: Swarm.Distribution.StaticQuorumRing, - static_quorum_size: 2, - sync_nodes_timeout: 0, - debug: false - ``` - - This is to ensure consistency during a network partition. The `static_quorum_size` setting defines the minimum number of nodes that must be connected in the cluster to allow process registration and distribution. If there are fewer nodes currently available than the quorum size, any calls to the `EventStore` will return `{:error, :no_node_available}`. - ## Automatic cluster formation -Swarm can be used with [libcluster](https://github.com/bitwalker/libcluster), a library that provides a mechanism for automatically forming clusters of Erlang nodes, with either static or dynamic node membership. +You can use [libcluster](https://github.com/bitwalker/libcluster) to automatically form clusters of Erlang nodes, with either static or dynamic node membership. You will need to include `libcluster` as an additional dependency: @@ -55,7 +33,7 @@ defp deps do end ``` -Then configure the cluster topology in the environment config (e.g. `config/config.exs`). An example is shown below using the standard Erlang `epmd` daemon strategy: +Then configure your preferred cluster topology in the environment config (e.g. `config/config.exs`). An example is shown below using the standard Erlang `epmd` daemon strategy: ```elixir config :libcluster, @@ -67,7 +45,7 @@ config :libcluster, ] ``` -Please refer to the [libcluster docs](https://hexdocs.pm/libcluster/) for more detail. +Please refer to the [libcluster documentation](https://hexdocs.pm/libcluster/) for more detail. ### Starting a cluster @@ -111,7 +89,7 @@ config :kernel, The `sync_nodes_timeout` can be configured as `:infinity` to wait indefinitely for all nodes to connect. All involved nodes must have the same value for `sync_nodes_timeout`. -This approach will only work for Elixir releases. You will need to use [Erlang's `sys.config`](http://erlang.org/doc/man/config.html) file for development purposes. +The above approach will *only work* for Elixir releases. You will need to use [Erlang's `sys.config`](http://erlang.org/doc/man/config.html) file for development purposes. The Erlang equivalent of the `:kernerl` mix config, as above, is: @@ -153,6 +131,8 @@ Once the cluster has formed, you can use the EventStore API from any node. Strea ## Usage +Using the EventStore when run on a cluster of nodes is identical to single node usage. You can subscibe to a stream, or all streams, on one node and append events to the stream on another. The subscription will be notified of the appended events. + ### Append events to a stream ```elixir @@ -182,17 +162,3 @@ receive do IO.puts reply end ``` - -## Cluster diagnostics - -Peek into the Swarm process registry: - -```elixir -Swarm.Registry.registered() -``` - -Discover which node a stream process is running on: - -```elixir -stream_uuid |> EventStore.Streams.Stream.name() |> Swarm.whereis_name() |> node() -``` diff --git a/guides/Subscriptions.md b/guides/Subscriptions.md index ac47b971..eac62379 100644 --- a/guides/Subscriptions.md +++ b/guides/Subscriptions.md @@ -2,13 +2,21 @@ Subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. -A subscription can be created to receive events published from a single logical stream or from all streams. +A subscription can be created to receive events published from a single or all streams. Events are received in batches after being persisted to storage. Each batch contains events from a single stream and for the same correlation id. Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an error. -By default subscriptions are created from the single stream, or all stream, origin. So it will receive all events from the single stream, or all streams. You can optionally specify a given start position: +## Event pub/sub + +PostgreSQL's `LISTEN` and `NOTIFY` commands are used to pub/sub event notifications from the database. An after update trigger on the `event_counter` table is used to execute `NOTIFY` for each batch of inserted events. The notification payload contains the first and last event number (e.g. `1,5`). + +A single listener process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running the EventStore on multiple nodes, regardless of whether they are connected together to form a cluster. A single listener will be used when nodes form a cluster, otherwise one connection per node is used. + +## Subscription start from + +By default subscriptions are created from the stream origin; they will receive all events from the stream. You can optionally specify a given start position: - `:origin` - subscribe to events from the start of the stream (identical to using 0). This is the current behaviour and will remain the default. - `:current` - subscribe to events from the current version. @@ -92,7 +100,11 @@ You can provide an event mapping function that runs in the subscription process, Subscribe to all streams and provide a `mapper` function that sends only the event data: ```elixir -{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), mapper: fn %EventStore.RecordedEvent{event_number: event_number: data: data} -> {event_number, data} end) +mapper = fn %EventStore.RecordedEvent{event_number: event_number, data: data} -> + {event_number, data} +end + +{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), mapper: mapper) receive do {:events, mapped_events} -> diff --git a/lib/event_store/subscriptions/stream_subscription.ex b/lib/event_store/subscriptions/stream_subscription.ex index e98fc0a2..40acc070 100644 --- a/lib/event_store/subscriptions/stream_subscription.ex +++ b/lib/event_store/subscriptions/stream_subscription.ex @@ -162,19 +162,19 @@ defmodule EventStore.Subscriptions.StreamSubscription do case first_event_number do past when past < expected_event -> - Logger.info(fn -> describe(data) <> " received past event(s), ignoring" end) + Logger.debug(fn -> describe(data) <> " received past event(s), ignoring" end) # ignore already seen events next_state(:subscribed, data) future when future > expected_event -> - Logger.info(fn -> describe(data) <> " received unexpected event(s), requesting catch up" end) + Logger.debug(fn -> describe(data) <> " received unexpected event(s), requesting catch up" end) # missed events, go back and catch-up with unseen next_state(:request_catch_up, data) ^next_ack -> - Logger.info(fn -> describe(data) <> " is notifying subscriber with #{length(events)} event(s)" end) + Logger.debug(fn -> describe(data) <> " is notifying subscriber with #{length(events)} event(s)" end) # subscriber is up-to-date, so send events notify_subscriber(data, events) @@ -187,7 +187,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:subscribed, data) ^expected_event -> - Logger.info(fn -> describe(data) <> " received event(s) but still waiting for subscriber to ack, queueing event(s)" end) + Logger.debug(fn -> describe(data) <> " received event(s) but still waiting for subscriber to ack, queueing event(s)" end) # subscriber has not yet ack'd last seen event so store pending events # until subscriber ready to receive (back pressure) diff --git a/mix.lock b/mix.lock index 3a422533..3d6f1d93 100644 --- a/mix.lock +++ b/mix.lock @@ -9,15 +9,11 @@ "ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, "fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], [], "hexpm"}, "fsm": {:hex, :fsm, "0.3.0", "d00e0a3c68f8cf8feb24ce3a732164638ec652c48ce416b66d4e375b6ee415eb", [:mix], [], "hexpm"}, - "gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [], [], "hexpm"}, - "gen_state_machine": {:hex, :gen_state_machine, "2.0.1", "85efd5a0376929c3a4246dd943e17564a2908c7ddd7acd242d84594e785d83f8", [:mix], [], "hexpm"}, + "gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [:mix], [], "hexpm"}, "hoedown": {:git, "https://github.com/hoedown/hoedown.git", "980b9c549b4348d50b683ecee6abee470b98acda", []}, - "libring": {:hex, :libring, "1.3.0", "2379c64d0fb4077fc9750fa9b6ee2a08880064e9be3c9514d7ea72c67f843a05", [:mix], [], "hexpm"}, "markdown": {:git, "https://github.com/devinus/markdown.git", "d065dbcc4e242a85ca2516fdadd0082712871fd8", []}, "mix_test_watch": {:hex, :mix_test_watch, "0.5.0", "2c322d119a4795c3431380fca2bca5afa4dc07324bd3c0b9f6b2efbdd99f5ed3", [:mix], [{:fs, "~> 0.9.1", [hex: :fs, repo: "hexpm", optional: false]}], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"}, "postgrex": {:hex, :postgrex, "0.13.3", "c277cfb2a9c5034d445a722494c13359e361d344ef6f25d604c2353185682bfc", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"}, - "singleton": {:hex, :singleton, "1.2.0", "b65dcd745de10ab71d5cf149a8ddde496f33f2c401f4e0b4398dbab79a91be51", [], [], "hexpm"}, - "swarm": {:hex, :swarm, "3.1.0", "e13b22627cbe3c32c45f91d11206b7834c0b942073c553ad76b65801e7638b78", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"}, "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}} diff --git a/test/subscriptions/subscribe_to_stream_test.exs b/test/subscriptions/subscribe_to_stream_test.exs index ce747d54..075fc938 100644 --- a/test/subscriptions/subscribe_to_stream_test.exs +++ b/test/subscriptions/subscribe_to_stream_test.exs @@ -80,6 +80,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do # received events should not include events from the other stream assert_receive {:events, received_events} assert pluck(received_events, :data) == pluck(interested_events, :data) + refute_receive {:events, _received_events} end test "subscribe to single stream with mapper function should receive all its mapped events", %{subscription_name: subscription_name} do