Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, model: support for dist-reorg on partitioned tables #41145

Merged
merged 18 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 64 additions & 51 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@ const (
typeAddIndexMergeTmpWorker backfillerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
minGenTaskBatch = 1024
minDistTaskCnt = 32
retrySQLTimes = 10
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
genPhysicalTableTaskBatch = 256
minGenTaskBatch = 1024
minGenPhysicalTableTaskBatch = 64
minDistTaskCnt = 64
retrySQLTimes = 10
retrySQLInterval = 300 * time.Millisecond
zimulala marked this conversation as resolved.
Show resolved Hide resolved
)

// RetrySQLInterval is export for test.
Expand All @@ -89,15 +92,15 @@ func (bT backfillerType) String() string {

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
PhysicalTableID int64
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
Expand Down Expand Up @@ -250,6 +253,7 @@ type backfillResult struct {
type reorgBackfillTask struct {
bfJob *BackfillJob
physicalTable table.PhysicalTable
index table.Index

// TODO: Remove the following fields after remove the function of run.
id int
Expand Down Expand Up @@ -437,6 +441,14 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) updatePartitionIndexInfo(task *reorgBackfillTask) {
if _, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok {
addIdxWorker.index = task.index
}
}
}

func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String()))
defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() {
Expand All @@ -458,6 +470,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul
time.Sleep(100 * time.Millisecond)
})

w.updatePartitionIndexInfo(task)
// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
Expand Down Expand Up @@ -666,9 +679,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
prefix = t.IndexPrefix()
Expand All @@ -679,7 +691,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID)
jobCtx := reorgInfo.d.jobContext(job.ID)
for i, keyRange := range kvRanges {
startKey := keyRange.StartKey
endKey := keyRange.EndKey
Expand All @@ -688,6 +700,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] get backfill range task, change end key",
zap.Int64("pID", pID), zap.Int64("pTbl", phyTbl.GetPhysicalID()), zap.Bool("mergeTIdx", reorgInfo.mergingTmpIdx),
zimulala marked this conversation as resolved.
Show resolved Hide resolved
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}
Expand All @@ -700,8 +713,8 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,

