diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 86e535ee03cfb..579d71d526052 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -35,7 +35,10 @@ import ( ) type batchConn struct { - index uint32 + // An atomic flag indicates whether the batch is idle or not. + // 0 for busy, others for idle. + idle uint32 + // batchCommandsCh used for batch commands. batchCommandsCh chan *batchCommandsEntry batchCommandsClients []*batchCommandsClient @@ -44,10 +47,11 @@ type batchConn struct { // Notify rpcClient to check the idle flag idleNotify *uint32 - idle bool idleDetect *time.Timer pendingRequests prometheus.Gauge + + index uint32 } func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { @@ -62,6 +66,10 @@ func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { } } +func (a *batchConn) isIdle() bool { + return atomic.LoadUint32(&a.idle) != 0 +} + // fetchAllPendingRequests fetches all pending requests from the channel. func (a *batchConn) fetchAllPendingRequests( maxBatchSize int, @@ -78,7 +86,7 @@ func (a *batchConn) fetchAllPendingRequests( a.idleDetect.Reset(idleTimeout) case <-a.idleDetect.C: a.idleDetect.Reset(idleTimeout) - a.idle = true + atomic.AddUint32(&a.idle, 1) atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) // This batchConn to be recycled return @@ -540,7 +548,7 @@ func (c *rpcClient) recycleIdleConnArray() { var addrs []string c.RLock() for _, conn := range c.conns { - if conn.idle { + if conn.isIdle() { addrs = append(addrs, conn.target) } }