Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: skip adding index reorg work for empty tables #56406

Merged
merged 27 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ec282c9
ddl: skip adding index reorg work for empty tables
tangenta Sep 29, 2024
eac776f
refine test code
tangenta Sep 29, 2024
8ed23d3
Merge remote-tracking branch 'upstream/master' into add-index-skip-reorg
tangenta Sep 29, 2024
60b1a2d
fix linter
tangenta Sep 29, 2024
83b1490
address comment and fix TestAddIndexMultipleDelete
tangenta Sep 29, 2024
04b322d
fix TestAddIndexMergeVersionIndexValue
tangenta Sep 29, 2024
4836b7d
refine function name
tangenta Sep 29, 2024
a92a3e7
move empty check into doReorgWorkForCreateIndex
tangenta Sep 29, 2024
71cce20
refine naming
tangenta Sep 29, 2024
084c68f
fix code and linter
tangenta Sep 29, 2024
b416be3
consistent naming
tangenta Sep 29, 2024
4250966
return if job.SnapshotVersion is not zero
tangenta Sep 29, 2024
afc6035
fix linter
tangenta Sep 29, 2024
834d295
fix TestMultiSchemaAddIndexMerge
tangenta Sep 30, 2024
fbb2854
fix TestAddIndexMergeReplaceDelete
tangenta Sep 30, 2024
d25c52d
make naming consistent
tangenta Sep 30, 2024
7dbedd1
send asc scan request instead
tangenta Sep 30, 2024
9fe9991
dont rely on stat cache
tangenta Oct 8, 2024
477b41b
fix TestDumpPlanReplayerAPI
tangenta Oct 16, 2024
0c08e25
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 16, 2024
63ec9cb
fix TestDumpPlanReplayerAPI
tangenta Oct 16, 2024
f3a9278
test: fix TestAddGlobalIndexInIngest
tangenta Oct 17, 2024
519180b
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 21, 2024
0b6423c
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 22, 2024
a4c44c4
Merge remote-tracking branch 'upstream/master' into HEAD
tangenta Oct 24, 2024
4a5e30a
set label for topsql
tangenta Oct 25, 2024
da86bd9
fix TestPauseFailedOnCommit
tangenta Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 142 additions & 18 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -830,6 +831,108 @@ SwitchIndexState:
return ver, errors.Trace(err)
}

func checkIfReorgTableWorkCanSkip(
store kv.Storage,
tbl table.Table,
job *model.Job,
) bool {
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
return checkIfTableIsEmpty(ctx, store, tbl)

}

func checkIfTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
) bool {
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
pTbl := pTbl.GetPartition(pid)
if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl) {
return false
}
}
return true
}
plainTbl := tbl.(table.PhysicalTable)
return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl)
}

func checkIfPhysicalTableIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.PhysicalTable,
) bool {
_, isEmpty, err := GetTableMaxHandle(ctx, store, math.MaxInt64, tbl)
if err != nil {
logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err))
return false
}
return isEmpty
}

func checkIfTempIndexReorgWorkCanSkip(
store kv.Storage,
tbl table.Table,
allIndexInfos []*model.IndexInfo,
job *model.Job,
) bool {
if job.SnapshotVer != 0 {
// Reorg work has begun.
return false
}
ctx := NewReorgContext()
ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName
firstIdxID := allIndexInfos[0].ID
lastIdxID := allIndexInfos[len(allIndexInfos)-1].ID
return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID)
}

func checkIfTempIndexIsEmpty(
ctx *ReorgContext,
store kv.Storage,
tbl table.Table,
firstIdxID, lastIdxID int64,
) bool {
if pTbl, ok := tbl.(table.PartitionedTable); ok {
for _, pid := range pTbl.GetAllPartitionIDs() {
pTbl := pTbl.GetPartition(pid)
if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pTbl, firstIdxID, lastIdxID) {
return false
}
}
return true
}
plainTbl := tbl.(table.PhysicalTable)
return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, plainTbl, firstIdxID, lastIdxID)
}

func checkIfTempIndexIsEmptyForPhysicalTable(
ctx *ReorgContext,
store kv.Storage,
tbl table.PhysicalTable,
firstIdxID, lastIdxID int64,
) bool {
start, end := encodeTempIndexRange(tbl.Meta().ID, firstIdxID, lastIdxID)
foundKey := false
err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, tbl.IndexPrefix(), math.MaxUint64, start, end,
func(_ kv.Handle, _ kv.Key, _ []byte) (more bool, err error) {
foundKey = true
return false, nil
})
if err != nil {
logutil.DDLLogger().Info("check if temp index is empty failed", zap.Error(err))
return false
}
return !foundKey
}

