Skip to content

Commit

Permalink
Merge pull request #7842 from heyitsanthony/fix-switch-race
Browse files Browse the repository at this point in the history
clientv3: don't race on upc/downc/switch endpoints in balancer
  • Loading branch information
Anthony Romano committed May 3, 2017
2 parents 61c5a0c + 61abf25 commit 9fee35b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
86 changes: 49 additions & 37 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
notifyCh <- addrs
sb := &simpleBalancer{
addrs: addrs,
notifyCh: notifyCh,
Expand All @@ -89,6 +88,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
}
close(sb.downc)
go sb.updateNotifyLoop()
return sb
}
Expand Down Expand Up @@ -170,38 +170,51 @@ func (b *simpleBalancer) updateNotifyLoop() {

for {
b.mu.RLock()
upc := b.upc
upc, downc, addr := b.upc, b.downc, b.pinAddr
b.mu.RUnlock()
var downc chan struct{}
// downc or upc should be closed
select {
case <-downc:
downc = nil
default:
}
select {
case <-upc:
var addr string
b.mu.RLock()
addr = b.pinAddr
// Up() sets pinAddr and downc as a pair under b.mu
downc = b.downc
b.mu.RUnlock()
if addr == "" {
break
upc = nil
default:
}
switch {
case downc == nil && upc == nil:
// stale
select {
case <-b.stopc:
return
default:
}
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
case downc == nil:
b.notifyAddrs()
select {
case <-upc:
case <-b.updateAddrsC:
b.notifyAddrs()
case <-b.stopc:
return
}
case upc == nil:
select {
// close connections that are not the pinned address
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
case <-downc:
case <-b.stopc:
return
}
select {
case <-downc:
case <-b.updateAddrsC:
case <-b.stopc:
return
}
case <-b.updateAddrsC:
b.notifyAddrs()
continue
}
select {
case <-downc:
b.notifyAddrs()
case <-b.updateAddrsC:
b.notifyAddrs()
case <-b.stopc:
return
}
}
}
Expand Down Expand Up @@ -231,23 +244,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}

if b.pinAddr == "" {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
if b.pinAddr != "" {
return func(err error) {}
}

// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
if b.pinAddr == addr.Addr {
b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = ""
}
b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
}
}
Expand Down Expand Up @@ -280,6 +290,8 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
b.mu.RUnlock()
select {
case <-ch:
case <-b.donec:
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
case <-ctx.Done():
return grpc.Address{Addr: ""}, nil, ctx.Err()
}
Expand Down
9 changes: 8 additions & 1 deletion integration/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (b *bridge) serveListen() {
b.mu.Unlock()
select {
case <-b.stopc:
inc.Close()
return
case <-pausec:
}
Expand Down Expand Up @@ -152,10 +153,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
wg.Add(2)
go func() {
io.Copy(bc.out, bc.in)
bc.close()
wg.Done()
}()
go func() {
io.Copy(bc.in, bc.out)
bc.close()
wg.Done()
}()
wg.Wait()
Expand All @@ -168,7 +171,11 @@ type bridgeConn struct {
}

func (bc *bridgeConn) Close() {
bc.close()
<-bc.donec
}

func (bc *bridgeConn) close() {
bc.in.Close()
bc.out.Close()
<-bc.donec
}

0 comments on commit 9fee35b

Please sign in to comment.