diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8809a706936..916200bfa3e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -55,7 +55,7 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { } // Collect collects the cluster information. -func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) { +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) { if hasRegionStats { c.GetRegionStats().Observe(region, stores) } diff --git a/pkg/core/region.go b/pkg/core/region.go index b141e8478da..0595e75f198 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -98,6 +98,12 @@ func (r *RegionInfo) LoadedFromStorage() bool { return r.source == Storage } +// LoadedFromSync means this region's meta info loaded from region syncer. +// Only used for test. +func (r *RegionInfo) LoadedFromSync() bool { + return r.source == Sync +} + // NewRegionInfo creates RegionInfo with region's meta and leader peer. func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo { regionInfo := &RegionInfo{ @@ -705,7 +711,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) +type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -718,19 +724,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) { + return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, isNew = true, true, true + saveKV, saveCache = true, true } else { - if origin.LoadedFromStorage() { - isNew = true - } r := region.GetRegionEpoch() o := origin.GetRegionEpoch() if r.GetVersion() > o.GetVersion() { @@ -756,9 +758,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { - if origin.GetLeader().GetId() == 0 { - isNew = true - } else if log.GetLevel() <= zap.InfoLevel { + if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { info("leader changed", zap.Uint64("region-id", region.GetID()), zap.Uint64("from", origin.GetLeader().GetStoreId()), diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 508e7aa59aa..1e3b6073dda 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, _, needSync := RegionGuide(regionA, regionB) + _, _, needSync := RegionGuide(regionA, regionB) re.Equal(testCase.needSync, needSync) } } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 2d02fd00434..1fac592f791 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -217,6 +217,7 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() + s.CloseClientConns() s.serverLoopCancel() s.serverLoopWg.Wait() diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 9752a9160f8..9e75057621e 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -557,9 +557,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) - if !saveCache && !isNew { + _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -581,7 +580,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { cluster.HandleOverlaps(c, overlaps) } - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) return nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index cda8faf9f44..ebce73e3303 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -158,7 +158,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat s.hbStreams.BindStream(storeID, server) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat)) + region := core.RegionFromHeartbeat(request) err = c.HandleRegionHeartbeat(region) if err != nil { // TODO: if we need to send the error back to API server. diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 8013f1d0e7b..0ee6003e2f9 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -319,6 +319,7 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() + s.CloseClientConns() s.serverLoopCancel() s.serverLoopWg.Wait() diff --git a/pkg/mcs/server/server.go b/pkg/mcs/server/server.go index a8dedd8ad91..2c008e8f5e8 100644 --- a/pkg/mcs/server/server.go +++ b/pkg/mcs/server/server.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/grpcutil" "go.etcd.io/etcd/clientv3" @@ -167,3 +168,14 @@ func (bs *BaseServer) IsSecure() bool { func (bs *BaseServer) StartTimestamp() int64 { return bs.startTimestamp } + +// CloseClientConns closes all client connections. +func (bs *BaseServer) CloseClientConns() { + bs.clientConns.Range(func(key, value any) bool { + conn := value.(*grpc.ClientConn) + if err := conn.Close(); err != nil { + log.Error("close client connection meet error") + } + return true + }) +} diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 55473efc8bb..92ffc6603c3 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -177,6 +177,7 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() + s.CloseClientConns() s.serverLoopCancel() s.serverLoopWg.Wait() diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 21af8e152fd..52655b093a2 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -33,6 +33,8 @@ type RegionInfoProvider interface { // RegionStatisticType represents the type of the region's status. type RegionStatisticType uint32 +const emptyStatistic = RegionStatisticType(0) + // region status type const ( MissPeer RegionStatisticType = 1 << iota @@ -148,6 +150,9 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID // due to some special state types. func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { regionID := region.GetID() + if !r.isObserved(regionID) { + return true + } if r.IsRegionStatsType(regionID, OversizedRegion) != region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) { return true @@ -156,6 +161,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) } +// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region. +func (r *RegionStatistics) isObserved(id uint64) bool { + r.RLock() + defer r.RUnlock() + _, ok := r.index[id] + return ok +} + // Observe records the current regions' status. func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { r.Lock() @@ -164,7 +177,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store desiredReplicas = r.conf.GetMaxReplicas() desiredVoters = desiredReplicas peerTypeIndex RegionStatisticType - deleteIndex RegionStatisticType ) // Check if the region meets count requirements of its rules. if r.conf.IsPlacementRulesEnabled() { @@ -240,10 +252,10 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } } // Remove the info if any of the conditions are not met any more. - if oldIndex, ok := r.index[regionID]; ok { - deleteIndex = oldIndex &^ peerTypeIndex + if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { + deleteIndex := oldIndex &^ peerTypeIndex + r.deleteEntry(deleteIndex, regionID) } - r.deleteEntry(deleteIndex, regionID) r.index[regionID] = peerTypeIndex } @@ -252,7 +264,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { r.Lock() defer r.Unlock() if oldIndex, ok := r.index[regionID]; ok { - r.deleteEntry(oldIndex, regionID) + delete(r.index, regionID) + if oldIndex > emptyStatistic { + r.deleteEntry(oldIndex, regionID) + } } } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 00dd8c5107d..cd9a87aaf54 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -205,7 +205,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - _, saveKV, _, _ := regionGuide(region, origin) + saveKV, _, _ := regionGuide(region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 4055ec12a7d..f1683de1352 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -325,6 +325,12 @@ func (am *AllocatorManager) close() { allocatorGroup.allocator.(*GlobalTSOAllocator).close() } + for _, cc := range am.localAllocatorConn.clientConns { + if err := cc.Close(); err != nil { + log.Error("failed to close allocator manager grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + } + am.cancel() am.svcLoopWG.Wait() diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 8fb8bc59f88..1e5c8a3316f 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -285,9 +285,11 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Create a proxy to etcd1. proxyAddr := tempurl.Alloc() var enableDiscard atomic.Bool - go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go proxyWithDiscard(ctx, re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard) - // Create a etcd client with etcd1 as endpoint. + // Create an etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) client1, err := CreateEtcdClient(nil, urls) @@ -307,30 +309,48 @@ func checkEtcdWithHangLeader(t *testing.T) error { return err } -func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) { +func proxyWithDiscard(ctx context.Context, re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) { server = strings.TrimPrefix(server, "http://") proxy = strings.TrimPrefix(proxy, "http://") l, err := net.Listen("tcp", proxy) re.NoError(err) + defer l.Close() for { - connect, err := l.Accept() - re.NoError(err) - go func(connect net.Conn) { - serverConnect, err := net.Dial("tcp", server) - re.NoError(err) - pipe(connect, serverConnect, enableDiscard) - }(connect) + type accepted struct { + conn net.Conn + err error + } + accept := make(chan accepted, 1) + go func() { + // closed by `l.Close()` + conn, err := l.Accept() + accept <- accepted{conn, err} + }() + + select { + case <-ctx.Done(): + return + case a := <-accept: + if a.err != nil { + return + } + go func(connect net.Conn) { + serverConnect, err := net.DialTimeout("tcp", server, 3*time.Second) + re.NoError(err) + pipe(ctx, connect, serverConnect, enableDiscard) + }(a.conn) + } } } -func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) { +func pipe(ctx context.Context, src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) { errChan := make(chan error, 1) go func() { - err := ioCopy(src, dst, enableDiscard) + err := ioCopy(ctx, src, dst, enableDiscard) errChan <- err }() go func() { - err := ioCopy(dst, src, enableDiscard) + err := ioCopy(ctx, dst, src, enableDiscard) errChan <- err }() <-errChan @@ -338,28 +358,31 @@ func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) { src.Close() } -func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) { +func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) error { buffer := make([]byte, 32*1024) for { - if enableDiscard.Load() { - io.Copy(io.Discard, src) - } - readNum, errRead := src.Read(buffer) - if readNum > 0 { - writeNum, errWrite := dst.Write(buffer[:readNum]) - if errWrite != nil { - return errWrite + select { + case <-ctx.Done(): + return nil + default: + if enableDiscard.Load() { + io.Copy(io.Discard, src) } - if readNum != writeNum { - return io.ErrShortWrite + readNum, errRead := src.Read(buffer) + if readNum > 0 { + writeNum, errWrite := dst.Write(buffer[:readNum]) + if errWrite != nil { + return errWrite + } + if readNum != writeNum { + return io.ErrShortWrite + } + } + if errRead != nil { + return errRead } - } - if errRead != nil { - err = errRead - break } } - return err } type loopWatcherTestSuite struct { diff --git a/server/api/region_test.go b/server/api/region_test.go index 5d4c3d51605..8c0d78abd4a 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -166,6 +166,8 @@ func (suite *regionTestSuite) TestRegionCheck() { histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} re.Equal(histKeys, r7) + // ref https://github.com/tikv/pd/issues/3558, we should change size to pass `NeedUpdate` for observing. + r = r.Clone(core.SetApproximateKeys(0)) mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{}) mustRegionHeartbeat(re, suite.svr, r) url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 350a7fd5e91..075403b815a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1004,9 +1004,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { + saveKV, saveCache, needSync := regionGuide(region, origin) + if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1037,11 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionUpdateCacheEventCounter.Inc() } - isPrepared := true - if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - isPrepared = c.IsPrepared() - } - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, isPrepared) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if diff --git a/server/server.go b/server/server.go index 4d26b0bd079..ab69c2a3ad7 100644 --- a/server/server.go +++ b/server/server.go @@ -582,6 +582,14 @@ func (s *Server) Close() { cb() } + s.clientConns.Range(func(key, value any) bool { + conn := value.(*grpc.ClientConn) + if err := conn.Close(); err != nil { + log.Error("close grpc client meet error", errs.ZapError(err)) + } + return true + }) + log.Info("close server") } diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index ff9769cd1a5..ccec0a7cdc0 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -81,6 +81,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() { re := suite.Require() suite.cleanupFunc() suite.cluster.Destroy() + suite.dialClient.CloseIdleConnections() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 525b211b1fc..3415c22a77b 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -38,6 +38,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" @@ -181,6 +182,99 @@ func TestDamagedRegion(t *testing.T) { re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin)) } +func TestRegionStatistics(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + + err = tc.RunInitialServers() + re.NoError(err) + + leaderName := tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + + region := &metapb.Region{ + Id: 10, + StartKey: []byte("abc"), + EndKey: []byte("xyz"), + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + {Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner}, + }, + } + + // To put region. + regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions := rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + // wait for sync region + time.Sleep(1000 * time.Millisecond) + + leaderServer.ResignLeader() + newLeaderName := tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + r := rc.GetRegion(region.Id) + re.NotNil(r) + re.True(r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + re.NotNil(r) + re.True(r.LoadedFromStorage() || r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.NotEqual(newLeaderName, leaderName) + leaderServer.ResignLeader() + newLeaderName = tc.WaitLeader() + re.Equal(newLeaderName, leaderName) + leaderServer = tc.GetLeaderServer() + rc = leaderServer.GetRaftCluster() + r = rc.GetRegion(region.Id) + re.NotNil(r) + re.False(r.LoadedFromStorage() && r.LoadedFromSync()) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Empty(regions) + + regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + regions = rc.GetRegionStatsByType(statistics.LearnerPeer) + re.Len(regions, 1) +} + func TestStaleRegion(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index d87e8a1a5c0..6d4cbb3a6a4 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -83,7 +83,8 @@ func TestMemberDelete(t *testing.T) { {path: fmt.Sprintf("id/%d", members[1].GetServerID()), members: []*config.Config{leader.GetConfig()}}, } - httpClient := &http.Client{Timeout: 15 * time.Second} + httpClient := &http.Client{Timeout: 15 * time.Second, Transport: &http.Transport{DisableKeepAlives: true}} + defer httpClient.CloseIdleConnections() for _, table := range tables { t.Log(time.Now(), "try to delete:", table.path) testutil.Eventually(re, func() bool { @@ -103,7 +104,7 @@ func TestMemberDelete(t *testing.T) { } // Check by member list. cluster.WaitLeader() - if err = checkMemberList(re, leader.GetConfig().ClientUrls, table.members); err != nil { + if err = checkMemberList(re, *httpClient, leader.GetConfig().ClientUrls, table.members); err != nil { t.Logf("check member fail: %v", err) time.Sleep(time.Second) return false @@ -120,8 +121,7 @@ func TestMemberDelete(t *testing.T) { } } -func checkMemberList(re *require.Assertions, clientURL string, configs []*config.Config) error { - httpClient := &http.Client{Timeout: 15 * time.Second} +func checkMemberList(re *require.Assertions, httpClient http.Client, clientURL string, configs []*config.Config) error { addr := clientURL + "/pd/api/v1/members" res, err := httpClient.Get(addr) re.NoError(err)