Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Reconnect Jitter #385

Merged
merged 8 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions src/NATS.Client/Conn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ public Options Opts
internal Exception lastEx;

Timer ptmr = null;

int pout = 0;

internal AutoResetEvent ReconnectDelayARE = new AutoResetEvent(false);
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved

private AsyncSubscription globalRequestSubscription;
private readonly string globalRequestInbox;

Expand Down Expand Up @@ -723,6 +724,10 @@ internal Channel<Msg> getMessageChannel()
internal Connection(Options options)
{
opts = new Options(options);
if (opts.ReconnectDelayHandler == null)
{
opts.ReconnectDelayHandler = DefaultReconnectDelayHandler;
}

PING_P_BYTES = Encoding.UTF8.GetBytes(IC.pingProto);
PING_P_BYTES_LEN = PING_P_BYTES.Length;
Expand Down Expand Up @@ -1526,6 +1531,15 @@ private void scheduleConnEvent(EventHandler<ConnEventArgs> connEvent, Exception
}
}

private void DefaultReconnectDelayHandler(object o, ReconnectDelayEventArgs args)
{
Random rand = new Random();
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
int jitter = srvPool.HasSecureServer() ? rand.Next(opts.ReconnectJitterTLS) : rand.Next(opts.ReconnectJitter);

ReconnectDelayARE.Reset();
ReconnectDelayARE.WaitOne(opts.ReconnectWait + jitter);
}

// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
//
Expand Down Expand Up @@ -1562,43 +1576,43 @@ private void doReconnect()

scheduleConnEvent(Opts.DisconnectedEventHandler, errorForHandler);

