Skip to content

Commit

Permalink
planner,executor: fix 'select ...(join on partition table) for update…
Browse files Browse the repository at this point in the history
…' panic (#21148) (#25501)
  • Loading branch information
ti-srebot authored Jul 8, 2021
1 parent 3be56ed commit 7dbfdc0
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 41 deletions.
27 changes: 27 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -2342,6 +2352,9 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand All @@ -2368,6 +2381,17 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

const modelExtraPidColID = -2

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == modelExtraPidColID {
return newOffset(idx)
}
}
return 0
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *TableReaderExecutor {
Expand Down Expand Up @@ -2516,6 +2540,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down
22 changes: 13 additions & 9 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ type IndexLookUpExecutor struct {
PushedLimit *plannercore.PushedDownLimit

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional
}

type checkIndexValue struct {
Expand Down Expand Up @@ -533,15 +536,16 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: e.table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: e.table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
Expand Down
36 changes: 13 additions & 23 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,12 @@ type SelectLockExec struct {

tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable
// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
Expand All @@ -876,22 +882,7 @@ type SelectLockExec struct {

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
return e.baseExecutor.Open(ctx)
}

// Next implements the Executor Next interface.
Expand All @@ -909,15 +900,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
offset := e.tblID2PIDColumnIndex[id]
physicalID = row.GetInt64(offset)
}

for _, col := range cols {
Expand Down
141 changes: 141 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6558,6 +6558,147 @@ func (s *testSuite) TestIssue19667(c *C) {
tk.MustQuery(`SELECT DATE_ADD(a, INTERVAL 1 SECOND) FROM t`).Check(testkit.Rows("1988-04-17 02:00:00"))
}

func issue20975Prepare(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec("create table t1(id int primary key, c int)")
tk1.MustExec("insert into t1 values(1, 10), (2, 20)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975UpdateNoChange(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdate(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGet(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGet(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec(`create table t1(id int primary key, c int) partition by range (id) (
partition p1 values less than (10),
partition p2 values less than (20)
)`)
tk1.MustExec("insert into t1 values(1, 10), (2, 20), (11, 30), (12, 40)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975SelectForUpdateWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20305(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
36 changes: 36 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -90,6 +91,24 @@ type TableReaderExecutor struct {
virtualColumnRetFieldTypes []*types.FieldType
// batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine.
batchCop bool

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column.
extraPIDColumnIndex offsetOptional
}

// offsetOptional may be a positive integer, or invalid.
type offsetOptional int

func newOffset(i int) offsetOptional {
return offsetOptional(i + 1)
}

func (i offsetOptional) valid() bool {
return i != 0
}

func (i offsetOptional) value() int {
return int(i - 1)
}

// Open initialzes necessary variables for using this executor.
Expand Down Expand Up @@ -181,9 +200,26 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
return err
}

// When 'select ... for update' work on a partitioned table, the table reader should
// add the partition ID as an extra column. The SelectLockExec need this information
// to construct the lock key.
physicalID := getPhysicalTableID(e.table)
if e.extraPIDColumnIndex.valid() {
fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID)
}

return nil
}

func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID int64) {
numRows := req.NumRows()
pidColumn := chunk.NewColumn(types.NewFieldType(mysql.TypeLonglong), numRows)
for i := 0; i < numRows; i++ {
pidColumn.AppendInt64(physicalID)
}
req.SetCol(extraPIDColumnIndex, pidColumn)
}

// Close implements the Executor Close interface.
func (e *TableReaderExecutor) Close() error {
var err error
Expand Down
1 change: 1 addition & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
PartitionedTable: p.partitionedTable,
ExtraPIDInfo: p.extraPIDInfo,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
return []PhysicalPlan{lock}, true
}
Expand Down
10 changes: 10 additions & 0 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,16 @@ func (ds *DataSource) canConvertToPointGet(candidate *candidatePath) bool {
}
}

if tblInfo := ds.table.Meta(); tblInfo.GetPartitionInfo() != nil {
// If the schema contains ExtraPidColID, do not convert to point get.
// Because the point get executor can not handle the extra partition ID column now.
for _, col := range ds.schema.Columns {
if col.ID == modelExtraPidColID {
return false
}
}
}

return true
}

Expand Down
Loading

0 comments on commit 7dbfdc0

Please sign in to comment.