diff --git a/CHANGELOG.md b/CHANGELOG.md index e307421a..deef943d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Remove distributed registry ([#210](https://github.com/commanded/eventstore/pull/210)). - Hibernate subscription process after inactivity ([#214](https://github.com/commanded/eventstore/pull/214)). - Runtime event store configuration ((#217)[https://github.com/commanded/eventstore/pull/217]). +- Shared database connection pools ([#216](https://github.com/commanded/eventstore/pull/216)). ### Bug fixes diff --git a/lib/event_store.ex b/lib/event_store.ex index b1afd4ff..7ff15500 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -145,6 +145,32 @@ defmodule EventStore do The above can be used for multi-tenancy where the data for each tenant is stored in a separate, isolated schema. + ## Shared database connection pools + + By default each event store will start its own `Postgrex` database connection + pool. The size of the pool is configured with the `pool_size` config option. + + When you have multiple event stores running you will also end up with multiple + connection pools. If they are all to the same physical Postgres database then + it can be useful to share a single pool amongst all event stores. Use the + `shared_connection_pool` config option to specify a name for the shared connection + pool. Then configure the event stores you'd like to share the pool with the + same name. + + This can be done in config: + + # config/config.exs + config :my_app, MyApp.EventStore, shared_connection_pool: :shared_pool + + Or when starting the event stores, such as via a `Supervisor`: + + Supervisor.start_link( + [ + {MyApp.EventStore, name: :eventstore1, shared_connection_pool: :shared_pool}, + {MyApp.EventStore, name: :eventstore2, shared_connection_pool: :shared_pool}, + {MyApp.EventStore, name: :eventstore3, shared_connection_pool: :shared_pool} + ], opts) + ## Guides Please refer to the following guides to learn more: diff --git a/lib/event_store/config.ex b/lib/event_store/config.ex index c777c6e5..76c241cd 100644 --- a/lib/event_store/config.ex +++ b/lib/event_store/config.ex @@ -86,9 +86,7 @@ defmodule EventStore.Config do Keyword.take(config, @postgrex_connection_opts) end - def postgrex_opts(config) do - {name, config} = Keyword.pop(config, :conn) - + def postgrex_opts(config, name) do [ pool_size: 10, queue_target: 50, diff --git a/lib/event_store/monitored_server.ex b/lib/event_store/monitored_server.ex index 62da7017..3e6710c2 100644 --- a/lib/event_store/monitored_server.ex +++ b/lib/event_store/monitored_server.ex @@ -16,13 +16,27 @@ defmodule EventStore.MonitoredServer do defmodule State do @moduledoc false - defstruct [:mfa, :name, :backoff, :pid, :shutdown, :queue, monitors: MapSet.new()] + defstruct [ + :mfa, + :name, + :backoff, + :pid, + :terminate?, + :shutdown, + :queue, + monitors: MapSet.new() + ] def new(monitor_opts, start_opts) do {_module, _fun, _args} = mfa = Keyword.fetch!(monitor_opts, :mfa) + backoff_ops = + monitor_opts + |> Keyword.take([:backoff_type, :backoff_min, :backoff_max]) + |> Keyword.put_new(:backoff_type, :exp) + %State{ - backoff: Backoff.new(backoff_type: :exp), + backoff: Backoff.new(backoff_ops), mfa: mfa, name: Keyword.get(start_opts, :name), queue: :queue.new(), @@ -53,14 +67,10 @@ defmodule EventStore.MonitoredServer do def handle_call({__MODULE__, :monitor, monitor}, _from, %State{} = state) do %State{monitors: monitors, name: name, pid: pid} = state - _ref = Process.monitor(monitor) - - case pid do - pid when is_pid(pid) -> - Process.send(monitor, {:UP, name, pid}, []) + Process.monitor(monitor) - _ -> - :ok + if is_pid(pid) do + Process.send(monitor, {:UP, name, pid}, []) end state = %State{state | monitors: MapSet.put(monitors, monitor)} @@ -88,7 +98,7 @@ defmodule EventStore.MonitoredServer do {:noreply, state} end - def handle_info(:start_process, %State{} = state) do + def handle_info({__MODULE__, :start_process}, %State{} = state) do {:noreply, start_process(state)} end @@ -96,15 +106,7 @@ defmodule EventStore.MonitoredServer do Handle process exit by attempting to restart, after a delay. """ def handle_info({:EXIT, pid, reason}, %State{pid: pid} = state) do - %State{name: name} = state - - Logger.debug(fn -> "Monitored process EXIT due to: #{inspect(reason)}" end) - - notify_monitors({:DOWN, name, pid, reason}, state) - - state = %State{state | pid: nil} - - {:noreply, delayed_start(state)} + on_process_exit(pid, reason, state) end def handle_info({:EXIT, pid, _reason}, %State{} = state) do @@ -115,6 +117,10 @@ defmodule EventStore.MonitoredServer do {:noreply, state} end + def handle_info({:DOWN, _ref, :process, pid, reason}, %State{pid: pid} = state) do + on_process_exit(pid, reason, state) + end + def handle_info(msg, %State{pid: nil} = state) do {:noreply, enqueue({:info, msg}, state)} end @@ -126,13 +132,12 @@ defmodule EventStore.MonitoredServer do end def terminate(_reason, %State{pid: nil}), do: :ok + def terminate(_reason, %State{terminate?: false}), do: :ok def terminate(reason, %State{} = state) do %State{pid: pid, shutdown: shutdown, mfa: {module, _fun, _args}} = state - Logger.debug(fn -> - "Monitored server #{inspect(module)} terminate due to: #{inspect(reason)}" - end) + Logger.debug("Monitored server #{inspect(module)} terminate due to: #{inspect(reason)}") Process.exit(pid, reason) @@ -162,12 +167,15 @@ defmodule EventStore.MonitoredServer do {:ok, pid} -> Logger.debug("Successfully started #{inspect(module)} (#{inspect(pid)})") - on_process_start(pid, state) + on_process_start(pid, %State{state | terminate?: true}) {:error, {:already_started, pid}} -> Logger.debug("Monitored process already started #{inspect(module)} (#{inspect(pid)})") - on_process_start(pid, state) + # Monitor already started process to enable it to be restarted on exit + Process.monitor(pid) + + on_process_start(pid, %State{state | terminate?: false}) {:error, reason} -> Logger.info("Failed to start #{inspect(module)} due to: #{inspect(reason)}") @@ -185,6 +193,18 @@ defmodule EventStore.MonitoredServer do %State{state | pid: pid, queue: :queue.new()} end + defp on_process_exit(pid, reason, %State{} = state) do + %State{name: name} = state + + Logger.debug("Monitored process EXIT due to: " <> inspect(reason)) + + notify_monitors({:DOWN, name, pid, reason}, state) + + state = %State{state | pid: nil, terminate?: nil} + + {:noreply, delayed_start(state)} + end + defp enqueue(item, %State{queue: queue} = state) do %State{state | queue: :queue.in(item, queue)} end @@ -219,17 +239,13 @@ defmodule EventStore.MonitoredServer do defp notify_monitors(message, %State{} = state) do %State{monitors: monitors} = state - for monitor <- monitors do - :ok = Process.send(monitor, message, []) - end - - :ok + Enum.each(monitors, &Process.send(&1, message, [])) end defp delayed_start(%State{backoff: backoff} = state) do {delay, backoff} = Backoff.backoff(backoff) - Process.send_after(self(), :start_process, delay) + Process.send_after(self(), {__MODULE__, :start_process}, delay) %State{state | backoff: backoff} end diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index dacac1d0..5a08ba5d 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -57,13 +57,21 @@ defmodule EventStore.Supervisor do subscriptions_name = Module.concat([name, Subscriptions.Supervisor]) subscriptions_registry_name = Module.concat([name, Subscriptions.Registry]) schema = Keyword.fetch!(config, :schema) + conn = Keyword.fetch!(config, :conn) children = [ - {Postgrex, Config.postgrex_opts(config)}, - MonitoredServer.child_spec( - mfa: {Postgrex, :start_link, [Config.sync_connect_postgrex_opts(config)]}, - name: advisory_locks_postgrex_name + Supervisor.child_spec( + {MonitoredServer, + mfa: {Postgrex, :start_link, [Config.postgrex_opts(config, conn)]}, + name: Module.concat([name, Postgrex, MonitoredServer])}, + id: Module.concat([conn, MonitoredServer]) + ), + Supervisor.child_spec( + {MonitoredServer, + mfa: {Postgrex, :start_link, [Config.sync_connect_postgrex_opts(config)]}, + name: advisory_locks_postgrex_name}, + id: Module.concat([advisory_locks_postgrex_name, MonitoredServer]) ), {AdvisoryLocks, conn: advisory_locks_postgrex_name, schema: schema, name: advisory_locks_name}, @@ -94,17 +102,41 @@ defmodule EventStore.Supervisor do end defp validate_config!(event_store, name, config) do - conn = Module.concat([name, Postgrex]) + conn = postgrex_conn(name, config) column_data_type = Config.column_data_type(event_store, config) serializer = Serializer.serializer(event_store, config) subscription_retry_interval = Subscriptions.retry_interval(event_store, config) subscription_hibernate_after = Subscriptions.hibernate_after(event_store, config) - config - |> Keyword.put(:conn, conn) - |> Keyword.put(:column_data_type, column_data_type) - |> Keyword.put(:serializer, serializer) - |> Keyword.put(:subscription_retry_interval, subscription_retry_interval) - |> Keyword.put(:subscription_hibernate_after, subscription_hibernate_after) + Keyword.merge(config, + conn: conn, + column_data_type: column_data_type, + serializer: serializer, + subscription_retry_interval: subscription_retry_interval, + subscription_hibernate_after: subscription_hibernate_after + ) + end + + # Get the name of the main Postgres database connection pool. + # + # By default each event store instance will start its own connection pool. + # + # The `:shared_connection_pool` config option can be used to share the same + # database connection pool between multiple event store instances when they + # connect to the same physical database. This will reduce the total number of + # connections. + defp postgrex_conn(name, config) do + case Keyword.get(config, :shared_connection_pool) do + nil -> + Module.concat([name, Postgrex]) + + shared_connection_pool when is_atom(shared_connection_pool) -> + Module.concat([shared_connection_pool, Postgrex]) + + invalid -> + raise ArgumentError, + "Invalid `:shared_connection_pool` specified, expected an atom but got: " <> + inspect(invalid) + end end end diff --git a/test/config_test.exs b/test/config_test.exs index c1e3cbd8..0d7a509b 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -52,6 +52,27 @@ defmodule EventStore.ConfigTest do ] end + test "parse `:shared_connection_pool`" do + config = [ + username: "postgres", + database: "eventstore_test", + password: "postgres", + shared_connection_pool: :shared_pool + ] + + assert Config.parse(config) == + [ + enable_hard_deletes: false, + column_data_type: "bytea", + schema: "public", + pool: EventStore.Config.get_pool(), + shared_connection_pool: :shared_pool, + password: "postgres", + database: "eventstore_test", + username: "postgres" + ] + end + test "parse socket_dir" do config = [ username: "postgres", diff --git a/test/dynamic_event_store_test.exs b/test/dynamic_event_store_test.exs index 6c930510..8fcc5cba 100644 --- a/test/dynamic_event_store_test.exs +++ b/test/dynamic_event_store_test.exs @@ -1,4 +1,4 @@ -defmodule DynamicEventStoreTest do +defmodule EventStore.DynamicEventStoreTest do use EventStore.StorageCase alias EventStore.EventFactory diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 461bd441..788a55d3 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -1,4 +1,4 @@ -defmodule EventStoreTest do +defmodule EventStore.EventStoreTest do use EventStore.StorageCase alias EventStore.{EventData, EventFactory, RecordedEvent} diff --git a/test/list_event_store_migrations_test.exs b/test/list_event_store_migrations_test.exs index e9c9158d..a3c18636 100644 --- a/test/list_event_store_migrations_test.exs +++ b/test/list_event_store_migrations_test.exs @@ -1,4 +1,4 @@ -defmodule ListEventStoreMigrationsTest do +defmodule EventStore.ListEventStoreMigrationsTest do use ExUnit.Case import ExUnit.CaptureIO diff --git a/test/migrate_event_store_test.exs b/test/migrate_event_store_test.exs index 47e49255..b1f4c68a 100644 --- a/test/migrate_event_store_test.exs +++ b/test/migrate_event_store_test.exs @@ -1,4 +1,4 @@ -defmodule MigrateEventStoreTest do +defmodule EventStore.MigrateEventStoreTest do use ExUnit.Case import ExUnit.CaptureIO diff --git a/test/migrated_event_store_test.exs b/test/migrated_event_store_test.exs index f4e0f583..b5700027 100644 --- a/test/migrated_event_store_test.exs +++ b/test/migrated_event_store_test.exs @@ -8,7 +8,7 @@ defmodule Snapshot do defstruct [:data, version: "1"] end -defmodule MigratedEventStoreTest do +defmodule EventStore.MigratedEventStoreTest do use ExUnit.Case alias EventStore.RecordedEvent diff --git a/test/monitored_server_test.exs b/test/monitored_server_test.exs index ad8236ab..d2fe333c 100644 --- a/test/monitored_server_test.exs +++ b/test/monitored_server_test.exs @@ -1,96 +1,167 @@ defmodule EventStore.MonitoredServerTest do use ExUnit.Case - alias EventStore.{MonitoredServer, ObservedServer, ProcessHelper, Wait} + alias EventStore.{MonitoredServer, ObservedServer, ProcessHelper} - test "should start process" do - {:ok, _pid} = start_monitored_process() + describe "monitored server" do + test "should start process" do + start_monitored_process!() - assert ObservedServer |> Process.whereis() |> Process.alive?() - end + assert_receive {:UP, MonitoredServer, pid} + + assert Process.whereis(ObservedServer) == pid + assert Process.alive?(pid) + end + + test "should stop observed process when monitored process stopped" do + start_monitored_process!() + + assert_receive {:UP, MonitoredServer, pid} + + ref = Process.monitor(pid) - test "should restart process after exit" do - {:ok, _pid} = start_monitored_process() + :ok = stop_supervised(MonitoredServer) - pid1 = shutdown_observed_process() + assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown} + end - Wait.until(fn -> - pid2 = Process.whereis(ObservedServer) + test "should restart process after exit" do + start_monitored_process!() - assert pid2 != nil + assert_receive {:UP, MonitoredServer, pid1} + + shutdown_observed_process() + + assert_receive {:UP, MonitoredServer, pid2} + + assert Process.whereis(ObservedServer) == pid2 + + assert is_pid(pid2) assert pid1 != pid2 assert Process.alive?(pid2) - end) - end + end - test "should send `DOWN` message after process shutdown" do - {:ok, _pid} = start_monitored_process() + test "should retry start on failure" do + start_monitored_process!(start_successfully: false) - refute_receive {:DOWN, MonitoredServer, _pid, _reason} + assert_receive {:init, _pid} + assert_receive {:init, _pid} + assert_receive {:init, _pid} + end - _pid = shutdown_observed_process() + test "should send `:DOWN` message after process shutdown" do + start_monitored_process!() - assert_receive {:DOWN, MonitoredServer, _pid, :shutdown} - end + assert_receive {:UP, MonitoredServer, _pid1} + refute_receive {:DOWN, MonitoredServer, _pid, _reason} - test "should send `:UP` message after process restarted" do - {:ok, _pid} = start_monitored_process() + shutdown_observed_process() - assert_receive {:UP, MonitoredServer, _pid} + assert_receive {:DOWN, MonitoredServer, _pid, :shutdown} + end - _pid = shutdown_observed_process() + test "should send `:UP` message after process restarted" do + start_monitored_process!() - assert_receive {:UP, MonitoredServer, _pid} - end + assert_receive {:UP, MonitoredServer, pid1} + assert pid1 == Process.whereis(ObservedServer) - test "should forward calls to observed process using registered name" do - {:ok, _pid} = start_monitored_process() + shutdown_observed_process() - assert {:ok, :pong} = GenServer.call(MonitoredServer, :ping) - end + assert_receive {:UP, MonitoredServer, pid2} + assert pid2 == Process.whereis(ObservedServer) + assert pid1 != pid2 + end - test "should forward calls to observed process" do - {:ok, pid} = start_monitored_process() + test "should forward calls to observed process using registered name" do + start_monitored_process!() - assert {:ok, :pong} = GenServer.call(pid, :ping) - end + assert {:ok, :pong} = GenServer.call(MonitoredServer, :ping) + end - test "should forward casts to observed process" do - {:ok, pid} = start_monitored_process() + test "should forward calls to observed process using pid" do + pid = start_monitored_process!() - assert :ok = GenServer.cast(pid, :ping) + assert {:ok, :pong} = GenServer.call(pid, :ping) + end - assert_receive :pong - end + test "should forward casts to observed process" do + pid = start_monitored_process!() - test "should forward sent messages to observed process" do - {:ok, pid} = start_monitored_process() + assert :ok = GenServer.cast(pid, :ping) - send(pid, :ping) + assert_receive :pong + end - assert_receive :pong - end + test "should forward info messages to observed process" do + pid = start_monitored_process!() - defp start_monitored_process do - reply_to = self() + send(pid, :ping) - {:ok, pid} = - MonitoredServer.start_link( - mfa: {ObservedServer, :start_link, [[reply_to: reply_to, name: ObservedServer]]}, - name: MonitoredServer - ) + assert_receive :pong + end - :ok = MonitoredServer.monitor(MonitoredServer) + test "allow monitored process to monitor an already started process" do + pid = start_supervised!({ObservedServer, reply_to: self(), name: ObservedServer}) + + assert {:ok, :pong} = GenServer.call(pid, :ping) - {:ok, pid} + monitor = start_monitored_process!() + + assert_receive {:UP, MonitoredServer, ^pid} + + assert {:ok, :pong} = GenServer.call(monitor, :ping) + end + + test "stopping monitored observer associated with an already started process should not terminate process" do + pid = start_supervised!({ObservedServer, reply_to: self(), name: ObservedServer}) + + start_monitored_process!() + + assert_receive {:UP, MonitoredServer, ^pid} + + ref = Process.monitor(pid) + + :ok = stop_supervised(MonitoredServer) + + refute_receive {:DOWN, ^ref, :process, ^pid, :shutdown} + end + + test "monitored observer should attempt to restart an already started process on exit" do + pid = start_supervised!({ObservedServer, reply_to: self(), name: ObservedServer}) + + start_monitored_process!() + + assert_receive {:UP, MonitoredServer, ^pid} + + shutdown_observed_process() + + assert_receive {:DOWN, MonitoredServer, ^pid, :shutdown} + assert_receive {:UP, MonitoredServer, _pid1} + end end - defp shutdown_observed_process do - pid = Process.whereis(ObservedServer) + defp start_monitored_process!(opts \\ []) do + opts = Keyword.merge([reply_to: self(), name: ObservedServer], opts) + + spec = + Supervisor.child_spec( + {MonitoredServer, + mfa: {ObservedServer, :start_link, [opts]}, + name: MonitoredServer, + backoff_min: 1, + backoff_max: 100}, + id: MonitoredServer + ) - ProcessHelper.shutdown(pid) - refute Process.alive?(pid) + pid = start_supervised!(spec) + + :ok = MonitoredServer.monitor(MonitoredServer) pid end + + defp shutdown_observed_process do + ProcessHelper.shutdown(ObservedServer) + end end diff --git a/test/multi_event_store_test.exs b/test/multi_event_store_test.exs index 08c1d529..a0ef2cff 100644 --- a/test/multi_event_store_test.exs +++ b/test/multi_event_store_test.exs @@ -1,4 +1,4 @@ -defmodule MultiEventStoreTest do +defmodule EventStore.MultiEventStoreTest do use EventStore.StorageCase alias EventStore.{Config, EventData, EventFactory, RecordedEvent, Storage} diff --git a/test/schema_test.exs b/test/schema_test.exs index f4104166..efdde045 100644 --- a/test/schema_test.exs +++ b/test/schema_test.exs @@ -1,4 +1,4 @@ -defmodule SchemaTest do +defmodule EventStore.SchemaTest do use EventStore.StorageCase alias EventStore.Config diff --git a/test/shared_connection_pool_test.exs b/test/shared_connection_pool_test.exs new file mode 100644 index 00000000..40e46e8e --- /dev/null +++ b/test/shared_connection_pool_test.exs @@ -0,0 +1,126 @@ +defmodule EventStore.SharedConnectionPoolTest do + use EventStore.StorageCase + + alias EventStore.EventFactory + alias EventStore.MonitoredServer + alias EventStore.MonitoredServer.State, as: MonitoredServerState + alias EventStore.Tasks.{Create, Drop, Init} + + describe "connection pool sharing" do + setup do + for schema <- ["schema1", "schema2"] do + config = TestEventStore.config() |> Keyword.put(:schema, schema) + + Create.exec(config, quiet: true) + Init.exec(config, quiet: true) + end + + start_supervised!( + {TestEventStore, + name: :eventstore1, shared_connection_pool: :shared_pool, schema: "schema1"} + ) + + start_supervised!( + {TestEventStore, + name: :eventstore2, shared_connection_pool: :shared_pool, schema: "schema2"} + ) + + on_exit(fn -> + for schema <- ["schema1", "schema2"] do + config = TestEventStore.config() |> Keyword.put(:schema, schema) + + Drop.exec(config, quiet: true) + end + end) + end + + test "should only start one Postgrex connection pool" do + # Event stores sharing a connection pool should use the same `Postgrex` conn + conn = Process.whereis(Module.concat([:shared_pool, Postgrex])) + assert is_pid(conn) + + pid1 = Process.whereis(Module.concat([:eventstore1, Postgrex, MonitoredServer])) + + assert is_pid(pid1) + assert %MonitoredServerState{pid: ^conn} = :sys.get_state(pid1) + + pid2 = Process.whereis(Module.concat([:eventstore2, Postgrex, MonitoredServer])) + + assert is_pid(pid2) + assert %MonitoredServerState{pid: ^conn} = :sys.get_state(pid2) + + # An event store started without specifying a connection pool should start its own pool + pid = Process.whereis(Module.concat([TestEventStore, Postgrex])) + assert is_pid(pid) + + # Rudimentary check that this is a `Postgrex` process based on its state + assert {:ready, _ref, _state} = :sys.get_state(pid) + end + + test "append and read events" do + stream_uuid = UUID.uuid4() + + {:ok, events} = append_events_to_stream(:eventstore1, stream_uuid, 3) + + assert_recorded_events(:eventstore1, stream_uuid, events) + refute_stream_exists(:eventstore2, stream_uuid) + end + + test "subscribe to stream" do + stream_uuid = UUID.uuid4() + + {:ok, subscription1} = + TestEventStore.subscribe_to_stream(stream_uuid, "subscriber1", self(), name: :eventstore1) + + {:ok, subscription2} = + TestEventStore.subscribe_to_stream(stream_uuid, "subscriber2", self(), name: :eventstore2) + + assert_receive {:subscribed, ^subscription1} + assert_receive {:subscribed, ^subscription2} + + {:ok, _events} = append_events_to_stream(:eventstore1, stream_uuid, 3) + + assert_receive {:events, _events} + refute_receive {:events, _events} + end + end + + defp append_events_to_stream(event_store_name, stream_uuid, count, expected_version \\ 0) do + events = EventFactory.create_events(count, expected_version + 1) + + :ok = + TestEventStore.append_to_stream(stream_uuid, expected_version, events, + name: event_store_name + ) + + {:ok, events} + end + + defp assert_recorded_events(event_store_name, stream_uuid, expected_events) do + actual_events = + TestEventStore.stream_forward(stream_uuid, 0, name: event_store_name) |> Enum.to_list() + + assert_events(expected_events, actual_events) + end + + defp assert_events(expected_events, actual_events) do + assert length(expected_events) == length(actual_events) + + for {expected, actual} <- Enum.zip(expected_events, actual_events) do + assert_event(expected, actual) + end + end + + defp assert_event(expected_event, actual_event) do + assert expected_event.correlation_id == actual_event.correlation_id + assert expected_event.causation_id == actual_event.causation_id + assert expected_event.event_type == actual_event.event_type + assert expected_event.data == actual_event.data + assert expected_event.metadata == actual_event.metadata + end + + defp refute_stream_exists(event_store_name, stream_uuid) do + assert {:error, :stream_not_found} == + TestEventStore.stream_forward(stream_uuid, 0, name: event_store_name) + end +end diff --git a/test/support/observed_server.ex b/test/support/observed_server.ex index 65d5a49d..31745c6c 100644 --- a/test/support/observed_server.ex +++ b/test/support/observed_server.ex @@ -4,26 +4,37 @@ defmodule EventStore.ObservedServer do use GenServer def start_link(opts) do - GenServer.start_link(__MODULE__, Keyword.take(opts, [:reply_to]), Keyword.take(opts, [:name])) + {start_opts, observer_opts} = + Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after]) + + GenServer.start_link(__MODULE__, observer_opts, start_opts) end - def init(state) do - {:ok, state} + def init(opts) do + reply_to = Keyword.fetch!(opts, :reply_to) + + send(reply_to, {:init, self()}) + + if Keyword.get(opts, :start_successfully, true) do + {:ok, reply_to} + else + {:error, :failed} + end end - def handle_call(:ping, _from, state) do - {:reply, {:ok, :pong}, state} + def handle_call(:ping, _from, reply_to) do + {:reply, {:ok, :pong}, reply_to} end - def handle_cast(:ping, [reply_to: reply_to] = state) do + def handle_cast(:ping, reply_to) do send(reply_to, :pong) - {:noreply, state} + {:noreply, reply_to} end - def handle_info(:ping, [reply_to: reply_to] = state) do + def handle_info(:ping, reply_to) do send(reply_to, :pong) - {:noreply, state} + {:noreply, reply_to} end end diff --git a/test/support/process_helper.ex b/test/support/process_helper.ex index 692eadc8..58783aeb 100644 --- a/test/support/process_helper.ex +++ b/test/support/process_helper.ex @@ -15,6 +15,6 @@ defmodule EventStore.ProcessHelper do Process.exit(pid, :shutdown) ref = Process.monitor(pid) - assert_receive {:DOWN, ^ref, _, _, _}, 1_000 + assert_receive {:DOWN, ^ref, :process, _object, _reason}, 1_000 end end