// pickBackfillType determines which backfill process will be used. The result is
// both stored in job.ReorgMeta.ReorgTp and returned.
func pickBackfillType(job *model.Job) (model.ReorgType, error) {
Expand Down Expand Up @@ -898,26 +1001,40 @@ func doReorgWorkForCreateIndex(
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
skipReorg := checkIfReorgTableWorkCanSkip(w.store, tbl, job)
if skipReorg {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
return true, ver, nil
}
return runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
switch allIndexInfos[0].BackfillState {
case model.BackfillStateRunning:
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", allIndexInfos[0].Name.O))
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, jobCtx, job, tbl, allIndexInfos)
} else {
done, ver, err = runIngestReorgJob(w, jobCtx, job, tbl, allIndexInfos)
skipReorg := checkIfReorgTableWorkCanSkip(w.store, tbl, job)
if !skipReorg {
logutil.DDLLogger().Info("index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", allIndexInfos[0].Name.O))
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, jobCtx, job, tbl, allIndexInfos)
} else {
done, ver, err = runIngestReorgJob(w, jobCtx, job, tbl, allIndexInfos)
}
case model.ReorgTypeTxnMerge:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
case model.ReorgTypeTxnMerge:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, false)
}
if err != nil || !done {
return false, ver, errors.Trace(err)
if err != nil || !done {
return false, ver, errors.Trace(err)
}
} else {
logutil.DDLLogger().Info("table is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
}
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateReadyToMerge
Expand All @@ -944,9 +1061,16 @@ func doReorgWorkForCreateIndex(
ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true)
return false, ver, errors.Trace(err)
case model.BackfillStateMerging:
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, tbl, allIndexInfos, job)
if !skipReorg {
done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true)
if !done {
return false, ver, err
}
} else {
logutil.DDLLogger().Info("temp index is empty, skipping reorg work",
zap.Int64("jobID", job.ID),
zap.String("table", tbl.Meta().Name.O))
}
for _, indexInfo := range allIndexInfos {
indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index.
Expand Down
63 changes: 28 additions & 35 deletions pkg/ddl/primary_key_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ import (
"github.com/tikv/client-go/v2/testutils"
)

func getTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage) (kv.Handle, bool) {
func getTableMaxHandle(t *testing.T, tbl table.Table, store kv.Storage) (kv.Handle, bool) {
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
maxHandle, emptyTable, err := ddl.GetTableMaxHandle(ddl.NewReorgContext(), store, ver.Ver, tbl.(table.PhysicalTable))
require.NoError(t, err)
return maxHandle, emptyTable
}

func checkTableMaxHandle(t *testing.T, d ddl.DDL, tbl table.Table, store kv.Storage, expectedEmpty bool, expectedMaxHandle kv.Handle) {
maxHandle, emptyHandle := getTableMaxHandle(t, d, tbl, store)
func checkTableMaxHandle(t *testing.T, tbl table.Table, store kv.Storage, expectedEmpty bool, expectedMaxHandle kv.Handle) {
maxHandle, emptyHandle := getTableMaxHandle(t, tbl, store)
require.Equal(t, expectedEmpty, emptyHandle)
if expectedEmpty {
require.True(t, emptyHandle)
Expand Down Expand Up @@ -78,18 +78,16 @@ func TestMultiRegionGetTableEndHandle(t *testing.T) {
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

d := dom.DDL()

// Split the table.
tableStart := tablecodec.GenTableRecordPrefix(tbl.Meta().ID)
cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100)
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(999))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(999))

tk.MustExec("insert into t values(10000, 1000)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(10000))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(10000))

tk.MustExec("insert into t values(-1, 1000)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(10000))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(10000))
}

func TestGetTableEndHandle(t *testing.T) {
Expand All @@ -102,25 +100,24 @@ func TestGetTableEndHandle(t *testing.T) {
tk.MustExec("create table t(a bigint PRIMARY KEY, b int)")

is := dom.InfoSchema()
d := dom.DDL()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// test empty table
checkTableMaxHandle(t, d, tbl, store, true, nil)
checkTableMaxHandle(t, tbl, store, true, nil)

tk.MustExec("insert into t values(-1, 1)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(-1))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(-1))

tk.MustExec("insert into t values(9223372036854775806, 1)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(9223372036854775806))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(9223372036854775806))

tk.MustExec("insert into t values(9223372036854775807, 1)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(9223372036854775807))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(9223372036854775807))

tk.MustExec("insert into t values(10, 1)")
tk.MustExec("insert into t values(102149142, 1)")
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(9223372036854775807))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(9223372036854775807))

tk.MustExec("create table t1(a bigint PRIMARY KEY, b int)")

Expand All @@ -135,15 +132,15 @@ func TestGetTableEndHandle(t *testing.T) {
is = dom.InfoSchema()
tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
checkTableMaxHandle(t, d, tbl, store, false, kv.IntHandle(999))
checkTableMaxHandle(t, tbl, store, false, kv.IntHandle(999))

// Test PK is not handle
tk.MustExec("create table t2(a varchar(255))")

is = dom.InfoSchema()
tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
checkTableMaxHandle(t, d, tbl, store, true, nil)
checkTableMaxHandle(t, tbl, store, true, nil)

builder.Reset()
_, _ = fmt.Fprintf(&builder, "insert into t2 values ")
Expand All @@ -154,31 +151,31 @@ func TestGetTableEndHandle(t *testing.T) {
tk.MustExec(sql[:len(sql)-1])

result := tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxHandle, emptyTable := getTableMaxHandle(t, d, tbl, store)
maxHandle, emptyTable := getTableMaxHandle(t, tbl, store)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxHandle.IntValue())))
require.False(t, emptyTable)

tk.MustExec("insert into t2 values(100000)")
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxHandle, emptyTable = getTableMaxHandle(t, d, tbl, store)
maxHandle, emptyTable = getTableMaxHandle(t, tbl, store)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxHandle.IntValue())))
require.False(t, emptyTable)

tk.MustExec(fmt.Sprintf("insert into t2 values(%v)", math.MaxInt64-1))
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxHandle, emptyTable = getTableMaxHandle(t, d, tbl, store)
maxHandle, emptyTable = getTableMaxHandle(t, tbl, store)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxHandle.IntValue())))
require.False(t, emptyTable)

