Skip to content

Commit

Permalink
Reduce default concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Aylei <rayingecho@gmail.com>
  • Loading branch information
aylei committed Apr 17, 2019
1 parent f392195 commit 7e3e4fa
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 38 deletions.
24 changes: 10 additions & 14 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ type OperatorActions interface {
CheckTidbClusterStatusOrDie(info *TidbClusterConfig)
BeginInsertDataTo(info *TidbClusterConfig) error
BeginInsertDataToOrDie(info *TidbClusterConfig)
StopInsertDataTo(info *TidbClusterConfig) error
StopInsertDataToOrDie(info *TidbClusterConfig)
StopInsertDataTo(info *TidbClusterConfig)
ScaleTidbCluster(info *TidbClusterConfig) error
ScaleTidbClusterOrDie(info *TidbClusterConfig)
CheckScaleInSafely(info *TidbClusterConfig) error
Expand Down Expand Up @@ -153,6 +152,8 @@ type TidbClusterConfig struct {
UserName string
InitSecretName string
BackupSecretName string

BlockWriteConfig blockwriter.Config
}

func (tc *TidbClusterConfig) BackupHelmSetString(m map[string]string) string {
Expand Down Expand Up @@ -318,7 +319,7 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterConfig) error {
}

// init blockWriter case
info.blockWriter = blockwriter.NewBlockWriterCase(oa.cfg.BlockWriter)
info.blockWriter = blockwriter.NewBlockWriterCase(info.BlockWriteConfig)
info.blockWriter.ClusterName = info.ClusterName

return nil
Expand Down Expand Up @@ -475,9 +476,12 @@ func (oa *operatorActions) CheckTidbClusterStatusOrDie(info *TidbClusterConfig)

func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterConfig) error {
dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password)
if info.blockWriter == nil {
return fmt.Errorf("block writer not initialized for cluster: %s", info.ClusterName)
}
glog.Infof("[%s] [%s] open TiDB connections, concurrency: %d",
info.blockWriter, info.ClusterName, oa.cfg.BlockWriter.Concurrency)
db, err := util.OpenDB(dsn, oa.cfg.BlockWriter.Concurrency)
info.blockWriter, info.ClusterName, info.blockWriter.GetConcurrency())
db, err := util.OpenDB(dsn, info.blockWriter.GetConcurrency())
if err != nil {
return err
}
Expand All @@ -492,16 +496,8 @@ func (oa *operatorActions) BeginInsertDataToOrDie(info *TidbClusterConfig) {
}
}

func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) error {
func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) {
info.blockWriter.Stop()
return nil
}

func (oa *operatorActions) StopInsertDataToOrDie(info *TidbClusterConfig) {
err := oa.StopInsertDataTo(info)
if err != nil {
panic(err)
}
}

func chartPath(name string, tag string) string {
Expand Down
22 changes: 10 additions & 12 deletions tests/backup/backupcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ func NewBackupCase(operator tests.OperatorActions, srcCluster *tests.TidbCluster

func (bc *BackupCase) Run() error {

err := bc.operator.StopInsertDataTo(bc.srcCluster)
if err != nil {
glog.Errorf("cluster:[%s] stop insert data failed,error: %v", bc.srcCluster.ClusterName, err)
return err
}
// pause write pressure during backup
defer func() {
go func() {
if err := bc.operator.BeginInsertDataTo(bc.srcCluster); err != nil {
glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
}
}()
}()
bc.operator.StopInsertDataTo(bc.srcCluster)

err = bc.operator.DeployAdHocBackup(bc.srcCluster)
err := bc.operator.DeployAdHocBackup(bc.srcCluster)
if err != nil {
glog.Errorf("cluster:[%s] deploy happen error: %v", bc.srcCluster.ClusterName, err)
return err
Expand Down Expand Up @@ -120,12 +124,6 @@ func (bc *BackupCase) Run() error {
return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

err = bc.operator.BeginInsertDataTo(bc.srcCluster)
if err != nil {
glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
return err
}

return nil
}

Expand Down
18 changes: 8 additions & 10 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func main() {
"tidb.resources.requests.memory": "1Gi",
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}
cluster2 := &tests.TidbClusterConfig{
Namespace: clusterName2,
Expand Down Expand Up @@ -123,8 +124,9 @@ func main() {
// TODO assert the the monitor's pvc exist and clean it when bootstrapping
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}

// cluster backup and restore
Expand Down Expand Up @@ -155,12 +157,8 @@ func main() {
oa.CheckTidbClusterStatusOrDie(cluster2)

defer func() {
if err := oa.StopInsertDataTo(cluster1); err != nil {
glog.Errorf("cluster:[%s] stop insert data failed,error: %v", cluster1.ClusterName, err)
}
if err := oa.StopInsertDataTo(cluster2); err != nil {
glog.Errorf("cluster:[%s] stop insert data failed,error: %v", cluster2.ClusterName, err)
}
oa.StopInsertDataTo(cluster1)
oa.StopInsertDataTo(cluster2)
}()
go oa.BeginInsertDataToOrDie(cluster1)
go oa.BeginInsertDataToOrDie(cluster2)
Expand Down
2 changes: 1 addition & 1 deletion tests/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

const (
defaultTableNum int = 64
defaultConcurrency = 512
defaultConcurrency = 128
defaultBatchSize = 100
defaultRawSize = 100
)
Expand Down
6 changes: 5 additions & 1 deletion tests/pkg/blockwriter/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ const (

// BlockWriterCase is for concurrent writing blocks.
type BlockWriterCase struct {
cfg Config
bws []*blockWriter

isRunning uint32
isInit uint32
stopChan chan struct{}

cfg Config
ClusterName string

sync.RWMutex
Expand Down Expand Up @@ -75,6 +75,10 @@ func NewBlockWriterCase(cfg Config) *BlockWriterCase {
return c
}

func (c *BlockWriterCase) GetConcurrency() int {
return c.cfg.Concurrency
}

func (c *BlockWriterCase) initBlocks() {
c.bws = make([]*blockWriter, c.cfg.Concurrency)
for i := 0; i < c.cfg.Concurrency; i++ {
Expand Down

0 comments on commit 7e3e4fa

Please sign in to comment.