task := &reorgBackfillTask{
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
jobID: job.ID,
physicalTableID: pID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
Expand All @@ -720,7 +733,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
batchTasks := getBatchTasks(t, scheduler.reorgInfo, scheduler.reorgInfo.PhysicalTableID, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -1105,8 +1118,8 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
return nil
}

func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error {
func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error {
bJobs = bJobs[:0]
instanceID := ""
if notDistTask {
Expand All @@ -1116,12 +1129,11 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
// TODO: Adjust the number of ranges(region) for each task.
for _, task := range batchTasks {
bm := &model.BackfillMeta{
PhysicalTableID: reorgInfo.PhysicalTableID,
IsUnique: isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
IsUnique: sJobCtx.isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
JobMeta: &model.JobMeta{
SchemaID: reorgInfo.Job.SchemaID,
TableID: reorgInfo.Job.TableID,
Expand All @@ -1130,19 +1142,19 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
},
}
bj := &BackfillJob{
ID: *id,
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
Tp: bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
ID: sJobCtx.currBackfillJobID.Add(1),
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
PhysicalTableID: phyTblID,
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
*id++
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand All @@ -1151,29 +1163,30 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
return nil
}

func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool,
bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error {
endKey := reorgInfo.EndKey
isFirstOps := true
bJobs := make([]*BackfillJob, 0, genTaskBatch)
func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error {
isFirstOps := !sJobCtx.isMultiPhyTbl
batchSize := sJobCtx.batchSize
startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey
bJobs := make([]*BackfillJob, 0, batchSize)
for {
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch)
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch)
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, pTblMeta.PhyTblID, kvRanges, batchSize)
if len(batchTasks) == 0 {
break
}
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil {
if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil {
return errors.Trace(err)
}
isFirstOps = false

remains := kvRanges[len(batchTasks):]
dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job")
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
zap.Int64("physicalID", pTblMeta.PhyTblID),
zap.Int("batchTasksCnt", len(batchTasks)),
zap.Int("totalRegionCnt", len(kvRanges)),
zap.Int("remainRegionCnt", len(remains)),
Expand All @@ -1185,11 +1198,11 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo,
}

for {
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID)
if err != nil {
return errors.Trace(err)
}
if bJobCnt < minGenTaskBatch {
if bJobCnt < sJobCtx.minBatchSize {
break
}
time.Sleep(RetrySQLInterval)
Expand Down
4 changes: 2 additions & 2 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
zimulala marked this conversation as resolved.
Show resolved Hide resolved
type int,
exec_id blob default null,
exec_lease timestamp,
Expand All @@ -74,7 +74,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
Expand Down
55 changes: 25 additions & 30 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ddl

import (
"encoding/hex"
"sync"
"time"

Expand All @@ -27,13 +26,16 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gpool/spmc"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

const GetJobWithoutPartition = -1

type backfillWorkerContext struct {
currID int
mu sync.Mutex
Expand Down Expand Up @@ -107,19 +109,12 @@ func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker {
return bw
}

func runBackfillJobs(d *ddl, ingestBackendCtx *ingest.BackendContext, bJob *BackfillJob, jobCtx *JobContext) (table.Table, error) {
func runBackfillJobs(d *ddl, sess *session, ingestBackendCtx *ingest.BackendContext, bJob *BackfillJob, jobCtx *JobContext) (table.Table, error) {
dbInfo, tbl, err := d.getTableByTxn(d.store, bJob.Meta.SchemaID, bJob.Meta.TableID)
if err != nil {
logutil.BgLogger().Warn("[ddl] runBackfillJobs gets table failed", zap.String("bfJob", bJob.AbbrStr()), zap.Error(err))
return nil, err
}
se, err := d.sessPool.get()
if err != nil {
logutil.BgLogger().Warn("[ddl] run backfill jobs get session failed", zap.Error(err))
return nil, err
}
defer d.sessPool.put(se)
sess := newSession(se)

workerCnt := int(variable.GetDDLReorgWorkerCounter())
// TODO: Different worker using different newBackfillerFunc.
Expand All @@ -134,9 +129,14 @@ func runBackfillJobs(d *ddl, ingestBackendCtx *ingest.BackendContext, bJob *Back
return bfWorker.runTask(task)
})

runningPID := int64(0)
// If txn-merge we needn't to claim the backfill job through the partition table
if ingestBackendCtx == nil {
runningPID = GetJobWithoutPartition
}
proFunc := func() ([]*reorgBackfillTask, error) {
// TODO: After BackfillJob replaces reorgBackfillTask, use backfiller's GetTasks instead of it.
return GetTasks(d.ddlCtx, sess, tbl, bJob.JobID, workerCnt+5)
return GetTasks(d.ddlCtx, sess, tbl, bJob.JobID, &runningPID, workerCnt+5)
}
// add new task
resultCh, control := d.backfillWorkerPool.AddProduceBySlice(proFunc, 0, workerCtx, spmc.WithConcurrency(workerCnt))
Expand Down Expand Up @@ -218,42 +218,34 @@ func (bwm *backfilWorkerManager) close(d *ddl) error {
// backfillJob2Task builds reorg task.
func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBackfillTask, error) {
pt := t.(table.PhysicalTable)
var idx table.Index
if tbl, ok := t.(table.PartitionedTable); ok {
pt = tbl.GetPartition(bfJob.Meta.PhysicalTableID)
pt = tbl.GetPartition(bfJob.PhysicalTableID)
if pt == nil {
return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.Meta.PhysicalTableID, t.Meta().ID)
return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.PhysicalTableID, t.Meta().ID)
}
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, bfJob.EleID)
idx = tables.NewIndex(bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
endKey := bfJob.EndKey
// TODO: Check reorgInfo.mergingTmpIdx
endK, err := getRangeEndKey(dc.jobContext(bfJob.JobID), dc.store, bfJob.Meta.Priority, pt.RecordPrefix(), bfJob.StartKey, endKey)
if err != nil {
logutil.BgLogger().Info("[ddl] convert backfill job to task, get reverse key failed", zap.String("backfill job", bfJob.AbbrStr()), zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] convert backfill job to task, change end key", zap.String("backfill job",
bfJob.AbbrStr()), zap.String("current key", hex.EncodeToString(bfJob.StartKey)), zap.Bool("end include", bfJob.Meta.EndInclude),
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}

return &reorgBackfillTask{
bfJob: bfJob,
physicalTable: pt,
bfJob: bfJob,
index: idx,
physicalTable: pt,
physicalTableID: bfJob.PhysicalTableID,
// TODO: Remove these fields after remove the old logic.
sqlQuery: bfJob.Meta.Query,
startKey: bfJob.StartKey,
endKey: endKey,
endKey: bfJob.EndKey,
endInclude: bfJob.Meta.EndInclude,
priority: bfJob.Meta.Priority}, nil
}

// GetTasks gets the backfill tasks associated with the non-runningJobID.
func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, concurrency int) ([]*reorgBackfillTask, error) {
func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, runningPID *int64, concurrency int) ([]*reorgBackfillTask, error) {
// TODO: At present, only add index is processed. In the future, different elements need to be distinguished.
var err error
var bJobs []*BackfillJob
for i := 0; i < retrySQLTimes; i++ {
bJobs, err = GetAndMarkBackfillJobsForOneEle(sess, concurrency, runningJobID, d.uuid, InstanceLease)
bJobs, err := GetAndMarkBackfillJobsForOneEle(sess, concurrency, runningJobID, d.uuid, *runningPID, InstanceLease)
if err != nil {
// TODO: add test: if all tidbs can't get the unmark backfill job(a tidb mark a backfill job, other tidbs returned, then the tidb can't handle this job.)
if dbterror.ErrDDLJobNotFound.Equal(err) {
Expand All @@ -267,6 +259,9 @@ func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, con
}
}

if *runningPID != GetJobWithoutPartition {
*runningPID = bJobs[0].PhysicalTableID
}
tasks := make([]*reorgBackfillTask, 0, len(bJobs))
for _, bJ := range bJobs {
task, err := d.backfillJob2Task(tbl, bJ)
Expand Down
Loading