Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Aug 12, 2024
1 parent 7a3b67f commit 9e95940
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 33 deletions.
16 changes: 4 additions & 12 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
}

exprCtx := sessCtx.GetExprCtx()
batchCnt := rInfo.ReorgMeta.BatchSize
if batchCnt == 0 {
batchCnt = int(variable.GetDDLReorgBatchSize())
}
batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.d,
Expand Down Expand Up @@ -419,10 +416,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
})

// Change the batch size dynamically.
newBatchCnt := job.ReorgMeta.BatchSize
if newBatchCnt == 0 {
newBatchCnt = int(variable.GetDDLReorgBatchSize())
}
newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
w.GetCtx().batchCnt = newBatchCnt
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)
Expand Down Expand Up @@ -683,7 +677,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc)
if err != nil {
Expand Down Expand Up @@ -732,8 +726,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}

concurrency := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
Expand All @@ -747,7 +739,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
reorgInfo.EndKey,
job.ReorgMeta,
avgRowSize,
concurrency,
importConc,
cpMgr,
rowCntListener,
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -153,7 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
ingest.ResolveConcurrency(job.ReorgMeta.Concurrency),
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
)
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.Concurrency
if workerCnt == 0 {
workerCnt = int(variable.GetDDLReorgWorkerCounter())
}
workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
Expand Down Expand Up @@ -235,10 +232,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context)
}

func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.Concurrency
if workerCnt == 0 {
workerCnt = int(variable.GetDDLReorgWorkerCounter())
}
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
return min(workerCnt, maxBackfillWorkerSize)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
})
} else {
job := reorgInfo.Job
workerCntLimit := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
cpuCount, err := handle.GetCPUCountOfNode(ctx)
if err != nil {
return err
Expand Down
13 changes: 4 additions & 9 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ func genConfig(
return c, nil
}

// ResolveConcurrency gets the concurrency used for ingest mode of adding index.
func ResolveConcurrency(hintConc int) int {
if hintConc > 0 {
return hintConc
}
return int(variable.GetDDLReorgWorkerCounter())
}

// CopReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
Expand All @@ -113,7 +105,10 @@ func CopReadBatchSize(hintSize int) int {
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func CopReadChunkPoolSize(hintConc int) int {
return ResolveConcurrency(hintConc) * 10
if hintConc > 0 {
return 10 * hintConc
}
return 10 * int(variable.GetDDLReorgWorkerCounter())
}

// NewDDLTLS creates a common.TLS from the tidb config for DDL.
Expand Down
22 changes: 20 additions & 2 deletions pkg/parser/model/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,30 @@ type DDLReorgMeta struct {
ReorgTp ReorgType `json:"reorg_tp"`
IsFastReorg bool `json:"is_fast_reorg"`
IsDistReorg bool `json:"is_dist_reorg"`
Concurrency int `json:"concurrency"`
BatchSize int `json:"batch_size"`
UseCloudStorage bool `json:"use_cloud_storage"`
ResourceGroupName string `json:"resource_group_name"`
Version int64 `json:"version"`
TargetScope string `json:"target_scope"`
// These two variables are set when corresponding session variables are set explicitly. When they are set,
// user cannot change it by setting the global one. Otherwise, they can be adjusted dynamically through global var.
Concurrency int `json:"concurrency"`
BatchSize int `json:"batch_size"`
}

// GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta or returns the default value.
func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int {
if dm == nil || dm.Concurrency == 0 {
return defaultVal
}
return dm.Concurrency
}

// GetBatchSizeOrDefault gets the batch size from DDLReorgMeta or returns the default value.
func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int {
if dm == nil || dm.BatchSize == 0 {
return defaultVal
}
return dm.BatchSize
}

const (
Expand Down

0 comments on commit 9e95940

Please sign in to comment.