Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: health balancer #8545

Merged
merged 4 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 94 additions & 28 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,40 @@ import (
// This error is returned only when opts.BlockingWait is true.
var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")

type notifyMsg int

const (
notifyReset notifyMsg = iota
notifyNext
)

type balancer interface {
grpc.Balancer
ConnectNotify() <-chan struct{}

endpoint(host string) string
endpoints() []string

// up is Up but includes whether the balancer will use the connection.
up(addr grpc.Address) (func(error), bool)

// updateAddrs changes the balancer's endpoints.
updateAddrs(endpoints ...string)
// ready returns a channel that closes when the balancer first connects.
ready() <-chan struct{}
// next forces the balancer to switch endpoints.
next()
}

// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
// addrs are the client's endpoint addresses for grpc
addrs []grpc.Address

// eps holds the raw endpoints from the client
eps []string

// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

Expand All @@ -57,7 +86,7 @@ type simpleBalancer struct {
donec chan struct{}

// updateAddrsC notifies updateNotifyLoop to update addrs.
updateAddrsC chan struct{}
updateAddrsC chan notifyMsg

// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
Expand All @@ -72,20 +101,18 @@ type simpleBalancer struct {
}

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
notifyCh := make(chan []grpc.Address)
addrs := eps2addrs(eps)
sb := &simpleBalancer{
addrs: addrs,
eps: eps,
notifyCh: notifyCh,
readyc: make(chan struct{}),
upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
updateAddrsC: make(chan struct{}, 1),
updateAddrsC: make(chan notifyMsg),
host2ep: getHost2ep(eps),
}
close(sb.downc)
Expand All @@ -101,12 +128,20 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
return b.upc
}

func (b *simpleBalancer) getEndpoint(host string) string {
func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }

func (b *simpleBalancer) endpoint(host string) string {
b.mu.Lock()
defer b.mu.Unlock()
return b.host2ep[host]
}

func (b *simpleBalancer) endpoints() []string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.eps
}

func getHost2ep(eps []string) map[string]string {
hm := make(map[string]string, len(eps))
for i := range eps {
Expand All @@ -116,7 +151,7 @@ func getHost2ep(eps []string) map[string]string {
return hm
}

func (b *simpleBalancer) updateAddrs(eps []string) {
func (b *simpleBalancer) updateAddrs(eps ...string) {
np := getHost2ep(eps)

b.mu.Lock()
Expand All @@ -135,27 +170,37 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
}
b.addrs = addrs
b.addrs, b.eps = eps2addrs(eps), eps

// updating notifyCh can trigger new connections,
// only update addrs if all connections are down
// or addrs does not include pinAddr.
update := !hasAddr(addrs, b.pinAddr)
update := !hasAddr(b.addrs, b.pinAddr)
b.mu.Unlock()

if update {
select {
case b.updateAddrsC <- struct{}{}:
case b.updateAddrsC <- notifyReset:
case <-b.stopc:
}
}
}

func (b *simpleBalancer) next() {
b.mu.RLock()
downc := b.downc
b.mu.RUnlock()
select {
case b.updateAddrsC <- notifyNext:
case <-b.stopc:
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return on <-b.stopc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the channel is closed, so it'll pass through the next select without blocking

// wait until disconnect so new RPCs are not issued on old connection
select {
case <-downc:
case <-b.stopc:
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
if targetAddr == addr.Addr {
Expand Down Expand Up @@ -192,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() {
default:
}
case downc == nil:
b.notifyAddrs()
b.notifyAddrs(notifyReset)
select {
case <-upc:
case <-b.updateAddrsC:
b.notifyAddrs()
case msg := <-b.updateAddrsC:
b.notifyAddrs(msg)
case <-b.stopc:
return
}
Expand All @@ -210,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() {
}
select {
case <-downc:
case <-b.updateAddrsC:
b.notifyAddrs(notifyReset)
case msg := <-b.updateAddrsC:
b.notifyAddrs(msg)
case <-b.stopc:
return
}
b.notifyAddrs()
}
}
}

func (b *simpleBalancer) notifyAddrs() {
func (b *simpleBalancer) notifyAddrs(msg notifyMsg) {
if msg == notifyNext {
select {
case b.notifyCh <- []grpc.Address{}:
case <-b.stopc:
return
}
}
b.mu.RLock()
addrs := b.addrs
b.mu.RUnlock()
Expand All @@ -230,22 +283,27 @@ func (b *simpleBalancer) notifyAddrs() {
}

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
f, _ := b.up(addr)
return f
}

func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.mu.Lock()
defer b.mu.Unlock()

// gRPC might call Up after it called Close. We add this check
// to "fix" it up at application layer. Otherwise, will panic
// if b.upc is already closed.
if b.closed {
return func(err error) {}
return func(err error) {}, false
}
// gRPC might call Up on a stale address.
// Prevent updating pinAddr with a stale address.
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
return func(err error) {}, false
}
if b.pinAddr != "" {
return func(err error) {}
return func(err error) {}, false
}
// notify waiting Get()s and pin first connected address
close(b.upc)
Expand All @@ -259,7 +317,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
}
}, true
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
Expand Down Expand Up @@ -354,3 +412,11 @@ func getHost(ep string) string {
}
return url.Host
}

