From fac0d202d16be2b5c655f55c4fe1447158dcfc91 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 5 Nov 2021 14:04:17 -0500 Subject: [PATCH 1/7] add support for :ssl connections --- lib/extreme/connection.ex | 67 ++++++++++++++++--- lib/extreme/connection_impl.ex | 19 ++++-- lib/extreme/tcp.ex | 49 -------------- .../{tcp_test.exs => connection_test.exs} | 8 +-- 4 files changed, 75 insertions(+), 68 deletions(-) delete mode 100644 lib/extreme/tcp.ex rename test/extreme/{tcp_test.exs => connection_test.exs} (63%) diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index af65dcd..d4a12b1 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -1,11 +1,11 @@ defmodule Extreme.Connection do use GenServer - alias Extreme.{Configuration, Tcp, RequestManager} + alias Extreme.{Configuration, RequestManager} alias Extreme.ConnectionImpl, as: Impl require Logger defmodule State do - defstruct ~w(base_name socket received_data)a + defstruct ~w(base_name socket received_data transport)a end def start_link(base_name, configuration), @@ -18,13 +18,40 @@ defmodule Extreme.Connection do |> GenServer.cast({:execute, message}) end + @doc """ + Opens a connection with EventStore. Returns `{:ok, socket}` on success or + `{:error, :max_attempt_exceeded}` if connection wasn't made in `:max_attempts` + provided in `configuration`. If not specified, `max_attempts` defaults to :infinity + """ + def connect(host, port, configuration, attempt \\ 1) do + configuration + |> Keyword.get(:max_attempts, :infinity) + |> case do + :infinity -> true + max when attempt <= max -> true + _any -> false + end + |> if do + if attempt > 1 do + configuration + |> Keyword.get(:reconnect_delay, 1_000) + |> :timer.sleep() + end + + _connect(host, port, configuration, attempt) + else + {:error, :max_attempt_exceeded} + end + end + @impl true def init({base_name, configuration}) do GenServer.cast(self(), {:connect, configuration, 1}) state = %State{ base_name: base_name, - received_data: "" + received_data: "", + transport: Keyword.get(configuration, :transport, :tcp) } {:ok, state} @@ -32,9 +59,9 @@ defmodule Extreme.Connection do @impl true def handle_cast({:connect, configuration, attempt}, state) do - configuration - |> _connect(attempt) - |> case do + {:ok, host, port} = Configuration.get_node(configuration) + + case connect(host, port, configuration, attempt) do {:ok, socket} -> Logger.info(fn -> "Successfully connected to EventStore" end) @@ -72,10 +99,28 @@ defmodule Extreme.Connection do RequestManager.kill_all_subscriptions(state.base_name) end - defp _connect(configuration, attempt) do - {:ok, host, port} = Configuration.get_node(configuration) - Tcp.connect(host, port, configuration, attempt) - end - def _name(base_name), do: Module.concat(base_name, Connection) + + defp _connect(host, port, configuration, attempt) do + Logger.info(fn -> "Connecting Extreme to #{host}:#{port}" end) + + transport_module = + case Keyword.get(configuration, :transport, :tcp) do + :tcp -> :gen_tcp + :ssl -> :ssl + end + + opts = Keyword.get(configuration, :transport_opts, []) ++ [:binary, active: :once] + + host + |> transport_module.connect(port, opts) + |> case do + {:ok, socket} -> + {:ok, socket} + + reason -> + Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) + connect(host, port, configuration, attempt + 1) + end + end end diff --git a/lib/extreme/connection_impl.ex b/lib/extreme/connection_impl.ex index 5579ee1..d465e8c 100644 --- a/lib/extreme/connection_impl.ex +++ b/lib/extreme/connection_impl.ex @@ -7,15 +7,26 @@ defmodule Extreme.ConnectionImpl do require Logger - def execute(message, %State{socket: socket}), - do: :gen_tcp.send(socket, message) + def execute(message, %State{transport: :tcp, socket: socket}) do + :gen_tcp.send(socket, message) + end + + def execute(message, %State{transport: :ssl, socket: socket}) do + :ssl.send(socket, message) + end - def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do - :inet.setopts(socket, active: :once) + def receive_package(pkg, %State{received_data: received_data} = state) do + set_active_once(state) state = _process_package(state, received_data <> pkg) {:ok, state} end + defp set_active_once(%State{transport: :tcp, socket: socket}), + do: :inet.setopts(socket, active: :once) + + defp set_active_once(%State{transport: :ssl, socket: socket}), + do: :ssl.setopts(socket, active: :once) + defp _process_package( state, < Keyword.get(:max_attempts, :infinity) - |> case do - :infinity -> true - max when attempt <= max -> true - _any -> false - end - |> if do - if attempt > 1 do - configuration - |> Keyword.get(:reconnect_delay, 1_000) - |> :timer.sleep() - end - - _connect(host, port, configuration, attempt) - else - {:error, :max_attempt_exceeded} - end - end - - defp _connect(host, port, configuration, attempt) do - Logger.info(fn -> "Connecting Extreme to #{host}:#{port}" end) - opts = [:binary, active: :once] - - host - |> :gen_tcp.connect(port, opts) - |> case do - {:ok, socket} -> - {:ok, socket} - - reason -> - Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end) - connect(host, port, configuration, attempt + 1) - end - end -end diff --git a/test/extreme/tcp_test.exs b/test/extreme/connection_test.exs similarity index 63% rename from test/extreme/tcp_test.exs rename to test/extreme/connection_test.exs index d866194..31bc378 100644 --- a/test/extreme/tcp_test.exs +++ b/test/extreme/connection_test.exs @@ -1,6 +1,6 @@ -defmodule Extreme.TcpTest do +defmodule Extreme.ConnectionTest do use ExUnit.Case, async: true - alias Extreme.{Tcp, Configuration} + alias Extreme.{Connection, Configuration} @test_configuration Application.get_env(:extreme, TestConn) @@ -8,14 +8,14 @@ defmodule Extreme.TcpTest do test "returns {:ok, socket} for correct host and port" do {:ok, host, port} = Configuration.get_node(@test_configuration) - assert {:ok, _socket} = Tcp.connect(host, port, []) + assert {:ok, _socket} = Connection.connect(host, port, []) end test "returns {:error, :max_attempt_exceeded} for incorrect port when `max_attempts` exceeds" do host = 'localhost' port = 1609 - assert {:error, :max_attempt_exceeded} = Tcp.connect(host, port, max_attempts: 1) + assert {:error, :max_attempt_exceeded} = Connection.connect(host, port, max_attempts: 1) end end end From 8bdbd3b51e37015783f52ea144ea661536bea742 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 5 Nov 2021 14:21:49 -0500 Subject: [PATCH 2/7] add :ssl to :extra_applications --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 97a9435..0e9be4f 100644 --- a/mix.exs +++ b/mix.exs @@ -25,7 +25,7 @@ defmodule Extreme.Mixfile do def application do [ - extra_applications: [:logger, :inets] + extra_applications: [:logger, :inets, :ssl] ] end From 616373bac68e7e6a659e83d6164ab8e7db891268 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 15 Nov 2021 07:49:21 -0600 Subject: [PATCH 3/7] unroll unnecessary pipe --- lib/extreme/connection.ex | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index d4a12b1..2edf34b 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -112,9 +112,7 @@ defmodule Extreme.Connection do opts = Keyword.get(configuration, :transport_opts, []) ++ [:binary, active: :once] - host - |> transport_module.connect(port, opts) - |> case do + case transport_module.connect(host, port, opts) do {:ok, socket} -> {:ok, socket} From 5c1b9dd482d77d1726e6ea45c824f412492a47c6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 15 Nov 2021 08:56:58 -0600 Subject: [PATCH 4/7] test with SSL in ci on 21.6.0 container use run instead of args use docker run instead of uses-run use bash -c in generation step add some logging don't rely on unpublished es-gencert version set entrypoint as bash create crt dirs ahead of time add all read/write priviledges on certs dir fix backslashes in esdb run move ./certs dir to /certs add sudo to directory creation commands fix formatting fix transport-opts config use verify_peer config and fix cert path point at cacertfile env var show docker logs fix backslash in docker run point esdb at cacerts directory fix trusted cert cli switch name fix path for ca cert file remove logging --- .github/workflows/test.yml | 41 +++++++++++++++++++++++++------------- config/test.exs | 30 +++++++++++++++++++++------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a0cd0d..0917ba9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -58,26 +58,39 @@ jobs: run: mix test --cover ES-21: - name: ES 21.6.0 + name: ES 21.6.0 with SSL runs-on: ubuntu-latest strategy: matrix: elixir: ['1.12.3'] erlang: ['24.1'] - - services: - es: - image: eventstore/eventstore:21.6.0-buster-slim - ports: ['1113:1113'] - env: - EVENTSTORE_RUN_PROJECTIONS: "All" - EVENTSTORE_START_STANDARD_PROJECTIONS: "true" - EVENTSTORE_CLUSTER_SIZE: 1 - EVENTSTORE_EXT_TCP_PORT: 1113 - EVENTSTORE_INSECURE: "true" - EVENTSTORE_ENABLE_EXTERNAL_TCP: "true" + env: + EXTREME_CACERTFILE: /certs/ca/ca.crt steps: + - name: Generate EventStoreDB SSL certificates + run: | + sudo mkdir -p /certs + sudo chmod a+rw /certs + docker run \ + --entrypoint /bin/bash \ + --mount type=bind,source=/certs,target=/certs \ + eventstore/es-gencert-cli:1.0.2 \ + -c "mkdir -p ./certs && cd /certs && es-gencert-cli create-ca && es-gencert-cli create-node -out ./node -ip-addresses 127.0.0.1 -dns-names localhost" + - name: Start the EventStoreDB container + run: | + docker run \ + --name eventstore \ + --detach \ + --mount type=bind,source=/certs,target=/etc/eventstore/certs \ + --publish 1113:1113 \ + eventstore/eventstore:21.6.0-buster-slim \ + --run-projections=All \ + --enable-external-tcp=true \ + --trusted-root-certificates-path=/etc/eventstore/certs/ca \ + --certificate-file=/etc/eventstore/certs/node/node.crt \ + --certificate-private-key-file=/etc/eventstore/certs/node/node.key \ + --advertise-host-to-client-as=127.0.0.1 - uses: actions/checkout@v2 - name: Set up Elixir uses: erlef/setup-beam@v1 @@ -103,4 +116,4 @@ jobs: - name: Check formatting run: mix format --check-formatted - name: Run tests - run: mix test --cover --exclude=authentication + run: mix test --cover diff --git a/config/test.exs b/config/test.exs index 5e45d5d..c06d8d6 100644 --- a/config/test.exs +++ b/config/test.exs @@ -9,10 +9,26 @@ config :ex_unit, assert_receive_timeout: 10_000, capture_log: true -config :extreme, TestConn, - db_type: "node", - host: "localhost", - port: "1113", - username: "admin", - password: "changeit", - connection_name: "extreme_test" +transport_opts = + if cacertfile = System.get_env("EXTREME_CACERTFILE") do + [ + transport: :ssl, + transport_opts: [ + verify: :verify_peer, + cacertfile: cacertfile + ] + ] + else + [] + end + +config :extreme, + TestConn, + [ + db_type: "node", + host: "localhost", + port: "1113", + username: "admin", + password: "changeit", + connection_name: "extreme_test" + ] ++ transport_opts From 59315876e0016c48b0e08cce8ec933bced9102e4 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 15 Nov 2021 10:10:09 -0600 Subject: [PATCH 5/7] allow :ssl tags in handle_info/2 --- lib/extreme/connection.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/extreme/connection.ex b/lib/extreme/connection.ex index 2edf34b..9f36e75 100644 --- a/lib/extreme/connection.ex +++ b/lib/extreme/connection.ex @@ -85,7 +85,7 @@ defmodule Extreme.Connection do end @impl true - def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do + def handle_info({tag, socket, pkg}, %State{socket: socket} = state) when tag in [:tcp, :ssl] do {:ok, state} = Impl.receive_package(pkg, state) {:noreply, state} end From 0694a8e1bef2ed73d4cf2b79fb0b702724a9127c Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 15 Nov 2021 11:01:04 -0600 Subject: [PATCH 6/7] use ssl for gossip requests via :httpc --- .github/workflows/test.yml | 2 +- lib/extreme/cluster_connection.ex | 20 ++++++++++++++------ lib/extreme/configuration.ex | 23 +++++++++++------------ test/extreme/cluster_connection_test.exs | 6 ++++-- test/extreme_test.exs | 1 + 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0917ba9..550bd37 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -116,4 +116,4 @@ jobs: - name: Check formatting run: mix format --check-formatted - name: Run tests - run: mix test --cover + run: mix test --cover --exclude gossip diff --git a/lib/extreme/cluster_connection.ex b/lib/extreme/cluster_connection.ex index d612ba4..491d6cb 100644 --- a/lib/extreme/cluster_connection.ex +++ b/lib/extreme/cluster_connection.ex @@ -5,15 +5,23 @@ defmodule Extreme.ClusterConnection do require Logger - def gossip_with(nodes, gossip_timeout, mode) + def gossip_with(nodes, opts) - def gossip_with([], _, _), do: {:error, :no_more_gossip_seeds} + def gossip_with([], _opts), do: {:error, :no_more_gossip_seeds} + + def gossip_with([node | rest_nodes], opts) do + mode = Keyword.fetch!(opts, :mode) + scheme = if Keyword.fetch!(opts, :transport) == :ssl, do: 'https', else: 'http' + url = '#{scheme}://#{node.host}:#{node.port}/gossip?format=json' + + request_opts = [ + timeout: Keyword.fetch!(opts, :timeout), + ssl: Keyword.fetch!(opts, :transport_opts) + ] - def gossip_with([node | rest_nodes], gossip_timeout, mode) do - url = 'http://#{node.host}:#{node.port}/gossip?format=json' Logger.info("Gossip with #{url}") - case :httpc.request(:get, {url, []}, [timeout: gossip_timeout], []) do + case :httpc.request(:get, {url, []}, request_opts, []) do {:ok, {{_version, 200, _status}, _headers, body}} -> body |> Jason.decode!() @@ -21,7 +29,7 @@ defmodule Extreme.ClusterConnection do error -> Logger.error("Error getting gossip: #{inspect(error)}") - gossip_with(rest_nodes, gossip_timeout, mode) + gossip_with(rest_nodes, opts) end end diff --git a/lib/extreme/configuration.ex b/lib/extreme/configuration.ex index 97e98d7..2430e4b 100644 --- a/lib/extreme/configuration.ex +++ b/lib/extreme/configuration.ex @@ -47,12 +47,9 @@ defmodule Extreme.Configuration do do: {:ok, _get_host(configuration), _get_port(configuration)} defp _get_node(:cluster, configuration) do - gossip_timeout = Keyword.get(configuration, :gossip_timeout, 1_000) - mode = Keyword.get(configuration, :mode, :write) - configuration |> Keyword.fetch!(:nodes) - |> ClusterConnection.gossip_with(gossip_timeout, mode) + |> ClusterConnection.gossip_with(gossip_opts(configuration)) end defp _get_node(:cluster_dns, configuration) do @@ -66,18 +63,11 @@ defmodule Extreme.Configuration do |> Keyword.get(:port, 2113) |> Tools.cast_to_integer() - gossip_timeout = - configuration - |> Keyword.get(:gossip_timeout, 1_000) - |> Tools.cast_to_integer() - - mode = Keyword.get(configuration, :mode, :write) - ips |> Enum.map(fn ip -> %{host: to_string(:inet.ntoa(ip)), port: gossip_port} end) - |> ClusterConnection.gossip_with(gossip_timeout, mode) + |> ClusterConnection.gossip_with(gossip_opts(configuration)) end # Returns `:host` value from `configuration` as charlist. @@ -95,4 +85,13 @@ defmodule Extreme.Configuration do |> Keyword.fetch!(:port) |> Tools.cast_to_integer() end + + defp gossip_opts(configuration) do + [ + mode: Keyword.get(configuration, :mode, :write), + timeout: Keyword.get(configuration, :gossip_timeout, 1_000) |> Tools.cast_to_integer(), + transport: Keyword.get(configuration, :transport, :tcp), + transport_opts: Keyword.get(configuration, :transport_opts, []) + ] + end end diff --git a/test/extreme/cluster_connection_test.exs b/test/extreme/cluster_connection_test.exs index 9a88fae..3d6ca6a 100644 --- a/test/extreme/cluster_connection_test.exs +++ b/test/extreme/cluster_connection_test.exs @@ -10,8 +10,10 @@ defmodule Extreme.ClusterConnectionTest do assert {:ok, 'localhost', 1113} = Extreme.ClusterConnection.gossip_with( [%{host: "0.0.0.0", port: "2113"}], - 20_000, - :write + timeout: 20_000, + mode: :write, + transport: :tcp, + transport_opts: [] ) end end diff --git a/test/extreme_test.exs b/test/extreme_test.exs index cacbd38..3f50b74 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -18,6 +18,7 @@ defmodule ExtremeTest do defmodule(ClusterConn, do: use(Extreme)) + @tag :gossip test "Connects on EventStore cluster with a list of nodes" do use_cassette "gossip_with_clusters_existing_node" do nodes = [ From f5a1f81ff414d91e7cc98d197090e1dcbd07f5f9 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 15 Nov 2021 11:03:49 -0600 Subject: [PATCH 7/7] fix --exclude format in test workflow --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 550bd37..96d2a69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -116,4 +116,4 @@ jobs: - name: Check formatting run: mix format --check-formatted - name: Run tests - run: mix test --cover --exclude gossip + run: mix test --cover --exclude=gossip