Skip to content

Commit

Permalink
- Add ReconnectJitter as an option
Browse files Browse the repository at this point in the history
- Add it as a default option of 1sec
- Add the jitter to existing reconnect wait
- Update tests to set jitter to 0 to not have to wait for too long

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 1, 2020
1 parent fa41411 commit eaa6848
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 37 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
module github.com/nats-io/nats.go

require (
github.com/golang/protobuf v1.4.0
github.com/nats-io/jwt v0.3.2
github.com/nats-io/nats-server/v2 v2.1.6
github.com/nats-io/nkeys v0.1.4
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.21.0
)

go 1.13
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
github.com/nats-io/nats-server/v2 v2.1.6 h1:qAaHZaS8pRRNQLFaiBA1rq5WynyEGp9DFgmMfoaiXGY=
github.com/nats-io/nats-server/v2 v2.1.6/go.mod h1:BL1NOtaBQ5/y97djERRVWNouMW7GT3gxnmbE/eC8u8A=
github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
Expand All @@ -12,4 +26,12 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
27 changes: 25 additions & 2 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultReconnectJitter = time.Second
DefaultTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
Expand Down Expand Up @@ -130,6 +131,7 @@ func GetDefaultOptions() Options {
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
Expand Down Expand Up @@ -258,6 +260,10 @@ type Options struct {
// to a server that we were already connected to previously.
ReconnectWait time.Duration

// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect.
ReconnectJitter time.Duration

// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration

Expand Down Expand Up @@ -673,6 +679,14 @@ func MaxReconnects(max int) Option {
}
}

// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
func ReconnectJitter(jitter time.Duration) Option {
return func(o *Options) error {
o.ReconnectJitter = jitter
return nil
}
}

// PingInterval is an Option to set the period for client ping commands.
func PingInterval(t time.Duration) Option {
return func(o *Options) error {
Expand Down Expand Up @@ -1820,7 +1834,7 @@ func (nc *Conn) doReconnect(err error) {
// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false
rt := time.NewTimer(0)
var rt *time.Timer
rqch := nc.rqch

for len(nc.srvPool) > 0 {
Expand Down Expand Up @@ -1848,9 +1862,18 @@ func (nc *Conn) doReconnect(err error) {
if sleepTime <= 0 {
runtime.Gosched()
} else {
rt.Reset(time.Duration(rand.Int63n(sleepTime)))
if nc.Opts.ReconnectJitter > 0 {
sleepTime += rand.Int63n(int64(nc.Opts.ReconnectJitter))
}
st := time.Duration(sleepTime)
if rt == nil {
rt = time.NewTimer(st)
} else {
rt.Reset(st)
}
select {
case <-rqch:
rt.Stop()
case <-rt.C:
}
}
Expand Down
56 changes: 22 additions & 34 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ func TestExpiredUserCredentials(t *testing.T) {
url := fmt.Sprintf("nats://127.0.0.1:%d", addr.Port)
nc, err := Connect(url,
ReconnectWait(25*time.Millisecond),
ReconnectJitter(0),
MaxReconnects(-1),
ErrorHandler(func(_ *Conn, _ *Subscription, e error) {
select {
Expand Down Expand Up @@ -1583,6 +1584,7 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
nc, err := Connect(url,
UserCredentials(chainedFile),
ReconnectWait(25*time.Millisecond),
ReconnectJitter(0),
MaxReconnects(2),
ReconnectHandler(func(nc *Conn) {
rch <- true
Expand Down Expand Up @@ -2075,6 +2077,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
urls := fmt.Sprintf("nats://%s:%d, nats://%s:%d", o1.Host, o1.Port, o2.Host, o2.Port)
nc, err := Connect(urls,
ReconnectWait(25*time.Millisecond),
ReconnectJitter(0),
MaxReconnects(-1),
DontRandomize(),
DisconnectErrHandler(func(_ *Conn, e error) {
Expand Down Expand Up @@ -2269,7 +2272,7 @@ func TestGetRTT(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := Connect(s.ClientURL(), ReconnectWait(10*time.Millisecond))
nc, err := Connect(s.ClientURL(), ReconnectWait(10*time.Millisecond), ReconnectJitter(0))
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
Expand Down Expand Up @@ -2390,53 +2393,41 @@ func TestNoPanicOnSrvPoolSizeChanging(t *testing.T) {
wg.Wait()
}

func TestReconnectWaitRandom(t *testing.T) {
func TestReconnectWaitJitter(t *testing.T) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

dch := make(chan bool, 1)
rch := make(chan time.Time, 1)
nc, err := Connect(s.ClientURL(),
ReconnectWait(100*time.Millisecond),
DisconnectErrHandler(func(_ *Conn, err error) {
dch <- true
ReconnectJitter(time.Second),
ReconnectHandler(func(_ *Conn) {
rch <- time.Now()
}),
)
if err != nil {
t.Fatalf("Error during connect: %v", err)
}
defer nc.Close()

nr := 25
total := time.Duration(0)
for i := 0; i < nr; i++ {
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()
if err := WaitTime(dch, time.Second); err != nil {
t.Fatal(err.Error())
}
start := time.Now()
nc.Flush()
// The time will count for the reconnect + the ping pong, so it could
// be that we are a bit over the 100ms max.
dur := time.Since(start)
total += dur
if dur > 150*time.Millisecond {
t.Fatalf("Waited too long: %v", dur)
nc.mu.Lock()
start := time.Now()
nc.conn.Close()
nc.mu.Unlock()
select {
case end := <-rch:
dur := end.Sub(start)
// We should wait at least the reconnect wait + random up to 1 sec.
if dur < 90*time.Millisecond || dur > 1200*time.Millisecond {
t.Fatalf("Wrong wait: %v", dur)
}
}
if avg := total / time.Duration(nr); avg >= 95*time.Millisecond {
t.Fatalf("Average of reconnect suspiciously high: %v", avg)
case <-time.After(5 * time.Second):
t.Fatalf("Should have reconnected")
}
nc.Close()

// Use a long reconnect wait
nc, err = Connect(s.ClientURL(),
ReconnectWait(100*time.Second),
DisconnectErrHandler(func(_ *Conn, err error) {
dch <- true
}),
)
nc, err = Connect(s.ClientURL(), ReconnectWait(100*time.Second))
if err != nil {
t.Fatalf("Error during connect: %v", err)
}
Expand All @@ -2446,9 +2437,6 @@ func TestReconnectWaitRandom(t *testing.T) {
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()
if err := WaitTime(dch, time.Second); err != nil {
t.Fatal(err.Error())
}
// Wait a bit for the reconnect loop to go into wait mode.
time.Sleep(50 * time.Millisecond)
// Now close and expect the reconnect go routine to return..
Expand Down
2 changes: 2 additions & 0 deletions test/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func TestAuthFailAllowReconnect(t *testing.T) {
copts.NoRandomize = true
copts.MaxReconnect = 10
copts.ReconnectWait = 100 * time.Millisecond
copts.ReconnectJitter = 0

copts.ReconnectedCB = func(_ *nats.Conn) {
reconnectch <- true
Expand Down Expand Up @@ -174,6 +175,7 @@ func TestTokenHandlerReconnect(t *testing.T) {
copts.NoRandomize = true
copts.MaxReconnect = 10
copts.ReconnectWait = 100 * time.Millisecond
copts.ReconnectJitter = 0

copts.TokenHandler = func() string {
return secret
Expand Down
1 change: 1 addition & 0 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ func TestOptions(t *testing.T) {
nats.Name("myName"),
nats.MaxReconnects(2),
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectJitter(0),
nats.PingInterval(20*time.Millisecond))
if err != nil {
t.Fatalf("Failed to connect: %v", err)
Expand Down
7 changes: 7 additions & 0 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func TestHotSpotReconnect(t *testing.T) {

opts := []nats.Option{
nats.ReconnectWait(50 * time.Millisecond),
nats.ReconnectJitter(0),
nats.ReconnectHandler(func(_ *nats.Conn) { wg.Done() }),
}

Expand Down Expand Up @@ -389,6 +390,7 @@ func TestProperFalloutAfterMaxAttempts(t *testing.T) {
}
opts.NoRandomize = true
opts.ReconnectWait = (25 * time.Millisecond)
opts.ReconnectJitter = 0

dch := make(chan bool)
opts.DisconnectedErrCB = func(_ *nats.Conn, _ error) {
Expand Down Expand Up @@ -456,6 +458,7 @@ func TestProperFalloutAfterMaxAttemptsWithAuthMismatch(t *testing.T) {
opts.MaxReconnect = 5
}
opts.ReconnectWait = (25 * time.Millisecond)
opts.ReconnectJitter = 0

dch := make(chan bool)
opts.DisconnectedErrCB = func(_ *nats.Conn, _ error) {
Expand Down Expand Up @@ -519,11 +522,13 @@ func TestTimeoutOnNoServers(t *testing.T) {
opts.Servers = testServers[:2]
opts.MaxReconnect = 2
opts.ReconnectWait = (100 * time.Millisecond)
opts.ReconnectJitter = 0
} else {
opts.Servers = testServers
// 1 second total time wait
opts.MaxReconnect = 10
opts.ReconnectWait = (100 * time.Millisecond)
opts.ReconnectJitter = 0
}
opts.NoRandomize = true

Expand Down Expand Up @@ -584,6 +589,7 @@ func TestPingReconnect(t *testing.T) {
opts.Servers = testServers
opts.NoRandomize = true
opts.ReconnectWait = 200 * time.Millisecond
opts.ReconnectJitter = 0
opts.PingInterval = 50 * time.Millisecond
opts.MaxPingsOut = -1

Expand Down Expand Up @@ -813,6 +819,7 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
nc, err = nats.Connect(s1Url,
nats.MaxReconnects(10),
nats.ReconnectWait(15*time.Millisecond),
nats.ReconnectJitter(0),
nats.SetCustomDialer(d),
nats.ReconnectHandler(connHandler),
nats.ClosedHandler(connHandler))
Expand Down
7 changes: 6 additions & 1 deletion test/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ func TestCallbacksOrder(t *testing.T) {
nats.ClosedHandler(cch),
nats.ErrorHandler(ech),
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectJitter(0),
nats.DontRandomize())

if err != nil {
Expand Down Expand Up @@ -1158,6 +1159,7 @@ func TestErrStaleConnection(t *testing.T) {
opts.ReconnectedCB = func(_ *nats.Conn) { rch <- true }
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
opts.ReconnectWait = 20 * time.Millisecond
opts.ReconnectJitter = 0
opts.MaxReconnect = 100
opts.Servers = []string{natsURL}
nc, err := opts.Connect()
Expand Down Expand Up @@ -1247,6 +1249,7 @@ func TestServerErrorClosesConnection(t *testing.T) {
opts.ReconnectedCB = func(_ *nats.Conn) { atomic.AddInt64(&reconnected, 1) }
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
opts.ReconnectWait = 20 * time.Millisecond
opts.ReconnectJitter = 0
opts.MaxReconnect = 100
opts.Servers = []string{natsURL}
nc, err := opts.Connect()
Expand Down Expand Up @@ -1321,7 +1324,8 @@ func TestNoRaceOnLastError(t *testing.T) {
nats.DisconnectHandler(dch),
nats.ClosedHandler(cch),
nats.MaxReconnects(-1),
nats.ReconnectWait(5*time.Millisecond))
nats.ReconnectWait(5*time.Millisecond),
nats.ReconnectJitter(0))
if err != nil {
t.Fatalf("Unable to connect: %v\n", err)
}
Expand Down Expand Up @@ -1985,6 +1989,7 @@ func TestReceiveInfoWithEmptyConnectURLs(t *testing.T) {
rch := make(chan bool)
nc, err := nats.Connect("nats://127.0.0.1:4222",
nats.ReconnectWait(50*time.Millisecond),
nats.ReconnectJitter(0),
nats.ReconnectHandler(func(_ *nats.Conn) {
rch <- true
}))
Expand Down
10 changes: 10 additions & 0 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func TestReconnectTotalTime(t *testing.T) {
}
}

func TestDefaultReconnectJitter(t *testing.T) {
opts := nats.GetDefaultOptions()
if opts.ReconnectJitter != nats.DefaultReconnectJitter {
t.Fatalf("Expected default jitter to be %v, got %v", nats.DefaultReconnectJitter, opts.ReconnectJitter)
}
}

func TestReconnectDisallowedFlags(t *testing.T) {
ts := startReconnectServer(t)
defer ts.Shutdown()
Expand Down Expand Up @@ -73,6 +80,7 @@ func TestReconnectAllowedFlags(t *testing.T) {
opts.AllowReconnect = true
opts.MaxReconnect = 2
opts.ReconnectWait = 1 * time.Second
opts.ReconnectJitter = 0

opts.ClosedCB = func(_ *nats.Conn) {
ch <- true
Expand Down Expand Up @@ -435,6 +443,7 @@ func TestIsReconnectingAndStatus(t *testing.T) {
opts.AllowReconnect = true
opts.MaxReconnect = 10000
opts.ReconnectWait = 100 * time.Millisecond
opts.ReconnectJitter = 0

opts.DisconnectedErrCB = func(_ *nats.Conn, _ error) {
disconnectedch <- true
Expand Down Expand Up @@ -504,6 +513,7 @@ func TestFullFlushChanDuringReconnect(t *testing.T) {
opts.AllowReconnect = true
opts.MaxReconnect = 10000
opts.ReconnectWait = 100 * time.Millisecond
opts.ReconnectJitter = 0

opts.ReconnectedCB = func(_ *nats.Conn) {
reconnectch <- true
Expand Down
Loading

0 comments on commit eaa6848

Please sign in to comment.