func eps2addrs(eps []string) []grpc.Address {
addrs := make([]grpc.Address, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
return addrs
}
60 changes: 60 additions & 0 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,66 @@ func TestBalancerGetBlocking(t *testing.T) {
}
}

// TestHealthBalancerGraylist checks one endpoint is tried after the other
// due to gray listing.
func TestHealthBalancerGraylist(t *testing.T) {
var wg sync.WaitGroup
// Use 3 endpoints so gray list doesn't fallback to all connections
// after failing on 2 endpoints.
lns, eps := make([]net.Listener, 3), make([]string, 3)
wg.Add(3)
connc := make(chan string, 2)
for i := range eps {
ln, err := net.Listen("tcp", ":0")
testutil.AssertNil(t, err)
lns[i], eps[i] = ln, ln.Addr().String()
go func() {
defer wg.Done()
for {
conn, err := ln.Accept()
if err != nil {
return
}
_, err = conn.Read(make([]byte, 512))
conn.Close()
if err == nil {
select {
case connc <- ln.Addr().String():
// sleep some so balancer catches up
// before attempted next reconnect.
time.Sleep(50 * time.Millisecond)
default:
}
}
}
}()
}

sb := newSimpleBalancer(eps)
tf := func(s string) (bool, error) { return false, nil }
hb := newHealthBalancer(sb, 5*time.Second, tf)

conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb))
testutil.AssertNil(t, err)
defer conn.Close()

kvc := pb.NewKVClient(conn)
<-hb.ready()

kvc.Range(context.TODO(), &pb.RangeRequest{})
ep1 := <-connc
kvc.Range(context.TODO(), &pb.RangeRequest{})
ep2 := <-connc
for _, ln := range lns {
ln.Close()
}
wg.Wait()

if ep1 == ep2 {
t.Fatalf("expected %q != %q", ep1, ep2)
}
}

// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
// See issue: https://github.com/coreos/etcd/issues/7283 for more detail.
Expand Down
16 changes: 11 additions & 5 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Client struct {

cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer
balancer balancer
mu sync.Mutex

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -116,8 +117,10 @@ func (c *Client) Endpoints() (eps []string) {

// SetEndpoints updates client's endpoints.
func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
c.cfg.Endpoints = eps
c.balancer.updateAddrs(eps)
c.mu.Unlock()
c.balancer.updateAddrs(eps...)
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
Expand Down Expand Up @@ -227,7 +230,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
opts = append(opts, dopts...)

f := func(host string, t time.Duration) (net.Conn, error) {
proto, host, _ := parseEndpoint(c.balancer.getEndpoint(host))
proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
if host == "" && endpoint != "" {
// dialing an endpoint not in the balancer; use
// endpoint passed into dial
Expand Down Expand Up @@ -375,7 +378,10 @@ func newClient(cfg *Config) (*Client, error) {
client.Password = cfg.Password
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
sb := newSimpleBalancer(cfg.Endpoints)
hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) }
client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc)

// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand All @@ -391,7 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
hasConn := false
waitc := time.After(cfg.DialTimeout)
select {
case <-client.balancer.readyc:
case <-client.balancer.ready():
hasConn = true
case <-ctx.Done():
case <-waitc:
Expand Down
Loading