diff --git a/executor/replace.go b/executor/replace.go index 9fc306ddbf923..0865a9b61da71 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -17,7 +17,9 @@ import ( "context" "fmt" + "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -52,13 +54,19 @@ func (e *ReplaceExec) Open(ctx context.Context) error { // removeRow removes the duplicate row and cleanup its keys in the key-value map, // but if the to-be-removed row equals to the to-be-added row, no remove or add things to do. -func (e *ReplaceExec) removeRow(ctx context.Context, handle int64, r toBeCheckedRow) (bool, error) { +func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle int64, r toBeCheckedRow) (bool, error) { newRow := r.row - oldRow, err := e.batchChecker.getOldRow(e.ctx, r.t, handle, e.GenExprs) + oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs) if err != nil { - logutil.BgLogger().Error("get old row failed when replace", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row))) + logutil.BgLogger().Error("get old row failed when replace", + zap.Int64("handle", handle), + zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row))) + if kv.IsErrNotFound(err) { + err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle) + } return false, err } + rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow) if err != nil { return false, err @@ -73,36 +81,40 @@ func (e *ReplaceExec) removeRow(ctx context.Context, handle int64, r toBeChecked return false, err } e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) - - // Cleanup keys map, because the record was removed. - err = e.deleteDupKeys(ctx, e.ctx, r.t, [][]types.Datum{oldRow}) - if err != nil { - return false, err - } return false, nil } // replaceRow removes all duplicate rows for one row, then inserts it. func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { + txn, err := e.ctx.Txn(true) + if err != nil { + return err + } + if r.handleKey != nil { - if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { - handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) - if err != nil { - return err - } - rowUnchanged, err := e.removeRow(ctx, handle, r) + handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) + if err != nil { + return err + } + + if _, err := txn.Get(ctx, r.handleKey.newKV.key); err == nil { + rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { return err } if rowUnchanged { return nil } + } else { + if !kv.IsErrNotFound(err) { + return err + } } } // Keep on removing duplicated rows. for { - rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, r) + rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r) if err != nil { return err } @@ -116,11 +128,10 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { } // No duplicated rows now, insert the row. - newHandle, err := e.addRecord(ctx, r.row) + _, err = e.addRecord(ctx, r.row) if err != nil { return err } - e.fillBackKeys(r.t, r, newHandle) return nil } @@ -130,19 +141,25 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 2. bool: true when found the duplicated key. This only means that duplicated key was found, // and the row was removed. // 3. error: the error. -func (e *ReplaceExec) removeIndexRow(ctx context.Context, r toBeCheckedRow) (bool, bool, error) { +func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - if val, found := e.dupKVs[string(uk.newKV.key)]; found { - handle, err := tables.DecodeHandle(val) - if err != nil { - return false, found, err - } - rowUnchanged, err := e.removeRow(ctx, handle, r) - if err != nil { - return false, found, err + val, err := txn.Get(ctx, uk.newKV.key) + if err != nil { + if kv.IsErrNotFound(err) { + continue } - return rowUnchanged, found, nil + return false, false, err + } + + handle, err := tables.DecodeHandle(val) + if err != nil { + return false, true, err + } + rowUnchanged, err := e.removeRow(ctx, txn, handle, r) + if err != nil { + return false, true, err } + return rowUnchanged, true, nil } return false, false, nil } @@ -160,18 +177,26 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { * because in this case, one row was inserted after the duplicate was deleted. * See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html */ - err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows) + + // Get keys need to be checked. + toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, newRows) if err != nil { return err } - // Batch get the to-be-replaced rows in storage. - err = e.initDupOldRowValue(ctx, e.ctx, e.Table, newRows) + txn, err := e.ctx.Txn(true) if err != nil { return err } + + // Use BatchGet to fill cache. + // It's an optimization and could be removed without affecting correctness. + if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { + return err + } + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows))) - for _, r := range e.toBeCheckedRows { + for _, r := range toBeCheckedRows { err = e.replaceRow(ctx, r) if err != nil { return err