Skip to content

Commit

Permalink
*: refactoring the code of batchChecker pingcap#12108
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 10, 2019
1 parent cdf4566 commit d93a38f
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 20 deletions.
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,50 @@ func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows
return nil
}

// getOldRowNew gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRowNew(sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
158 changes: 157 additions & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/hex"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(rows)
err := e.batchUpdateDupRowsNew(ctx, rows)
if err != nil {
return err
}
Expand All @@ -78,6 +79,161 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
nKeys++
}
nKeys += len(r.uniqueKeys)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, r := range rows {
if r.handleKey != nil {
batchKeys = append(batchKeys, r.handleKey.newKV.key)
}
for _, k := range r.uniqueKeys {
batchKeys = append(batchKeys, k.newKV.key)
}
}
return txn.BatchGet(batchKeys)
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
batchKeys = append(batchKeys, r.t.RecordKey(handle))
}
}
}
_, err := txn.BatchGet(batchKeys)
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}

_, _, _, err = e.doDupRowUpdate(handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
// handle points to nothing.
logutil.Logger(ctx).Error("get old row failed when insert on dup",
zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)),
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
}
return err
}

newRows[i] = nil
break
}

// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
_, err := e.addRecord(newRows[i])
if err != nil {
return err
}
}
}
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
Expand Down
40 changes: 26 additions & 14 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,20 +287,6 @@ func (s *testSuite3) TestAllowInvalidDates(c *C) {
runWithMode("ALLOW_INVALID_DATES")
}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))
}

func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
Expand Down Expand Up @@ -561,3 +547,29 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
}

}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))

tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int,
PRIMARY KEY (a,b),
UNIQUE KEY (b,c,d)
) PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (4),
PARTITION p1 VALUES LESS THAN (7),
PARTITION p2 VALUES LESS THAN (11)
)`)
tk.MustExec("insert into t3 values (1,2,3,4,5)")
tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)")
tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16"))
}
33 changes: 33 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type tikvSnapshot struct {
syncLog bool
keyOnly bool
vars *kv.Variables

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
// the result should not change.
cached map[string][]byte
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand All @@ -75,7 +80,20 @@ func (s *tikvSnapshot) SetPriority(priority int) {
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
// Check the cached value first.
m := make(map[string][]byte)
if s.cached != nil {
tmp := keys[:0]
for _, key := range keys {
if val, ok := s.cached[string(key)]; ok {
m[string(key)] = val
} else {
tmp = append(tmp, key)
}
}
keys = tmp
}

if len(keys) == 0 {
return m, nil
}
Expand Down Expand Up @@ -107,6 +125,14 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
return nil, errors.Trace(err)
}

// Update the cache.
if s.cached == nil {
s.cached = make(map[string][]byte, len(m))
}
for key, value := range m {
s.cached[key] = value
}

return m, nil
}

Expand Down Expand Up @@ -234,6 +260,13 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) {
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
if s.cached != nil {
if value, ok := s.cached[string(k)]; ok {
return value, nil
}
}

sender := NewRegionRequestSender(s.store.regionCache, s.store.client)

req := &tikvrpc.Request{
Expand Down
6 changes: 1 addition & 5 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co

// Row implements table.Table Row interface.
func (t *tableCommon) Row(ctx sessionctx.Context, h int64) ([]types.Datum, error) {
r, err := t.RowWithCols(ctx, h, t.Cols())
if err != nil {
return nil, err
}
return r, nil
return t.RowWithCols(ctx, h, t.Cols())
}

// RemoveRecord implements table.Table RemoveRecord interface.
Expand Down

0 comments on commit d93a38f

Please sign in to comment.