Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] *: configure server keepalive, optimize client balancer with gray-list #8463

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 112 additions & 17 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -32,18 +33,19 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
addrs []grpc.Address
// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

// readyc closes once the first connection is up
readyc chan struct{}
readyOnce sync.Once

// mu protects upEps, pinAddr, and connectingAddr
// mu protects addrs, upEps, pinAddr, and connectingAddr
mu sync.RWMutex

// addrs are the client's endpoints for grpc,
addrs *addrConns

// upc closes when upEps transitions from empty to non-zero or the balancer closes.
upc chan struct{}

Expand Down Expand Up @@ -71,11 +73,15 @@ type simpleBalancer struct {
closed bool
}

func newSimpleBalancer(eps []string) *simpleBalancer {
func newSimpleBalancer(eps []string, dialTimeout time.Duration) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
addrs := &addrConns{
all: make(map[string]grpc.Address),
failed: make(map[string]addrConn),
}
for i := range eps {
addrs[i].Addr = getHost(eps[i])
addr := grpc.Address{Addr: getHost(eps[i])}
addrs.all[addr.Addr] = addr
}
sb := &simpleBalancer{
addrs: addrs,
Expand All @@ -90,6 +96,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
}
close(sb.downc)
go sb.updateNotifyLoop()
go sb.updateAddrConns(dialTimeout)
return sb
}

Expand Down Expand Up @@ -136,16 +143,23 @@ func (b *simpleBalancer) updateAddrs(eps []string) {

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
addrs := &addrConns{
all: make(map[string]grpc.Address),
failed: make(map[string]addrConn),
}
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
addr := grpc.Address{Addr: getHost(eps[i])}
if v, ok := b.addrs.failed[addr.Addr]; ok {
addrs.failed[addr.Addr] = addrConn{addr: addr, failed: v.failed}
}
addrs.all[addr.Addr] = addr
}
b.addrs = addrs

// updating notifyCh can trigger new connections,
// only update addrs if all connections are down
// or addrs does not include pinAddr.
update := !hasAddr(addrs, b.pinAddr)
update := !addrs.hasAddr(b.pinAddr)
b.mu.Unlock()

