Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subscriptions to filter the events they receive #114

Merged
merged 5 commits into from
May 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ receive do
end
```

#### Filtering events

You can provide an event selector function that filters each `RecordedEvent` before sending it to the subscriber:

```elixir
EventStore.subscribe(stream_uuid, selector: fn
%EventStore.RecordedEvent{data: data} -> data != nil
end)

# receive first batch of mapped event data
receive do
{:events, %EventStore.RecordedEvent{} = event_data} ->
IO.puts("Received non nil event data: " <> inspect(event_data))
end
```


#### Mapping events

You can provide an event mapping function that maps each `RecordedEvent` before sending it to the subscriber:
Expand Down Expand Up @@ -160,6 +177,34 @@ Example all stream subscription that will receive new events appended after the
{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), start_from: :current)
```

#### Event Filtering

You can provide an event selector function that run in the subscription process, before sending the event to your mapper and subscriber. You can use this to filter events before dispatching to a subscriber.

Subscribe to all streams and provide a `selector` function that only sends data that the selector function returns `true` for.

```elixir
selector = fn %EventStore.RecordedEvent{event_number: event_number} ->
rem(event_number) == 0
end

{:ok, subscription} = EventStore.subscribe_to_all_streams("example_subscription", self(), selector: selector)

# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts("Successfully subscribed to all streams")
end

receive do
{:events, filtered_events} ->
# ... process events & ack receipt using last `event_number`
RecordedEvent{event_number: event_number} = List.last(filtered_events)

EventStore.ack(subscription, event_number)
end
```

#### Mapping events

You can provide an event mapping function that runs in the subscription process, before sending the event to your subscriber. You can use this to change the data received.
Expand Down Expand Up @@ -188,6 +233,7 @@ receive do
end
```


### Example persistent subscriber

Use a `GenServer` process to subscribe to the event store and track all notified events:
Expand Down
13 changes: 12 additions & 1 deletion lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ defmodule EventStore do
Use the `$all` identifier to subscribe to events from all streams.

- `opts` is an optional map providing additional subscription configuration:
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
- `mapper` to define a function to map each recorded event before sending
to the subscriber.

Expand Down Expand Up @@ -319,7 +321,11 @@ defmodule EventStore do
end

"""
@spec subscribe(String.t(), mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@spec subscribe(
String.t(),
selector: (RecordedEvent.t() -> any()),
mapper: (RecordedEvent.t() -> any())
) :: :ok | {:error, term}

def subscribe(stream_uuid, opts \\ [])

Expand All @@ -345,8 +351,11 @@ defmodule EventStore do
- `:current` for any new events appended to the stream after the
subscription has been created.
- any positive integer for a stream version to receive events after.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
- `mapper` to define a function to map each recorded event before sending
to the subscriber.


The subscription will resume from the last acknowledged event if it already
exists. It will ignore the `start_from` argument in this case.
Expand Down Expand Up @@ -416,6 +425,8 @@ defmodule EventStore do
subscription has been created.
- any positive integer for an event id to receive events after that
exact event.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
- `mapper` to define a function to map each recorded event before sending
to the subscriber.

Expand Down
6 changes: 5 additions & 1 deletion lib/event_store/registration/distributed_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ defmodule EventStore.Registration.DistributedRegistry do
@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@spec subscribe(
binary,
selector: (RecodedEvent.t() -> any()),
mapper: (RecordedEvent.t() -> any())
) :: :ok | {:error, term}
@impl EventStore.Registration
def subscribe(topic, opts) do
LocalRegistry.subscribe(topic, opts)
Expand Down
23 changes: 20 additions & 3 deletions lib/event_store/registration/local_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ defmodule EventStore.Registration.LocalRegistry do
@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@spec subscribe(
binary,
selector: (RecordedEvent.t() -> any()),
mapper: (RecordedEvent.t() -> any())
) :: :ok | {:error, term}
@impl EventStore.Registration
def subscribe(topic, opts) do
with {:ok, _} <- Registry.register(EventStore.PubSub, topic, opts) do
Expand All @@ -47,11 +51,24 @@ defmodule EventStore.Registration.LocalRegistry do
end)
end

defp notify_subscriber(pid, {:events, events}, mapper: mapper) when is_function(mapper, 1) do
send(pid, {:events, Enum.map(events, mapper)})
defp notify_subscriber(_pid, {:events, []}, _), do: nil

defp notify_subscriber(pid, {:events, events}, opts) do
selector = Keyword.get(opts, :selector)
mapper = Keyword.get(opts, :mapper)

events = events |> filter(selector) |> map(mapper)

send(pid, {:events, events})
end

defp notify_subscriber(pid, message, _opts) do
send(pid, message)
end

defp filter(events, selector) when is_function(selector, 1), do: Enum.filter(events, selector)
defp filter(events, _selector), do: events

defp map(events, mapper) when is_function(mapper, 1), do: Enum.map(events, mapper)
defp map(events, _mapper), do: events
end
6 changes: 5 additions & 1 deletion lib/event_store/registration/registration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ defmodule EventStore.Registration do
@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@spec subscribe(
binary,
selector: (RecordedEvent.t() -> any()),
mapper: (RecordedEvent.t() -> any())
) :: :ok | {:error, term}
def subscribe(topic, opts \\ []), do: registry_provider().subscribe(topic, opts)

@doc """
Expand Down
18 changes: 4 additions & 14 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ defmodule EventStore.Subscriptions.Subscription do
GenServer.cast(subscription, :disconnect)
end

@doc false
def caught_up(subscription, last_seen) do
GenServer.cast(subscription, {:caught_up, last_seen})
end

@doc false
def unsubscribe(subscription) do
GenServer.call(subscription, :unsubscribe)
Expand Down Expand Up @@ -135,12 +130,6 @@ defmodule EventStore.Subscriptions.Subscription do
{:noreply, apply_subscription_to_state(subscription, state)}
end

def handle_cast({:caught_up, last_seen}, %Subscription{subscription: subscription} = state) do
subscription = SubscriptionFsm.caught_up(subscription, last_seen)

{:noreply, apply_subscription_to_state(subscription, state)}
end

def handle_cast(:disconnect, %Subscription{subscription: subscription} = state) do
_ = Logger.debug(fn -> describe(state) <> " disconnected" end)

Expand Down Expand Up @@ -182,15 +171,15 @@ defmodule EventStore.Subscriptions.Subscription do
defp handle_subscription_state(%Subscription{subscription: %SubscriptionFsm{state: :subscribe_to_events}} = state) do
_ = Logger.debug(fn -> describe(state) <> " subscribing to events" end)

GenServer.cast(self(), :subscribe_to_events)
:ok = GenServer.cast(self(), :subscribe_to_events)

state
end

defp handle_subscription_state(%Subscription{subscription: %SubscriptionFsm{state: :request_catch_up}} = state) do
_ = Logger.debug(fn -> describe(state) <> " requesting catch-up" end)
_ = Logger.debug(fn -> describe(state) <> " catching-up" end)

GenServer.cast(self(), :catch_up)
:ok = GenServer.cast(self(), :catch_up)

state
end
Expand Down Expand Up @@ -240,6 +229,7 @@ defmodule EventStore.Subscriptions.Subscription do
# notify the subscriber that this subscription has successfully subscribed to events
defp notify_subscribed(%Subscription{subscriber: subscriber}) do
send(subscriber, {:subscribed, self()})

:ok
end

Expand Down
Loading