tk.MustExec(fmt.Sprintf("insert into t2 values(%v)", math.MaxInt64))
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxHandle, emptyTable = getTableMaxHandle(t, d, tbl, store)
maxHandle, emptyTable = getTableMaxHandle(t, tbl, store)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxHandle.IntValue())))
require.False(t, emptyTable)

tk.MustExec("insert into t2 values(100)")
result = tk.MustQuery("select MAX(_tidb_rowid) from t2")
maxHandle, emptyTable = getTableMaxHandle(t, d, tbl, store)
maxHandle, emptyTable = getTableMaxHandle(t, tbl, store)
result.Check(testkit.Rows(fmt.Sprintf("%v", maxHandle.IntValue())))
require.False(t, emptyTable)
}
Expand Down Expand Up @@ -208,18 +205,16 @@ func TestMultiRegionGetTableEndCommonHandle(t *testing.T) {
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

d := dom.DDL()

// Split the table.
tableStart := tablecodec.GenTableRecordPrefix(tbl.Meta().ID)
cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100)
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "999", 999, 999))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "999", 999, 999))

tk.MustExec("insert into t values('a', 1, 1, 1)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "a", 1, 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "a", 1, 1))

tk.MustExec("insert into t values('0000', 1, 1, 1)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "a", 1, 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "a", 1, 1))
}

func TestGetTableEndCommonHandle(t *testing.T) {
Expand All @@ -232,31 +227,29 @@ func TestGetTableEndCommonHandle(t *testing.T) {
tk.MustExec("create table t1(a varchar(15), b bigint, c int, primary key (a(2), b))")

is := dom.InfoSchema()
d := dom.DDL()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// test empty table
checkTableMaxHandle(t, d, tbl, store, true, nil)
checkTableMaxHandle(t, tbl, store, true, nil)

tk.MustExec("insert into t values('abc', 1, 10)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "abc", 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "abc", 1))

tk.MustExec("insert into t values('abchzzzzzzzz', 1, 10)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "abchzzzzzzzz", 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "abchzzzzzzzz", 1))
tk.MustExec("insert into t values('a', 1, 10)")
tk.MustExec("insert into t values('ab', 1, 10)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "abchzzzzzzzz", 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "abchzzzzzzzz", 1))

// Test MaxTableRowID with prefixed primary key.
tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
d = dom.DDL()
checkTableMaxHandle(t, d, tbl, store, true, nil)
checkTableMaxHandle(t, tbl, store, true, nil)
tk.MustExec("insert into t1 values('abccccc', 1, 10)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "ab", 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "ab", 1))
tk.MustExec("insert into t1 values('azzzz', 1, 10)")
checkTableMaxHandle(t, d, tbl, store, false, testutil.MustNewCommonHandle(t, "az", 1))
checkTableMaxHandle(t, tbl, store, false, testutil.MustNewCommonHandle(t, "az", 1))
}

func TestCreateClusteredIndex(t *testing.T) {
Expand Down
Loading