diff --git a/go.mod b/go.mod index 6f6be36d883..bb72c0d5dba 100644 --- a/go.mod +++ b/go.mod @@ -16,21 +16,24 @@ require ( github.com/docker/go-units v0.4.0 github.com/elliotchance/pie/v2 v2.1.0 github.com/gin-contrib/cors v1.4.0 + github.com/gin-contrib/gzip v0.0.1 github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.8.1 github.com/go-echarts/go-echarts v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/google/btree v1.1.2 + github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.7.4 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/joho/godotenv v1.4.0 + github.com/mailru/easyjson v0.7.6 github.com/mattn/go-shellwords v1.0.12 github.com/mgechev/revive v1.0.2 github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 + github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 @@ -39,6 +42,7 @@ require ( github.com/sasha-s/go-deadlock v0.2.0 github.com/shirou/gopsutil/v3 v3.23.3 github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 + github.com/soheilhy/cmux v0.1.4 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.2 @@ -48,6 +52,7 @@ require ( github.com/unrolled/render v1.0.1 github.com/urfave/negroni v0.3.0 go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 + go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.12 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a @@ -92,7 +97,6 @@ require ( github.com/fogleman/gg v1.3.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/ghodss/yaml v1.0.0 // indirect - github.com/gin-contrib/gzip v0.0.1 github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect @@ -112,7 +116,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect @@ -128,7 +131,6 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect - github.com/mailru/easyjson v0.7.6 github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.8 // indirect @@ -158,7 +160,6 @@ require ( github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect github.com/sirupsen/logrus v1.6.0 // indirect - github.com/soheilhy/cmux v0.1.4 github.com/stretchr/objx v0.5.0 // indirect github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 // indirect github.com/tidwall/gjson v1.9.3 // indirect @@ -173,7 +174,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect // Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201 go.etcd.io/bbolt v1.3.6 // indirect - go.uber.org/atomic v1.10.0 go.uber.org/dig v1.9.0 // indirect go.uber.org/fx v1.12.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 8640a98a6b3..a007acf869e 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 19bd891196f..28a04c5640a 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,11 +2,13 @@ package server import ( "context" - "errors" "sync/atomic" "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/schedule" @@ -18,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "go.uber.org/zap" ) // Cluster is used to manage all information for scheduling purpose. @@ -28,6 +31,7 @@ type Cluster struct { ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler regionStats *statistics.RegionStatistics + labelLevelStats *statistics.LabelStatistics hotStat *statistics.HotStat storage storage.Storage coordinator *schedule.Coordinator @@ -53,6 +57,7 @@ func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storag persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + labelLevelStats: statistics.NewLabelStatistics(), storage: storage, clusterID: clusterID, checkMembershipCh: checkMembershipCh, @@ -174,4 +179,74 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + return nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index e75c78eb415..f615e0c37c0 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -15,12 +15,15 @@ package server import ( + "context" "net/http" + "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -64,8 +67,29 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { } } +// StoreHeartbeat implements gRPC PDServer. +func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { + c := s.GetCluster() + if c == nil { + // TODO: add metrics + log.Info("cluster isn't initialized") + return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil + } + + if c.GetStore(request.GetStats().GetStoreId()) == nil { + s.metaWatcher.GetStoreWatcher().ForceLoad() + } + + // TODO: add metrics + if err := c.HandleStoreHeartbeat(request); err != nil { + log.Error("handle store heartbeat failed", zap.Error(err)) + } + return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil +} + // RegisterGRPCService registers the service to gRPC server. func (s *Service) RegisterGRPCService(g *grpc.Server) { + schedulingpb.RegisterSchedulingServer(g, s) } // RegisterRESTHandler registers the service to REST server. diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 1e58bbd845f..3dbd0fc8c92 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -115,3 +115,8 @@ func (w *Watcher) Close() { w.cancel() w.wg.Wait() } + +// GetStoreWatcher returns the store watcher. +func (w *Watcher) GetStoreWatcher() *etcdutil.LoopWatcher { + return w.storeWatcher +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 1d20bb22d4d..55b265e32a5 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" @@ -76,9 +77,29 @@ var ( // GrpcServer wraps Server to provide grpc service. type GrpcServer struct { *Server + schedulingClient atomic.Value concurrentTSOProxyStreamings atomic.Int32 } +type schedulingClient struct { + client schedulingpb.SchedulingClient + lastPrimary string +} + +func (s *schedulingClient) getClient() schedulingpb.SchedulingClient { + if s == nil { + return nil + } + return s.client +} + +func (s *schedulingClient) getPrimaryAddr() string { + if s == nil { + return "" + } + return s.lastPrimary +} + type request interface { GetHeader() *pdpb.RequestHeader } @@ -978,8 +999,23 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear } s.handleDamagedStore(request.GetStats()) - storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.StoreHeartbeatRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Stats: request.GetStats(), + } + if _, err := s.schedulingClient.Load().(*schedulingClient).getClient().StoreHeartbeat(ctx, req); err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + } + } + } } if status := request.GetDrAutosyncStatus(); status != nil { @@ -993,6 +1029,21 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } +func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { + forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) + pre := s.schedulingClient.Load() + if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { + client, err := s.getDelegateClient(ctx, forwardedHost) + if err != nil { + log.Error("get delegate client failed", zap.Error(err)) + } + s.schedulingClient.Store(&schedulingClient{ + client: schedulingpb.NewSchedulingClient(client), + lastPrimary: forwardedHost, + }) + } +} + // bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error // occurs on SendAndClose() or Recv(), both endpoints will be closed. type bucketHeartbeatServer struct { diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 70006a085e9..cbb2c2d5f46 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 + github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 04eb8e8647f..f18313cb3bb 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 98c8a420238..50e816cd3ad 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 + github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 801d9b926cc..8afbfcdc70e 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 54994bbc34b..f4785a013d9 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -20,9 +20,12 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/suite" mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server" "github.com/tikv/pd/tests" "go.uber.org/goleak" ) @@ -93,8 +96,10 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re.NoError(err) re.NotEqual(uint64(0), id) suite.cluster.ResignLeader() - suite.cluster.WaitLeader() - time.Sleep(200 * time.Millisecond) + leaderName := suite.cluster.WaitLeader() + suite.pdLeader = suite.cluster.GetServer(leaderName) + suite.backendEndpoints = suite.pdLeader.GetAddr() + time.Sleep(time.Second) id1, err := cluster.AllocID() re.NoError(err) re.Greater(id1, id) @@ -126,3 +131,54 @@ func (suite *serverTestSuite) TestPrimaryChange() { return ok && newPrimaryAddr == watchedAddr }) } + +func (suite *serverTestSuite) TestForwardStoreHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: 1, + Address: "tikv1", + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + + resp1, err := s.StoreHeartbeat( + context.Background(), &pdpb.StoreHeartbeatRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Stats: &pdpb.StoreStats{ + StoreId: 1, + Capacity: 1798985089024, + Available: 1709868695552, + UsedSize: 85150956358, + KeysWritten: 20000, + BytesWritten: 199, + KeysRead: 10000, + BytesRead: 99, + }, + }, + ) + re.NoError(err) + re.Empty(resp1.GetHeader().GetError()) + testutil.Eventually(re, func() bool { + store := tc.GetPrimaryServer().GetCluster().GetStore(1) + return store.GetStoreStats().GetCapacity() == uint64(1798985089024) && + store.GetStoreStats().GetAvailable() == uint64(1709868695552) && + store.GetStoreStats().GetUsedSize() == uint64(85150956358) && + store.GetStoreStats().GetKeysWritten() == uint64(20000) && + store.GetStoreStats().GetBytesWritten() == uint64(199) && + store.GetStoreStats().GetKeysRead() == uint64(10000) && + store.GetStoreStats().GetBytesRead() == uint64(99) + }) +} diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index a9788e01ed2..cea17c73141 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 + github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 891bc42d24f..5ccf67f8b13 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index 1cef635e20b..bb3d5be530f 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -70,7 +70,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 // indirect + github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index 2cdd7ca7a38..29d39f504df 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -263,8 +263,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw= -github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM= +github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=