Skip to content

Commit

Permalink
Merge pull request #8601 from gyuho/notify
Browse files Browse the repository at this point in the history
clientv3: wait for ConnectNotify before sending RPCs
  • Loading branch information
gyuho committed Sep 27, 2017
2 parents b6b4898 + 6368159 commit 398e6ba
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
14 changes: 7 additions & 7 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ func TestKVGetErrConnClosed(t *testing.T) {
go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if err != nil && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if err != nil && err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand Down Expand Up @@ -473,8 +473,8 @@ func TestKVNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if _, err := cli.Get(context.TODO(), "foo"); err != context.Canceled {
t.Fatalf("expected %v, got %v", context.Canceled, err)
}
close(donec)
}()
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
}
Expand All @@ -813,14 +813,14 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}

// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if !strings.Contains(err.Error(), "context deadline") {
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if _, err := cli.Grant(context.TODO(), 5); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down Expand Up @@ -351,8 +351,8 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {

donec := make(chan struct{})
go func() {
if _, err := cli.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err)
if _, err := cli.Revoke(context.TODO(), leaseID); err != context.Canceled && err != grpc.ErrClientConnClosing {
t.Fatalf("expected %v or %v, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down
7 changes: 7 additions & 0 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func isWriteStopError(err error) bool {
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
select {
case <-c.balancer.ConnectNotify():
case <-rpcCtx.Done():
return rpcCtx.Err()
case <-c.ctx.Done():
return c.ctx.Err()
}
err := f(rpcCtx)
if err == nil {
return nil
Expand Down

0 comments on commit 398e6ba

Please sign in to comment.