-
Notifications
You must be signed in to change notification settings - Fork 699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIXED] Added jitter in the reconnect logic #564
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,19 +45,21 @@ import ( | |
|
||
// Default Constants | ||
const ( | ||
Version = "1.9.2" | ||
DefaultURL = "nats://127.0.0.1:4222" | ||
DefaultPort = 4222 | ||
DefaultMaxReconnect = 60 | ||
DefaultReconnectWait = 2 * time.Second | ||
DefaultTimeout = 2 * time.Second | ||
DefaultPingInterval = 2 * time.Minute | ||
DefaultMaxPingOut = 2 | ||
DefaultMaxChanLen = 8192 // 8k | ||
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB | ||
RequestChanLen = 8 | ||
DefaultDrainTimeout = 30 * time.Second | ||
LangString = "go" | ||
Version = "1.9.3" | ||
DefaultURL = "nats://127.0.0.1:4222" | ||
DefaultPort = 4222 | ||
DefaultMaxReconnect = 60 | ||
DefaultReconnectWait = 2 * time.Second | ||
DefaultReconnectJitter = 100 * time.Millisecond | ||
DefaultReconnectJitterTLS = time.Second | ||
DefaultTimeout = 2 * time.Second | ||
DefaultPingInterval = 2 * time.Minute | ||
DefaultMaxPingOut = 2 | ||
DefaultMaxChanLen = 8192 // 8k | ||
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB | ||
RequestChanLen = 8 | ||
DefaultDrainTimeout = 30 * time.Second | ||
LangString = "go" | ||
) | ||
|
||
const ( | ||
|
@@ -127,15 +129,17 @@ func init() { | |
// GetDefaultOptions returns default configuration options for the client. | ||
func GetDefaultOptions() Options { | ||
return Options{ | ||
AllowReconnect: true, | ||
MaxReconnect: DefaultMaxReconnect, | ||
ReconnectWait: DefaultReconnectWait, | ||
Timeout: DefaultTimeout, | ||
PingInterval: DefaultPingInterval, | ||
MaxPingsOut: DefaultMaxPingOut, | ||
SubChanLen: DefaultMaxChanLen, | ||
ReconnectBufSize: DefaultReconnectBufSize, | ||
DrainTimeout: DefaultDrainTimeout, | ||
AllowReconnect: true, | ||
MaxReconnect: DefaultMaxReconnect, | ||
ReconnectWait: DefaultReconnectWait, | ||
ReconnectJitter: DefaultReconnectJitter, | ||
ReconnectJitterTLS: DefaultReconnectJitterTLS, | ||
Timeout: DefaultTimeout, | ||
PingInterval: DefaultPingInterval, | ||
MaxPingsOut: DefaultMaxPingOut, | ||
SubChanLen: DefaultMaxChanLen, | ||
ReconnectBufSize: DefaultReconnectBufSize, | ||
DrainTimeout: DefaultDrainTimeout, | ||
} | ||
} | ||
|
||
|
@@ -182,6 +186,12 @@ type SignatureHandler func([]byte) ([]byte, error) | |
// AuthTokenHandler is used to generate a new token. | ||
type AuthTokenHandler func() string | ||
|
||
// ReconnectDelayHandler is used to get from the user the desired | ||
// delay the library should pause before attempting to reconnect | ||
// again. Note that this is invoked after the library tried the | ||
// whole list of URLs and failed to reconnect. | ||
type ReconnectDelayHandler func(attempts int) time.Duration | ||
|
||
// asyncCB is used to preserve order for async callbacks. | ||
type asyncCB struct { | ||
f func() | ||
|
@@ -258,6 +268,24 @@ type Options struct { | |
// to a server that we were already connected to previously. | ||
ReconnectWait time.Duration | ||
|
||
// CustomReconnectDelayCB is invoked after the library tried every | ||
// URL in the server list and failed to reconnect. It passes to the | ||
// user the current number of attempts. This function returns the | ||
// amount of time the library will sleep before attempting to reconnect | ||
// again. It is strongly recommended that this value contains some | ||
// jitter to prevent all connections to attempt reconnecting at the same time. | ||
CustomReconnectDelayCB ReconnectDelayHandler | ||
|
||
// ReconnectJitter sets the upper bound for a random delay added to | ||
// ReconnectWait during a reconnect when no TLS is used. | ||
// Note that any jitter is capped with ReconnectJitterMax. | ||
ReconnectJitter time.Duration | ||
|
||
// ReconnectJitterTLS sets the upper bound for a random delay added to | ||
// ReconnectWait during a reconnect when TLS is used. | ||
// Note that any jitter is capped with ReconnectJitterMax. | ||
ReconnectJitterTLS time.Duration | ||
|
||
// Timeout sets the timeout for a Dial operation on a connection. | ||
Timeout time.Duration | ||
|
||
|
@@ -411,6 +439,7 @@ type Conn struct { | |
ptmr *time.Timer | ||
pout int | ||
ar bool // abort reconnect | ||
rqch chan struct{} | ||
|
||
// New style response handler | ||
respSub string // The wildcard subject | ||
|
@@ -672,6 +701,24 @@ func MaxReconnects(max int) Option { | |
} | ||
} | ||
|
||
// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait. | ||
func ReconnectJitter(jitter, jitterForTLS time.Duration) Option { | ||
return func(o *Options) error { | ||
o.ReconnectJitter = jitter | ||
o.ReconnectJitterTLS = jitterForTLS | ||
return nil | ||
} | ||
} | ||
|
||
// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. | ||
// See CustomReconnectDelayCB Option for more details. | ||
func CustomReconnectDelay(cb ReconnectDelayHandler) Option { | ||
return func(o *Options) error { | ||
o.CustomReconnectDelayCB = cb | ||
return nil | ||
} | ||
} | ||
|
||
// PingInterval is an Option to set the period for client ping commands. | ||
func PingInterval(t time.Duration) Option { | ||
return func(o *Options) error { | ||
|
@@ -1396,6 +1443,7 @@ func (nc *Conn) setup() { | |
nc.pongs = make([]chan struct{}, 0, 8) | ||
|
||
nc.fch = make(chan struct{}, flushChanSize) | ||
nc.rqch = make(chan struct{}) | ||
|
||
// Setup scratch outbound buffer for PUB | ||
pub := nc.scratch[:len(_PUB_P_)] | ||
|
@@ -1818,33 +1866,63 @@ func (nc *Conn) doReconnect(err error) { | |
// This is used to wait on go routines exit if we start them in the loop | ||
// but an error occurs after that. | ||
waitForGoRoutines := false | ||
var rt *time.Timer | ||
// Channel used to kick routine out of sleep when conn is closed. | ||
rqch := nc.rqch | ||
// Counter that is increased when the whole list of servers has been tried. | ||
var wlf int | ||
|
||
var jitter time.Duration | ||
var rw time.Duration | ||
// If a custom reconnect delay handler is set, this takes precedence. | ||
crd := nc.Opts.CustomReconnectDelayCB | ||
if crd == nil { | ||
rw = nc.Opts.ReconnectWait | ||
// TODO: since we sleep only after the whole list has been tried, we can't | ||
// rely on individual *srv to know if it is a TLS or non-TLS url. | ||
// We have to pick which type of jitter to use, for now, we use these hints: | ||
jitter = nc.Opts.ReconnectJitter | ||
if nc.Opts.Secure || nc.Opts.TLSConfig != nil { | ||
jitter = nc.Opts.ReconnectJitterTLS | ||
} | ||
} | ||
|
||
for len(nc.srvPool) > 0 { | ||
for i := 0; len(nc.srvPool) > 0; { | ||
cur, err := nc.selectNextServer() | ||
if err != nil { | ||
nc.err = err | ||
break | ||
} | ||
|
||
sleepTime := int64(0) | ||
|
||
// Sleep appropriate amount of time before the | ||
// connection attempt if connecting to same server | ||
// we just got disconnected from.. | ||
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait { | ||
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt)) | ||
} | ||
|
||
// On Windows, createConn() will take more than a second when no | ||
// server is running at that address. So it could be that the | ||
// time elapsed between reconnect attempts is always > than | ||
// the set option. Release the lock to give a chance to a parallel | ||
// nc.Close() to break the loop. | ||
doSleep := i+1 >= len(nc.srvPool) | ||
nc.mu.Unlock() | ||
if sleepTime <= 0 { | ||
|
||
if !doSleep { | ||
i++ | ||
// Release the lock to give a chance to a concurrent nc.Close() to break the loop. | ||
runtime.Gosched() | ||
} else { | ||
time.Sleep(time.Duration(sleepTime)) | ||
i = 0 | ||
var st time.Duration | ||
if crd != nil { | ||
wlf++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we count wlf regardless of crd? Noop now but we may use it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we will move it out of the condition when we actually need it, will be easy to do. |
||
st = crd(wlf) | ||
} else { | ||
st = rw | ||
if jitter > 0 { | ||
st += time.Duration(rand.Int63n(int64(jitter))) | ||
} | ||
} | ||
if rt == nil { | ||
rt = time.NewTimer(st) | ||
} else { | ||
rt.Reset(st) | ||
} | ||
select { | ||
case <-rqch: | ||
rt.Stop() | ||
case <-rt.C: | ||
} | ||
} | ||
// If the readLoop, etc.. go routines were started, wait for them to complete. | ||
if waitForGoRoutines { | ||
|
@@ -3655,9 +3733,13 @@ func (nc *Conn) close(status Status, doCBs bool, err error) { | |
|
||
// Kick the Go routines so they fall out. | ||
nc.kickFlusher() | ||
nc.mu.Unlock() | ||
|
||
nc.mu.Lock() | ||
// If the reconnect timer is waiting between a reconnect attempt, | ||
// this will kick it out. | ||
if nc.rqch != nil { | ||
close(nc.rqch) | ||
nc.rqch = nil | ||
} | ||
|
||
// Clear any queued pongs, e.g. pending flush calls. | ||
nc.clearPendingFlushCalls() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sweet 👍