Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transient subscriptions #105

Merged
merged 1 commit into from
Feb 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Use PostgreSQL's `NOTIFY` / `LISTEN` for event pub/sub ([#100](https://github.com/commanded/eventstore/pull/100)).
- Link existing events to another stream ([#103](https://github.com/commanded/eventstore/pull/103)).
- Subscription notification message once successfully subscribed ([#104](https://github.com/commanded/eventstore/pull/104)).
- Transient subscriptions ([#105](https://github.com/commanded/eventstore/pull/105)).

## v0.13.2

Expand Down
2 changes: 1 addition & 1 deletion guides/Cluster.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Cluster
# Running on a cluster of nodes

EventStore supports running on multiple nodes as either a [distributed Erlang](http://erlang.org/doc/reference_manual/distributed.html) cluster or as multiple single instance nodes.

Expand Down
2 changes: 1 addition & 1 deletion guides/Event Serialization.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Event serialization

The default serialization of event data and metadata uses Erlang's [external term format](http://erlang.org/doc/apps/erts/erl_ext_dist.html). This is not a recommended serialization format for production usage.
The default serialization of event data and metadata uses Erlang's [external term format](http://erlang.org/doc/apps/erts/erl_ext_dist.html). This is not a recommended serialization format for production usage as backwards compatibility is only guaranteed going back at least two major releases.

You must implement the `EventStore.Serializer` behaviour to provide your preferred serialization format.

Expand Down
65 changes: 48 additions & 17 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,65 @@
# Subscriptions

Subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped.
There are two types of subscriptions provided by EventStore:

A subscription can be created to receive events appended to a single or all streams.

Events are received in batches after being persisted to storage. A batch contains the events appended to a single stream using `EventStore.append_to_stream/4`.

Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an `{:error, :subscription_already_exists}` error.
1. [Transient subscriptions](#transient-subscriptions) where new events are broadcast to subscribers immediately after they have been appended to storage.
2. [Persistent subscriptions](#persistent-subscriptions) which guarantee at-least-once delivery of every persisted event, provide back-pressure, and can be started, paused, and resumed from any position, including from the stream's origin.

## Event pub/sub

PostgreSQL's `LISTEN` and `NOTIFY` commands are used to pub/sub event notifications from the database. An after update trigger on the `streams` table is used to execute `NOTIFY` for each batch of inserted events. The notification payload contains the stream uuid, stream id, and first / last stream versions (e.g. `stream-12345,1,1,5`).

A single listener process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running the EventStore on multiple nodes, regardless of whether they are connected together to form a cluster. A single listener will be used when nodes form a cluster, otherwise one connection per node is used.

## `:subscribed` message
## Transient subscriptions

Use `EventStore.subscribe/1` to create a transient subscription to a single stream identified by its `stream_uuid`. Events will be received in batches as an `{:events, events}` message, where `events` is a collection of `EventStore.RecordedEvent` structs.

You can use `$all` as the stream identity to subscribe to events appended to all streams. With transient subscriptions you do not need to acknowledge receipt of the published events. The subscription will terminate when the subscriber process stops running.

#### Subscribe to single stream events

Subscribe to events appended to a *single* stream:

```elixir
:ok = EventStore.subscribe(stream_uuid)

# receive first batch of events
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
end
```

## Persistent subscriptions

Persistent subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. The last received and acknowledged event is stored by the EventStore to support resuming at a later time later or whenever the subscriber process restarts.

A subscription can be created to receive events appended to a single or all streams.

Events are received in batches after being persisted to storage. A batch contains the events appended to a single stream using `EventStore.append_to_stream/4`.

Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an `{:error, :subscription_already_exists}` error.

### `:subscribed` message

Once the subscription has successfully subscribed to the stream it will send the subscriber a `{:subscribed, subscription}` message. This indicates the subscription succeeded and you will begin receiving events.

Only one instance of a subscription named subscription to a stream can connect to the database. This guarantees that starting the same subscription on each node when run on a cluster, or when running multiple single instance nodes, will only allow one subscription to actually connect. Therefore you can defer any initialisation until receipt of the `{:subscribed, subscription}` message to prevent duplicate effort by multiple nodes racing to create or subscribe to the same subscription.

## `:events` message
### `:events` message

For each batch of events appended to the event store your subscriber will receive a `{:events, events}` message. The `events` list is a collection of `EventStore.RecordedEvent` structs.

## Subscription start from
### Subscription start from

By default subscriptions are created from the stream origin; they will receive all events from the stream. You can optionally specify a given start position:

- `:origin` - subscribe to events from the start of the stream (identical to using `0`). This is the default behaviour.
- `:current` - subscribe to events from the current version.
- `event_number` (integer) - specify an exact event number to subscribe from. This will be the same as the stream version for single stream subscriptions.

## Ack received events
### Ack received events

Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event.

Expand All @@ -46,7 +73,7 @@ A subscriber can confirm receipt of each event in a batch by sending multiple ac

A subscriber will not receive further published events until it has confirmed receipt of all received events. This provides back pressure to the subscription to prevent the subscriber from being overwhelmed with messages if it cannot keep up. The subscription will buffer events until the subscriber is ready to receive, or an overflow occurs. At which point it will move into a catch-up mode and query events and replay them from storage until caught up.

### Subscribe to all events
#### Subscribe to all events

Subscribe to events appended to all streams:

Expand Down Expand Up @@ -74,7 +101,7 @@ Unsubscribe from all streams:
:ok = EventStore.unsubscribe_from_all_streams("example_all_subscription")
```

### Subscribe to single stream events
#### Subscribe to single stream events

Subscribe to events appended to a *single* stream:

Expand All @@ -101,7 +128,7 @@ Unsubscribe from a single stream:
:ok = EventStore.unsubscribe_from_stream(stream_uuid, "example_single_subscription")
```

### Start subscription from a given position
#### Start subscription from a given position

You can choose to receive events from a given starting position.

Expand All @@ -117,7 +144,7 @@ Example all stream subscription that will receive new events appended after the
{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), start_from: :current)
```

### Mapping events
#### Mapping events

You can provide an event mapping function that runs in the subscription process, before sending the event to your subscriber. You can use this to change the data received.

Expand Down Expand Up @@ -145,7 +172,9 @@ receive do
end
```

## Example subscriber
### Example persistent subscriber

Use a `GenServer` process to subscribe to the event store and track all notified events:

```elixir
# An example subscriber
Expand Down Expand Up @@ -173,7 +202,9 @@ defmodule Subscriber do
end

# Event notification
def handle_info({:events, events}, %{events: existing_events, subscription: subscription} = state) do
def handle_info({:events, events}, state) do
%{events: existing_events, subscription: subscription} = state

# confirm receipt of received events
EventStore.ack(subscription, events)

Expand All @@ -186,7 +217,7 @@ defmodule Subscriber do
end
```

Start your subscriber process, which subscribes to all streams in the event store.
Start your subscriber process, which subscribes to all streams in the event store:

```elixir
{:ok, subscriber} = Subscriber.start_link()
Expand Down
Loading