if update {
Expand All @@ -156,13 +170,69 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
if targetAddr == addr.Addr {
return true
type addrConn struct {
addr grpc.Address
failed time.Time
}

type addrConns struct {
all map[string]grpc.Address
failed map[string]addrConn
}

func (ac *addrConns) hasAddr(addr string) bool {
_, ok := ac.all[addr]
return ok
}

func (ac *addrConns) fail(addr string) {
ad, ok := ac.all[addr]
if !ok { // endpoints were changed
return
}
ac.failed[addr] = addrConn{addr: ad, failed: time.Now()}
}

func (ac *addrConns) shouldPin(addr string) bool {
if len(ac.all) == 1 { // only endpoint available
return true
}
if len(ac.all) == len(ac.failed) { // all failed
cur, _ := ac.failed[addr]
for k, ad := range ac.failed {
if k == addr {
continue
}
// better alternative (failed ahead)
if ad.failed.Before(cur.failed) {
return false
}
}
// if no better alternatives, keep retrying
return true
}
_, ok := ac.failed[addr]
return !ok
}

func (b *simpleBalancer) updateAddrConns(d time.Duration) {
if d < 3*time.Second {
d = 3 * time.Second
}
for {
select {
case <-time.After(d):
b.mu.Lock()
for k, v := range b.addrs.failed {
if time.Since(v.failed) >= d {
delete(b.addrs.failed, k)
}
}
b.mu.Unlock()
case <-b.stopc:
return
}
}
return false
}

func (b *simpleBalancer) updateNotifyLoop() {
Expand Down Expand Up @@ -221,14 +291,31 @@ func (b *simpleBalancer) updateNotifyLoop() {

func (b *simpleBalancer) notifyAddrs() {
b.mu.RLock()
addrs := b.addrs
n := len(b.addrs.all)
check := n > 1 && n != len(b.addrs.failed)
addrs := make([]grpc.Address, 0, n)
for ac, addr := range b.addrs.all {
if check {
if _, ok := b.addrs.failed[ac]; ok {
continue
}
}
addrs = append(addrs, addr)
}
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
}
}

// Up is part of the balancer interface, simpleBalancer implements the interface.
// Balancer notifies a set of endpoints and pin whichever endpoint gRPC
// Balancer.Up first and close other endpoints. Client could get stuck
// retrying blackholed endpoints, because gRPC just marks network I/O
// errors as transient, retrying until success; it takes several seconds
// to find healthy one in next tries. To avoid wasting retries, gray-list
// unhealthy endpoints on notifyAddrs, unlisting them after dial timeouts.
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -241,12 +328,16 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
}
// gRPC might call Up on a stale address.
// Prevent updating pinAddr with a stale address.
if !hasAddr(b.addrs, addr.Addr) {
if !b.addrs.hasAddr(addr.Addr) {
return func(err error) {}
}
if b.pinAddr != "" {
return func(err error) {}
}
// avoid pinning gray-listed addresses
if !b.addrs.shouldPin(addr.Addr) {
return func(err error) {}
}
// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
Expand All @@ -255,8 +346,12 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
if err.Error() == "grpc: failed with network I/O error" ||
err.Error() == "grpc: the connection is drained" {
b.addrs.fail(addr.Addr)
}
b.upc = make(chan struct{})
close(b.downc)
close(b.downc) // trigger notifyAddrs
b.pinAddr = ""
b.mu.Unlock()
}
Expand Down
6 changes: 3 additions & 3 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
)

func TestBalancerGetUnblocking(t *testing.T) {
sb := newSimpleBalancer(endpoints)
sb := newSimpleBalancer(endpoints, 0)
defer sb.Close()
if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestBalancerGetUnblocking(t *testing.T) {
}

func TestBalancerGetBlocking(t *testing.T) {
sb := newSimpleBalancer(endpoints)
sb := newSimpleBalancer(endpoints, 0)
defer sb.Close()
if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestBalancerDoNotBlockOnClose(t *testing.T) {
defer kcl.close()

for i := 0; i < 5; i++ {
sb := newSimpleBalancer(kcl.endpoints())
sb := newSimpleBalancer(kcl.endpoints(), 0)
conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(sb))
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func newClient(cfg *Config) (*Client, error) {
client.Password = cfg.Password
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
client.balancer = newSimpleBalancer(cfg.Endpoints, cfg.DialTimeout)
// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the ServerName is in the endpoint.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand Down
95 changes: 95 additions & 0 deletions clientv3/integration/watch_keepalive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package integration

import (
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"

"golang.org/x/net/context"
)

// TestWatchKeepAlive tests when watch discovers it cannot talk to
// blackholed endpoint, client balancer switches to healthy one.
// TODO: test with '-tags cluster_proxy'
func TestWatchKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
GRPCKeepAliveInterval: 2 * time.Second, // server-to-client ping
GRPCKeepAliveTimeout: 2 * time.Second,
})
defer clus.Terminate(t)

ccfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: 3 * time.Second,
DialKeepAliveTime: 2 * time.Second,
DialKeepAliveTimeout: 2 * time.Second,
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify())
if _, ok := <-wch; !ok {
t.Fatalf("watch failed on creation")
}

clus.Members[0].Blackhole()

// expects endpoint switch to ep[1]
cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr())

// ep[0] keepalive time-out after DialKeepAliveTime + DialKeepAliveTimeout
// wait extra for processing network error for endpoint switching
timeout := ccfg.DialKeepAliveTime + ccfg.DialKeepAliveTimeout + ccfg.DialTimeout
time.Sleep(timeout)

if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(5 * time.Second):
t.Fatal("took too long to receive events")
}

clus.Members[0].Unblackhole()
clus.Members[1].Blackhole()
defer clus.Members[1].Unblackhole()

// wait for ep[0] recover, ep[1] fail
time.Sleep(timeout)

if _, err = clus.Client(0).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(5 * time.Second):
t.Fatal("took too long to receive events")
}
}
Loading