Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support create index with ingest method #37847

Merged
merged 15 commits into from
Sep 16, 2022
5 changes: 4 additions & 1 deletion br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (local *local) SplitAndScatterRegionByRanges(
if err != nil {
log.FromContext(ctx).Warn("fetch table region size statistics failed",
zap.String("table", tableInfo.Name), zap.Error(err))
tableRegionStats = make(map[uint64]int64)
tableRegionStats, err = make(map[uint64]int64), nil
}
}

Expand Down Expand Up @@ -348,6 +348,9 @@ func (local *local) SplitAndScatterRegionByRanges(
}

func fetchTableRegionSizeStats(ctx context.Context, db *sql.DB, tableID int64) (map[uint64]int64, error) {
if db == nil {
return nil, errors.Errorf("db is nil")
}
exec := &common.SQLWithRetry{
DB: db,
Logger: log.FromContext(ctx),
Expand Down
13 changes: 12 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -655,7 +656,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic

switch bfWorkerType {
case typeAddIndexWorker:
idxWorker, err := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
idxWorker, err := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc, job)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -712,6 +713,16 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
err := bc.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
} else {
return errors.New(ingest.LitErrGetBackendFail)
}
}
remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
Expand Down Expand Up @@ -681,6 +682,8 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
// Start some background routine to manage TiFlash replica.
d.wg.Run(d.PollTiFlashRoutine)

ingest.InitGlobalLightningEnv()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after this PR, we will use Lightning as default, right?

Copy link
Contributor Author

@tangenta tangenta Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the tidb_ddl_enable_fast_reorg is true, the lightning backfill is used.


return nil
}

Expand Down
155 changes: 135 additions & 20 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -598,7 +600,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
switch indexInfo.State {
case model.StateNone:
// none -> delete only
reorgTp := pickBackfillType(job)
reorgTp := pickBackfillType(w, job)
if reorgTp.NeedMergeProcess() {
indexInfo.BackfillState = model.BackfillStateRunning
}
Expand Down Expand Up @@ -675,20 +677,53 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

// pickBackfillType determines which backfill process will be used.
func pickBackfillType(job *model.Job) model.ReorgType {
func pickBackfillType(w *worker, job *model.Job) model.ReorgType {
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
// The backfill task has been started.
// Don't switch the backfill process.
return job.ReorgMeta.ReorgTp
}
if IsEnableFastReorg() {
canUseIngest := canUseIngest(w)
if ingest.LitInitialized && canUseIngest {
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge
}
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", ingest.LitInitialized),
zap.Bool("can use ingest", canUseIngest))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge
}
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn
}

// canUseIngest indicates whether it can use ingest way to backfill index.
func canUseIngest(w *worker) bool {
ctx, err := w.sessPool.get()
if err != nil {
return false
}
defer w.sessPool.put(ctx)
failpoint.Inject("EnablePiTR", func() {
logutil.BgLogger().Info("lightning: mock enable PiTR")
failpoint.Return(true)
})
// Ingest way is not compatible with PiTR.
return !utils.CheckLogBackupEnabled(ctx)
}

// tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong.
func tryFallbackToTxnMerge(job *model.Job, err error) {
if job.State != model.JobStateRollingback {
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
job.SnapshotVer = 0
}
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
Expand All @@ -704,25 +739,68 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
bfProcess := pickBackfillType(job)
bfProcess := pickBackfillType(w, job)
if !bfProcess.NeedMergeProcess() {
return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
}
switch indexInfo.BackfillState {
case model.BackfillStateRunning:
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
return false, ver, errors.Trace(err)
}
if done {
indexInfo.BackfillState = model.BackfillStateReadyToMerge
logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
switch bfProcess {
case model.ReorgTypeLitMerge:
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if ok && bc.Done() {
break
}
if !ok && job.SnapshotVer != 0 {
// The owner is crashed or changed, we need to restart the backfill.
job.SnapshotVer = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
tryFallbackToTxnMerge(job, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function log "txn-merge", but this function is "lit-merge". Do we need to update this log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error for 'lit-merge', we will fallback to 'txn-merge'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

return false, ver, errors.Trace(err)
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
}
if !done {
return false, ver, nil
}
err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
logutil.BgLogger().Warn("[ddl] import index duplicate key, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
} else {
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
tryFallbackToTxnMerge(job, err)
}
ingest.LitBackCtxMgr.Unregister(job.ID)
return false, ver, errors.Trace(err)
}
bc.SetDone()
case model.ReorgTypeTxnMerge:
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil || !done {
return false, ver, errors.Trace(err)
}
}
indexInfo.BackfillState = model.BackfillStateReadyToMerge
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateReadyToMerge:
logutil.BgLogger().Info("[ddl] index backfill state merge sync", zap.Int64("job ID", job.ID),
logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
indexInfo.BackfillState = model.BackfillStateMerging
if bfProcess == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg.
ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1061,7 +1139,8 @@ type baseIndexWorker struct {

type addIndexWorker struct {
baseIndexWorker
index table.Index
index table.Index
writerCtx *ingest.WriterContext

// The following attributes are used to reduce memory allocation.
idxKeyBufs [][]byte
Expand All @@ -1070,7 +1149,7 @@ type addIndexWorker struct {
}

func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column,
reorgInfo *reorgInfo, jc *JobContext) (*addIndexWorker, error) {
reorgInfo *reorgInfo, jc *JobContext, job *model.Job) (*addIndexWorker, error) {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
Expand All @@ -1079,6 +1158,23 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)

var lwCtx *ingest.WriterContext
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail))
}
ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID)
if err != nil {
return nil, errors.Trace(errors.New(ingest.LitErrCreateEngineFail))
}
lwCtx, err = ei.NewWriterCtx(id)
if err != nil {
return nil, err
}
}

return &addIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillWorker: newBackfillWorker(sessCtx, id, t, reorgInfo, typeAddIndexWorker),
Expand All @@ -1090,7 +1186,8 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
sqlMode: reorgInfo.ReorgMeta.SQLMode,
jobContext: jc,
},
index: index,
index: index,
writerCtx: lwCtx,
}, nil
}

Expand Down Expand Up @@ -1384,14 +1481,32 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
}

// Create the index.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
continue
}
if w.writerCtx == nil {
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
continue
}

return errors.Trace(err)
return errors.Trace(err)
}
} else { // The lightning environment is ready.
vars := w.sessCtx.GetSessionVars()
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf)
if err != nil {
return errors.Trace(err)
}
idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
if err != nil {
return errors.Trace(err)
}
err = w.writerCtx.WriteRow(key, idxVal)
if err != nil {
return errors.Trace(err)
}
writeBufs.IndexKeyBuf = key
}
taskCtx.addedCount++
}
Expand Down
24 changes: 16 additions & 8 deletions ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin
func (ei *engineInfo) Flush() error {
err := ei.openedEngine.Flush(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}
return nil
Expand All @@ -78,13 +79,15 @@ func (ei *engineInfo) Clean() {
indexEngine := ei.openedEngine
closedEngine, err := indexEngine.Close(ei.ctx, ei.cfg)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
// Here the local intermediate files will be removed.
err = closedEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCleanEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrCleanEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
}

Expand All @@ -94,14 +97,16 @@ func (ei *engineInfo) ImportAndClean() error {
indexEngine := ei.openedEngine
closeEngine, err1 := indexEngine.Close(ei.ctx, ei.cfg)
if err1 != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
}
ei.openedEngine = nil

err := ei.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

Expand All @@ -111,14 +116,16 @@ func (ei *engineInfo) ImportAndClean() error {
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrIngestDataErr)
}

// Clean up the engine local workspace.
err = closeEngine.Cleanup(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
logutil.BgLogger().Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return errors.New(LitErrCloseEngineErr)
}
return nil
Expand All @@ -139,7 +146,8 @@ func (ei *engineInfo) NewWriterCtx(id int) (*WriterContext, error) {

wCtx, err := ei.newWriterContext(id)
if err != nil {
logutil.BgLogger().Error(LitErrCreateContextFail, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID),
logutil.BgLogger().Error(LitErrCreateContextFail, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID),
zap.Int("worker ID", id))
return nil, err
}
Expand Down
Loading