// TODO: Look at using a predicate delegate in the server pool to
// pass a method to, but locking is complex and would need to be
// reworked.
Srv cur;
int wlf = 0;
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
bool doSleep = false;
while ((cur = srvPool.SelectNextServer(Opts.MaxReconnect)) != null)
{
url = cur.url;

lastEx = null;

// Sleep appropriate amount of time before the
// connection attempt if connecting to same server
// we just got disconnected from.
double elapsedMillis = cur.TimeSinceLastAttempt.TotalMilliseconds;
double sleepTime = 0;

if (elapsedMillis < Opts.ReconnectWait)
// check if we've been through the list
if (cur == srvPool.First())
{
sleepTime = Opts.ReconnectWait - elapsedMillis;
doSleep = (wlf != 0);
wlf++;
}

if (sleepTime <= 0)
else
{
// Release to allow parallel processes to close,
// unsub, etc. Note: Use the sleep API - yield is
// heavy handed here.
sleepTime = 50;
doSleep = false;
}

url = cur.url;
lastEx = null;

if (lockWasTaken)
{
Monitor.Exit(mu);
lockWasTaken = false;
}

sleep((int)sleepTime);

if (doSleep)
{
try
{
// If unset, the default handler will be called which uses an
// auto reset event to wait, unless kicked out of a close
// call.
opts?.ReconnectDelayHandler(this, new ReconnectDelayEventArgs(wlf - 1));
}
catch { } // swallow user exceptions
}

Monitor.Enter(mu, ref lockWasTaken);

if (isClosed())
Expand Down Expand Up @@ -3701,6 +3715,8 @@ private void close(ConnState closeState, bool invokeDelegates, Exception error =

lock (mu)
{
ReconnectDelayARE?.Set();

// Clear any queued pongs, e.g. pending flush calls.
clearPendingFlushCalls();
if (pending != null)
Expand Down
33 changes: 33 additions & 0 deletions src/NATS.Client/NATS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ public static class Defaults
/// </summary>
public const int ReconnectBufferSize = 8 * 1024 * 1024; // 8MB

/// <summary>
/// Default non-TLS reconnect jitter of 100ms.
/// </summary>
public const int ReconnectJitter = 100;

/// <summary>
/// Default TLS reconnect jitter of 1s.
/// </summary>
public const int ReconnectJitterTLS = 1000;

/*
* Namespace level defaults
*/
Expand Down Expand Up @@ -186,6 +196,29 @@ internal ConnEventArgs(Connection c, Exception error = null)
public Exception Error { get; }
}

/// <summary>
/// Provides details for the ReconnectDelayEvent.
/// </summary>
/// <remarks>
/// This event handler is a good place to apply backoff logic. The associated
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
/// connection will be RECONNECTING so access or calling methods will result
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
/// in undefined behavior.
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public class ReconnectDelayEventArgs : EventArgs
{
internal ReconnectDelayEventArgs(int attempts)
{
Attempts = attempts;
}

/// <Summary>
/// Gets the number of times the client has traversed the
/// server list in attempting to reconnect.
/// </Summary>>
ColinSullivan1 marked this conversation as resolved.
Show resolved Hide resolved
public int Attempts { get; }
}


/// <summary>
/// Provides details when a user JWT is read during a connection. The
/// JWT must be set or a <see cref="NATSConnectionException"/> will
Expand Down
72 changes: 69 additions & 3 deletions src/NATS.Client/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public sealed class Options
int reconnectWait = Defaults.ReconnectWait;
int pingInterval = Defaults.PingInterval;
int timeout = Defaults.Timeout;
int reconnectJitter = Defaults.ReconnectJitter;
int reconnectJitterTLS = Defaults.ReconnectJitterTLS;

internal X509Certificate2Collection certificates = null;

Expand Down Expand Up @@ -72,6 +74,20 @@ public sealed class Options
/// </summary>
public EventHandler<ErrEventArgs> AsyncErrorEventHandler = null;

/// <summary>
/// Represents the optional method that is used to get from the
/// user the desired delay the client should pause before attempting
/// to reconnect again.
/// </summary>
/// <remarks>
/// Note that this is invoked after the library tried the
/// entire list of URLs and failed to reconnect. By default, the client
/// will use the sum of <see cref="ReconnectWait"/> and a random value between
/// zero and <see cref="Options.ReconnectJitter"/> or
/// <see cref="Options.ReconnectJitterTLS"/>
/// </remarks>
public EventHandler<ReconnectDelayEventArgs> ReconnectDelayHandler = null;

/// <summary>
/// Represents the optional method that is used to fetch and
/// return the account signed JWT for this user. Exceptions thrown
Expand Down Expand Up @@ -206,6 +222,7 @@ internal Options(Options o)
DisconnectedEventHandler = o.DisconnectedEventHandler;
UserJWTEventHandler = o.UserJWTEventHandler;
UserSignatureEventHandler = o.UserSignatureEventHandler;
ReconnectDelayHandler = o.ReconnectDelayHandler;
maxPingsOut = o.maxPingsOut;
maxReconnect = o.maxReconnect;
name = o.name;
Expand All @@ -216,6 +233,8 @@ internal Options(Options o)
useOldRequestStyle = o.useOldRequestStyle;
pingInterval = o.pingInterval;
ReconnectedEventHandler = o.ReconnectedEventHandler;
reconnectJitter = o.reconnectJitter;
reconnectJitterTLS = o.reconnectJitterTLS;
reconnectWait = o.reconnectWait;
secure = o.secure;
user = o.user;
Expand Down Expand Up @@ -669,6 +688,48 @@ public int ReconnectBufferSize
}
}

/// <summary>
/// Sets the the upper bound for a random delay in milliseconds added to
/// ReconnectWait during a reconnect for clear and TLS connections.
/// </summary>
/// <remarks>
/// Defaults are 100 ms and 1s for TLS.
/// </remarks>
/// <seealso cref="ReconnectDelayHandler"/>
/// <seealso cref="ReconnectJitter"/>
/// <seealso cref="ReconnectJitterTLS"/>
/// <seealso cref="ReconnectWait"/>
public void SetReconnectJitter(int jitter, int tlsJitter)
{
if (jitter < 0 || tlsJitter < 0)
{
throw new ArgumentOutOfRangeException("value", "Reconnect jitter must be positive");
}

reconnectJitter = jitter;
reconnectJitterTLS = tlsJitter;
}

