diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a0cd0d..96d2a69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -58,26 +58,39 @@ jobs: run: mix test --cover ES-21: - name: ES 21.6.0 + name: ES 21.6.0 with SSL runs-on: ubuntu-latest strategy: matrix: elixir: ['1.12.3'] erlang: ['24.1'] - - services: - es: - image: eventstore/eventstore:21.6.0-buster-slim - ports: ['1113:1113'] - env: - EVENTSTORE_RUN_PROJECTIONS: "All" - EVENTSTORE_START_STANDARD_PROJECTIONS: "true" - EVENTSTORE_CLUSTER_SIZE: 1 - EVENTSTORE_EXT_TCP_PORT: 1113 - EVENTSTORE_INSECURE: "true" - EVENTSTORE_ENABLE_EXTERNAL_TCP: "true" + env: + EXTREME_CACERTFILE: /certs/ca/ca.crt steps: + - name: Generate EventStoreDB SSL certificates + run: | + sudo mkdir -p /certs + sudo chmod a+rw /certs + docker run \ + --entrypoint /bin/bash \ + --mount type=bind,source=/certs,target=/certs \ + eventstore/es-gencert-cli:1.0.2 \ + -c "mkdir -p ./certs && cd /certs && es-gencert-cli create-ca && es-gencert-cli create-node -out ./node -ip-addresses 127.0.0.1 -dns-names localhost" + - name: Start the EventStoreDB container + run: | + docker run \ + --name eventstore \ + --detach \ + --mount type=bind,source=/certs,target=/etc/eventstore/certs \ + --publish 1113:1113 \ + eventstore/eventstore:21.6.0-buster-slim \ + --run-projections=All \ + --enable-external-tcp=true \ + --trusted-root-certificates-path=/etc/eventstore/certs/ca \ + --certificate-file=/etc/eventstore/certs/node/node.crt \ + --certificate-private-key-file=/etc/eventstore/certs/node/node.key \ + --advertise-host-to-client-as=127.0.0.1 - uses: actions/checkout@v2 - name: Set up Elixir uses: erlef/setup-beam@v1 @@ -103,4 +116,4 @@ jobs: - name: Check formatting run: mix format --check-formatted - name: Run tests - run: mix test --cover --exclude=authentication + run: mix test --cover --exclude=gossip diff --git a/config/test.exs b/config/test.exs index 5e45d5d..c06d8d6 100644 --- a/config/test.exs +++ b/config/test.exs @@ -9,10 +9,26 @@ config :ex_unit, assert_receive_timeout: 10_000, capture_log: true -config :extreme, TestConn, - db_type: "node", - host: "localhost", - port: "1113", - username: "admin", - password: "changeit", - connection_name: "extreme_test" +transport_opts = + if cacertfile = System.get_env("EXTREME_CACERTFILE") do + [ + transport: :ssl, + transport_opts: [ + verify: :verify_peer, + cacertfile: cacertfile + ] + ] + else + [] + end + +config :extreme, + TestConn, + [ + db_type: "node", + host: "localhost", + port: "1113", + username: "admin", + password: "changeit", + connection_name: "extreme_test" + ] ++ transport_opts diff --git a/lib/extreme/cluster_connection.ex b/lib/extreme/cluster_connection.ex index d612ba4..491d6cb 100644 --- a/lib/extreme/cluster_connection.ex +++ b/lib/extreme/cluster_connection.ex @@ -5,15 +5,23 @@ defmodule Extreme.ClusterConnection do require Logger - def gossip_with(nodes, gossip_timeout, mode) + def gossip_with(nodes, opts) - def gossip_with([], _, _), do: {:error, :no_more_gossip_seeds} + def gossip_with([], _opts), do: {:error, :no_more_gossip_seeds} + + def gossip_with([node | rest_nodes], opts) do + mode = Keyword.fetch!(opts, :mode) + scheme = if Keyword.fetch!(opts, :transport) == :ssl, do: 'https', else: 'http' + url = '#{scheme}://#{node.host}:#{node.port}/gossip?format=json' + + request_opts = [ + timeout: Keyword.fetch!(opts, :timeout), + ssl: Keyword.fetch!(opts, :transport_opts) + ] - def gossip_with([node | rest_nodes], gossip_timeout, mode) do - url = 'http://#{node.host}:#{node.port}/gossip?format=json' Logger.info("Gossip with #{url}") - case :httpc.request(:get, {url, []}, [timeout: gossip_timeout], []) do + case :httpc.request(:get, {url, []}, request_opts, []) do {:ok, {{_version, 200, _status}, _headers, body}} -> body |> Jason.decode!() @@ -21,7 +29,7 @@ defmodule Extreme.ClusterConnection do error -> Logger.error("Error getting gossip: #{inspect(error)}") - gossip_with(rest_nodes, gossip_timeout, mode) + gossip_with(rest_nodes, opts) end end diff --git a/lib/extreme/configuration.ex b/lib/extreme/configuration.ex index 97e98d7..2430e4b 100644 --- a/lib/extreme/configuration.ex +++ b/lib/extreme/configuration.ex @@ -47,12 +47,9 @@ defmodule Extreme.Configuration do do: {:ok, _get_host(configuration), _get_port(configuration)} defp _get_node(:cluster, configuration) do - gossip_timeout = Keyword.get(configuration, :gossip_timeout, 1_000) - mode = Keyword.get(configuration, :mode, :write) - configuration |> Keyword.fetch!(:nodes) - |> ClusterConnection.gossip_with(gossip_timeout, mode) + |> ClusterConnection.gossip_with(gossip_opts(configuration)) end defp _get_node(:cluster_dns, configuration) do @@ -66,18 +63,11 @@ defmodule Extreme.Configuration do |> Keyword.get(:port, 2113) |> Tools.cast_to_integer() - gossip_timeout = - configuration - |> Keyword.get(:gossip_timeout, 1_000) - |> Tools.cast_to_integer() - - mode = Keyword.get(configuration, :mode, :write) - ips |> Enum.map(fn ip -> %{host: to_string(:inet.ntoa(ip)), port: gossip_port} end) - |> ClusterConnection.gossip_with(gossip_timeout, mode) + |> ClusterConnection.gossip_with(gossip_opts(configuration)) end # Returns `:host` value from `configuration` as charlist. @@ -95,4 +85,13 @@ defmodule Extreme.Configuration do |> Keyword.fetch!(:port) |> Tools.cast_to_integer() end + + defp gossip_opts(configuration) do + [ + mode: Keyword.get(configuration, :mode, :write), + timeout: Keyword.get(configuration, :gossip_timeout, 1_000) |> Tools.cast_to_integer(), + transport: Keyword.get(configuration, :transport, :tcp), + transport_opts: Keyword.get(configuration, :transport_opts, []) + ] + end end diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index af65dcd..9f36e75 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -1,11 +1,11 @@ defmodule Extreme.Connection do use GenServer - alias Extreme.{Configuration, Tcp, RequestManager} + alias Extreme.{Configuration, RequestManager} alias Extreme.ConnectionImpl, as: Impl require Logger defmodule State do - defstruct ~w(base_name socket received_data)a + defstruct ~w(base_name socket received_data transport)a end def start_link(base_name, configuration), @@ -18,13 +18,40 @@ defmodule Extreme.Connection do |> GenServer.cast({:execute, message}) end + @doc """ + Opens a connection with EventStore. Returns `{:ok, socket}` on success or + `{:error, :max_attempt_exceeded}` if connection wasn't made in `:max_attempts` + provided in `configuration`. If not specified, `max_attempts` defaults to :infinity + """ + def connect(host, port, configuration, attempt \\ 1) do + configuration + |> Keyword.get(:max_attempts, :infinity) + |> case do + :infinity -> true + max when attempt <= max -> true + _any -> false + end + |> if do + if attempt > 1 do + configuration + |> Keyword.get(:reconnect_delay, 1_000) + |> :timer.sleep() + end + + _connect(host, port, configuration, attempt) + else + {:error, :max_attempt_exceeded} + end + end + @impl true def init({base_name, configuration}) do GenServer.cast(self(), {:connect, configuration, 1}) state = %State{ base_name: base_name, - received_data: "" + received_data: "", + transport: Keyword.get(configuration, :transport, :tcp) } {:ok, state} @@ -32,9 +59,9 @@ defmodule Extreme.Connection do @impl true def handle_cast({:connect, configuration, attempt}, state) do - configuration - |> _connect(attempt) - |> case do + {:ok, host, port} = Configuration.get_node(configuration) + + case connect(host, port, configuration, attempt) do {:ok, socket} -> Logger.info(fn -> "Successfully connected to EventStore" end) @@ -58,7 +85,7 @@ defmodule Extreme.Connection do end @impl true - def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do + def handle_info({tag, socket, pkg}, %State{socket: socket} = state) when tag in [:tcp, :ssl] do {:ok, state} = Impl.receive_package(pkg, state) {:noreply, state} end @@ -72,10 +99,26 @@ defmodule Extreme.Connection do RequestManager.kill_all_subscriptions(state.base_name) end - defp _connect(configuration, attempt) do - {:ok, host, port} = Configuration.get_node(configuration) - Tcp.connect(host, port, configuration, attempt) - end - def _name(base_name), do: Module.concat(base_name, Connection) + + defp _connect(host, port, configuration, attempt) do + Logger.info(fn -> "Connecting Extreme to #{host}:#{port}" end) + + transport_module = + case Keyword.get(configuration, :transport, :tcp) do + :tcp -> :gen_tcp + :ssl -> :ssl + end + + opts = Keyword.get(configuration, :transport_opts, []) ++ [:binary, active: :once] + + case transport_module.connect(host, port, opts) do + {:ok, socket} -> + {:ok, socket} + + reason -> + Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) + connect(host, port, configuration, attempt + 1) + end + end end diff --git a/lib/extreme/connection_impl.ex b/lib/extreme/connection_impl.ex index 5579ee1..d465e8c 100644 --- a/lib/extreme/connection_impl.ex +++ b/lib/extreme/connection_impl.ex @@ -7,15 +7,26 @@ defmodule Extreme.ConnectionImpl do require Logger - def execute(message, %State{socket: socket}), - do: :gen_tcp.send(socket, message) + def execute(message, %State{transport: :tcp, socket: socket}) do + :gen_tcp.send(socket, message) + end + + def execute(message, %State{transport: :ssl, socket: socket}) do + :ssl.send(socket, message) + end - def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do - :inet.setopts(socket, active: :once) + def receive_package(pkg, %State{received_data: received_data} = state) do + set_active_once(state) state = _process_package(state, received_data <> pkg) {:ok, state} end + defp set_active_once(%State{transport: :tcp, socket: socket}), + do: :inet.setopts(socket, active: :once) + + defp set_active_once(%State{transport: :ssl, socket: socket}), + do: :ssl.setopts(socket, active: :once) + defp _process_package( state, < Keyword.get(:max_attempts, :infinity) - |> case do - :infinity -> true - max when attempt <= max -> true - _any -> false - end - |> if do - if attempt > 1 do - configuration - |> Keyword.get(:reconnect_delay, 1_000) - |> :timer.sleep() - end - - _connect(host, port, configuration, attempt) - else - {:error, :max_attempt_exceeded} - end - end - - defp _connect(host, port, configuration, attempt) do - Logger.info(fn -> "Connecting Extreme to #{host}:#{port}" end) - opts = [:binary, active: :once] - - host - |> :gen_tcp.connect(port, opts) - |> case do - {:ok, socket} -> - {:ok, socket} - - reason -> - Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) - connect(host, port, configuration, attempt + 1) - end - end -end diff --git a/mix.exs b/mix.exs index 97a9435..0e9be4f 100644 --- a/mix.exs +++ b/mix.exs @@ -25,7 +25,7 @@ defmodule Extreme.Mixfile do def application do [ - extra_applications: [:logger, :inets] + extra_applications: [:logger, :inets, :ssl] ] end diff --git a/test/extreme/cluster_connection_test.exs b/test/extreme/cluster_connection_test.exs index 9a88fae..3d6ca6a 100644 --- a/test/extreme/cluster_connection_test.exs +++ b/test/extreme/cluster_connection_test.exs @@ -10,8 +10,10 @@ defmodule Extreme.ClusterConnectionTest do assert {:ok, 'localhost', 1113} = Extreme.ClusterConnection.gossip_with( [%{host: "0.0.0.0", port: "2113"}], - 20_000, - :write + timeout: 20_000, + mode: :write, + transport: :tcp, + transport_opts: [] ) end end diff --git a/test/extreme/tcp_test.exs b/test/extreme/connection_test.exs similarity index 63% rename from test/extreme/tcp_test.exs rename to test/extreme/connection_test.exs index d866194..31bc378 100644 --- a/test/extreme/tcp_test.exs +++ b/test/extreme/connection_test.exs @@ -1,6 +1,6 @@ -defmodule Extreme.TcpTest do +defmodule Extreme.ConnectionTest do use ExUnit.Case, async: true - alias Extreme.{Tcp, Configuration} + alias Extreme.{Connection, Configuration} @test_configuration Application.get_env(:extreme, TestConn) @@ -8,14 +8,14 @@ defmodule Extreme.TcpTest do test "returns {:ok, socket} for correct host and port" do {:ok, host, port} = Configuration.get_node(@test_configuration) - assert {:ok, _socket} = Tcp.connect(host, port, []) + assert {:ok, _socket} = Connection.connect(host, port, []) end test "returns {:error, :max_attempt_exceeded} for incorrect port when `max_attempts` exceeds" do host = 'localhost' port = 1609 - assert {:error, :max_attempt_exceeded} = Tcp.connect(host, port, max_attempts: 1) + assert {:error, :max_attempt_exceeded} = Connection.connect(host, port, max_attempts: 1) end end end diff --git a/test/extreme_test.exs b/test/extreme_test.exs index cacbd38..3f50b74 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -18,6 +18,7 @@ defmodule ExtremeTest do defmodule(ClusterConn, do: use(Extreme)) + @tag :gossip test "Connects on EventStore cluster with a list of nodes" do use_cassette "gossip_with_clusters_existing_node" do nodes = [