Skip to content

Commit

Permalink
planner,executor: fix 'select ...(join on partition table) for update…
Browse files Browse the repository at this point in the history
…' panic (#21148) (#25502)
  • Loading branch information
ti-srebot authored Jun 17, 2021
1 parent 709b68e commit 4ca5d03
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 45 deletions.
37 changes: 37 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -2673,6 +2683,9 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down Expand Up @@ -2703,6 +2716,15 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
return newOffset(idx)
}
}
return 0
}

func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor {
startTs, err := b.getSnapshotTS()
if err != nil {
Expand Down Expand Up @@ -2748,6 +2770,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := ts.IsPartition(); ok {
return ret
}

pi := ts.Table.GetPartitionInfo()
if pi == nil {
Expand Down Expand Up @@ -2966,6 +2992,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E
if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := is.IsPartition(); ok {
return ret
}

pi := is.Table.GetPartitionInfo()
if pi == nil {
Expand Down Expand Up @@ -3067,6 +3097,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down Expand Up @@ -3126,6 +3159,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
if is.Index.Global {
return ret
}
if ok, _ := is.IsPartition(); ok {
// Already pruned when translated to logical union.
return ret
}

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
Expand Down
22 changes: 13 additions & 9 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ type IndexLookUpExecutor struct {
PushedLimit *plannercore.PushedDownLimit

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional
}

type getHandleType int8
Expand Down Expand Up @@ -638,15 +641,16 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
Expand Down
42 changes: 16 additions & 26 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,31 +890,22 @@ type SelectLockExec struct {
Lock *ast.SelectLockInfo
keys []kv.Key

tblID2Handle map[int64][]plannercore.HandleCols
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
}

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
return e.baseExecutor.Open(ctx)
}

// Next implements the Executor Next interface.
Expand All @@ -932,15 +923,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
offset := e.tblID2PIDColumnIndex[id]
physicalID = row.GetInt64(offset)
}

for _, col := range cols {
Expand Down
5 changes: 5 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7591,6 +7591,11 @@ func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit,

func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)

// Set projection concurrency to avoid data race here.
// TODO: remove this line after fixing https://github.com/pingcap/tidb/issues/25496
tk1.Se.GetSessionVars().Concurrency.SetProjectionConcurrency(0)

tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
Expand Down
159 changes: 159 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math/rand"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -2614,6 +2615,164 @@ partition p2 values less than (10))`)
tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9"))
}

func (s *partitionTableSuite) TestIssue20028(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("set @@tidb_partition_prune_mode='static-only'")
tk.MustExec(`create table t1 (c_datetime datetime, primary key (c_datetime))
partition by range (to_days(c_datetime)) ( partition p0 values less than (to_days('2020-02-01')),
partition p1 values less than (to_days('2020-04-01')),
partition p2 values less than (to_days('2020-06-01')),
partition p3 values less than maxvalue)`)
tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))")
tk.MustExec("insert into t1 values ('2020-06-26 03:24:00'), ('2020-02-21 07:15:33'), ('2020-04-27 13:50:58')")
tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')")
tk.MustExec("begin")
tk.MustQuery("select * from t1 join t2 on t1.c_datetime >= t2.c_datetime for update").
Sort().
Check(testkit.Rows(
"2020-02-21 07:15:33 2020-01-10 09:36:00",
"2020-02-21 07:15:33 2020-02-04 06:00:00",
"2020-04-27 13:50:58 2020-01-10 09:36:00",
"2020-04-27 13:50:58 2020-02-04 06:00:00",
"2020-06-26 03:24:00 2020-01-10 09:36:00",
"2020-06-26 03:24:00 2020-02-04 06:00:00",
"2020-06-26 03:24:00 2020-06-12 03:45:18"))
tk.MustExec("rollback")
}

func (s *partitionTableSuite) TestSelectLockOnPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists pt")
tk.MustExec(`create table pt (id int primary key, k int, c int, index(k))
partition by range (id) (
partition p0 values less than (4),
partition p1 values less than (7),
partition p2 values less than (11))`)

tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")

optimisticTableReader := func() {
tk.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk.MustExec("begin")
tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5"))
tk2.MustExec("update pt set c = c + 1 where k = 5")
_, err := tk.Exec("commit")
c.Assert(err, NotNil) // Write conflict
}

optimisticIndexReader := func() {
tk.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk.MustExec("begin")
// This is not index reader actually.
tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5"))
tk2.MustExec("update pt set c = c + 1 where k = 5")
_, err := tk.Exec("commit")
c.Assert(err, NotNil)
}

optimisticIndexLookUp := func() {
tk.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk.MustExec("begin")
tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5"))
tk2.MustExec("update pt set c = c + 1 where k = 5")
_, err := tk.Exec("commit")
c.Assert(err, NotNil)
}

pessimisticTableReader := func() {
tk.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk.MustExec("begin")
tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5"))
ch := make(chan int, 2)
go func() {
tk2.MustExec("update pt set c = c + 1 where k = 5")
ch <- 1
}()
time.Sleep(100 * time.Millisecond)
ch <- 2

// Check the operation in the goroutine is blocked, if not the first result in
// the channel should be 1.
c.Assert(<-ch, Equals, 2)

tk.MustExec("commit")
<-ch
tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6"))
}

pessimisticIndexReader := func() {
tk.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk.MustExec("begin")
// This is not index reader actually.
tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5"))
ch := make(chan int, 2)
go func() {
tk2.MustExec("update pt set c = c + 1 where k = 5")
ch <- 1
}()
time.Sleep(100 * time.Millisecond)
ch <- 2

// Check the operation in the goroutine is blocked,
c.Assert(<-ch, Equals, 2)

tk.MustExec("commit")
<-ch
tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6"))
}

pessimisticIndexLookUp := func() {
tk.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk.MustExec("begin")
tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5"))
ch := make(chan int, 2)
go func() {
tk2.MustExec("update pt set c = c + 1 where k = 5")
ch <- 1
}()
time.Sleep(100 * time.Millisecond)
ch <- 2

// Check the operation in the goroutine is blocked,
c.Assert(<-ch, Equals, 2)

tk.MustExec("commit")
<-ch
tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6"))
}

partitionModes := []string{
"'dynamic-only'",
"'static-only'",
}
testCases := []func(){
optimisticTableReader,
optimisticIndexLookUp,
optimisticIndexReader,
pessimisticTableReader,
pessimisticIndexReader,
pessimisticIndexLookUp,
}

for _, mode := range partitionModes {
tk.MustExec("set @@tidb_partition_prune_mode=" + mode)
for _, c := range testCases {
tk.MustExec("replace into pt values (5, 5, 5)")
c()
}
}
}

func (s *globalIndexSuite) TestIssue21731(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists p, t")
Expand Down
Loading

0 comments on commit 4ca5d03

Please sign in to comment.