Skip to content

Commit

Permalink
Merge branch 'master' into api_bench/more_api
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Feb 6, 2024
2 parents ac6218a + b2f40b6 commit 67d6450
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// LoadedFromSync means this region's meta info loaded from region syncer.
// Only used for test.
func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -705,7 +711,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -718,19 +724,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
saveKV, saveCache = true, true
} else {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
Expand All @@ -756,9 +758,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -581,7 +580,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
region := core.RegionFromHeartbeat(request)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
12 changes: 12 additions & 0 deletions pkg/mcs/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/grpcutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -167,3 +168,14 @@ func (bs *BaseServer) IsSecure() bool {
func (bs *BaseServer) StartTimestamp() int64 {
return bs.startTimestamp
}

// CloseClientConns closes all client connections.
func (bs *BaseServer) CloseClientConns() {
bs.clientConns.Range(func(key, value any) bool {
conn := value.(*grpc.ClientConn)
if err := conn.Close(); err != nil {
log.Error("close client connection meet error")
}
return true
})
}
1 change: 1 addition & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.CloseClientConns()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand Down
25 changes: 20 additions & 5 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type RegionInfoProvider interface {
// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32

const emptyStatistic = RegionStatisticType(0)

// region status type
const (
MissPeer RegionStatisticType = 1 << iota
Expand Down Expand Up @@ -148,6 +150,9 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID
// due to some special state types.
func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
regionID := region.GetID()
if !r.isObserved(regionID) {
return true
}
if r.IsRegionStatsType(regionID, OversizedRegion) !=
region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) {
return true
Expand All @@ -156,6 +161,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys()))
}

// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region.
func (r *RegionStatistics) isObserved(id uint64) bool {
r.RLock()
defer r.RUnlock()
_, ok := r.index[id]
return ok
}

// Observe records the current regions' status.
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) {
r.Lock()
Expand All @@ -164,7 +177,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
desiredReplicas = r.conf.GetMaxReplicas()
desiredVoters = desiredReplicas
peerTypeIndex RegionStatisticType
deleteIndex RegionStatisticType
)
// Check if the region meets count requirements of its rules.
if r.conf.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -240,10 +252,10 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
}
// Remove the info if any of the conditions are not met any more.
if oldIndex, ok := r.index[regionID]; ok {
deleteIndex = oldIndex &^ peerTypeIndex
if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic {
deleteIndex := oldIndex &^ peerTypeIndex
r.deleteEntry(deleteIndex, regionID)
}
r.deleteEntry(deleteIndex, regionID)
r.index[regionID] = peerTypeIndex
}

Expand All @@ -252,7 +264,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) {
r.Lock()
defer r.Unlock()
if oldIndex, ok := r.index[regionID]; ok {
r.deleteEntry(oldIndex, regionID)
delete(r.index, regionID)
if oldIndex > emptyStatistic {
r.deleteEntry(oldIndex, regionID)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func (am *AllocatorManager) close() {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
}

for _, cc := range am.localAllocatorConn.clientConns {
if err := cc.Close(); err != nil {
log.Error("failed to close allocator manager grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err))
}
}

am.cancel()
am.svcLoopWG.Wait()

Expand Down
81 changes: 52 additions & 29 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go proxyWithDiscard(ctx, re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
// Create an etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls)
Expand All @@ -307,59 +309,80 @@ func checkEtcdWithHangLeader(t *testing.T) error {
return err
}

func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
func proxyWithDiscard(ctx context.Context, re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
defer l.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
type accepted struct {
conn net.Conn
err error
}
accept := make(chan accepted, 1)
go func() {
// closed by `l.Close()`
conn, err := l.Accept()
accept <- accepted{conn, err}
}()

select {
case <-ctx.Done():
return
case a := <-accept:
if a.err != nil {
return
}
go func(connect net.Conn) {
serverConnect, err := net.DialTimeout("tcp", server, 3*time.Second)
re.NoError(err)
pipe(ctx, connect, serverConnect, enableDiscard)
}(a.conn)
}
}
}

func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
func pipe(ctx context.Context, src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
err := ioCopy(ctx, src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
err := ioCopy(ctx, dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) error {
buffer := make([]byte, 32*1024)
for {
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
select {
case <-ctx.Done():
return nil
default:
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
if readNum != writeNum {
return io.ErrShortWrite
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
return errRead
}
}
if errRead != nil {
err = errRead
break
}
}
return err
}

type loopWatcherTestSuite struct {
Expand Down
2 changes: 2 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (suite *regionTestSuite) TestRegionCheck() {
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
re.Equal(histKeys, r7)

// ref https://github.com/tikv/pd/issues/3558, we should change size to pass `NeedUpdate` for observing.
r = r.Clone(core.SetApproximateKeys(0))
mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{})
mustRegionHeartbeat(re, suite.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer")
Expand Down
11 changes: 3 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,9 +1004,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1037,11 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionUpdateCacheEventCounter.Inc()
}

isPrepared := true
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
isPrepared = c.IsPrepared()
}
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, isPrepared)
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
Loading

0 comments on commit 67d6450

Please sign in to comment.