Skip to content

Commit

Permalink
Cap the max reconnect rate by delaying before the first connect
Browse files Browse the repository at this point in the history
If the Connection GenServer crashes or stops due to an error it gets
restarted immediately. This is problematic when remaking the connection
immediately doesn't allow the issue to resolve itself. A trivial way of
relieving the issue is to add a short manditory reconnect delay. Since
the delay is mandatory, it can't be long. Given typical connect times,
the default of 1 second probably won't be noticed. 50% jitter is added.

The delay can be changed using the `:first_connect_delay` option.
  • Loading branch information
fhunleth authored and jfcloutier committed Aug 1, 2024
1 parent 1a60f2c commit e58fcdb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 21 deletions.
29 changes: 24 additions & 5 deletions lib/tortoise311/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule Tortoise311.Connection do
| Tortoise311.Package.Subscribe.t()}
| {:clean_session, boolean()}
| {:enable_telemetry, boolean()}
| {:handler, {atom(), term()}},
| {:handler, {atom(), term()}}
| {:first_connect_delay, non_neg_integer()},
options: [option]
def start_link(connection_opts, opts \\ []) do
client_id = Keyword.fetch!(connection_opts, :client_id)
Expand Down Expand Up @@ -76,7 +77,14 @@ defmodule Tortoise311.Connection do
end

# @todo, validate that the handler is valid
connection_opts = Keyword.take(connection_opts, [:client_id, :handler, :enable_telemetry])
connection_opts =
Keyword.take(connection_opts, [
:client_id,
:handler,
:enable_telemetry,
:first_connect_delay
])

initial = {server, connect, backoff, subscriptions, connection_opts}
opts = Keyword.merge(opts, name: via_name(client_id))
GenServer.start_link(__MODULE__, initial, opts)
Expand Down Expand Up @@ -381,9 +389,7 @@ defmodule Tortoise311.Connection do
Tortoise311.Registry.put_meta(via_name(client_id), :connecting)
{:ok, _pid} = Tortoise311.Events.register(client_id, :status)

# eventually, switch to handle_continue
send(self(), :connect)
{:ok, state}
{:ok, state, {:continue, :first_connect}}
end

@impl GenServer
Expand All @@ -393,6 +399,19 @@ defmodule Tortoise311.Connection do
:ok
end

@impl GenServer
def handle_continue(:first_connect, state) do
# Apply a short delay before connecting to limit the max rate of reconnects
# if the GenServer crashes. The delay is jittered by 50% of its value. The
# default is 1 second.
delay = Keyword.get(state.opts, :first_connect_delay, 1000)
delay_with_jitter = round(delay * (1.0 + (:rand.uniform() - 0.5)))

Process.send_after(self(), :connect, delay_with_jitter)

{:noreply, state}
end

@impl GenServer
def handle_info(:connect, state) do
# make sure we will not fall for a keep alive timeout while we reconnect
Expand Down
48 changes: 32 additions & 16 deletions test/tortoise/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, _pid} = Connection.start_link(opts)
Expand All @@ -89,7 +90,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, _pid} = Connection.start_link(opts)
Expand Down Expand Up @@ -119,7 +121,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand All @@ -141,7 +144,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand All @@ -162,7 +166,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand All @@ -183,7 +188,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand All @@ -204,7 +210,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand Down Expand Up @@ -242,7 +249,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

# connection
Expand Down Expand Up @@ -304,7 +312,8 @@ defmodule Tortoise311.ConnectionTest do
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []},
subscriptions: subscribe
subscriptions: subscribe,
first_connect_delay: 0
]

assert {:ok, _pid} = Connection.start_link(opts)
Expand Down Expand Up @@ -440,7 +449,8 @@ defmodule Tortoise311.ConnectionTest do
client_id: client_id,
server: {ScriptedTransport, host: ~c"localhost", port: 1883},
backoff: [min_interval: 1],
handler: {Tortoise311.Handler.Logger, []}
handler: {Tortoise311.Handler.Logger, []},
first_connect_delay: 0
)

assert_receive {ScriptedTransport, {:refute_connection, ^refusal}}
Expand Down Expand Up @@ -484,7 +494,8 @@ defmodule Tortoise311.ConnectionTest do
client_id: client_id,
server: {ScriptedTransport, host: ~c"localhost", port: 1883},
backoff: [min_interval: 0],
handler: {Tortoise311.Handler.Logger, []}
handler: {Tortoise311.Handler.Logger, []},
first_connect_delay: 0
)

assert_receive {ScriptedTransport, :connected}
Expand Down Expand Up @@ -514,7 +525,8 @@ defmodule Tortoise311.ConnectionTest do
Tortoise311.Connection.start_link(
client_id: client_id,
server: {ScriptedTransport, host: ~c"localhost", port: 1883},
handler: {Tortoise311.Handler.Logger, []}
handler: {Tortoise311.Handler.Logger, []},
first_connect_delay: 0
)

assert_receive {ScriptedTransport, :connected}
Expand Down Expand Up @@ -544,7 +556,8 @@ defmodule Tortoise311.ConnectionTest do
client_id: client_id,
server: {ScriptedTransport, host: ~c"localhost", port: 1883},
backoff: [min_interval: 1],
handler: {Tortoise311.Handler.Logger, []}
handler: {Tortoise311.Handler.Logger, []},
first_connect_delay: 0
)

assert_receive {ScriptedTransport, {:refute_connection, ^refusal}}
Expand Down Expand Up @@ -572,7 +585,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, _pid} = Connection.start_link(opts)
Expand All @@ -596,7 +610,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, _pid} = Connection.start_link(opts)
Expand Down Expand Up @@ -628,7 +643,8 @@ defmodule Tortoise311.ConnectionTest do
opts = [
client_id: client_id,
server: {Tortoise311.Transport.Tcp, [host: ip, port: port]},
handler: {Tortoise311.Handler.Default, []}
handler: {Tortoise311.Handler.Default, []},
first_connect_delay: 0
]

assert {:ok, pid} = Connection.start_link(opts)
Expand Down

0 comments on commit e58fcdb

Please sign in to comment.