diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 59ea6337115..f6f11c4e0f5 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -22,6 +22,7 @@ import ( pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + "go.etcd.io/etcd/clientv3" ) var ( @@ -111,6 +112,24 @@ func (c *baseCase) GetConfig() *Config { return c.cfg.Clone() } +// ETCDCase is the interface for all etcd api cases. +type ETCDCase interface { + Case + Init(context.Context, *clientv3.Client) error + Unary(context.Context, *clientv3.Client) error +} + +// ETCDCraeteFn is function type to create ETCDCase. +type ETCDCraeteFn func() ETCDCase + +// ETCDCaseFnMap is the map for all ETCD case creation function. +var ETCDCaseFnMap = map[string]ETCDCraeteFn{ + "Get": newGetKV(), + "Put": newPutKV(), + "Delete": newDeleteKV(), + "Txn": newTxnKV(), +} + // GRPCCase is the interface for all gRPC cases. type GRPCCase interface { Case @@ -130,9 +149,6 @@ var GRPCCaseFnMap = map[string]GRPCCraeteFn{ "Tso": newTso(), } -// GRPCCaseMap is the map for all gRPC case creation function. -var GRPCCaseMap = map[string]GRPCCase{} - // HTTPCase is the interface for all HTTP cases. type HTTPCase interface { Case @@ -148,9 +164,6 @@ var HTTPCaseFnMap = map[string]HTTPCraeteFn{ "GetMinResolvedTS": newMinResolvedTS(), } -// HTTPCaseMap is the map for all HTTP cases. -var HTTPCaseMap = map[string]HTTPCase{} - type minResolvedTS struct { *baseCase } @@ -366,3 +379,102 @@ func generateKeyForSimulator(id int, keyLen int) []byte { copy(k, fmt.Sprintf("%010d", id)) return k } + +type getKV struct { + *baseCase +} + +func newGetKV() func() ETCDCase { + return func() ETCDCase { + return &getKV{ + baseCase: &baseCase{ + name: "Get", + cfg: newConfig(), + }, + } + } +} + +func (c *getKV) Init(ctx context.Context, cli *clientv3.Client) error { + for i := 0; i < 100; i++ { + _, err := cli.Put(ctx, fmt.Sprintf("/test/0001/%4d", i), fmt.Sprintf("%4d", i)) + if err != nil { + return err + } + } + return nil +} + +func (c *getKV) Unary(ctx context.Context, cli *clientv3.Client) error { + _, err := cli.Get(ctx, "/test/0001", clientv3.WithPrefix()) + return err +} + +type putKV struct { + *baseCase +} + +func newPutKV() func() ETCDCase { + return func() ETCDCase { + return &putKV{ + baseCase: &baseCase{ + name: "Put", + cfg: newConfig(), + }, + } + } +} + +func (c *putKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil } + +func (c *putKV) Unary(ctx context.Context, cli *clientv3.Client) error { + _, err := cli.Put(ctx, "/test/0001/0000", "test") + return err +} + +type deleteKV struct { + *baseCase +} + +func newDeleteKV() func() ETCDCase { + return func() ETCDCase { + return &deleteKV{ + baseCase: &baseCase{ + name: "Put", + cfg: newConfig(), + }, + } + } +} + +func (c *deleteKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil } + +func (c *deleteKV) Unary(ctx context.Context, cli *clientv3.Client) error { + _, err := cli.Delete(ctx, "/test/0001/0000") + return err +} + +type txnKV struct { + *baseCase +} + +func newTxnKV() func() ETCDCase { + return func() ETCDCase { + return &txnKV{ + baseCase: &baseCase{ + name: "Put", + cfg: newConfig(), + }, + } + } +} + +func (c *txnKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil } + +func (c *txnKV) Unary(ctx context.Context, cli *clientv3.Client) error { + txn := cli.Txn(ctx) + txn = txn.If(clientv3.Compare(clientv3.Value("/test/0001/0000"), "=", "test")) + txn = txn.Then(clientv3.OpPut("/test/0001/0000", "test2")) + _, err := txn.Commit() + return err +} diff --git a/tools/pd-api-bench/cases/controller.go b/tools/pd-api-bench/cases/controller.go index 2a4561a3d2a..db42c469843 100644 --- a/tools/pd-api-bench/cases/controller.go +++ b/tools/pd-api-bench/cases/controller.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -34,21 +35,25 @@ type Coordinator struct { httpClients []pdHttp.Client gRPCClients []pd.Client + etcdClients []*clientv3.Client http map[string]*httpController grpc map[string]*gRPCController + etcd map[string]*etcdController mu sync.RWMutex } // NewCoordinator returns a new coordinator. -func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client) *Coordinator { +func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client, etcdClients []*clientv3.Client) *Coordinator { return &Coordinator{ ctx: ctx, httpClients: httpClients, gRPCClients: gRPCClients, + etcdClients: etcdClients, http: make(map[string]*httpController), grpc: make(map[string]*gRPCController), + etcd: make(map[string]*etcdController), } } @@ -72,6 +77,16 @@ func (c *Coordinator) GetGRPCCase(name string) (*Config, error) { return nil, errors.Errorf("case %v does not exist.", name) } +// GetETCDCase returns the etcd case config. +func (c *Coordinator) GetETCDCase(name string) (*Config, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if controller, ok := c.etcd[name]; ok { + return controller.GetConfig(), nil + } + return nil, errors.Errorf("case %v does not exist.", name) +} + // GetAllHTTPCases returns the all HTTP case configs. func (c *Coordinator) GetAllHTTPCases() map[string]*Config { c.mu.RLock() @@ -94,6 +109,17 @@ func (c *Coordinator) GetAllGRPCCases() map[string]*Config { return ret } +// GetAllETCDCases returns the all etcd case configs. +func (c *Coordinator) GetAllETCDCases() map[string]*Config { + c.mu.RLock() + defer c.mu.RUnlock() + ret := make(map[string]*Config) + for name, c := range c.etcd { + ret[name] = c.GetConfig() + } + return ret +} + // SetHTTPCase sets the config for the specific case. func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error { c.mu.Lock() @@ -133,7 +159,29 @@ func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error { } controller.run() } else { - return errors.Errorf("HTTP case %s not implemented", name) + return errors.Errorf("gRPC case %s not implemented", name) + } + return nil +} + +// SetETCDCase sets the config for the specific case. +func (c *Coordinator) SetETCDCase(name string, cfg *Config) error { + c.mu.Lock() + defer c.mu.Unlock() + if fn, ok := ETCDCaseFnMap[name]; ok { + var controller *etcdController + if controller, ok = c.etcd[name]; !ok { + controller = newEtcdController(c.ctx, c.etcdClients, fn) + c.etcd[name] = controller + } + controller.stop() + controller.SetQPS(cfg.QPS) + if cfg.Burst > 0 { + controller.SetBurst(cfg.Burst) + } + controller.run() + } else { + return errors.Errorf("etcd case %s not implemented", name) } return nil } @@ -266,3 +314,73 @@ func (c *gRPCController) stop() { c.cancel = nil c.wg.Wait() } + +type etcdController struct { + ETCDCase + clients []*clientv3.Client + pctx context.Context + + ctx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup +} + +func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn ETCDCraeteFn) *etcdController { + c := &etcdController{ + pctx: ctx, + clients: clis, + ETCDCase: fn(), + } + return c +} + +// run tries to run the gRPC api bench. +func (c *etcdController) run() { + if c.GetQPS() <= 0 || c.cancel != nil { + return + } + c.ctx, c.cancel = context.WithCancel(c.pctx) + qps := c.GetQPS() + burst := c.GetBurst() + cliNum := int64(len(c.clients)) + tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond + log.Info("begin to run etcd case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt)) + err := c.Init(c.ctx, c.clients[0]) + if err != nil { + log.Error("init error", zap.String("case", c.Name()), zap.Error(err)) + return + } + for _, cli := range c.clients { + c.wg.Add(1) + go func(cli *clientv3.Client) { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + for i := int64(0); i < burst; i++ { + err := c.Unary(c.ctx, cli) + if err != nil { + log.Error("meet erorr when doing etcd request", zap.String("case", c.Name()), zap.Error(err)) + } + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running etcd case") + return + } + } + }(cli) + } +} + +// stop stops the etcd api bench. +func (c *etcdController) stop() { + if c.cancel == nil { + return + } + c.cancel() + c.cancel = nil + c.wg.Wait() +} diff --git a/tools/pd-api-bench/config/config.go b/tools/pd-api-bench/config/config.go index 8898c0e3083..675e665ab0a 100644 --- a/tools/pd-api-bench/config/config.go +++ b/tools/pd-api-bench/config/config.go @@ -45,6 +45,7 @@ type Config struct { // only for init HTTP map[string]cases.Config `toml:"http" json:"http"` GRPC map[string]cases.Config `toml:"grpc" json:"grpc"` + ETCD map[string]cases.Config `toml:"etcd" json:"etcd"` } // NewConfig return a set of settings. @@ -108,6 +109,12 @@ func (c *Config) InitCoordinator(co *cases.Coordinator) { log.Error("create gRPC case failed", zap.Error(err)) } } + for name, cfg := range c.ETCD { + err := co.SetETCDCase(name, &cfg) + if err != nil { + log.Error("create etcd case failed", zap.Error(err)) + } + } } // Adjust is used to adjust configurations diff --git a/tools/pd-api-bench/config/simconfig.toml b/tools/pd-api-bench/config/simconfig.toml index 9a05001973b..48e5a2595ba 100644 --- a/tools/pd-api-bench/config/simconfig.toml +++ b/tools/pd-api-bench/config/simconfig.toml @@ -9,4 +9,8 @@ pd = "127.0.0.1:2379" burst = 1 [grpc.GetStores] qps = 1000 + burst = 1 +[etcd] + [etcd.Get] + qps = 1 burst = 1 \ No newline at end of file diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index 681c3579012..b64bec40104 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -40,6 +40,7 @@ import ( "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-api-bench/cases" "github.com/tikv/pd/tools/pd-api-bench/config" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -119,6 +120,10 @@ func main() { pdClis[i] = newPDClient(ctx, cfg) pdClis[i].UpdateOption(pd.EnableFollowerHandle, true) } + etcdClis := make([]*clientv3.Client, cfg.Client) + for i := int64(0); i < cfg.Client; i++ { + etcdClis[i] = newEtcdClient(cfg) + } httpClis := make([]pdHttp.Client, cfg.Client) for i := int64(0); i < cfg.Client; i++ { sd := pdClis[i].GetServiceDiscovery() @@ -129,7 +134,7 @@ func main() { log.Fatal("InitCluster error", zap.Error(err)) } - coordinator := cases.NewCoordinator(ctx, httpClis, pdClis) + coordinator := cases.NewCoordinator(ctx, httpClis, pdClis, etcdClis) hcaseStr := strings.Split(httpCases, ",") for _, str := range hcaseStr { @@ -158,6 +163,9 @@ func main() { for _, cli := range httpClis { cli.Close() } + for _, cli := range etcdClis { + cli.Close() + } log.Info("Exit") switch sig { case syscall.SIGTERM: @@ -276,6 +284,23 @@ func runHTTPServer(cfg *config.Config, co *cases.Coordinator) { co.SetGRPCCase(name, cfg) c.String(http.StatusOK, "") }) + engine.POST("config/etcd/all", func(c *gin.Context) { + var input map[string]cases.Config + if err := c.ShouldBindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for name, cfg := range input { + co.SetETCDCase(name, &cfg) + } + c.String(http.StatusOK, "") + }) + engine.POST("config/etcd/:name", func(c *gin.Context) { + name := c.Param("name") + cfg := getCfg(c) + co.SetETCDCase(name, cfg) + c.String(http.StatusOK, "") + }) engine.GET("config/http/all", func(c *gin.Context) { all := co.GetAllHTTPCases() @@ -303,6 +328,19 @@ func runHTTPServer(cfg *config.Config, co *cases.Coordinator) { } c.IndentedJSON(http.StatusOK, cfg) }) + engine.GET("config/etcd/all", func(c *gin.Context) { + all := co.GetAllETCDCases() + c.IndentedJSON(http.StatusOK, all) + }) + engine.GET("config/etcd/:name", func(c *gin.Context) { + name := c.Param("name") + cfg, err := co.GetETCDCase(name) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, cfg) + }) // nolint engine.Run(cfg.StatusAddr) } @@ -313,13 +351,38 @@ func trimHTTPPrefix(str string) string { return str } +const ( + keepaliveTime = 10 * time.Second + keepaliveTimeout = 3 * time.Second +) + +func newEtcdClient(cfg *config.Config) *clientv3.Client { + lgc := zap.NewProductionConfig() + lgc.Encoding = log.ZapEncodingName + tlsCfg, err := tlsutil.TLSConfig{ + CAPath: cfg.CaPath, + CertPath: cfg.CertPath, + KeyPath: cfg.KeyPath, + }.ToTLSConfig() + if err != nil { + log.Fatal("fail to create etcd client", zap.Error(err)) + return nil + } + clientConfig := clientv3.Config{ + Endpoints: []string{trimHTTPPrefix(cfg.PDAddr)}, + DialTimeout: keepaliveTimeout, + TLS: tlsCfg, + LogConfig: &lgc, + } + client, err := clientv3.New(clientConfig) + if err != nil { + log.Fatal("fail to create pd client", zap.Error(err)) + } + return client +} + // newPDClient returns a pd client. func newPDClient(ctx context.Context, cfg *config.Config) pd.Client { - const ( - keepaliveTime = 10 * time.Second - keepaliveTimeout = 3 * time.Second - ) - addrs := []string{trimHTTPPrefix(cfg.PDAddr)} pdCli, err := pd.NewClientWithContext(ctx, addrs, pd.SecurityOption{ CAPath: cfg.CaPath,