diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 824cedfc2a56f..83f3a1ebc272c 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "math" "os" "slices" "strings" @@ -70,6 +69,7 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/generatedexpr" + "github.com/pingcap/tidb/pkg/util/intest" tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" "github.com/pingcap/tidb/pkg/util/size" @@ -1119,6 +1119,7 @@ SwitchIndexState: func checkIfTableReorgWorkCanSkip( store kv.Storage, + sessCtx sessionctx.Context, tbl table.Table, job *model.Job, ) bool { @@ -1126,21 +1127,30 @@ func checkIfTableReorgWorkCanSkip( // Reorg work has begun. return false } + txn, err := sessCtx.Txn(false) + validTxn := err == nil && txn != nil && txn.Valid() + intest.Assert(validTxn) + if !validTxn { + logutil.DDLLogger().Warn("check if table is empty failed", zap.Error(err)) + return false + } + startTS := txn.StartTS() ctx := NewReorgContext() ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName ctx.setDDLLabelForTopSQL(job.Query) - return checkIfTableIsEmpty(ctx, store, tbl) + return checkIfTableIsEmpty(ctx, store, tbl, startTS) } func checkIfTableIsEmpty( ctx *ReorgContext, store kv.Storage, tbl table.Table, + startTS uint64, ) bool { if pTbl, ok := tbl.(table.PartitionedTable); ok { for _, pid := range pTbl.GetAllPartitionIDs() { pTbl := pTbl.GetPartition(pid) - if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl) { + if !checkIfPhysicalTableIsEmpty(ctx, store, pTbl, startTS) { return false } } @@ -1148,15 +1158,17 @@ func checkIfTableIsEmpty( } //nolint:forcetypeassert plainTbl := tbl.(table.PhysicalTable) - return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl) + return checkIfPhysicalTableIsEmpty(ctx, store, plainTbl, startTS) } func checkIfPhysicalTableIsEmpty( ctx *ReorgContext, store kv.Storage, tbl table.PhysicalTable, + startTS uint64, ) bool { - hasRecord, err := ExistsTableRow(ctx, store, math.MaxInt64, tbl) + hasRecord, err := existsTableRow(ctx, store, tbl, startTS) + intest.Assert(err == nil) if err != nil { logutil.DDLLogger().Info("check if table is empty failed", zap.Error(err)) return false @@ -1166,6 +1178,7 @@ func checkIfPhysicalTableIsEmpty( func checkIfTempIndexReorgWorkCanSkip( store kv.Storage, + sessCtx sessionctx.Context, tbl table.Table, allIndexInfos []*model.IndexInfo, job *model.Job, @@ -1179,6 +1192,14 @@ func checkIfTempIndexReorgWorkCanSkip( // Reorg work has begun. return false } + txn, err := sessCtx.Txn(false) + validTxn := err == nil && txn != nil && txn.Valid() + intest.Assert(validTxn) + if !validTxn { + logutil.DDLLogger().Warn("check if temp index is empty failed", zap.Error(err)) + return false + } + startTS := txn.StartTS() ctx := NewReorgContext() ctx.resourceGroupName = job.ReorgMeta.ResourceGroupName ctx.setDDLLabelForTopSQL(job.Query) @@ -1190,7 +1211,7 @@ func checkIfTempIndexReorgWorkCanSkip( globalIdxIDs = append(globalIdxIDs, idxInfo.ID) } } - return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs) + return checkIfTempIndexIsEmpty(ctx, store, tbl, firstIdxID, lastIdxID, globalIdxIDs, startTS) } func checkIfTempIndexIsEmpty( @@ -1199,22 +1220,23 @@ func checkIfTempIndexIsEmpty( tbl table.Table, firstIdxID, lastIdxID int64, globalIdxIDs []int64, + startTS uint64, ) bool { tblMetaID := tbl.Meta().ID if pTbl, ok := tbl.(table.PartitionedTable); ok { for _, pid := range pTbl.GetAllPartitionIDs() { - if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID) { + if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, pid, firstIdxID, lastIdxID, startTS) { return false } } for _, globalIdxID := range globalIdxIDs { - if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID) { + if !checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, globalIdxID, globalIdxID, startTS) { return false } } return true } - return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID) + return checkIfTempIndexIsEmptyForPhysicalTable(ctx, store, tblMetaID, firstIdxID, lastIdxID, startTS) } func checkIfTempIndexIsEmptyForPhysicalTable( @@ -1222,15 +1244,17 @@ func checkIfTempIndexIsEmptyForPhysicalTable( store kv.Storage, pid int64, firstIdxID, lastIdxID int64, + startTS uint64, ) bool { start, end := encodeTempIndexRange(pid, firstIdxID, lastIdxID) foundKey := false idxPrefix := tablecodec.GenTableIndexPrefix(pid) - err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, math.MaxUint64, start, end, + err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, idxPrefix, startTS, start, end, func(_ kv.Handle, _ kv.Key, _ []byte) (more bool, err error) { foundKey = true return false, nil }) + intest.Assert(err == nil) if err != nil { logutil.DDLLogger().Info("check if temp index is empty failed", zap.Error(err)) return false @@ -1306,7 +1330,7 @@ func doReorgWorkForCreateIndex( return false, ver, err } if !reorgTp.NeedMergeProcess() { - skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job) + skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job) if skipReorg { logutil.DDLLogger().Info("table is empty, skipping reorg work", zap.Int64("jobID", job.ID), @@ -1317,7 +1341,7 @@ func doReorgWorkForCreateIndex( } switch allIndexInfos[0].BackfillState { case model.BackfillStateRunning: - skipReorg := checkIfTableReorgWorkCanSkip(w.store, tbl, job) + skipReorg := checkIfTableReorgWorkCanSkip(w.store, w.sess.Session(), tbl, job) if !skipReorg { logutil.DDLLogger().Info("index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), @@ -1337,6 +1361,7 @@ func doReorgWorkForCreateIndex( return false, ver, errors.Trace(err) } } else { + failpoint.InjectCall("afterCheckTableReorgCanSkip") logutil.DDLLogger().Info("table is empty, skipping reorg work", zap.Int64("jobID", job.ID), zap.String("table", tbl.Meta().Name.O)) @@ -1367,13 +1392,14 @@ func doReorgWorkForCreateIndex( ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: - skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, tbl, allIndexInfos, job) + skipReorg := checkIfTempIndexReorgWorkCanSkip(w.store, w.sess.Session(), tbl, allIndexInfos, job) if !skipReorg { done, ver, err = runReorgJobAndHandleErr(w, jobCtx, job, tbl, allIndexInfos, true) if !done { return false, ver, err } } else { + failpoint.InjectCall("afterCheckTempIndexReorgCanSkip") logutil.DDLLogger().Info("temp index is empty, skipping reorg work", zap.Int64("jobID", job.ID), zap.String("table", tbl.Meta().Name.O)) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 40201d2e4b040..e4f388cd9d28f 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -767,22 +767,19 @@ func GetTableMaxHandle(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl return kv.IntHandle(row.GetInt64(0)), false, nil } -// ExistsTableRow checks if there is at least one row in the specified table. +// existsTableRow checks if there is at least one row in the specified table. // In case of an error during the operation, it returns false along with the error. -func ExistsTableRow(ctx *ReorgContext, store kv.Storage, startTS uint64, tbl table.PhysicalTable) (bool, error) { - handleCols := buildHandleCols(tbl) - result, err := buildOneRowTableScan(ctx, store, startTS, tbl, handleCols, 1, false) - if err != nil { - return false, errors.Trace(err) - } - defer terror.Call(result.Close) - - chk := chunk.New(getColumnsTypes(handleCols), 1, 1) - err = result.Next(ctx.ddlJobCtx, chk) +func existsTableRow(ctx *ReorgContext, store kv.Storage, tbl table.PhysicalTable, startTS uint64) (bool, error) { + found := false + err := iterateSnapshotKeys(ctx, store, kv.PriorityLow, tbl.RecordPrefix(), startTS, nil, nil, + func(_ kv.Handle, _ kv.Key, _ []byte) (bool, error) { + found = true + return false, nil + }) if err != nil { return false, errors.Trace(err) } - return chk.NumRows() != 0, nil + return found, nil } func buildHandleCols(tbl table.PhysicalTable) []*model.ColumnInfo { diff --git a/pkg/ddl/tests/indexmerge/BUILD.bazel b/pkg/ddl/tests/indexmerge/BUILD.bazel index ad5186396e2ca..9b9580df9f293 100644 --- a/pkg/ddl/tests/indexmerge/BUILD.bazel +++ b/pkg/ddl/tests/indexmerge/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 21, + shard_count = 23, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/ddl/tests/indexmerge/merge_test.go b/pkg/ddl/tests/indexmerge/merge_test.go index c808dd79cb18b..5055cce81652b 100644 --- a/pkg/ddl/tests/indexmerge/merge_test.go +++ b/pkg/ddl/tests/indexmerge/merge_test.go @@ -857,3 +857,77 @@ func TestAddUniqueIndexFalsePositiveDuplicate(t *testing.T) { tk.MustExec("alter table t add unique index idx(b);") tk.MustExec("admin check table t;") } + +func TestAddIndexSkipReorgCheck(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + + skipTableReorg := false + skipTempIdxReorg := false + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() { + skipTableReorg = true + }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() { + skipTempIdxReorg = true + }) + tk.MustExec("alter table t add index idx1(a);") + require.True(t, skipTableReorg) + require.True(t, skipTempIdxReorg) + + skipTableReorg = false + skipTempIdxReorg = false + tk.MustExec("insert into t values (1);") + tk.MustExec("alter table t add index idx2(a);") + require.False(t, skipTableReorg) + require.True(t, skipTempIdxReorg) + + skipTableReorg = false + skipTempIdxReorg = false + var runDML bool + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunAfter", func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("insert into t values (2);") + runDML = true + } + }) + tk.MustExec("alter table t add index idx3(a);") + require.False(t, skipTableReorg) + require.False(t, skipTempIdxReorg) + tk.MustQuery("select * from t;").Check(testkit.Rows("1", "2")) + tk.MustExec("admin check table t;") +} + +func TestAddIndexInsertAfterReorgSkipCheck(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip", func() { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("insert into t values (1);") + }) + tk.MustExec("alter table t add index idx(a);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1")) + tk.MustExec("admin check table t;") + err := failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterCheckTableReorgCanSkip") + require.NoError(t, err) + + tk.MustExec("truncate table t;") + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterCheckTempIndexReorgCanSkip", func() { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("insert into t values (2);") + }) + tk.MustExec("alter table t add index idx2(a);") + tk.MustQuery("select * from t;").Check(testkit.Rows("2")) + tk.MustExec("admin check table t;") +}