Skip to content

Commit

Permalink
[Doc] Cluster and PostgreSQL LISTEN / NOTIFY usage
Browse files Browse the repository at this point in the history
Include #100 in CHANGELOG
  • Loading branch information
slashdotdash committed Jan 11, 2018
1 parent 68b65d2 commit 5e8d7d9
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Publish events directly to subscriptions ([#93](https://github.com/commanded/eventstore/pull/93)).
- Use PostgreSQL advisory locks to enforce only one subscription instance ([#98](https://github.com/commanded/eventstore/pull/98)).
- Remove stream process ([#99](https://github.com/commanded/eventstore/pull/99)).
- Use PostgreSQL's `NOTIFY` / `LISTEN` for event pub/sub ([#100](https://github.com/commanded/eventstore/pull/100)).

## v0.13.2

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CQRS event store implemented in Elixir. Uses [PostgreSQL](http://www.postgresql.

Requires Elixir v1.5 and PostgreSQL v9.5 or newer.

EventStore supports [running on a cluster of nodes](guides/Cluster.md) (since v0.11). Stream processes will be distributed amongst all available nodes, events are published to subscribers running on any node.
EventStore supports [running on a cluster of nodes](guides/Cluster.md).

[Changelog](CHANGELOG.md)

Expand Down
60 changes: 13 additions & 47 deletions guides/Cluster.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,27 @@
# Cluster

EventStore supports running on a cluster of nodes. It uses the [Swarm](https://hex.pm/packages/swarm) library for process distribution.
EventStore supports running on multiple nodes as either a [distributed Erlang](http://erlang.org/doc/reference_manual/distributed.html) cluster or as multiple single instance nodes.

## Running on a cluster
## Event publication

1. Add `:swarm` as a dependency in your `mix.exs` file:
PostgreSQL `LISTEN` / `NOTIFY` is used to pub/sub event notifications.

```elixir
defp deps do
[
{:swarm, "~> 3.1"},
]
end
```
A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast to all connected nodes using Erlang's [pg2](http://erlang.org/doc/man/pg2.html) process groups. This limits the number of database connections to at most the number of running clusters.

2. Fetch the dependencies:
Running EventStore on multiple nodes that are not connected together to form a cluster will result in one listener process and database connection per node.

```console
$ mix deps.get
```
## Running on a cluster

3. Configure the EventStore to use the `:distributed` registry in the environment config (e.g. `config/config.exs`):
1. Configure the EventStore to use the `:distributed` registry in the environment config (e.g. `config/config.exs`):

```elixir
config :eventstore,
registry: :distributed
```

4. Swarm must be configured to use the `Swarm.Distribution.StaticQuorumRing` distribution strategy:

```elixir
config :swarm,
nodes: [:"node1@127.0.0.1", :"node2@127.0.0.1", :"node3@127.0.0.1"],
node_blacklist: [~r/^primary@.+$/],
distribution_strategy: Swarm.Distribution.StaticQuorumRing,
static_quorum_size: 2,
sync_nodes_timeout: 0,
debug: false
```

This is to ensure consistency during a network partition. The `static_quorum_size` setting defines the minimum number of nodes that must be connected in the cluster to allow process registration and distribution. If there are fewer nodes currently available than the quorum size, any calls to the `EventStore` will return `{:error, :no_node_available}`.

## Automatic cluster formation

Swarm can be used with [libcluster](https://github.com/bitwalker/libcluster), a library that provides a mechanism for automatically forming clusters of Erlang nodes, with either static or dynamic node membership.
You can use [libcluster](https://github.com/bitwalker/libcluster) to automatically form clusters of Erlang nodes, with either static or dynamic node membership.

You will need to include `libcluster` as an additional dependency:

Expand All @@ -55,7 +33,7 @@ defp deps do
end
```

Then configure the cluster topology in the environment config (e.g. `config/config.exs`). An example is shown below using the standard Erlang `epmd` daemon strategy:
Then configure your preferred cluster topology in the environment config (e.g. `config/config.exs`). An example is shown below using the standard Erlang `epmd` daemon strategy:

```elixir
config :libcluster,
Expand All @@ -67,7 +45,7 @@ config :libcluster,
]
```

Please refer to the [libcluster docs](https://hexdocs.pm/libcluster/) for more detail.
Please refer to the [libcluster documentation](https://hexdocs.pm/libcluster/) for more detail.

### Starting a cluster

Expand Down Expand Up @@ -111,7 +89,7 @@ config :kernel,
The `sync_nodes_timeout` can be configured as `:infinity` to wait indefinitely for all nodes to
connect. All involved nodes must have the same value for `sync_nodes_timeout`.

This approach will only work for Elixir releases. You will need to use [Erlang's `sys.config`](http://erlang.org/doc/man/config.html) file for development purposes.
The above approach will *only work* for Elixir releases. You will need to use [Erlang's `sys.config`](http://erlang.org/doc/man/config.html) file for development purposes.

The Erlang equivalent of the `:kernerl` mix config, as above, is:

Expand Down Expand Up @@ -153,6 +131,8 @@ Once the cluster has formed, you can use the EventStore API from any node. Strea

## Usage

Using the EventStore when run on a cluster of nodes is identical to single node usage. You can subscibe to a stream, or all streams, on one node and append events to the stream on another. The subscription will be notified of the appended events.

### Append events to a stream

```elixir
Expand Down Expand Up @@ -182,17 +162,3 @@ receive do
IO.puts reply
end
```

## Cluster diagnostics

Peek into the Swarm process registry:

```elixir
Swarm.Registry.registered()
```

Discover which node a stream process is running on:

```elixir
stream_uuid |> EventStore.Streams.Stream.name() |> Swarm.whereis_name() |> node()
```
18 changes: 15 additions & 3 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

Subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped.

A subscription can be created to receive events published from a single logical stream or from all streams.
A subscription can be created to receive events published from a single or all streams.

Events are received in batches after being persisted to storage. Each batch contains events from a single stream and for the same correlation id.

Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an error.

By default subscriptions are created from the single stream, or all stream, origin. So it will receive all events from the single stream, or all streams. You can optionally specify a given start position:
## Event pub/sub

PostgreSQL's `LISTEN` and `NOTIFY` commands are used to pub/sub event notifications from the database. An after update trigger on the `event_counter` table is used to execute `NOTIFY` for each batch of inserted events. The notification payload contains the first and last event number (e.g. `1,5`).

A single listener process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running the EventStore on multiple nodes, regardless of whether they are connected together to form a cluster. A single listener will be used when nodes form a cluster, otherwise one connection per node is used.

## Subscription start from

By default subscriptions are created from the stream origin; they will receive all events from the stream. You can optionally specify a given start position:

- `:origin` - subscribe to events from the start of the stream (identical to using 0). This is the current behaviour and will remain the default.
- `:current` - subscribe to events from the current version.
Expand Down Expand Up @@ -92,7 +100,11 @@ You can provide an event mapping function that runs in the subscription process,
Subscribe to all streams and provide a `mapper` function that sends only the event data:

```elixir
{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), mapper: fn %EventStore.RecordedEvent{event_number: event_number: data: data} -> {event_number, data} end)
mapper = fn %EventStore.RecordedEvent{event_number: event_number, data: data} ->
{event_number, data}
end

{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), mapper: mapper)

receive do
{:events, mapped_events} ->
Expand Down
8 changes: 4 additions & 4 deletions lib/event_store/subscriptions/stream_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,19 @@ defmodule EventStore.Subscriptions.StreamSubscription do

case first_event_number do
past when past < expected_event ->
Logger.info(fn -> describe(data) <> " received past event(s), ignoring" end)
Logger.debug(fn -> describe(data) <> " received past event(s), ignoring" end)

# ignore already seen events
next_state(:subscribed, data)

future when future > expected_event ->
Logger.info(fn -> describe(data) <> " received unexpected event(s), requesting catch up" end)
Logger.debug(fn -> describe(data) <> " received unexpected event(s), requesting catch up" end)

# missed events, go back and catch-up with unseen
next_state(:request_catch_up, data)

^next_ack ->
Logger.info(fn -> describe(data) <> " is notifying subscriber with #{length(events)} event(s)" end)
Logger.debug(fn -> describe(data) <> " is notifying subscriber with #{length(events)} event(s)" end)

# subscriber is up-to-date, so send events
notify_subscriber(data, events)
Expand All @@ -187,7 +187,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do
next_state(:subscribed, data)

^expected_event ->
Logger.info(fn -> describe(data) <> " received event(s) but still waiting for subscriber to ack, queueing event(s)" end)
Logger.debug(fn -> describe(data) <> " received event(s) but still waiting for subscriber to ack, queueing event(s)" end)

# subscriber has not yet ack'd last seen event so store pending events
# until subscriber ready to receive (back pressure)
Expand Down
6 changes: 1 addition & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,11 @@
"ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], [], "hexpm"},
"fsm": {:hex, :fsm, "0.3.0", "d00e0a3c68f8cf8feb24ce3a732164638ec652c48ce416b66d4e375b6ee415eb", [:mix], [], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [], [], "hexpm"},
"gen_state_machine": {:hex, :gen_state_machine, "2.0.1", "85efd5a0376929c3a4246dd943e17564a2908c7ddd7acd242d84594e785d83f8", [:mix], [], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.12.2", "e0e347cbb1ceb5f4e68a526aec4d64b54ad721f0a8b30aa9d28e0ad749419cbb", [:mix], [], "hexpm"},
"hoedown": {:git, "https://github.com/hoedown/hoedown.git", "980b9c549b4348d50b683ecee6abee470b98acda", []},
"libring": {:hex, :libring, "1.3.0", "2379c64d0fb4077fc9750fa9b6ee2a08880064e9be3c9514d7ea72c67f843a05", [:mix], [], "hexpm"},
"markdown": {:git, "https://github.com/devinus/markdown.git", "d065dbcc4e242a85ca2516fdadd0082712871fd8", []},
"mix_test_watch": {:hex, :mix_test_watch, "0.5.0", "2c322d119a4795c3431380fca2bca5afa4dc07324bd3c0b9f6b2efbdd99f5ed3", [:mix], [{:fs, "~> 0.9.1", [hex: :fs, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.13.3", "c277cfb2a9c5034d445a722494c13359e361d344ef6f25d604c2353185682bfc", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
"singleton": {:hex, :singleton, "1.2.0", "b65dcd745de10ab71d5cf149a8ddde496f33f2c401f4e0b4398dbab79a91be51", [], [], "hexpm"},
"swarm": {:hex, :swarm, "3.1.0", "e13b22627cbe3c32c45f91d11206b7834c0b942073c553ad76b65801e7638b78", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}}
1 change: 1 addition & 0 deletions test/subscriptions/subscribe_to_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do
# received events should not include events from the other stream
assert_receive {:events, received_events}
assert pluck(received_events, :data) == pluck(interested_events, :data)
refute_receive {:events, _received_events}
end

test "subscribe to single stream with mapper function should receive all its mapped events", %{subscription_name: subscription_name} do
Expand Down

0 comments on commit 5e8d7d9

Please sign in to comment.