Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: skip adding index reorg work for empty tables #56406

Merged
merged 27 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ec282c9
ddl: skip adding index reorg work for empty tables
tangenta Sep 29, 2024
eac776f
refine test code
tangenta Sep 29, 2024
8ed23d3
Merge remote-tracking branch 'upstream/master' into add-index-skip-reorg
tangenta Sep 29, 2024
60b1a2d
fix linter
tangenta Sep 29, 2024
83b1490
address comment and fix TestAddIndexMultipleDelete
tangenta Sep 29, 2024
04b322d
fix TestAddIndexMergeVersionIndexValue
tangenta Sep 29, 2024
4836b7d
refine function name
tangenta Sep 29, 2024
a92a3e7
move empty check into doReorgWorkForCreateIndex
tangenta Sep 29, 2024
71cce20
refine naming
tangenta Sep 29, 2024
084c68f
fix code and linter
tangenta Sep 29, 2024
b416be3
consistent naming
tangenta Sep 29, 2024
4250966
return if job.SnapshotVersion is not zero
tangenta Sep 29, 2024
afc6035
fix linter
tangenta Sep 29, 2024
834d295
fix TestMultiSchemaAddIndexMerge
tangenta Sep 30, 2024
fbb2854
fix TestAddIndexMergeReplaceDelete
tangenta Sep 30, 2024
d25c52d
make naming consistent
tangenta Sep 30, 2024
7dbedd1
send asc scan request instead
tangenta Sep 30, 2024
9fe9991
dont rely on stat cache
tangenta Oct 8, 2024
477b41b
fix TestDumpPlanReplayerAPI
tangenta Oct 16, 2024
0c08e25
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 16, 2024
63ec9cb
fix TestDumpPlanReplayerAPI
tangenta Oct 16, 2024
f3a9278
test: fix TestAddGlobalIndexInIngest
tangenta Oct 17, 2024
519180b
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 21, 2024
0b6423c
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 22, 2024
a4c44c4
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 24, 2024
4a5e30a
set label for topsql
tangenta Oct 25, 2024
da86bd9
fix TestPauseFailedOnCommit
tangenta Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,12 +1401,14 @@ func processJobs(

failpoint.Inject("mockCommitFailedOnDDLCommand", func(val failpoint.Value) {
if val.(bool) {
ns.Rollback()
failpoint.Return(jobErrs, errors.New("mock commit failed on admin command on ddl jobs"))
}
})

// There may be some conflict during the update, try it again
if err = ns.Commit(ctx); err != nil {
ns.Rollback()
continue
}

Expand Down
179 changes: 161 additions & 18 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -1116,6 +1117,127 @@ SwitchIndexState:
return ver, errors.Trace(err)
}

func checkIfTableReorgWorkCanSkip(
store kv.Storage,
tbl table.Table,
job *model.Job,
) bool {
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
return checkIfTableIsEmpty(ctx, store, tbl)
}

func checkIfTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
) bool {
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
pTbl := pTbl.GetPartition(pid)
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl) {
return false
}
}
return true
}
//nolint:forcetypeassert
plainTbl := tbl.(table.PhysicalTable)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl)
}

func checkIfPhysicalTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.PhysicalTable,
) bool {
hasRecord, err := ExistsTableRow(ctx, store, math.MaxInt64, tbl)
if err != nil {
logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err))
return false
}
return !hasRecord
}

func checkIfTempIndexReorgWorkCanSkip(
store kv.Storage,
tbl table.Table,
allIndexInfos []*model.IndexInfo,
job *model.Job,
) bool {
failpoint.Inject("skipReorgWorkForTempIndex", func(val failpoint.Value) {
if v, ok := val.(bool); ok {
failpoint.Return(v)
}
})
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
ctx.setDDLLabelForTopSQL(job.Query)
firstIdxID := allIndexInfos[0].ID
lastIdxID := allIndexInfos[len(allIndexInfos)-1].ID
var globalIdxIDs []int64
for _, idxInfo := range allIndexInfos {
if idxInfo.Global {
globalIdxIDs = append(globalIdxIDs, idxInfo.ID)
}
}
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs)
}

func checkIfTempIndexIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
firstIdxID, lastIdxID int64,
globalIdxIDs []int64,
) bool {
tblMetaID := tbl.Meta().ID
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID) {
return false
}
}
for _, globalIdxID := range globalIdxIDs {
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID) {
return false
}
}
return true
}
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID)
}

func checkIfTempIndexIsEmptyForPhysicalTable(
ctx *ReorgContext,
store kv.Storage,
pid int64,
firstIdxID, lastIdxID int64,
) bool {
start, end := encodeTempIndexRange(pid, firstIdxID, lastIdxID)
foundKey := false
idxPrefix := tablecodec.GenTableIndexPrefix(pid)
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, math.MaxUint64, start, end,
func(_ kv.Handle, _ kv.Key, _ []byte) (more bool, err error) {
foundKey = true
return false, nil
})
if err != nil {
logutil.DDLLogger().Info("check if temp index is empty failed", zap.Error(err))
return false
}
return !foundKey
}

// pickBackfillType determines which backfill process will be used. The result is
// both stored in job.ReorgMeta.ReorgTp and returned.
func pickBackfillType(job *model.Job) (model.ReorgType, error) {
Expand Down Expand Up @@ -1184,26 +1306,40 @@ func doReorgWorkForCreateIndex(
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
if skipReorg {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
return true, ver, nil
}
return runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
switch allIndexInfos[0].BackfillState {
case model.BackfillStateRunning:
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", allIndexInfos[0].Name.O))
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, jobCtx, job, tbl, allIndexInfos)
} else {
done, ver, err = runIngestReorgJob(w, jobCtx, job, tbl, allIndexInfos)
skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job)
if !skipReorg {
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", allIndexInfos[0].Name.O))
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, jobCtx, job, tbl, allIndexInfos)
} else {
done, ver, err = runIngestReorgJob(w, jobCtx, job, tbl, allIndexInfos)
}
case model.ReorgTypeTxnMerge:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
case model.ReorgTypeTxnMerge:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
if err != nil || !done {
return false, ver, errors.Trace(err)
if err != nil || !done {
return false, ver, errors.Trace(err)
}
} else {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
}
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateReadyToMerge
Expand All @@ -1230,9 +1366,16 @@ func doReorgWorkForCreateIndex(
ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateMerging:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, tbl, allIndexInfos, job)
if !skipReorg {
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
}
} else {
logutil.DDLLogger().Info("temp index is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
}
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index.
Expand Down
Loading