diff --git a/executor/builder.go b/executor/builder.go index 6d30b3ec2e8cf..47859de655c45 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -575,9 +575,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor return src } e := &SelectLockExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), - Lock: v.Lock, - tblID2Handle: v.TblID2Handle, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), + Lock: v.Lock, + tblID2Handle: v.TblID2Handle, + partitionedTable: v.PartitionedTable, } return e } diff --git a/executor/executor.go b/executor/executor.go index 9e094af097c01..66780f1ba406e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -817,7 +817,11 @@ type SelectLockExec struct { Lock ast.SelectLockType keys []kv.Key - tblID2Handle map[int64][]*expression.Column + tblID2Handle map[int64][]*expression.Column + partitionedTable []table.PartitionedTable + + // tblID2Table is cached to reduce cost. + tblID2Table map[int64]table.PartitionedTable } // Open implements the Executor Open interface. @@ -831,6 +835,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error { // This operation is only for schema validator check. txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) } + + if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { + e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) + for id := range e.tblID2Handle { + for _, p := range e.partitionedTable { + if id == p.Meta().ID { + e.tblID2Table[id] = p + } + } + } + } + return nil } @@ -845,12 +861,23 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) { return nil } - if req.NumRows() != 0 { + + if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) - for id, cols := range e.tblID2Handle { - for _, col := range cols { - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index))) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + for id, cols := range e.tblID2Handle { + physicalID := id + if pt, ok := e.tblID2Table[id]; ok { + // On a partitioned table, we have to use physical ID to encode the lock key! + p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes)) + if err != nil { + return err + } + physicalID = p.GetPhysicalID() + } + + for _, col := range cols { + e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index))) } } } diff --git a/executor/write.go b/executor/write.go index f5dc79e8f2afa..8575794d8d7ac 100644 --- a/executor/write.go +++ b/executor/write.go @@ -15,13 +15,13 @@ package executor import ( "context" - "github.com/pingcap/tidb/meta/autoid" "strings" "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -135,7 +135,17 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 { sc.AddAffectedRows(1) } - unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h) + + physicalID := t.Meta().ID + if pt, ok := t.(table.PartitionedTable); ok { + p, err := pt.GetPartitionByRow(sctx, oldData) + if err != nil { + return false, false, 0, err + } + physicalID = p.GetPhysicalID() + } + + unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h) txnCtx := sctx.GetSessionVars().TxnCtx if txnCtx.IsPessimistic { txnCtx.AddUnchangedRowKey(unchangedRowKey) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 24273bb96806d..8a175d0e1531c 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1699,8 +1699,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan { childProp := prop.Clone() lock := PhysicalLock{ - Lock: p.Lock, - TblID2Handle: p.tblID2Handle, + Lock: p.Lock, + TblID2Handle: p.tblID2Handle, + PartitionedTable: p.partitionedTable, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) return []PhysicalPlan{lock} } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 03efedf93f107..ff67ec1c5c4ef 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2565,6 +2565,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if tableInfo.GetPartitionInfo() != nil { b.optFlag = b.optFlag | flagPartitionProcessor + b.partitionedTable = append(b.partitionedTable, tbl.(table.PartitionedTable)) // check partition by name. for _, name := range tn.PartitionNames { _, err = tables.FindPartitionByName(tableInfo, name.L) diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 691768e3555b4..eb4299027b02f 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -878,8 +878,9 @@ type LogicalLimit struct { type LogicalLock struct { baseLogicalPlan - Lock ast.SelectLockType - tblID2Handle map[int64][]*expression.Column + Lock ast.SelectLockType + tblID2Handle map[int64][]*expression.Column + partitionedTable []table.PartitionedTable } // WindowFrame represents a window function frame. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 8e7f853af51f5..82801e791ec8d 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/ranger" ) @@ -409,7 +410,8 @@ type PhysicalLock struct { Lock ast.SelectLockType - TblID2Handle map[int64][]*expression.Column + TblID2Handle map[int64][]*expression.Column + PartitionedTable []table.PartitionedTable } // PhysicalLimit is the physical operator of Limit. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 85f63016b4447..77cd898510052 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -279,6 +279,9 @@ type PlanBuilder struct { hintProcessor *BlockHintProcessor // selectOffset is the offsets of current processing select stmts. selectOffset []int + + // SelectLock need this information to locate the lock on partitions. + partitionedTable []table.PartitionedTable } type handleColHelper struct { @@ -795,8 +798,9 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock { selectLock := LogicalLock{ - Lock: lock, - tblID2Handle: b.handleHelper.tailMap(), + Lock: lock, + tblID2Handle: b.handleHelper.tailMap(), + partitionedTable: b.partitionedTable, }.Init(b.ctx) selectLock.SetChildren(src) return selectLock diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 03acf725e5751..0185752f8b6f5 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -336,6 +336,12 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { return p.baseLogicalPlan.PruneColumns(parentUsedCols) } + if len(p.partitionedTable) > 0 { + // If the children include partitioned tables, do not prune columns. + // Because the executor needs the partitioned columns to calculate the lock key. + return p.children[0].PruneColumns(p.Schema().Columns) + } + for _, cols := range p.tblID2Handle { parentUsedCols = append(parentUsedCols, cols...) } diff --git a/session/session_test.go b/session/session_test.go index 2544631541fdf..f5b498c548f28 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3024,3 +3024,69 @@ func (s *testSessionSuite2) TestStmtHints(c *C) { c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) } + +func (s *testSessionSuite2) TestPessimisticLockOnPartition(c *C) { + // This test checks that 'select ... for update' locks the partition instead of the table. + // Cover a bug that table ID is used to encode the lock key mistakenly. + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec(`create table if not exists forupdate_on_partition ( + age int not null primary key, + nickname varchar(20) not null, + gender int not null default 0, + first_name varchar(30) not null default '', + last_name varchar(20) not null default '', + full_name varchar(60) as (concat(first_name, ' ', last_name)), + index idx_nickname (nickname) +) partition by range (age) ( + partition child values less than (18), + partition young values less than (30), + partition middle values less than (50), + partition old values less than (123) +);`) + tk.MustExec("insert into forupdate_on_partition (`age`, `nickname`) values (25, 'cosven');") + + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustExec("use test") + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from forupdate_on_partition where age=25 for update").Check(testkit.Rows("25 cosven 0 ")) + tk1.MustExec("begin pessimistic") + + ch := make(chan int32, 5) + go func() { + tk1.MustExec("update forupdate_on_partition set first_name='sw' where age=25") + ch <- 0 + tk1.MustExec("commit") + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + c.Assert(<-ch, Equals, int32(1)) + c.Assert(<-ch, Equals, int32(0)) + + // Once again... + // This time, test for the update-update conflict. + tk.MustExec("begin pessimistic") + tk.MustExec("update forupdate_on_partition set first_name='sw' where age=25") + tk1.MustExec("begin pessimistic") + + go func() { + tk1.MustExec("update forupdate_on_partition set first_name = 'xxx' where age=25") + ch <- 0 + tk1.MustExec("commit") + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + c.Assert(<-ch, Equals, int32(1)) + c.Assert(<-ch, Equals, int32(0)) +} diff --git a/table/table.go b/table/table.go index b33b7a336b4f0..25da187dc03a6 100644 --- a/table/table.go +++ b/table/table.go @@ -263,7 +263,7 @@ type PhysicalTable interface { type PartitionedTable interface { Table GetPartition(physicalID int64) PhysicalTable - GetPartitionByRow(sessionctx.Context, []types.Datum) (Table, error) + GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error) } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/partition.go b/table/tables/partition.go index 8bc999bfb1e5e..a72a7117e91a3 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -340,7 +340,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { } // GetPartitionByRow returns a Table, which is actually a Partition. -func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.Table, error) { +func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) if err != nil { return nil, errors.Trace(err)