diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index d289ecef06b..4a39e54d43c 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "math/rand" + "strconv" + "time" "github.com/pingcap/log" pd "github.com/tikv/pd/client" @@ -142,12 +144,14 @@ type GRPCCraeteFn func() GRPCCase // GRPCCaseFnMap is the map for all gRPC case creation function. var GRPCCaseFnMap = map[string]GRPCCraeteFn{ - "GetRegion": newGetRegion(), - "GetRegionEnableFollower": newGetRegionEnableFollower(), - "GetStore": newGetStore(), - "GetStores": newGetStores(), - "ScanRegions": newScanRegions(), - "Tso": newTso(), + "GetRegion": newGetRegion(), + "GetRegionEnableFollower": newGetRegionEnableFollower(), + "GetStore": newGetStore(), + "GetStores": newGetStores(), + "ScanRegions": newScanRegions(), + "Tso": newTso(), + "UpdateGCSafePoint": newUpdateGCSafePoint(), + "UpdateServiceGCSafePoint": newUpdateServiceGCSafePoint(), } // HTTPCase is the interface for all HTTP cases. @@ -227,6 +231,55 @@ func (c *regionsStats) Do(ctx context.Context, cli pdHttp.Client) error { return nil } +type updateGCSafePoint struct { + *baseCase +} + +func newUpdateGCSafePoint() func() GRPCCase { + return func() GRPCCase { + return &updateGCSafePoint{ + baseCase: &baseCase{ + name: "UpdateGCSafePoint", + cfg: newConfig(), + }, + } + } +} + +func (c *updateGCSafePoint) Unary(ctx context.Context, cli pd.Client) error { + s := time.Now().Unix() + _, err := cli.UpdateGCSafePoint(ctx, uint64(s)) + if err != nil { + return err + } + return nil +} + +type updateServiceGCSafePoint struct { + *baseCase +} + +func newUpdateServiceGCSafePoint() func() GRPCCase { + return func() GRPCCase { + return &updateServiceGCSafePoint{ + baseCase: &baseCase{ + name: "UpdateServiceGCSafePoint", + cfg: newConfig(), + }, + } + } +} + +func (c *updateServiceGCSafePoint) Unary(ctx context.Context, cli pd.Client) error { + s := time.Now().Unix() + id := rand.Int63n(100) + 1 + _, err := cli.UpdateServiceGCSafePoint(ctx, strconv.FormatInt(id, 10), id, uint64(s)) + if err != nil { + return err + } + return nil +} + type getRegion struct { *baseCase } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 5879cd307f0..cd8365cb984 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -316,7 +316,7 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options) { rs.awakenRegions.Store(awakenRegions) } -func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient { +func createHeartbeatStream(ctx context.Context, cfg *config.Config) (pdpb.PDClient, pdpb.PD_RegionHeartbeatClient) { cli, err := newClient(ctx, cfg) if err != nil { log.Fatal("create client error", zap.Error(err)) @@ -332,7 +332,7 @@ func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_Regi stream.Recv() } }() - return stream + return cli, stream } func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_RegionHeartbeatClient, storeID uint64, rep report.Report) { @@ -507,14 +507,20 @@ func main() { bootstrap(ctx, cli) putStores(ctx, cfg, cli, stores) log.Info("finish put stores") + clis := make(map[uint64]pdpb.PDClient, cfg.StoreCount) httpCli := pdHttp.NewClient("tools-heartbeat-bench", []string{cfg.PDAddr}, pdHttp.WithTLSConfig(loadTLSConfig(cfg))) go deleteOperators(ctx, httpCli) streams := make(map[uint64]pdpb.PD_RegionHeartbeatClient, cfg.StoreCount) for i := 1; i <= cfg.StoreCount; i++ { - streams[uint64(i)] = createHeartbeatStream(ctx, cfg) + clis[uint64(i)], streams[uint64(i)] = createHeartbeatStream(ctx, cfg) + } + header := &pdpb.RequestHeader{ + ClusterId: clusterID, } var heartbeatTicker = time.NewTicker(regionReportInterval * time.Second) defer heartbeatTicker.Stop() + var resolvedTSTicker = time.NewTicker(time.Second) + defer resolvedTSTicker.Stop() for { select { case <-heartbeatTicker.C: @@ -547,6 +553,26 @@ func main() { log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) regions.update(cfg, options) go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update. + case <-resolvedTSTicker.C: + wg := &sync.WaitGroup{} + for i := 1; i <= cfg.StoreCount; i++ { + id := uint64(i) + wg.Add(1) + go func(wg *sync.WaitGroup, id uint64) { + defer wg.Done() + cli := clis[id] + _, err := cli.ReportMinResolvedTS(ctx, &pdpb.ReportMinResolvedTsRequest{ + Header: header, + StoreId: id, + MinResolvedTs: uint64(time.Now().Unix()), + }) + if err != nil { + log.Error("send resolved TS error", zap.Uint64("store-id", id), zap.Error(err)) + return + } + }(wg, id) + } + wg.Wait() case <-ctx.Done(): log.Info("got signal to exit") switch sig {