diff --git a/executor/builder.go b/executor/builder.go index 529a8e7b6a739..d65cb39fdb64f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -621,6 +621,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor tblID2Handle: v.TblID2Handle, partitionedTable: v.PartitionedTable, } + if len(e.partitionedTable) > 0 { + schema := v.Schema() + e.tblID2PIDColumnIndex = make(map[int64]int) + for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ { + col := v.ExtraPIDInfo.Columns[i] + tblID := v.ExtraPIDInfo.TblIDs[i] + offset := schema.ColumnIndex(col) + e.tblID2PIDColumnIndex[tblID] = offset + } + } return e } @@ -2673,6 +2683,9 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea storeType: v.StoreType, batchCop: v.BatchCop, } + if tbl.Meta().Partition != nil { + e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) + } e.buildVirtualColumnInfo() if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc) @@ -2703,6 +2716,15 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return e, nil } +func extraPIDColumnIndex(schema *expression.Schema) offsetOptional { + for idx, col := range schema.Columns { + if col.ID == model.ExtraPidColID { + return newOffset(idx) + } + } + return 0 +} + func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor { startTs, err := b.getSnapshotTS() if err != nil { @@ -2748,6 +2770,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() { return ret } + // When isPartition is set, it means the union rewriting is done, so a partition reader is prefered. + if ok, _ := ts.IsPartition(); ok { + return ret + } pi := ts.Table.GetPartitionInfo() if pi == nil { @@ -2966,6 +2992,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() { return ret } + // When isPartition is set, it means the union rewriting is done, so a partition reader is prefered. + if ok, _ := is.IsPartition(); ok { + return ret + } pi := is.Table.GetPartitionInfo() if pi == nil { @@ -3067,6 +3097,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn tblPlans: v.TablePlans, PushedLimit: v.PushedLimit, } + if ok, _ := ts.IsPartition(); ok { + e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) + } if containsLimit(indexReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc) @@ -3126,6 +3159,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo if is.Index.Global { return ret } + if ok, _ := is.IsPartition(); ok { + // Already pruned when translated to logical union. + return ret + } tmp, _ := b.is.TableByID(is.Table.ID) tbl := tmp.(table.PartitionedTable) diff --git a/executor/distsql.go b/executor/distsql.go index 759ec6cb35e85..b27a62c320c71 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -366,6 +366,9 @@ type IndexLookUpExecutor struct { PushedLimit *plannercore.PushedDownLimit stats *IndexLookUpRunTimeStats + + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 + extraPIDColumnIndex offsetOptional } type getHandleType int8 @@ -638,15 +641,16 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup table = task.partitionTable } tableReaderExec := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), - table: table, - dagPB: e.tableRequest, - startTS: e.startTS, - columns: e.columns, - streaming: e.tableStreaming, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - corColInFilter: e.corColInTblSide, - plans: e.tblPlans, + baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), + table: table, + dagPB: e.tableRequest, + startTS: e.startTS, + columns: e.columns, + streaming: e.tableStreaming, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + corColInFilter: e.corColInTblSide, + plans: e.tblPlans, + extraPIDColumnIndex: e.extraPIDColumnIndex, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true) diff --git a/executor/executor.go b/executor/executor.go index 6aa191558d258..2b59dd0c44b04 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -890,31 +890,22 @@ type SelectLockExec struct { Lock *ast.SelectLockInfo keys []kv.Key - tblID2Handle map[int64][]plannercore.HandleCols + tblID2Handle map[int64][]plannercore.HandleCols + + // All the partition tables in the children of this executor. partitionedTable []table.PartitionedTable - // tblID2Table is cached to reduce cost. - tblID2Table map[int64]table.PartitionedTable + // When SelectLock work on the partition table, we need the partition ID + // instead of table ID to calculate the lock KV. In that case, partition ID is store as an + // extra column in the chunk row. + // tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join + // of multiple tables, so the map struct is used. + tblID2PIDColumnIndex map[int64]int } // Open implements the Executor Open interface. func (e *SelectLockExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } - - if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { - e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) - for id := range e.tblID2Handle { - for _, p := range e.partitionedTable { - if id == p.Meta().ID { - e.tblID2Table[id] = p - } - } - } - } - - return nil + return e.baseExecutor.Open(ctx) } // Next implements the Executor Next interface. @@ -932,15 +923,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) for row := iter.Begin(); row != iter.End(); row = iter.Next() { + for id, cols := range e.tblID2Handle { physicalID := id - if pt, ok := e.tblID2Table[id]; ok { - // On a partitioned table, we have to use physical ID to encode the lock key! - p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes)) - if err != nil { - return err - } - physicalID = p.GetPhysicalID() + if len(e.partitionedTable) > 0 { + // Replace the table ID with partition ID. + // The partition ID is returned as an extra column from the table reader. + offset := e.tblID2PIDColumnIndex[id] + physicalID = row.GetInt64(offset) } for _, col := range cols { diff --git a/executor/executor_test.go b/executor/executor_test.go index bb2baa2da3b32..0ca710c4a8456 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7591,6 +7591,11 @@ func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) { tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + + // Set projection concurrency to avoid data race here. + // TODO: remove this line after fixing https://github.com/pingcap/tidb/issues/25496 + tk1.Se.GetSessionVars().Concurrency.SetProjectionConcurrency(0) + tk1.MustExec("begin pessimistic") tk1.MustExec("update t1 set c=c") tk2.MustExec("create table t2(a int)") diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 35778fe15a00a..b78f6239d6898 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -17,6 +17,7 @@ import ( "fmt" "math/rand" "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -2614,6 +2615,164 @@ partition p2 values less than (10))`) tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9")) } +func (s *partitionTableSuite) TestIssue20028(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("set @@tidb_partition_prune_mode='static-only'") + tk.MustExec(`create table t1 (c_datetime datetime, primary key (c_datetime)) +partition by range (to_days(c_datetime)) ( partition p0 values less than (to_days('2020-02-01')), +partition p1 values less than (to_days('2020-04-01')), +partition p2 values less than (to_days('2020-06-01')), +partition p3 values less than maxvalue)`) + tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") + tk.MustExec("insert into t1 values ('2020-06-26 03:24:00'), ('2020-02-21 07:15:33'), ('2020-04-27 13:50:58')") + tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") + tk.MustExec("begin") + tk.MustQuery("select * from t1 join t2 on t1.c_datetime >= t2.c_datetime for update"). + Sort(). + Check(testkit.Rows( + "2020-02-21 07:15:33 2020-01-10 09:36:00", + "2020-02-21 07:15:33 2020-02-04 06:00:00", + "2020-04-27 13:50:58 2020-01-10 09:36:00", + "2020-04-27 13:50:58 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-01-10 09:36:00", + "2020-06-26 03:24:00 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-06-12 03:45:18")) + tk.MustExec("rollback") +} + +func (s *partitionTableSuite) TestSelectLockOnPartitionTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists pt") + tk.MustExec(`create table pt (id int primary key, k int, c int, index(k)) +partition by range (id) ( +partition p0 values less than (4), +partition p1 values less than (7), +partition p2 values less than (11))`) + + tk2 := testkit.NewTestKit(c, s.store) + tk2.MustExec("use test") + + optimisticTableReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) // Write conflict + } + + optimisticIndexReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + // This is not index reader actually. + tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) + } + + optimisticIndexLookUp := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) + } + + pessimisticTableReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, if not the first result in + // the channel should be 1. + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) + } + + pessimisticIndexReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + // This is not index reader actually. + tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) + } + + pessimisticIndexLookUp := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) + } + + partitionModes := []string{ + "'dynamic-only'", + "'static-only'", + } + testCases := []func(){ + optimisticTableReader, + optimisticIndexLookUp, + optimisticIndexReader, + pessimisticTableReader, + pessimisticIndexReader, + pessimisticIndexLookUp, + } + + for _, mode := range partitionModes { + tk.MustExec("set @@tidb_partition_prune_mode=" + mode) + for _, c := range testCases { + tk.MustExec("replace into pt values (5, 5, 5)") + c() + } + } +} + func (s *globalIndexSuite) TestIssue21731(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists p, t") diff --git a/executor/table_reader.go b/executor/table_reader.go index 88531e9c86363..6c8f417a7f02e 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -19,6 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -104,6 +105,24 @@ type TableReaderExecutor struct { virtualColumnRetFieldTypes []*types.FieldType // batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine. batchCop bool + + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. + extraPIDColumnIndex offsetOptional +} + +// offsetOptional may be a positive integer, or invalid. +type offsetOptional int + +func newOffset(i int) offsetOptional { + return offsetOptional(i + 1) +} + +func (i offsetOptional) valid() bool { + return i != 0 +} + +func (i offsetOptional) value() int { + return int(i - 1) } // Open initializes necessary variables for using this executor. @@ -218,9 +237,26 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error return err } + // When 'select ... for update' work on a partitioned table, the table reader should + // add the partition ID as an extra column. The SelectLockExec need this information + // to construct the lock key. + physicalID := getPhysicalTableID(e.table) + if e.extraPIDColumnIndex.valid() { + fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) + } + return nil } +func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID int64) { + numRows := req.NumRows() + pidColumn := chunk.NewColumn(types.NewFieldType(mysql.TypeLonglong), numRows) + for i := 0; i < numRows; i++ { + pidColumn.AppendInt64(physicalID) + } + req.SetCol(extraPIDColumnIndex, pidColumn) +} + // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { if e.table.Meta() != nil && e.table.Meta().TempTableType != model.TempTableNone { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 6ee891bfce2a0..37dbd04c10792 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2580,6 +2580,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P Lock: p.Lock, TblID2Handle: p.tblID2Handle, PartitionedTable: p.partitionedTable, + ExtraPIDInfo: p.extraPIDInfo, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) return []PhysicalPlan{lock}, true, nil } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 8389f0be4cade..1be83d87786ab 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3471,7 +3471,10 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("LOCK IN SHARE MODE") return nil, err } - p = b.buildSelectLock(p, sel.LockInfo) + p, err = b.buildSelectLock(p, sel.LockInfo) + if err != nil { + return nil, err + } } b.handleHelper.popMap() b.handleHelper.pushMap(nil) @@ -3607,6 +3610,33 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column { } } +// addExtraPIDColumn add an extra PID column for partition table. +// 'select ... for update' on a partition table need to know the partition ID +// to construct the lock key, so this column is added to the chunk row. +func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) { + pidCol := &expression.Column{ + RetType: types.NewFieldType(mysql.TypeLonglong), + UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), + ID: model.ExtraPidColID, + OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName), + } + + ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo()) + schema := ds.Schema() + schema.Append(pidCol) + ds.names = append(ds.names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.TableInfo().Name, + ColName: model.ExtraPartitionIdName, + OrigColName: model.ExtraPartitionIdName, + }) + ds.TblCols = append(ds.TblCols, pidCol) + + info.Columns = append(info.Columns, pidCol) + info.TblIDs = append(info.TblIDs, ds.TableInfo().ID) + return +} + var ( pseudoEstimationNotAvailable = metrics.PseudoEstimation.WithLabelValues("nodata") pseudoEstimationOutdate = metrics.PseudoEstimation.WithLabelValues("outdate") @@ -4445,9 +4475,12 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( // buildSelectLock is an optimization that can reduce RPC call. // We only need do this optimization for single table update which is the most common case. // When TableRefs.Right is nil, it is single table update. - p = b.buildSelectLock(p, &ast.SelectLockInfo{ + p, err = b.buildSelectLock(p, &ast.SelectLockInfo{ LockType: ast.SelectLockForUpdate, }) + if err != nil { + return nil, err + } } } @@ -4792,9 +4825,12 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) ( } if b.ctx.GetSessionVars().TxnCtx.IsPessimistic { if !delete.IsMultiTable { - p = b.buildSelectLock(p, &ast.SelectLockInfo{ + p, err = b.buildSelectLock(p, &ast.SelectLockInfo{ LockType: ast.SelectLockForUpdate, }) + if err != nil { + return nil, err + } } } diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 7186813a0fe04..c50d155520d8e 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1021,6 +1021,16 @@ type LogicalLimit struct { limitHints limitHintInfo } +// extraPIDInfo is used by SelectLock on partitioned table, the TableReader need +// to return the partition id column. +// Because SelectLock has to used that partition id to encode the lock key. +// the child of SelectLock may be Join, so that table can be multiple extra PID columns. +// fields are for each of the table, and TblIDs are the corresponding table IDs. +type extraPIDInfo struct { + Columns []*expression.Column + TblIDs []int64 +} + // LogicalLock represents a select lock plan. type LogicalLock struct { baseLogicalPlan @@ -1028,6 +1038,9 @@ type LogicalLock struct { Lock *ast.SelectLockInfo tblID2Handle map[int64][]HandleCols partitionedTable []table.PartitionedTable + // extraPIDInfo is used when it works on partition table, the child executor + // need to return an extra partition ID column in the chunk row. + extraPIDInfo } // WindowFrame represents a window function frame. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index d3169c865e07b..4e5c6cbfd58e5 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -472,6 +472,8 @@ type PhysicalTableScan struct { IsGlobalRead bool // The table scan may be a partition, rather than a real table. + // TODO: clean up this field. After we support dynamic partitioning, table scan + // works on the whole partition table, and `isPartition` is not used. isPartition bool // KeepOrder is true, if sort data by scanning pkcol, KeepOrder bool @@ -953,6 +955,7 @@ type PhysicalLock struct { TblID2Handle map[int64][]HandleCols PartitionedTable []table.PartitionedTable + ExtraPIDInfo extraPIDInfo } // PhysicalLimit is the physical operator of Limit. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 71e660079dca7..1b66c909ec085 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1141,14 +1141,47 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T return remainedPaths } -func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) *LogicalLock { +func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) (*LogicalLock, error) { selectLock := LogicalLock{ Lock: lock, tblID2Handle: b.handleHelper.tailMap(), partitionedTable: b.partitionedTable, }.Init(b.ctx) selectLock.SetChildren(src) - return selectLock + + if len(b.partitionedTable) > 0 { + // If a chunk row is read from a partitioned table, which partition the row + // comes from is unknown. With the existence of Join, the situation could be + // even worse: SelectLock have to know the `pid` to construct the lock key. + // To solve the problem, an extra `pid` column is add to the schema, and the + // DataSource need to return the `pid` information in the chunk row. + err := addExtraPIDColumnToDataSource(src, &selectLock.extraPIDInfo) + if err != nil { + return nil, err + } + // TODO: Dynamic partition mode does not support adding extra pid column to the data source. + // (Because one table reader can read from multiple partitions, which partition a chunk row comes from is unknown) + // So we have to use the old "rewrite to union" way here, set `flagPartitionProcessor` flag for that. + b.optFlag = b.optFlag | flagPartitionProcessor + } + return selectLock, nil +} + +func addExtraPIDColumnToDataSource(p LogicalPlan, info *extraPIDInfo) error { + switch raw := p.(type) { + case *DataSource: + raw.addExtraPIDColumn(info) + return nil + default: + var err error + for _, child := range p.Children() { + err = addExtraPIDColumnToDataSource(child, info) + if err != nil { + return err + } + } + } + return nil } func (b *PlanBuilder) buildPrepare(x *ast.PrepareStmt) Plan { diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 8a627792ecc7f..993ae9ef6b497 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -376,9 +376,8 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { } if len(p.partitionedTable) > 0 { - // If the children include partitioned tables, do not prune columns. - // Because the executor needs the partitioned columns to calculate the lock key. - return p.children[0].PruneColumns(p.Schema().Columns) + // If the children include partitioned tables, there is an extra partition ID column. + parentUsedCols = append(parentUsedCols, p.extraPIDInfo.Columns...) } for _, cols := range p.tblID2Handle { diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 90a03991864ab..db48360c25c58 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -14,11 +14,11 @@ package core import ( "context" - "errors" "fmt" "sort" "strings" + "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -289,6 +289,15 @@ func (s *partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types. }) continue } + if colExpr.ID == model.ExtraPidColID { + names = append(names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.tableInfo.Name, + ColName: model.ExtraPartitionIdName, + OrigColName: model.ExtraPartitionIdName, + }) + continue + } if colInfo, found := colsInfoMap[colExpr.ID]; found { names = append(names, &types.FieldName{ DBName: ds.DBName, @@ -300,7 +309,7 @@ func (s *partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types. }) continue } - return nil, fmt.Errorf("information of column %v is not found", colExpr.String()) + return nil, errors.Trace(fmt.Errorf("information of column %v is not found", colExpr.String())) } return names, nil }