Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactoring the code of batchChecker #12108 #12141

Merged
merged 7 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type AnalyzeExec struct {
var (
// MaxSampleSize is the size of samples for once analyze.
// It's public for test.
MaxSampleSize = 10000
MaxSampleSize = int64(10000)
// RandSeed is the seed for randing package.
// It's public for test.
RandSeed = int64(1)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
collectors[i] = &statistics.SampleCollector{
IsMerger: true,
FMSketch: statistics.NewFMSketch(maxSketchSize),
MaxSampleSize: int64(MaxSampleSize),
MaxSampleSize: atomic.LoadInt64(&MaxSampleSize),
CMSketch: statistics.NewCMSketch(defaultCMSketchDepth, defaultCMSketchWidth),
}
}
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st
}

randPos := make([]uint64, 0, MaxSampleSize+1)
for i := 0; i < MaxSampleSize; i++ {
for i := 0; i < int(MaxSampleSize); i++ {
randPos = append(randPos, uint64(rander.Int63n(int64(e.rowCount))))
}
sort.Slice(randPos, func(i, j int) bool { return randPos[i] < randPos[j] })
Expand Down
9 changes: 5 additions & 4 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -158,8 +159,8 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 20
executor.RandSeed = 123
atomic.StoreInt64(&executor.MaxSampleSize, 20)
atomic.StoreInt64(&executor.RandSeed, 123)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -229,8 +230,8 @@ func (s *testSuite1) TestFastAnalyze(c *C) {
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 6
executor.RandSeed = 123
atomic.StoreInt64(&executor.MaxSampleSize, 6)
atomic.StoreInt64(&executor.RandSeed, 123)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,50 @@ func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows
return nil
}

// getOldRowNew gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRowNew(sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
146 changes: 145 additions & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/hex"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(rows)
err := e.batchUpdateDupRowsNew(ctx, rows)
if err != nil {
return err
}
Expand All @@ -78,6 +79,149 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
nKeys++
}
nKeys += len(r.uniqueKeys)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, r := range rows {
if r.handleKey != nil {
batchKeys = append(batchKeys, r.handleKey.newKV.key)
}
for _, k := range r.uniqueKeys {
batchKeys = append(batchKeys, k.newKV.key)
}
}
return txn.BatchGet(batchKeys)
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
batchKeys = append(batchKeys, r.t.RecordKey(handle))
}
}
}
_, err := txn.BatchGet(batchKeys)
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}

_, _, _, err = e.doDupRowUpdate(handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
// handle points to nothing.
logutil.Logger(ctx).Error("get old row failed when insert on dup",
zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)),
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
}
return err
}

newRows[i] = nil
break
}

// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
_, err := e.addRecord(newRows[i])
if err != nil {
return err
}
}
}
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
Expand Down
40 changes: 26 additions & 14 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,20 +287,6 @@ func (s *testSuite3) TestAllowInvalidDates(c *C) {
runWithMode("ALLOW_INVALID_DATES")
}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))
}

func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
Expand Down Expand Up @@ -561,3 +547,29 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) {
}

}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))

tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int,
PRIMARY KEY (a,b),
UNIQUE KEY (b,c,d)
) PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (4),
PARTITION p1 VALUES LESS THAN (7),
PARTITION p2 VALUES LESS THAN (11)
)`)
tk.MustExec("insert into t3 values (1,2,3,4,5)")
tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)")
tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16"))
}
17 changes: 17 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,21 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk2.MustExec("commit")
_, err := tk.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
tk.MustExec("truncate table conflict")
tk.MustExec("insert into conflict values (1, 2)")
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")

tk2.MustExec("update conflict set c = c - 1")

// Make the txn update its forUpdateTS.
tk.MustQuery("select * from conflict where id = 1 for update").Check(testkit.Rows("1 1"))
// Cover a bug that the txn snapshot doesn't invalidate cache after ts change.
tk.MustExec("insert into conflict values (1, 999) on duplicate key update c = c + 2")
tk.MustExec("commit")
tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3"))
}
Loading