From 42f3ce82a9309fff1b00c9c15cfb1457f73d9096 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 19 Nov 2020 17:49:01 +0800 Subject: [PATCH 01/12] planner,executor: fix 'select ...(join on partition table) for update' panic Let the table reader return an extra column with partition ID, so the SelectLockExec can use that partition ID to construct the lock key. --- executor/builder.go | 50 +++++++++++++++++++------- executor/distsql.go | 19 +++++----- executor/executor.go | 36 ++++++++----------- executor/executor_test.go | 27 ++++++++++++++ executor/table_reader.go | 21 +++++++++++ planner/core/exhaust_physical_plans.go | 1 + planner/core/logical_plan_builder.go | 42 ++++++++++++++++++++-- planner/core/logical_plans.go | 8 +++++ planner/core/physical_plans.go | 1 + planner/core/planbuilder.go | 33 +++++++++++++++-- planner/core/rule_column_pruning.go | 5 ++- 11 files changed, 192 insertions(+), 51 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index ad128c559b484..d1e2db4fc1a46 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -600,6 +600,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor tblID2Handle: v.TblID2Handle, partitionedTable: v.PartitionedTable, } + if len(e.partitionedTable) > 0 { + schema := v.Schema() + e.tblID2PIDColumnIndex = make(map[int64]int) + for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ { + col := v.ExtraPIDInfo.Columns[i] + tblID := v.ExtraPIDInfo.TblIDs[i] + offset := schema.ColumnIndex(col) + e.tblID2PIDColumnIndex[tblID] = offset + } + } return e } @@ -2482,21 +2492,26 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } e := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: dagReq, - startTS: startTS, - table: tbl, - keepOrder: ts.KeepOrder, - desc: ts.Desc, - columns: ts.Columns, - streaming: streaming, - corColInFilter: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.TablePlans[0]), - plans: v.TablePlans, - tablePlan: v.GetTablePlan(), - storeType: v.StoreType, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + dagPB: dagReq, + startTS: startTS, + table: tbl, + keepOrder: ts.KeepOrder, + desc: ts.Desc, + columns: ts.Columns, + streaming: streaming, + corColInFilter: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.TablePlans[0]), + plans: v.TablePlans, + tablePlan: v.GetTablePlan(), + storeType: v.StoreType, + extraPIDColumnIndex: -1, } e.setBatchCop(v) + + if isPartition { + e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) + } e.buildVirtualColumnInfo() if containsLimit(dagReq.Executors) { e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc) @@ -2523,6 +2538,15 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return e, nil } +func extraPIDColumnIndex(schema *expression.Schema) int { + for idx, col := range schema.Columns { + if col.ID == model.ExtraPidColID { + return idx + } + } + return -1 +} + func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor { startTs, err := b.getSnapshotTS() if err != nil { diff --git a/executor/distsql.go b/executor/distsql.go index 669bb404c6c64..bdbdb7ee0964b 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -600,15 +600,16 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []kv.Handle) (Executor, error) { tableReaderExec := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), - table: e.table, - dagPB: e.tableRequest, - startTS: e.startTS, - columns: e.columns, - streaming: e.tableStreaming, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - corColInFilter: e.corColInTblSide, - plans: e.tblPlans, + baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), + table: e.table, + dagPB: e.tableRequest, + startTS: e.startTS, + columns: e.columns, + streaming: e.tableStreaming, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + corColInFilter: e.corColInTblSide, + plans: e.tblPlans, + extraPIDColumnIndex: -1, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true) diff --git a/executor/executor.go b/executor/executor.go index 2102f3ca1fabf..b49dffd94f474 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -872,11 +872,17 @@ type SelectLockExec struct { Lock *ast.SelectLockInfo keys []kv.Key - tblID2Handle map[int64][]plannercore.HandleCols + tblID2Handle map[int64][]plannercore.HandleCols + + // All the partition tables in the children of this executor. partitionedTable []table.PartitionedTable - // tblID2Table is cached to reduce cost. - tblID2Table map[int64]table.PartitionedTable + // When SelectLock work on the partition table, we need the partition ID + // instead of table ID to calculate the lock KV. In that case, partition ID is store as an + // extra column in the chunk row. + // tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join + // of multiple tables, so the map strcut is used. + tblID2PIDColumnIndex map[int64]int } // Open implements the Executor Open interface. @@ -885,17 +891,6 @@ func (e *SelectLockExec) Open(ctx context.Context) error { return err } - if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { - e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) - for id := range e.tblID2Handle { - for _, p := range e.partitionedTable { - if id == p.Meta().ID { - e.tblID2Table[id] = p - } - } - } - } - return nil } @@ -914,15 +909,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) for row := iter.Begin(); row != iter.End(); row = iter.Next() { + for id, cols := range e.tblID2Handle { physicalID := id - if pt, ok := e.tblID2Table[id]; ok { - // On a partitioned table, we have to use physical ID to encode the lock key! - p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes)) - if err != nil { - return err - } - physicalID = p.GetPhysicalID() + if len(e.partitionedTable) > 0 { + // Replace the table ID with partition ID. + // The partition ID is returned as an extra column from the table reader. + offset := e.tblID2PIDColumnIndex[id] + physicalID = row.GetInt64(offset) } for _, col := range cols { diff --git a/executor/executor_test.go b/executor/executor_test.go index 5338d9dfbdeca..4f475160bff90 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6853,3 +6853,30 @@ func (s *testSuite) TestIssue19667(c *C) { tk.MustExec("INSERT INTO t VALUES('1988-04-17 01:59:59')") tk.MustQuery(`SELECT DATE_ADD(a, INTERVAL 1 SECOND) FROM t`).Check(testkit.Rows("1988-04-17 02:00:00")) } + +func (s *testSuite) TestIssue20028(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("set @@tidb_partition_prune_mode='static-only'") + tk.MustExec(`create table t1 (c_datetime datetime, primary key (c_datetime)) +partition by range (to_days(c_datetime)) ( partition p0 values less than (to_days('2020-02-01')), +partition p1 values less than (to_days('2020-04-01')), +partition p2 values less than (to_days('2020-06-01')), +partition p3 values less than maxvalue)`) + tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") + tk.MustExec("insert into t1 values ('2020-06-26 03:24:00'), ('2020-02-21 07:15:33'), ('2020-04-27 13:50:58')") + tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") + tk.MustExec("begin") + tk.MustQuery("select * from t1 join t2 on t1.c_datetime >= t2.c_datetime for update"). + Sort(). + Check(testkit.Rows( + "2020-02-21 07:15:33 2020-01-10 09:36:00", + "2020-02-21 07:15:33 2020-02-04 06:00:00", + "2020-04-27 13:50:58 2020-01-10 09:36:00", + "2020-04-27 13:50:58 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-01-10 09:36:00", + "2020-06-26 03:24:00 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-06-12 03:45:18")) + tk.MustExec("rollback") +} diff --git a/executor/table_reader.go b/executor/table_reader.go index 9167993c8fc50..de711adffcbb7 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -19,6 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -103,6 +104,9 @@ type TableReaderExecutor struct { virtualColumnRetFieldTypes []*types.FieldType // batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine. batchCop bool + + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 + extraPIDColumnIndex int } // Open initialzes necessary variables for using this executor. @@ -194,9 +198,26 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error return err } + // When 'select ... for update' work on a partitioned table, the table reader should + // add the partition ID as an extra column. The SelectLockExec need this information + // to construct the lock key. + _, isPartition := e.table.(table.PartitionedTable) + if isPartition && e.extraPIDColumnIndex >= 0 { + fillExtraPIDColumn(req, e.extraPIDColumnIndex, getPhysicalTableID(e.table)) + } + return nil } +func fillExtraPIDColumn(req *chunk.Chunk, extraPIDColumnIndex int, physicalID int64) { + numRows := req.NumRows() + pidColumn := chunk.NewColumn(types.NewFieldType(mysql.TypeLonglong), numRows) + for i := 0; i < numRows; i++ { + pidColumn.AppendInt64(physicalID) + } + req.SetCol(extraPIDColumnIndex, pidColumn) +} + // Close implements the Executor Close interface. func (e *TableReaderExecutor) Close() error { var err error diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index b3ca49f4b0820..18f1ab21afad2 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2208,6 +2208,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P Lock: p.Lock, TblID2Handle: p.tblID2Handle, PartitionedTable: p.partitionedTable, + ExtraPIDInfo: p.extraPIDInfo, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) return []PhysicalPlan{lock}, true } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index c9c8ebf1100bc..8dd0e047a3ccd 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2810,7 +2810,10 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("LOCK IN SHARE MODE") return nil, err } - p = b.buildSelectLock(p, sel.LockInfo) + p, err = b.buildSelectLock(p, sel.LockInfo) + if err != nil { + return nil, err + } } b.handleHelper.popMap() b.handleHelper.pushMap(nil) @@ -2923,6 +2926,33 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column { } } +// addExtraPIDColumn add an extra PID column for partition table. +// 'select ... for update' on a partition table need to know the partition ID +// to construct the lock key, so this column is added to the chunk row. +func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) { + pidCol := &expression.Column{ + RetType: types.NewFieldType(mysql.TypeLonglong), + UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), + ID: model.ExtraPidColID, + OrigName: fmt.Sprintf("%v.%v.%v", ds.DBName, ds.tableInfo.Name, model.ExtraPartitionIdName), + } + + ds.Columns = append(ds.Columns, model.NewExtraPartitionIDColInfo()) + schema := ds.Schema() + schema.Append(pidCol) + ds.names = append(ds.names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.TableInfo().Name, + ColName: model.ExtraHandleName, + OrigColName: model.ExtraHandleName, + }) + ds.TblCols = append(ds.TblCols, pidCol) + + info.Columns = append(info.Columns, pidCol) + info.TblIDs = append(info.TblIDs, ds.TableInfo().ID) + return +} + // getStatsTable gets statistics information for a table specified by "tableID". // A pseudo statistics table is returned in any of the following scenario: // 1. tidb-server started and statistics handle has not been initialized. @@ -3667,9 +3697,12 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( // buildSelectLock is an optimization that can reduce RPC call. // We only need do this optimization for single table update which is the most common case. // When TableRefs.Right is nil, it is single table update. - p = b.buildSelectLock(p, &ast.SelectLockInfo{ + p, err = b.buildSelectLock(p, &ast.SelectLockInfo{ LockType: ast.SelectLockForUpdate, }) + if err != nil { + return nil, err + } } } @@ -3943,9 +3976,12 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) ( } if b.ctx.GetSessionVars().TxnCtx.IsPessimistic { if !delete.IsMultiTable { - p = b.buildSelectLock(p, &ast.SelectLockInfo{ + p, err = b.buildSelectLock(p, &ast.SelectLockInfo{ LockType: ast.SelectLockForUpdate, }) + if err != nil { + return nil, err + } } } diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 8114c82631e35..ff56c77a51333 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1014,6 +1014,11 @@ type LogicalLimit struct { limitHints limitHintInfo } +type extraPIDInfo struct { + Columns []*expression.Column + TblIDs []int64 +} + // LogicalLock represents a select lock plan. type LogicalLock struct { baseLogicalPlan @@ -1021,6 +1026,9 @@ type LogicalLock struct { Lock *ast.SelectLockInfo tblID2Handle map[int64][]HandleCols partitionedTable []table.PartitionedTable + // extraPIDInfo is used when it works on partition table, the child executor + // need to return an extra partition ID column in the chunk row. + extraPIDInfo } // WindowFrame represents a window function frame. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 1e68984b6e4f7..10500f6622040 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -947,6 +947,7 @@ type PhysicalLock struct { TblID2Handle map[int64][]HandleCols PartitionedTable []table.PartitionedTable + ExtraPIDInfo extraPIDInfo } // PhysicalLimit is the physical operator of Limit. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 7156a6f562d53..1bb8b0b0d9336 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -986,14 +986,43 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T return remainedPaths } -func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) *LogicalLock { +func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) (*LogicalLock, error) { selectLock := LogicalLock{ Lock: lock, tblID2Handle: b.handleHelper.tailMap(), partitionedTable: b.partitionedTable, }.Init(b.ctx) selectLock.SetChildren(src) - return selectLock + + if len(b.partitionedTable) > 0 { + // If a chunk row is read from a partitioned table, which partition the row + // comes from is unknown. With the existence of Join, the situation could be + // even worse: SelectLock have to know the `pid` to construct the lock key. + // To solve the problem, an extra `pid` column is add to the schema, and the + // DataSource need to return the `pid` information in the chunk row. + err := addExtraPIDColumnToDataSource(src, &selectLock.extraPIDInfo) + if err != nil { + return nil, err + } + } + return selectLock, nil +} + +func addExtraPIDColumnToDataSource(p LogicalPlan, info *extraPIDInfo) error { + switch raw := p.(type) { + case *DataSource: + raw.addExtraPIDColumn(info) + return nil + default: + var err error + for _, child := range p.Children() { + err = addExtraPIDColumnToDataSource(child, info) + if err != nil { + return err + } + } + } + return nil } func (b *PlanBuilder) buildPrepare(x *ast.PrepareStmt) Plan { diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 9b2a49c0292a3..7b02da4e4aa9e 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -371,9 +371,8 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { } if len(p.partitionedTable) > 0 { - // If the children include partitioned tables, do not prune columns. - // Because the executor needs the partitioned columns to calculate the lock key. - return p.children[0].PruneColumns(p.Schema().Columns) + // If the children include partitioned tables, there is an extra partition ID column. + parentUsedCols = append(parentUsedCols, p.Columns...) } for _, cols := range p.tblID2Handle { From 4fdb91658a6870887fa1e03535aabcfdff5f29bc Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 20 Nov 2020 18:16:13 +0800 Subject: [PATCH 02/12] fix index lookup reader and add test --- executor/builder.go | 45 +++++----- executor/distsql.go | 5 +- executor/executor_test.go | 27 ------ executor/partition_table_test.go | 145 +++++++++++++++++++++++++++++++ executor/table_reader.go | 6 +- 5 files changed, 176 insertions(+), 52 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d1e2db4fc1a46..10efef0328209 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2508,7 +2508,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea extraPIDColumnIndex: -1, } e.setBatchCop(v) - if isPartition { e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) } @@ -2907,26 +2906,30 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn return nil, err } e := &IndexLookUpExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: indexReq, - startTS: startTS, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - tableRequest: tableReq, - columns: ts.Columns, - indexStreaming: indexStreaming, - tableStreaming: tableStreaming, - dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, - corColInIdxSide: b.corColInDistPlan(v.IndexPlans), - corColInTblSide: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - idxPlans: v.IndexPlans, - tblPlans: v.TablePlans, - PushedLimit: v.PushedLimit, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + dagPB: indexReq, + startTS: startTS, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + tableRequest: tableReq, + columns: ts.Columns, + indexStreaming: indexStreaming, + tableStreaming: tableStreaming, + dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, + corColInIdxSide: b.corColInDistPlan(v.IndexPlans), + corColInTblSide: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + idxPlans: v.IndexPlans, + tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, + extraPIDColumnIndex: -1, + } + if ok, _ := ts.IsPartition(); ok { + e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) } if containsLimit(indexReq.Executors) { diff --git a/executor/distsql.go b/executor/distsql.go index bdbdb7ee0964b..545fd677a5993 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -399,6 +399,9 @@ type IndexLookUpExecutor struct { PushedLimit *plannercore.PushedDownLimit stats *IndexLookUpRunTimeStats + + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 + extraPIDColumnIndex int } type getHandleType int8 @@ -609,7 +612,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []kv feedback: statistics.NewQueryFeedback(0, nil, 0, false), corColInFilter: e.corColInTblSide, plans: e.tblPlans, - extraPIDColumnIndex: -1, + extraPIDColumnIndex: e.extraPIDColumnIndex, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true) diff --git a/executor/executor_test.go b/executor/executor_test.go index fd3c712194a2d..a6036bfb07952 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7053,30 +7053,3 @@ func (s *testSuite) TestIssue20305(c *C) { tk.MustExec("INSERT INTO `t3` VALUES (2069, 70), (2010, 11), (2155, 2156), (2069, 69)") tk.MustQuery("SELECT * FROM `t3` where y <= a").Check(testkit.Rows("2155 2156")) } - -func (s *testSuite) TestIssue20028(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2") - tk.MustExec("set @@tidb_partition_prune_mode='static-only'") - tk.MustExec(`create table t1 (c_datetime datetime, primary key (c_datetime)) -partition by range (to_days(c_datetime)) ( partition p0 values less than (to_days('2020-02-01')), -partition p1 values less than (to_days('2020-04-01')), -partition p2 values less than (to_days('2020-06-01')), -partition p3 values less than maxvalue)`) - tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") - tk.MustExec("insert into t1 values ('2020-06-26 03:24:00'), ('2020-02-21 07:15:33'), ('2020-04-27 13:50:58')") - tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") - tk.MustExec("begin") - tk.MustQuery("select * from t1 join t2 on t1.c_datetime >= t2.c_datetime for update"). - Sort(). - Check(testkit.Rows( - "2020-02-21 07:15:33 2020-01-10 09:36:00", - "2020-02-21 07:15:33 2020-02-04 06:00:00", - "2020-04-27 13:50:58 2020-01-10 09:36:00", - "2020-04-27 13:50:58 2020-02-04 06:00:00", - "2020-06-26 03:24:00 2020-01-10 09:36:00", - "2020-06-26 03:24:00 2020-02-04 06:00:00", - "2020-06-26 03:24:00 2020-06-12 03:45:18")) - tk.MustExec("rollback") -} diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index e358ee49a55e3..9fee307877eca 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -182,3 +182,148 @@ partition p2 values less than (10))`) tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)") tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9")) } + +func (s *partitionTableSuite) TestIssue20028(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("set @@tidb_partition_prune_mode='static-only'") + tk.MustExec(`create table t1 (c_datetime datetime, primary key (c_datetime)) +partition by range (to_days(c_datetime)) ( partition p0 values less than (to_days('2020-02-01')), +partition p1 values less than (to_days('2020-04-01')), +partition p2 values less than (to_days('2020-06-01')), +partition p3 values less than maxvalue)`) + tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime))") + tk.MustExec("insert into t1 values ('2020-06-26 03:24:00'), ('2020-02-21 07:15:33'), ('2020-04-27 13:50:58')") + tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18')") + tk.MustExec("begin") + tk.MustQuery("select * from t1 join t2 on t1.c_datetime >= t2.c_datetime for update"). + Sort(). + Check(testkit.Rows( + "2020-02-21 07:15:33 2020-01-10 09:36:00", + "2020-02-21 07:15:33 2020-02-04 06:00:00", + "2020-04-27 13:50:58 2020-01-10 09:36:00", + "2020-04-27 13:50:58 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-01-10 09:36:00", + "2020-06-26 03:24:00 2020-02-04 06:00:00", + "2020-06-26 03:24:00 2020-06-12 03:45:18")) + tk.MustExec("rollback") +} + +func (s *partitionTableSuite) TestSelectLockOnPartitionTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec(`create table pt (id int primary key, k int, c int, index(k)) +partition by range (id) ( +partition p0 values less than (4), +partition p1 values less than (7), +partition p2 values less than (11))`) + tk.MustExec("insert into pt values (5, 5, 5)") + // TODO: Fix bug when @@tidb_partition_prune_mode is 'dynamic-only', pay special + // attention to index join as it supported in that mode. + tk.MustExec("set tidb_partition_prune_mode='static-only'") + tk2 := testkit.NewTestKit(c, s.store) + tk2.MustExec("use test") + tk2.MustExec("set tidb_partition_prune_mode='static-only'") + + optimisticTableReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) // Write conflict + } + + optimisticIndexReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + // This is not index reader actually. + tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) + } + + optimisticIndexLookUp := func() { + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk2.MustExec("update pt set c = c + 1 where k = 5") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) + } + + pessimisticTableReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt ignore index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, if not the first result in + // the channel should be 1. + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("8")) + } + + pessimisticIndexReader := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + // This is not index reader actually. + tk.MustQuery("select k from pt where k = 5 for update").Check(testkit.Rows("5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("9")) + } + + pessimisticIndexLookUp := func() { + tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk.MustExec("begin") + tk.MustQuery("select id, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + ch := make(chan int, 2) + go func() { + tk2.MustExec("update pt set c = c + 1 where k = 5") + ch <- 1 + }() + time.Sleep(100 * time.Millisecond) + ch <- 2 + + // Check the operation in the goroutine is blocked, + c.Assert(<-ch, Equals, 2) + + tk.MustExec("commit") + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("10")) + } + + testCases := []func(){ + optimisticTableReader, optimisticIndexLookUp, optimisticIndexReader, + pessimisticTableReader, pessimisticIndexReader, pessimisticIndexLookUp, + } + for _, c := range testCases { + c() + } +} diff --git a/executor/table_reader.go b/executor/table_reader.go index de711adffcbb7..db8e7620321d3 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -201,9 +201,9 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // When 'select ... for update' work on a partitioned table, the table reader should // add the partition ID as an extra column. The SelectLockExec need this information // to construct the lock key. - _, isPartition := e.table.(table.PartitionedTable) - if isPartition && e.extraPIDColumnIndex >= 0 { - fillExtraPIDColumn(req, e.extraPIDColumnIndex, getPhysicalTableID(e.table)) + physicalID := getPhysicalTableID(e.table) + if physicalID != e.table.Meta().ID && e.extraPIDColumnIndex >= 0 { + fillExtraPIDColumn(req, e.extraPIDColumnIndex, physicalID) } return nil From 37bfeadfaa8823b38bb5fdd2028ca4d5d96cd0e4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 23 Nov 2020 14:11:00 +0800 Subject: [PATCH 03/12] make vet --- executor/partition_table_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 9fee307877eca..40ffb17d745fa 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -14,6 +14,8 @@ package executor_test import ( + "time" + . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/testkit" From 88ecff4a235f5090fa4c22ba6a29d26b410790ca Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 23 Nov 2020 15:00:36 +0800 Subject: [PATCH 04/12] fix CI --- executor/builder.go | 74 +++++++++++++++++++--------------------- executor/distsql.go | 2 +- executor/table_reader.go | 23 ++++++++++--- 3 files changed, 56 insertions(+), 43 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 10efef0328209..733edbbad1541 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2492,20 +2492,19 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } e := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: dagReq, - startTS: startTS, - table: tbl, - keepOrder: ts.KeepOrder, - desc: ts.Desc, - columns: ts.Columns, - streaming: streaming, - corColInFilter: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.TablePlans[0]), - plans: v.TablePlans, - tablePlan: v.GetTablePlan(), - storeType: v.StoreType, - extraPIDColumnIndex: -1, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + dagPB: dagReq, + startTS: startTS, + table: tbl, + keepOrder: ts.KeepOrder, + desc: ts.Desc, + columns: ts.Columns, + streaming: streaming, + corColInFilter: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.TablePlans[0]), + plans: v.TablePlans, + tablePlan: v.GetTablePlan(), + storeType: v.StoreType, } e.setBatchCop(v) if isPartition { @@ -2537,13 +2536,13 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return e, nil } -func extraPIDColumnIndex(schema *expression.Schema) int { +func extraPIDColumnIndex(schema *expression.Schema) offsetOptional { for idx, col := range schema.Columns { if col.ID == model.ExtraPidColID { - return idx + return newOffset(idx) } } - return -1 + return 0 } func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor { @@ -2906,27 +2905,26 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn return nil, err } e := &IndexLookUpExecutor{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - dagPB: indexReq, - startTS: startTS, - table: tbl, - index: is.Index, - keepOrder: is.KeepOrder, - desc: is.Desc, - tableRequest: tableReq, - columns: ts.Columns, - indexStreaming: indexStreaming, - tableStreaming: tableStreaming, - dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, - corColInIdxSide: b.corColInDistPlan(v.IndexPlans), - corColInTblSide: b.corColInDistPlan(v.TablePlans), - corColInAccess: b.corColInAccess(v.IndexPlans[0]), - idxCols: is.IdxCols, - colLens: is.IdxColLens, - idxPlans: v.IndexPlans, - tblPlans: v.TablePlans, - PushedLimit: v.PushedLimit, - extraPIDColumnIndex: -1, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + dagPB: indexReq, + startTS: startTS, + table: tbl, + index: is.Index, + keepOrder: is.KeepOrder, + desc: is.Desc, + tableRequest: tableReq, + columns: ts.Columns, + indexStreaming: indexStreaming, + tableStreaming: tableStreaming, + dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, + corColInIdxSide: b.corColInDistPlan(v.IndexPlans), + corColInTblSide: b.corColInDistPlan(v.TablePlans), + corColInAccess: b.corColInAccess(v.IndexPlans[0]), + idxCols: is.IdxCols, + colLens: is.IdxColLens, + idxPlans: v.IndexPlans, + tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, } if ok, _ := ts.IsPartition(); ok { e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) diff --git a/executor/distsql.go b/executor/distsql.go index 545fd677a5993..6ce46be9f53a6 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -401,7 +401,7 @@ type IndexLookUpExecutor struct { stats *IndexLookUpRunTimeStats // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 - extraPIDColumnIndex int + extraPIDColumnIndex offsetOptional } type getHandleType int8 diff --git a/executor/table_reader.go b/executor/table_reader.go index db8e7620321d3..2938c3321b9ad 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -105,8 +105,23 @@ type TableReaderExecutor struct { // batchCop indicates whether use super batch coprocessor request, only works for TiFlash engine. batchCop bool - // extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1 - extraPIDColumnIndex int + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. + extraPIDColumnIndex offsetOptional +} + +// offsetOptional may be a positive integer, or invalid. +type offsetOptional int + +func newOffset(i int) offsetOptional { + return offsetOptional(i + 1) +} + +func (i offsetOptional) valid() bool { + return i != 0 +} + +func (i offsetOptional) value() int { + return int(i - 1) } // Open initialzes necessary variables for using this executor. @@ -202,8 +217,8 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // add the partition ID as an extra column. The SelectLockExec need this information // to construct the lock key. physicalID := getPhysicalTableID(e.table) - if physicalID != e.table.Meta().ID && e.extraPIDColumnIndex >= 0 { - fillExtraPIDColumn(req, e.extraPIDColumnIndex, physicalID) + if physicalID != e.table.Meta().ID && e.extraPIDColumnIndex.valid() { + fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) } return nil From e1bbb74ca3723f6f05047674502cb410997e05ac Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 25 Nov 2020 20:34:12 +0800 Subject: [PATCH 05/12] revert changes --- executor/partition_table_test.go | 1 + executor/table_reader.go | 2 +- go.sum | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 40ffb17d745fa..5fbabc07e110d 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -215,6 +215,7 @@ partition p3 values less than maxvalue)`) func (s *partitionTableSuite) TestSelectLockOnPartitionTable(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") + tk.MustExec("drop table if exists pt") tk.MustExec(`create table pt (id int primary key, k int, c int, index(k)) partition by range (id) ( partition p0 values less than (4), diff --git a/executor/table_reader.go b/executor/table_reader.go index 2938c3321b9ad..243386d0b505e 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -217,7 +217,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error // add the partition ID as an extra column. The SelectLockExec need this information // to construct the lock key. physicalID := getPhysicalTableID(e.table) - if physicalID != e.table.Meta().ID && e.extraPIDColumnIndex.valid() { + if e.extraPIDColumnIndex.valid() { fillExtraPIDColumn(req, e.extraPIDColumnIndex.value(), physicalID) } diff --git a/go.sum b/go.sum index 67d3c5ee10c1b..6dcae737f5e0a 100644 --- a/go.sum +++ b/go.sum @@ -775,6 +775,7 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrY github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v2.20.3+incompatible h1:0JVooMPsT7A7HqEYdydp/OfjSOYSjhXV7w1hkKj/NPQ= From 4c113eea55b2232893228aac203019cc92ecd3c1 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 26 Nov 2020 11:06:36 +0800 Subject: [PATCH 06/12] fix a typo --- planner/core/logical_plan_builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index a75210b6b2f4d..4cd2662b6e859 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2991,8 +2991,8 @@ func (ds *DataSource) addExtraPIDColumn(info *extraPIDInfo) { ds.names = append(ds.names, &types.FieldName{ DBName: ds.DBName, TblName: ds.TableInfo().Name, - ColName: model.ExtraHandleName, - OrigColName: model.ExtraHandleName, + ColName: model.ExtraPartitionIdName, + OrigColName: model.ExtraPartitionIdName, }) ds.TblCols = append(ds.TblCols, pidCol) From df0078f59f412e063d266d51f5dacc4e9c85d420 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 27 Nov 2020 19:00:40 +0800 Subject: [PATCH 07/12] address comment --- executor/executor.go | 2 +- executor/partition_table_test.go | 4 ++-- planner/core/rule_column_pruning.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 8f785257c4d00..6c21f3ecb8ebb 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -890,7 +890,7 @@ type SelectLockExec struct { // instead of table ID to calculate the lock KV. In that case, partition ID is store as an // extra column in the chunk row. // tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join - // of multiple tables, so the map strcut is used. + // of multiple tables, so the map struct is used. tblID2PIDColumnIndex map[int64]int } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 5fbabc07e110d..50a1028710eb0 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -254,7 +254,7 @@ partition p2 values less than (11))`) tk.MustExec("set @@tidb_txn_mode = 'optimistic'") tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") tk.MustExec("begin") - tk.MustQuery("select id, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("6 5")) tk2.MustExec("update pt set c = c + 1 where k = 5") _, err := tk.Exec("commit") c.Assert(err, NotNil) @@ -306,7 +306,7 @@ partition p2 values less than (11))`) tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") tk.MustExec("begin") - tk.MustQuery("select id, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("10 5")) ch := make(chan int, 2) go func() { tk2.MustExec("update pt set c = c + 1 where k = 5") diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 7b02da4e4aa9e..eca7922ca400a 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -372,7 +372,7 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { if len(p.partitionedTable) > 0 { // If the children include partitioned tables, there is an extra partition ID column. - parentUsedCols = append(parentUsedCols, p.Columns...) + parentUsedCols = append(parentUsedCols, p.extraPIDInfo.Columns...) } for _, cols := range p.tblID2Handle { From df7f17c5adbc4dd2d1dd0ad3200ee6300196dba0 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 7 Jun 2021 23:20:34 +0800 Subject: [PATCH 08/12] rewrite to union for 'select lock on join' even in partition pruning mode --- executor/builder.go | 14 +++++++- executor/distsql.go | 18 +++++----- executor/partition_table_test.go | 44 +++++++++++++++--------- planner/core/physical_plans.go | 2 ++ planner/core/planbuilder.go | 4 +++ planner/core/rule_partition_processor.go | 4 +-- 6 files changed, 58 insertions(+), 28 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 5e403daab40dc..44bc99d663f68 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2676,7 +2676,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea storeType: v.StoreType, batchCop: v.BatchCop, } - if isPartition { + if tbl.Meta().Partition != nil { e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) } e.buildVirtualColumnInfo() @@ -2763,6 +2763,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() { return ret } + // When isPartition is set, it means the union rewriting is done, so a partition reader is prefered. + if ok, _ := ts.IsPartition(); ok { + return ret + } pi := ts.Table.GetPartitionInfo() if pi == nil { @@ -2978,6 +2982,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() { return ret } + // When isPartition is set, it means the union rewriting is done, so a partition reader is prefered. + if ok, _ := is.IsPartition(); ok { + return ret + } pi := is.Table.GetPartitionInfo() if pi == nil { @@ -3141,6 +3149,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo if is.Index.Global { return ret } + if ok, _ := is.IsPartition(); ok { + // Already pruned when translated to logical union. + return ret + } tmp, _ := b.is.TableByID(is.Table.ID) tbl := tmp.(table.PartitionedTable) diff --git a/executor/distsql.go b/executor/distsql.go index 11deed8d79bde..b27a62c320c71 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -641,15 +641,15 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup table = task.partitionTable } tableReaderExec := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), - table: table, - dagPB: e.tableRequest, - startTS: e.startTS, - columns: e.columns, - streaming: e.tableStreaming, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - corColInFilter: e.corColInTblSide, - plans: e.tblPlans, + baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()), + table: table, + dagPB: e.tableRequest, + startTS: e.startTS, + columns: e.columns, + streaming: e.tableStreaming, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + corColInFilter: e.corColInTblSide, + plans: e.tblPlans, extraPIDColumnIndex: e.extraPIDColumnIndex, } tableReaderExec.buildVirtualColumnInfo() diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index bbe8da6955526..c6a9f46be6d62 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -16,8 +16,8 @@ package executor_test import ( "fmt" "math/rand" - "time" "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -2595,13 +2595,9 @@ partition by range (id) ( partition p0 values less than (4), partition p1 values less than (7), partition p2 values less than (11))`) - tk.MustExec("insert into pt values (5, 5, 5)") - // TODO: Fix bug when @@tidb_partition_prune_mode is 'dynamic-only', pay special - // attention to index join as it supported in that mode. - tk.MustExec("set tidb_partition_prune_mode='static-only'") + tk2 := testkit.NewTestKit(c, s.store) tk2.MustExec("use test") - tk2.MustExec("set tidb_partition_prune_mode='static-only'") optimisticTableReader := func() { tk.MustExec("set @@tidb_txn_mode = 'optimistic'") @@ -2628,7 +2624,7 @@ partition p2 values less than (11))`) tk.MustExec("set @@tidb_txn_mode = 'optimistic'") tk2.MustExec("set @@tidb_txn_mode = 'optimistic'") tk.MustExec("begin") - tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("6 5")) + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) tk2.MustExec("update pt set c = c + 1 where k = 5") _, err := tk.Exec("commit") c.Assert(err, NotNil) @@ -2652,7 +2648,8 @@ partition p2 values less than (11))`) c.Assert(<-ch, Equals, 2) tk.MustExec("commit") - tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("8")) + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) } pessimisticIndexReader := func() { @@ -2673,14 +2670,15 @@ partition p2 values less than (11))`) c.Assert(<-ch, Equals, 2) tk.MustExec("commit") - tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("9")) + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) } pessimisticIndexLookUp := func() { tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") tk.MustExec("begin") - tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("10 5")) + tk.MustQuery("select c, k from pt use index (k) where k = 5 for update").Check(testkit.Rows("5 5")) ch := make(chan int, 2) go func() { tk2.MustExec("update pt set c = c + 1 where k = 5") @@ -2693,15 +2691,29 @@ partition p2 values less than (11))`) c.Assert(<-ch, Equals, 2) tk.MustExec("commit") - tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("10")) + <-ch + tk.MustQuery("select c from pt where k = 5").Check(testkit.Rows("6")) } - testCases := []func(){ - optimisticTableReader, optimisticIndexLookUp, optimisticIndexReader, - pessimisticTableReader, pessimisticIndexReader, pessimisticIndexLookUp, + partitionModes := []string{ + "'dynamic-only'", + "'static-only'", } - for _, c := range testCases { - c() + testCases := []func(){ + optimisticTableReader, + optimisticIndexLookUp, + optimisticIndexReader, + pessimisticTableReader, + pessimisticIndexReader, + pessimisticIndexLookUp, + } + + for _, mode := range partitionModes { + tk.MustExec("set @@tidb_partition_prune_mode=" + mode) + for i, c := range testCases { + tk.MustExec("replace into pt values (5, 5, 5)") + c() + } } } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index f7398962fcf47..0c06f1f448888 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -472,6 +472,8 @@ type PhysicalTableScan struct { IsGlobalRead bool // The table scan may be a partition, rather than a real table. + // TODO: clean up this field. After we support dynamic partitioning, table scan + // works on the whole partition table, and `isPartition` is not used. isPartition bool // KeepOrder is true, if sort data by scanning pkcol, KeepOrder bool diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 3599ad9cfec50..fabf89c02fc53 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1149,6 +1149,10 @@ func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) if err != nil { return nil, err } + // TODO: Dynamic partition mode does not support adding extra pid column to the data source. + // (Because one table reader can read from multiple partitions, which partition a chunk row comes from is unknown) + // So we have to use the old "rewrite to union" way here, set `flagPartitionProcessor` flag for that. + b.optFlag = b.optFlag | flagPartitionProcessor } return selectLock, nil } diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 17f8c5b2a88b1..39b1199bd60f9 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -14,11 +14,11 @@ package core import ( "context" - "errors" "fmt" "sort" "strings" + "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -297,7 +297,7 @@ func (s *partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types. }) continue } - return nil, fmt.Errorf("information of column %v is not found", colExpr.String()) + return nil, errors.Trace(fmt.Errorf("information of column %v is not found", colExpr.String())) } return names, nil } From b6cd4dc5f03de1f1a62f9c084114f0b00d3d64b6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 7 Jun 2021 23:30:48 +0800 Subject: [PATCH 09/12] make golint happy --- executor/partition_table_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index c6a9f46be6d62..57b92a195540a 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -2710,7 +2710,7 @@ partition p2 values less than (11))`) for _, mode := range partitionModes { tk.MustExec("set @@tidb_partition_prune_mode=" + mode) - for i, c := range testCases { + for _, c := range testCases { tk.MustExec("replace into pt values (5, 5, 5)") c() } From 2462d109c3658644fa613a47548c76ecc1e4f188 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 10 Jun 2021 11:29:00 +0800 Subject: [PATCH 10/12] address comment --- planner/core/logical_plans.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 909d9c058e3d4..c50d155520d8e 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1021,6 +1021,11 @@ type LogicalLimit struct { limitHints limitHintInfo } +// extraPIDInfo is used by SelectLock on partitioned table, the TableReader need +// to return the partition id column. +// Because SelectLock has to used that partition id to encode the lock key. +// the child of SelectLock may be Join, so that table can be multiple extra PID columns. +// fields are for each of the table, and TblIDs are the corresponding table IDs. type extraPIDInfo struct { Columns []*expression.Column TblIDs []int64 From 8f835047d4b8f9cecd20ead15e5d1cf5b92b09e6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 16 Jun 2021 20:01:57 +0800 Subject: [PATCH 11/12] avoid DATA RACE, fix CI --- executor/executor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index 3fefd7eaf84b9..fb89eeb670559 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7589,6 +7589,11 @@ func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) { tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + + // Set projection concurrency to avoid data race here. + // TODO: remove this line after fixing https://github.com/pingcap/tidb/issues/25496 + tk1.Se.GetSessionVars().Concurrency.SetProjectionConcurrency(0) + tk1.MustExec("begin pessimistic") tk1.MustExec("update t1 set c=c") tk2.MustExec("create table t2(a int)") From 1973b6daf2d97d5e495159d9794c7661dd99dfc1 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 16 Jun 2021 22:42:02 +0800 Subject: [PATCH 12/12] fix integration test --- planner/core/rule_partition_processor.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 0a0aa6b73db83..db48360c25c58 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -289,6 +289,15 @@ func (s *partitionProcessor) reconstructTableColNames(ds *DataSource) ([]*types. }) continue } + if colExpr.ID == model.ExtraPidColID { + names = append(names, &types.FieldName{ + DBName: ds.DBName, + TblName: ds.tableInfo.Name, + ColName: model.ExtraPartitionIdName, + OrigColName: model.ExtraPartitionIdName, + }) + continue + } if colInfo, found := colsInfoMap[colExpr.ID]; found { names = append(names, &types.FieldName{ DBName: ds.DBName,