/// <summary>
/// Get the the upper bound for a random delay added to
/// ReconnectWait during a reconnect for connections.
/// </summary>
/// <seealso cref="ReconnectDelayHandler"/>
/// <seealso cref="ReconnectJitterTLS"/>
/// <seealso cref="ReconnectWait"/>
/// <seealso cref="SetReconnectJitter(int, int)"/>
public int ReconnectJitter { get => reconnectJitter; }


/// <summary>
/// Get the the upper bound for a random delay added to
/// ReconnectWait during a reconnect for TLS connections.
/// </summary>
/// <seealso cref="ReconnectDelayHandler"/>
/// <seealso cref="ReconnectJitter"/>
/// <seealso cref="SetReconnectJitter(int, int)"/>
public int ReconnectJitterTLS { get => reconnectJitterTLS; }

/// <summary>
/// Returns a string representation of the
/// value of this Options instance.
Expand All @@ -684,7 +745,10 @@ public override string ToString()
appendEventHandler(sb, "AsyncErrorEventHandler", AsyncErrorEventHandler);
appendEventHandler(sb, "ClosedEventHandler", ClosedEventHandler);
appendEventHandler(sb, "DisconnectedEventHandler", DisconnectedEventHandler);

appendEventHandler(sb, "ReconnectedEventHandler", ReconnectedEventHandler);
appendEventHandler(sb, "ReconnectDelayHandler", ReconnectDelayHandler);
appendEventHandler(sb, "ServerDiscoveredEventHandler", ServerDiscoveredEventHandler);

sb.AppendFormat("MaxPingsOut={0};", MaxPingsOut);
sb.AppendFormat("MaxReconnect={0};", MaxReconnect);
sb.AppendFormat("Name={0};", Name != null ? Name : "null");
Expand All @@ -694,12 +758,14 @@ public override string ToString()
sb.AppendFormat("UseOldRequestStyle={0}", UseOldRequestStyle);
sb.AppendFormat("PingInterval={0};", PingInterval);
sb.AppendFormat("ReconnectBufferSize={0};", ReconnectBufferSize);
sb.AppendFormat("ReconnectJitter={0};", ReconnectJitter);
sb.AppendFormat("ReconnectJitterTLS={0};", ReconnectJitterTLS);
sb.AppendFormat("ReconnectWait={0};", ReconnectWait);
sb.AppendFormat("Secure={0};", Secure);
sb.AppendFormat("User={0};", User);
sb.AppendFormat("Token={0};", Token);
sb.AppendFormat("SubscriberDeliveryTaskCount={0};", SubscriberDeliveryTaskCount);
sb.AppendFormat("SubscriptionBatchSize={0};", SubscriptionBatchSize);
sb.AppendFormat("User={0};", User);
sb.AppendFormat("Token={0};", Token);

if (Servers == null)
{
Expand Down
13 changes: 13 additions & 0 deletions src/NATS.Client/ServerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,19 @@ internal Srv First()
return sList.First();
}
}

internal bool HasSecureServer()
{
lock (poolLock)
{
foreach (Srv s in sList)
{
if (s.Secure)
return true;
}
}
return false;
}
}

}
5 changes: 2 additions & 3 deletions src/Tests/IntegrationTests/TestConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public void TestErrorHandlerWhenAllowingReconnectErrorShouldNotBeProvided()
var opts = Context.GetTestOptions(Context.Server1.Port);
var errors = new ConcurrentQueue<Exception>();
opts.AllowReconnect = true;
opts.MaxReconnect = 1;
opts.ClosedEventHandler = (sender, args) =>
{
if (args.Error != null)
Expand Down Expand Up @@ -179,12 +178,12 @@ public void TestErrorHandlerWhenAllowingReconnectErrorShouldNotBeProvided()
using (Context.ConnectionFactory.CreateConnection(opts))
{
s.Bounce(1000);
Assert.True(disconEv.WaitOne(1000));
Assert.True(reconEv.WaitOne(1000));
}
Assert.True(closedEv.WaitOne(1000));
}

Assert.True(closedEv.WaitOne(1000));
Assert.True(disconEv.WaitOne(1000));
Assert.Empty(errors);
}

Expand Down
Loading