Skip to content

Commit

Permalink
Shared database connection pools
Browse files Browse the repository at this point in the history
Include #216 in CHANGELOG
  • Loading branch information
slashdotdash committed Oct 18, 2020
1 parent c5b1902 commit 1a8b65a
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Remove distributed registry ([#210](https://github.com/commanded/eventstore/pull/210)).
- Hibernate subscription process after inactivity ([#214](https://github.com/commanded/eventstore/pull/214)).
- Runtime event store configuration ((#217)[https://github.com/commanded/eventstore/pull/217]).
- Shared database connection pools ([#216](https://github.com/commanded/eventstore/pull/216)).

### Bug fixes

Expand Down
26 changes: 26 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,32 @@ defmodule EventStore do
The above can be used for multi-tenancy where the data for each tenant is
stored in a separate, isolated schema.
## Shared database connection pools
By default each event store will start its own `Postgrex` database connection
pool. The size of the pool is configured with the `pool_size` config option.
When you have multiple event stores running you will also end up with multiple
connection pools. If they are all to the same physical Postgres database then
it can be useful to share a single pool amongst all event stores. Use the
`shared_connection_pool` config option to specify a name for the shared connection
pool. Then configure the event stores you'd like to share the pool with the
same name.
This can be done in config:
# config/config.exs
config :my_app, MyApp.EventStore, shared_connection_pool: :shared_pool
Or when starting the event stores, such as via a `Supervisor`:
Supervisor.start_link(
[
{MyApp.EventStore, name: :eventstore1, shared_connection_pool: :shared_pool},
{MyApp.EventStore, name: :eventstore2, shared_connection_pool: :shared_pool},
{MyApp.EventStore, name: :eventstore3, shared_connection_pool: :shared_pool}
], opts)
## Guides
Please refer to the following guides to learn more:
Expand Down
4 changes: 1 addition & 3 deletions lib/event_store/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ defmodule EventStore.Config do
Keyword.take(config, @postgrex_connection_opts)
end

def postgrex_opts(config) do
{name, config} = Keyword.pop(config, :conn)

def postgrex_opts(config, name) do
[
pool_size: 10,
queue_target: 50,
Expand Down
76 changes: 46 additions & 30 deletions lib/event_store/monitored_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@ defmodule EventStore.MonitoredServer do
defmodule State do
@moduledoc false

defstruct [:mfa, :name, :backoff, :pid, :shutdown, :queue, monitors: MapSet.new()]
defstruct [
:mfa,
:name,
:backoff,
:pid,
:terminate?,
:shutdown,
:queue,
monitors: MapSet.new()
]

def new(monitor_opts, start_opts) do
{_module, _fun, _args} = mfa = Keyword.fetch!(monitor_opts, :mfa)

backoff_ops =
monitor_opts
|> Keyword.take([:backoff_type, :backoff_min, :backoff_max])
|> Keyword.put_new(:backoff_type, :exp)

%State{
backoff: Backoff.new(backoff_type: :exp),
backoff: Backoff.new(backoff_ops),
mfa: mfa,
name: Keyword.get(start_opts, :name),
queue: :queue.new(),
Expand Down Expand Up @@ -53,14 +67,10 @@ defmodule EventStore.MonitoredServer do
def handle_call({__MODULE__, :monitor, monitor}, _from, %State{} = state) do
%State{monitors: monitors, name: name, pid: pid} = state

_ref = Process.monitor(monitor)

case pid do
pid when is_pid(pid) ->
Process.send(monitor, {:UP, name, pid}, [])
Process.monitor(monitor)

_ ->
:ok
if is_pid(pid) do
Process.send(monitor, {:UP, name, pid}, [])
end

state = %State{state | monitors: MapSet.put(monitors, monitor)}
Expand Down Expand Up @@ -88,23 +98,15 @@ defmodule EventStore.MonitoredServer do
{:noreply, state}
end

def handle_info(:start_process, %State{} = state) do
def handle_info({__MODULE__, :start_process}, %State{} = state) do
{:noreply, start_process(state)}
end

@doc """
Handle process exit by attempting to restart, after a delay.
"""
def handle_info({:EXIT, pid, reason}, %State{pid: pid} = state) do
%State{name: name} = state

Logger.debug(fn -> "Monitored process EXIT due to: #{inspect(reason)}" end)

notify_monitors({:DOWN, name, pid, reason}, state)

state = %State{state | pid: nil}

{:noreply, delayed_start(state)}
on_process_exit(pid, reason, state)
end

def handle_info({:EXIT, pid, _reason}, %State{} = state) do
Expand All @@ -115,6 +117,10 @@ defmodule EventStore.MonitoredServer do
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, pid, reason}, %State{pid: pid} = state) do
on_process_exit(pid, reason, state)
end

def handle_info(msg, %State{pid: nil} = state) do
{:noreply, enqueue({:info, msg}, state)}
end
Expand All @@ -126,13 +132,12 @@ defmodule EventStore.MonitoredServer do
end

def terminate(_reason, %State{pid: nil}), do: :ok
def terminate(_reason, %State{terminate?: false}), do: :ok

def terminate(reason, %State{} = state) do
%State{pid: pid, shutdown: shutdown, mfa: {module, _fun, _args}} = state

Logger.debug(fn ->
"Monitored server #{inspect(module)} terminate due to: #{inspect(reason)}"
end)
Logger.debug("Monitored server #{inspect(module)} terminate due to: #{inspect(reason)}")

Process.exit(pid, reason)

Expand Down Expand Up @@ -162,12 +167,15 @@ defmodule EventStore.MonitoredServer do
{:ok, pid} ->
Logger.debug("Successfully started #{inspect(module)} (#{inspect(pid)})")

on_process_start(pid, state)
on_process_start(pid, %State{state | terminate?: true})

{:error, {:already_started, pid}} ->
Logger.debug("Monitored process already started #{inspect(module)} (#{inspect(pid)})")

on_process_start(pid, state)
# Monitor already started process to enable it to be restarted on exit
Process.monitor(pid)

on_process_start(pid, %State{state | terminate?: false})

{:error, reason} ->
Logger.info("Failed to start #{inspect(module)} due to: #{inspect(reason)}")
Expand All @@ -185,6 +193,18 @@ defmodule EventStore.MonitoredServer do
%State{state | pid: pid, queue: :queue.new()}
end

defp on_process_exit(pid, reason, %State{} = state) do
%State{name: name} = state

Logger.debug("Monitored process EXIT due to: " <> inspect(reason))

notify_monitors({:DOWN, name, pid, reason}, state)

state = %State{state | pid: nil, terminate?: nil}

{:noreply, delayed_start(state)}
end

defp enqueue(item, %State{queue: queue} = state) do
%State{state | queue: :queue.in(item, queue)}
end
Expand Down Expand Up @@ -219,17 +239,13 @@ defmodule EventStore.MonitoredServer do
defp notify_monitors(message, %State{} = state) do
%State{monitors: monitors} = state

for monitor <- monitors do
:ok = Process.send(monitor, message, [])
end

:ok
Enum.each(monitors, &Process.send(&1, message, []))
end

defp delayed_start(%State{backoff: backoff} = state) do
{delay, backoff} = Backoff.backoff(backoff)

Process.send_after(self(), :start_process, delay)
Process.send_after(self(), {__MODULE__, :start_process}, delay)

%State{state | backoff: backoff}
end
Expand Down
54 changes: 43 additions & 11 deletions lib/event_store/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,21 @@ defmodule EventStore.Supervisor do
subscriptions_name = Module.concat([name, Subscriptions.Supervisor])
subscriptions_registry_name = Module.concat([name, Subscriptions.Registry])
schema = Keyword.fetch!(config, :schema)
conn = Keyword.fetch!(config, :conn)

children =
[
{Postgrex, Config.postgrex_opts(config)},
MonitoredServer.child_spec(
mfa: {Postgrex, :start_link, [Config.sync_connect_postgrex_opts(config)]},
name: advisory_locks_postgrex_name
Supervisor.child_spec(
{MonitoredServer,
mfa: {Postgrex, :start_link, [Config.postgrex_opts(config, conn)]},
name: Module.concat([name, Postgrex, MonitoredServer])},
id: Module.concat([conn, MonitoredServer])
),
Supervisor.child_spec(
{MonitoredServer,
mfa: {Postgrex, :start_link, [Config.sync_connect_postgrex_opts(config)]},
name: advisory_locks_postgrex_name},
id: Module.concat([advisory_locks_postgrex_name, MonitoredServer])
),
{AdvisoryLocks,
conn: advisory_locks_postgrex_name, schema: schema, name: advisory_locks_name},
Expand Down Expand Up @@ -94,17 +102,41 @@ defmodule EventStore.Supervisor do
end

defp validate_config!(event_store, name, config) do
conn = Module.concat([name, Postgrex])
conn = postgrex_conn(name, config)
column_data_type = Config.column_data_type(event_store, config)
serializer = Serializer.serializer(event_store, config)
subscription_retry_interval = Subscriptions.retry_interval(event_store, config)
subscription_hibernate_after = Subscriptions.hibernate_after(event_store, config)

config
|> Keyword.put(:conn, conn)
|> Keyword.put(:column_data_type, column_data_type)
|> Keyword.put(:serializer, serializer)
|> Keyword.put(:subscription_retry_interval, subscription_retry_interval)
|> Keyword.put(:subscription_hibernate_after, subscription_hibernate_after)
Keyword.merge(config,
conn: conn,
column_data_type: column_data_type,
serializer: serializer,
subscription_retry_interval: subscription_retry_interval,
subscription_hibernate_after: subscription_hibernate_after
)
end

# Get the name of the main Postgres database connection pool.
#
# By default each event store instance will start its own connection pool.
#
# The `:shared_connection_pool` config option can be used to share the same
# database connection pool between multiple event store instances when they
# connect to the same physical database. This will reduce the total number of
# connections.
defp postgrex_conn(name, config) do
case Keyword.get(config, :shared_connection_pool) do
nil ->
Module.concat([name, Postgrex])

shared_connection_pool when is_atom(shared_connection_pool) ->
Module.concat([shared_connection_pool, Postgrex])

invalid ->
raise ArgumentError,
"Invalid `:shared_connection_pool` specified, expected an atom but got: " <>
inspect(invalid)
end
end
end
21 changes: 21 additions & 0 deletions test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ defmodule EventStore.ConfigTest do
]
end

test "parse `:shared_connection_pool`" do
config = [
username: "postgres",
database: "eventstore_test",
password: "postgres",
shared_connection_pool: :shared_pool
]

assert Config.parse(config) ==
[
enable_hard_deletes: false,
column_data_type: "bytea",
schema: "public",
pool: EventStore.Config.get_pool(),
shared_connection_pool: :shared_pool,
password: "postgres",
database: "eventstore_test",
username: "postgres"
]
end

test "parse socket_dir" do
config = [
username: "postgres",
Expand Down
2 changes: 1 addition & 1 deletion test/dynamic_event_store_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule DynamicEventStoreTest do
defmodule EventStore.DynamicEventStoreTest do
use EventStore.StorageCase

alias EventStore.EventFactory
Expand Down
2 changes: 1 addition & 1 deletion test/event_store_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule EventStoreTest do
defmodule EventStore.EventStoreTest do
use EventStore.StorageCase

alias EventStore.{EventData, EventFactory, RecordedEvent}
Expand Down
2 changes: 1 addition & 1 deletion test/list_event_store_migrations_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule ListEventStoreMigrationsTest do
defmodule EventStore.ListEventStoreMigrationsTest do
use ExUnit.Case

import ExUnit.CaptureIO
Expand Down
2 changes: 1 addition & 1 deletion test/migrate_event_store_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule MigrateEventStoreTest do
defmodule EventStore.MigrateEventStoreTest do
use ExUnit.Case

import ExUnit.CaptureIO
Expand Down
2 changes: 1 addition & 1 deletion test/migrated_event_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Snapshot do
defstruct [:data, version: "1"]
end

defmodule MigratedEventStoreTest do
defmodule EventStore.MigratedEventStoreTest do
use ExUnit.Case

alias EventStore.RecordedEvent
Expand Down
Loading

0 comments on commit 1a8b65a

Please sign in to comment.