diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 8f430ccce355f..5f699b3507e6f 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -76,6 +76,7 @@ type temporaryIndexRecord struct { delete bool unique bool distinct bool + rowKey kv.Key } type mergeIndexWorker struct { @@ -133,6 +134,14 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC if idxRecord.skip { continue } + + // Lock the corresponding row keys so that it doesn't modify the index KVs + // that are changing by a pessimistic transaction. + err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey) + if err != nil { + return errors.Trace(err) + } + if idxRecord.delete { if idxRecord.unique { err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked) @@ -149,6 +158,7 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC } return nil }) + logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000) return } @@ -166,6 +176,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor oprStartTime := startTime idxPrefix := w.table.IndexPrefix() var lastKey kv.Key + isCommonHandle := w.table.Meta().IsCommonHandle err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) { oprEndTime := time.Now() @@ -182,35 +193,37 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - isDelete := false - unique := false - length := len(rawValue) - keyVer := rawValue[length-1] + originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete { // For 'm' version kvs, they are double-written. // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. return true, nil } - rawValue = rawValue[:length-1] - if bytes.Equal(rawValue, tables.DeleteMarker) { - isDelete = true - } else if bytes.Equal(rawValue, tables.DeleteMarkerUnique) { - isDelete = true - unique = true + + if handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } } + rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle) originIdxKey := make([]byte, len(indexKey)) copy(originIdxKey, indexKey) tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) idxRecord := &temporaryIndexRecord{ + rowKey: rowKey, delete: isDelete, unique: unique, skip: false, } if !isDelete { - idxRecord.vals = rawValue - idxRecord.distinct = tablecodec.IndexKVIsUnique(rawValue) + idxRecord.vals = originVal + idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal) } w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) w.originIdxKeys = append(w.originIdxKeys, originIdxKey) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 431e4791f4aeb..6027c5cee215f 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "testing" + "time" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" @@ -376,3 +377,65 @@ func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) { } tk.MustExec("admin check table t;") } + +func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk.MustExec(`CREATE TABLE t (id int primary key, a int);`) + tk.MustExec(`INSERT INTO t VALUES (1, 1);`) + + // Force onCreateIndex use the txn-merge process. + ingest.LitInitialized = false + tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;") + tk.MustExec("set @@global.tidb_enable_metadata_lock = 0;") + + originHook := dom.DDL().GetHook() + callback := &ddl.TestDDLCallback{Do: dom} + + runPessimisticTxn := false + callback.OnJobRunBeforeExported = func(job *model.Job) { + if t.Failed() { + return + } + if job.SchemaState == model.StateWriteOnly { + // Write a record to the temp index. + _, err := tk2.Exec("update t set a = 2 where id = 1;") + assert.NoError(t, err) + } + if !runPessimisticTxn && job.SchemaState == model.StateWriteReorganization { + idx := findIdxInfo(dom, "test", "t", "idx") + if idx == nil { + return + } + if idx.BackfillState != model.BackfillStateReadyToMerge { + return + } + runPessimisticTxn = true + _, err := tk2.Exec("begin pessimistic;") + assert.NoError(t, err) + _, err = tk2.Exec("update t set a = 3 where id = 1;") + assert.NoError(t, err) + } + } + dom.DDL().SetHook(callback) + afterCommit := make(chan struct{}, 1) + go func() { + tk.MustExec("alter table t add index idx(a);") + afterCommit <- struct{}{} + }() + timer := time.NewTimer(300 * time.Millisecond) + select { + case <-timer.C: + break + case <-afterCommit: + require.Fail(t, "should be blocked by the pessimistic txn") + } + tk2.MustExec("rollback;") + <-afterCommit + dom.DDL().SetHook(originHook) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) +} diff --git a/executor/insert.go b/executor/insert.go index 36af152899bc3..d0b09e302860c 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -15,7 +15,6 @@ package executor import ( - "bytes" "context" "encoding/hex" "fmt" @@ -32,7 +31,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -160,6 +158,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { + if tablecodec.IsTempIndexKey(uk.newKey) { + if tablecodec.CheckTempIndexValueIsDelete(val) { + continue + } + val = tablecodec.DecodeTempIndexOriginValue(val) + } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { return err @@ -267,10 +271,10 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end // of value, So if return a key we check and skip deleted key. if tablecodec.IsTempIndexKey(uk.newKey) { - rowVal := val[:len(val)-1] - if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { + if tablecodec.CheckTempIndexValueIsDelete(val) { continue } + val = tablecodec.DecodeTempIndexOriginValue(val) } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { diff --git a/executor/replace.go b/executor/replace.go index f028d5e6db32c..158a620fb300e 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -179,6 +179,12 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r } return false, false, err } + if tablecodec.IsTempIndexKey(uk.newKey) { + if tablecodec.CheckTempIndexValueIsDelete(val) { + continue + } + val = tablecodec.DecodeTempIndexOriginValue(val) + } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { return false, true, err diff --git a/go.mod b/go.mod index 9cca1a83ee954..8babb1b852f2a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( cloud.google.com/go/storage v1.21.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0 github.com/BurntSushi/toml v1.2.1 @@ -127,7 +128,6 @@ require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.5.0 // indirect cloud.google.com/go/iam v0.1.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect diff --git a/table/tables/index.go b/table/tables/index.go index 1193f9e251758..446a2e7288595 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -15,7 +15,6 @@ package tables import ( - "bytes" "context" "errors" "sync" @@ -179,7 +178,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if !distinct || skipCheck || opt.Untouched { if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = append(idxVal, keyVer) + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) } err = txn.GetMemBuffer().Set(key, idxVal) if err != nil { @@ -187,7 +186,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = append(idxVal, keyVer) + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) } err = txn.GetMemBuffer().Set(tempKey, idxVal) if err != nil { @@ -226,11 +225,11 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } - if err != nil || len(value) == 0 { + if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey tempIndexKeyState + var needPresumeKey TempIndexKeyState if keyIsTempIdxKey { - idxVal = append(idxVal, keyVer) + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) if err != nil { return nil, err @@ -260,7 +259,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, err } if len(tempKey) > 0 { - idxVal = append(idxVal, keyVer) + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) } else { @@ -281,6 +280,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, err } + if keyIsTempIdxKey { + value = tablecodec.DecodeTempIndexOriginValue(value) + } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) if err != nil { return nil, err @@ -288,13 +290,6 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return handle, kv.ErrKeyExists } -var ( - // DeleteMarker is a marker that the key is deleted. - DeleteMarker = []byte("delete") - // DeleteMarkerUnique is a marker that the unique index key is deleted. - DeleteMarkerUnique = []byte("deleteu") -) - // Delete removes the entry for handle h and indexedValues from KV index. func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error { key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil) @@ -312,10 +307,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - val := make([]byte, 0, len(DeleteMarkerUnique)+1) - val = append(val, DeleteMarkerUnique...) - val = append(val, tempKeyVer) - err = txn.GetMemBuffer().Set(tempKey, val) + tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err } @@ -328,10 +321,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - val := make([]byte, 0, len(DeleteMarker)+1) - val = append(val, DeleteMarker...) - val = append(val, tempKeyVer) - err = txn.GetMemBuffer().Set(tempKey, val) + tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err } @@ -508,11 +499,12 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * return colInfo } -type tempIndexKeyState byte +// TempIndexKeyState is the state of the temporary index key. +type TempIndexKeyState byte const ( // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown tempIndexKeyState = iota + KeyInTempIndexUnknown TempIndexKeyState = iota // KeyInTempIndexNotExist the key is not exist in temp index. KeyInTempIndexNotExist // KeyInTempIndexIsDeleted the key is marked deleted in temp index. @@ -524,7 +516,7 @@ const ( ) // KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) { +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { // Only check temp index key. if !tablecodec.IsTempIndexKey(key) { return KeyInTempIndexUnknown, nil, nil @@ -541,24 +533,16 @@ func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, di if len(value) < 1 { return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") } - length := len(value) - // Firstly, we will remove the last byte of key version. - // It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge. - value = value[:length-1] - if distinct { - if bytes.Equal(value, DeleteMarkerUnique) { - return KeyInTempIndexIsDeleted, nil, nil - } - } else { - if bytes.Equal(value, DeleteMarker) { - return KeyInTempIndexIsDeleted, nil, nil - } + + if tablecodec.CheckTempIndexValueIsDelete(value) { + return KeyInTempIndexIsDeleted, nil, nil } // Check if handle equal. var handle kv.Handle if distinct { - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle) + originVal := tablecodec.DecodeTempIndexOriginValue(value) + handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) if err != nil { return KeyInTempIndexUnknown, nil, err } diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index bf6cc2ffb5853..e4513b8cae409 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -15,7 +15,6 @@ package tables import ( - "bytes" "fmt" "strings" @@ -153,11 +152,11 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in value []byte orgKey []byte indexHandle kv.Handle - err error ) if idxID != m.indexID { - value = append(value, m.value[:len(m.value)-1]...) - if len(value) == 0 || (bytes.Equal(value, []byte("delete")) || bytes.Equal(value, []byte("deleteu"))) { + value = tablecodec.DecodeTempIndexOriginValue(m.value) + if len(value) == 0 { + // Skip the deleted operation values. continue } orgKey = append(orgKey, m.key...) @@ -246,7 +245,7 @@ func checkIndexKeys( } // When it is in add index new backfill state. - if len(value) == 0 || (idxID != m.indexID && (bytes.Equal(value, []byte("deleteu")) || bytes.Equal(value, []byte("delete")))) { + if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index c2d98f5a2b17e..c757bb4f82aa0 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1157,6 +1157,100 @@ func IsTempIndexKey(indexKey []byte) bool { return tempIndexID == indexID } +// TempIndexValueFlag is the flag of temporary index value. +type TempIndexValueFlag byte + +const ( + // TempIndexValueFlagNormal means the following value is the normal index value. + TempIndexValueFlagNormal TempIndexValueFlag = iota + // TempIndexValueFlagDeleted means this is a representation of a "delete" operation. + TempIndexValueFlagDeleted +) + +// EncodeTempIndexValue encodes the value of temporary index. +// Note: this function changes the input value. +func EncodeTempIndexValue(value []byte, keyVer byte) []byte { + value = append(value, 0) + copy(value[1:], value[:len(value)-1]) + value[0] = byte(TempIndexValueFlagNormal) // normal flag + value + tempKeyVer + value = append(value, keyVer) + return value +} + +// EncodeTempIndexValueDeletedUnique encodes the value of temporary index for unique index. +func EncodeTempIndexValueDeletedUnique(handle kv.Handle, keyVer byte) []byte { + var hEncoded []byte + var hLen int + if handle.IsInt() { + var data [8]byte + binary.BigEndian.PutUint64(data[:], uint64(handle.IntValue())) + hEncoded = data[:] + hLen = 8 + } else { + hEncoded = handle.Encoded() + hLen = len(hEncoded) + } + val := make([]byte, 0, 1+hLen+1) // deleted flag + handle + tempKeyVer + val = append(val, byte(TempIndexValueFlagDeleted)) + val = append(val, hEncoded...) + val = append(val, keyVer) + return val +} + +// EncodeTempIndexValueDeleted encodes the delete operation on origin index to a value for temporary index. +func EncodeTempIndexValueDeleted(keyVer byte) []byte { + // Handle is not needed because it is already in the key. + val := make([]byte, 0, 2) // deleted flag + tempKeyVer + val = append(val, byte(TempIndexValueFlagDeleted)) + val = append(val, keyVer) + return val +} + +// DecodeTempIndexValue decodes the value of temporary index. +func DecodeTempIndexValue(value []byte, isCommonHandle bool) (originVal []byte, handle kv.Handle, isDelete bool, isUnique bool, keyVer byte) { + if len(value) == 0 { + return nil, nil, false, false, 0 + } + switch TempIndexValueFlag(value[0]) { + case TempIndexValueFlagNormal: + originVal = value[1 : len(value)-1] + keyVer = value[len(value)-1] + case TempIndexValueFlagDeleted: + isDelete = true + if len(value) == 2 { + keyVer = value[1] + } else { + isUnique = true + if isCommonHandle { + handle, _ = kv.NewCommonHandle(value[1 : len(value)-1]) + } else { + handle = decodeIntHandleInIndexValue(value[1 : len(value)-1]) + } + keyVer = value[len(value)-1] + } + } + return +} + +// CheckTempIndexValueIsDelete checks whether the value is a delete operation. +func CheckTempIndexValueIsDelete(value []byte) bool { + if len(value) == 0 { + return false + } + return TempIndexValueFlag(value[0]) == TempIndexValueFlagDeleted +} + +// DecodeTempIndexOriginValue decodes the value of origin index from a temp index value. +func DecodeTempIndexOriginValue(value []byte) []byte { + if len(value) == 0 { + return nil + } + if TempIndexValueFlag(value[0]) == TempIndexValueFlagNormal { + return value[1 : len(value)-1] + } + return nil +} + // GenIndexValuePortal is the portal for generating index value. // Value layout: // diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 6632aaf2c1727..231d58cf18bd3 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -610,3 +610,33 @@ func TestTempIndexKey(t *testing.T) { require.Equal(t, tid, tableID) require.Equal(t, indexID, iid) } + +func TestTempIndexValueCodec(t *testing.T) { + // Test encode temp index value. + encodedValue, err := codec.EncodeValue(&stmtctx.StatementContext{TimeZone: time.UTC}, nil, types.NewIntDatum(1)) + require.NoError(t, err) + encodedValueCopy := make([]byte, len(encodedValue)) + copy(encodedValueCopy, encodedValue) + tempIdxVal := EncodeTempIndexValue(encodedValue, 'b') + originVal, handle, isDelete, unique, keyVer := DecodeTempIndexValue(tempIdxVal, false) + require.Nil(t, handle) + require.False(t, isDelete || unique) + require.Equal(t, keyVer, byte('b')) + require.EqualValues(t, encodedValueCopy, originVal) + + tempIdxVal = EncodeTempIndexValueDeletedUnique(kv.IntHandle(100), 'm') + originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) + require.Equal(t, handle.IntValue(), int64(100)) + require.True(t, isDelete) + require.True(t, unique) + require.Equal(t, keyVer, byte('m')) + require.Empty(t, originVal) + + tempIdxVal = EncodeTempIndexValueDeleted('b') + originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) + require.Nil(t, handle) + require.True(t, isDelete) + require.False(t, unique) + require.Equal(t, keyVer, byte('b')) + require.Empty(t, originVal) +}