Skip to content

Commit

Permalink
store/tikv: fix CheckStreamTimeoutLoop goroutine leak (#13812)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and sre-bot committed Dec 5, 2019
1 parent 448af25 commit b1391ec
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,23 @@ type connArray struct {
streamTimeout chan *tikvrpc.Lease
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}
}

func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
}
if err := a.Init(addr, security, idleNotify, done); err != nil {
if err := a.Init(addr, security, idleNotify); err != nil {
return nil, err
}
return a, nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error {
a.target = addr

opt := grpc.WithInsecure()
Expand Down Expand Up @@ -162,7 +164,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done)
if allowBatch {
go a.batchSendLoop(cfg.TiKVClient)
}
Expand All @@ -187,6 +189,8 @@ func (a *connArray) Close() {
a.v[i] = nil
}
}

close(a.done)
}

// rpcClient is RPC client struct.
Expand All @@ -195,7 +199,6 @@ func (a *connArray) Close() {
// that there are too many concurrent requests which overload the service of TiKV.
type rpcClient struct {
sync.RWMutex
done chan struct{}

conns map[string]*connArray
security config.Security
Expand All @@ -208,7 +211,6 @@ type rpcClient struct {

func newRPCClient(security config.Security) *rpcClient {
return &rpcClient{
done: make(chan struct{}, 1),
conns: make(map[string]*connArray),
security: security,
}
Expand Down Expand Up @@ -244,7 +246,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,7 +354,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R

func (c *rpcClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
close(c.done)
c.closeConns()
return nil
}

0 comments on commit b1391ec

Please sign in to comment.