Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: API bench supports etcd #7715

Merged
merged 4 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 118 additions & 6 deletions tools/pd-api-bench/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

pd "github.com/tikv/pd/client"
pdHttp "github.com/tikv/pd/client/http"
"go.etcd.io/etcd/clientv3"
)

var (
Expand Down Expand Up @@ -111,6 +112,24 @@ func (c *baseCase) GetConfig() *Config {
return c.cfg.Clone()
}

// ETCDCase is the interface for all etcd api cases.
type ETCDCase interface {
Case
Init(context.Context, *clientv3.Client) error
Unary(context.Context, *clientv3.Client) error
}

// ETCDCraeteFn is function type to create ETCDCase.
type ETCDCraeteFn func() ETCDCase

// ETCDCaseFnMap is the map for all ETCD case creation function.
var ETCDCaseFnMap = map[string]ETCDCraeteFn{
"Get": newGetKV(),
"Put": newPutKV(),
"Delete": newDeleteKV(),
"Txn": newTxnKV(),
}

// GRPCCase is the interface for all gRPC cases.
type GRPCCase interface {
Case
Expand All @@ -130,9 +149,6 @@ var GRPCCaseFnMap = map[string]GRPCCraeteFn{
"Tso": newTso(),
}

// GRPCCaseMap is the map for all gRPC case creation function.
var GRPCCaseMap = map[string]GRPCCase{}

// HTTPCase is the interface for all HTTP cases.
type HTTPCase interface {
Case
Expand All @@ -148,9 +164,6 @@ var HTTPCaseFnMap = map[string]HTTPCraeteFn{
"GetMinResolvedTS": newMinResolvedTS(),
}

// HTTPCaseMap is the map for all HTTP cases.
var HTTPCaseMap = map[string]HTTPCase{}

type minResolvedTS struct {
*baseCase
}
Expand Down Expand Up @@ -366,3 +379,102 @@ func generateKeyForSimulator(id int, keyLen int) []byte {
copy(k, fmt.Sprintf("%010d", id))
return k
}

type getKV struct {
*baseCase
}

func newGetKV() func() ETCDCase {
return func() ETCDCase {
return &getKV{
baseCase: &baseCase{
name: "Get",
cfg: newConfig(),
},
}
}
}

func (c *getKV) Init(ctx context.Context, cli *clientv3.Client) error {
for i := 0; i < 100; i++ {
_, err := cli.Put(ctx, fmt.Sprintf("/test/0001/%4d", i), fmt.Sprintf("%4d", i))
if err != nil {
return err
}
}
return nil
}

func (c *getKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Get(ctx, "/test/0001", clientv3.WithPrefix())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key can be cluster id or others that must exist in etcd.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay. But I also have an idea, which is to initialize some key values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

return err
}

type putKV struct {
*baseCase
}

func newPutKV() func() ETCDCase {
return func() ETCDCase {
return &putKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *putKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *putKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Put(ctx, "/test/0001/0000", "test")
return err
}

type deleteKV struct {
*baseCase
}

func newDeleteKV() func() ETCDCase {
return func() ETCDCase {
return &deleteKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *deleteKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *deleteKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Delete(ctx, "/test/0001/0000")
return err
}

type txnKV struct {
*baseCase
}

func newTxnKV() func() ETCDCase {
return func() ETCDCase {
return &txnKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *txnKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *txnKV) Unary(ctx context.Context, cli *clientv3.Client) error {
txn := cli.Txn(ctx)
txn = txn.If(clientv3.Compare(clientv3.Value("/test/0001/0000"), "=", "test"))
txn = txn.Then(clientv3.OpPut("/test/0001/0000", "test2"))
_, err := txn.Commit()
return err
}
122 changes: 120 additions & 2 deletions tools/pd-api-bench/cases/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
pd "github.com/tikv/pd/client"
pdHttp "github.com/tikv/pd/client/http"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand All @@ -34,21 +35,25 @@ type Coordinator struct {

httpClients []pdHttp.Client
gRPCClients []pd.Client
etcdClients []*clientv3.Client

http map[string]*httpController
grpc map[string]*gRPCController
etcd map[string]*etcdController

mu sync.RWMutex
}

// NewCoordinator returns a new coordinator.
func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client) *Coordinator {
func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client, etcdClients []*clientv3.Client) *Coordinator {
return &Coordinator{
ctx: ctx,
httpClients: httpClients,
gRPCClients: gRPCClients,
etcdClients: etcdClients,
http: make(map[string]*httpController),
grpc: make(map[string]*gRPCController),
etcd: make(map[string]*etcdController),
}
}

Expand All @@ -72,6 +77,16 @@ func (c *Coordinator) GetGRPCCase(name string) (*Config, error) {
return nil, errors.Errorf("case %v does not exist.", name)
}

// GetETCDCase returns the etcd case config.
func (c *Coordinator) GetETCDCase(name string) (*Config, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if controller, ok := c.etcd[name]; ok {
return controller.GetConfig(), nil
}
return nil, errors.Errorf("case %v does not exist.", name)
}

// GetAllHTTPCases returns the all HTTP case configs.
func (c *Coordinator) GetAllHTTPCases() map[string]*Config {
c.mu.RLock()
Expand All @@ -94,6 +109,17 @@ func (c *Coordinator) GetAllGRPCCases() map[string]*Config {
return ret
}

// GetAllETCDCases returns the all etcd case configs.
func (c *Coordinator) GetAllETCDCases() map[string]*Config {
c.mu.RLock()
defer c.mu.RUnlock()
ret := make(map[string]*Config)
for name, c := range c.etcd {
ret[name] = c.GetConfig()
}
return ret
}

// SetHTTPCase sets the config for the specific case.
func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error {
c.mu.Lock()
Expand Down Expand Up @@ -133,7 +159,29 @@ func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error {
}
controller.run()
} else {
return errors.Errorf("HTTP case %s not implemented", name)
return errors.Errorf("gRPC case %s not implemented", name)
}
return nil
}

// SetETCDCase sets the config for the specific case.
func (c *Coordinator) SetETCDCase(name string, cfg *Config) error {
c.mu.Lock()
defer c.mu.Unlock()
if fn, ok := ETCDCaseFnMap[name]; ok {
var controller *etcdController
if controller, ok = c.etcd[name]; !ok {
controller = newEtcdController(c.ctx, c.etcdClients, fn)
c.etcd[name] = controller
}
controller.stop()
controller.SetQPS(cfg.QPS)
if cfg.Burst > 0 {
controller.SetBurst(cfg.Burst)
}
controller.run()
} else {
return errors.Errorf("etcd case %s not implemented", name)
}
return nil
}
Expand Down Expand Up @@ -266,3 +314,73 @@ func (c *gRPCController) stop() {
c.cancel = nil
c.wg.Wait()
}

type etcdController struct {
ETCDCase
clients []*clientv3.Client
pctx context.Context

ctx context.Context
cancel context.CancelFunc

wg sync.WaitGroup
}

func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn ETCDCraeteFn) *etcdController {
c := &etcdController{
pctx: ctx,
clients: clis,
ETCDCase: fn(),
}
return c
}

// run tries to run the gRPC api bench.
func (c *etcdController) run() {
if c.GetQPS() <= 0 || c.cancel != nil {
return
}
c.ctx, c.cancel = context.WithCancel(c.pctx)
qps := c.GetQPS()
burst := c.GetBurst()
cliNum := int64(len(c.clients))
tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond
log.Info("begin to run etcd case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt))
err := c.Init(c.ctx, c.clients[0])
if err != nil {
log.Error("init error", zap.String("case", c.Name()), zap.Error(err))
return
}
for _, cli := range c.clients {
c.wg.Add(1)
go func(cli *clientv3.Client) {
defer c.wg.Done()
var ticker = time.NewTicker(tt)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for i := int64(0); i < burst; i++ {
err := c.Unary(c.ctx, cli)
if err != nil {
log.Error("meet erorr when doing etcd request", zap.String("case", c.Name()), zap.Error(err))
}
}
case <-c.ctx.Done():
log.Info("Got signal to exit running etcd case")
return
}
}
}(cli)
}
}

// stop stops the etcd api bench.
func (c *etcdController) stop() {
if c.cancel == nil {
return
}
c.cancel()
c.cancel = nil
c.wg.Wait()
}
7 changes: 7 additions & 0 deletions tools/pd-api-bench/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Config struct {
// only for init
HTTP map[string]cases.Config `toml:"http" json:"http"`
GRPC map[string]cases.Config `toml:"grpc" json:"grpc"`
ETCD map[string]cases.Config `toml:"etcd" json:"etcd"`
}

// NewConfig return a set of settings.
Expand Down Expand Up @@ -108,6 +109,12 @@ func (c *Config) InitCoordinator(co *cases.Coordinator) {
log.Error("create gRPC case failed", zap.Error(err))
}
}
for name, cfg := range c.ETCD {
err := co.SetETCDCase(name, &cfg)
if err != nil {
log.Error("create etcd case failed", zap.Error(err))
}
}
}

// Adjust is used to adjust configurations
Expand Down
4 changes: 4 additions & 0 deletions tools/pd-api-bench/config/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ pd = "127.0.0.1:2379"
burst = 1
[grpc.GetStores]
qps = 1000
burst = 1
[etcd]
[etcd.Get]
qps = 1
burst = 1
Loading
Loading