From b3469e7964f07e11ddd53bad64d55d996c33d793 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 3 Mar 2020 15:06:45 +0800 Subject: [PATCH] *: fix a bug that the pessimistic lock doesn't work on a partition (#14921) physicalID should be used if 'select for update' works on a partitioned table. --- executor/builder.go | 7 +-- executor/executor.go | 39 ++++++++++++--- executor/write.go | 14 +++++- planner/core/exhaust_physical_plans.go | 5 +- planner/core/logical_plan_builder.go | 1 + planner/core/logical_plans.go | 5 +- planner/core/physical_plans.go | 4 +- planner/core/planbuilder.go | 8 +++- planner/core/rule_column_pruning.go | 6 +++ session/session_test.go | 66 ++++++++++++++++++++++++++ table/table.go | 2 +- table/tables/partition.go | 2 +- 12 files changed, 139 insertions(+), 20 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 25db3e0e3002c..777df5d3f2d90 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -579,9 +579,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor return src } e := &SelectLockExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), - Lock: v.Lock, - tblID2Handle: v.TblID2Handle, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), + Lock: v.Lock, + tblID2Handle: v.TblID2Handle, + partitionedTable: v.PartitionedTable, } return e } diff --git a/executor/executor.go b/executor/executor.go index a6f3b8e0bb8fa..71310280f7095 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -807,7 +807,11 @@ type SelectLockExec struct { Lock ast.SelectLockType keys []kv.Key - tblID2Handle map[int64][]*expression.Column + tblID2Handle map[int64][]*expression.Column + partitionedTable []table.PartitionedTable + + // tblID2Table is cached to reduce cost. + tblID2Table map[int64]table.PartitionedTable } // Open implements the Executor Open interface. @@ -821,6 +825,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error { // This operation is only for schema validator check. txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) } + + if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { + e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) + for id := range e.tblID2Handle { + for _, p := range e.partitionedTable { + if id == p.Meta().ID { + e.tblID2Table[id] = p + } + } + } + } + return nil } @@ -835,12 +851,23 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) { return nil } - if req.NumRows() != 0 { + + if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) - for id, cols := range e.tblID2Handle { - for _, col := range cols { - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index))) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + for id, cols := range e.tblID2Handle { + physicalID := id + if pt, ok := e.tblID2Table[id]; ok { + // On a partitioned table, we have to use physical ID to encode the lock key! + p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes)) + if err != nil { + return err + } + physicalID = p.GetPhysicalID() + } + + for _, col := range cols { + e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index))) } } } diff --git a/executor/write.go b/executor/write.go index f5dc79e8f2afa..8575794d8d7ac 100644 --- a/executor/write.go +++ b/executor/write.go @@ -15,13 +15,13 @@ package executor import ( "context" - "github.com/pingcap/tidb/meta/autoid" "strings" "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -135,7 +135,17 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 { sc.AddAffectedRows(1) } - unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h) + + physicalID := t.Meta().ID + if pt, ok := t.(table.PartitionedTable); ok { + p, err := pt.GetPartitionByRow(sctx, oldData) + if err != nil { + return false, false, 0, err + } + physicalID = p.GetPhysicalID() + } + + unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h) txnCtx := sctx.GetSessionVars().TxnCtx if txnCtx.IsPessimistic { txnCtx.AddUnchangedRowKey(unchangedRowKey) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 3fca7ebd2407b..80a4bc441eae8 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1703,8 +1703,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan { childProp := prop.Clone() lock := PhysicalLock{ - Lock: p.Lock, - TblID2Handle: p.tblID2Handle, + Lock: p.Lock, + TblID2Handle: p.tblID2Handle, + PartitionedTable: p.partitionedTable, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) return []PhysicalPlan{lock} } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index e26cc76d6da90..9500ea8759d1b 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2565,6 +2565,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if tableInfo.GetPartitionInfo() != nil { b.optFlag = b.optFlag | flagPartitionProcessor + b.partitionedTable = append(b.partitionedTable, tbl.(table.PartitionedTable)) // check partition by name. for _, name := range tn.PartitionNames { _, err = tables.FindPartitionByName(tableInfo, name.L) diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 691768e3555b4..eb4299027b02f 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -878,8 +878,9 @@ type LogicalLimit struct { type LogicalLock struct { baseLogicalPlan - Lock ast.SelectLockType - tblID2Handle map[int64][]*expression.Column + Lock ast.SelectLockType + tblID2Handle map[int64][]*expression.Column + partitionedTable []table.PartitionedTable } // WindowFrame represents a window function frame. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 099cf18a4551a..c77713f103b35 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/ranger" ) @@ -416,7 +417,8 @@ type PhysicalLock struct { Lock ast.SelectLockType - TblID2Handle map[int64][]*expression.Column + TblID2Handle map[int64][]*expression.Column + PartitionedTable []table.PartitionedTable } // PhysicalLimit is the physical operator of Limit. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 4ff9211cf2ad2..fc8b4d51e23c9 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -281,6 +281,9 @@ type PlanBuilder struct { hintProcessor *BlockHintProcessor // selectOffset is the offsets of current processing select stmts. selectOffset []int + + // SelectLock need this information to locate the lock on partitions. + partitionedTable []table.PartitionedTable } type handleColHelper struct { @@ -801,8 +804,9 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock { selectLock := LogicalLock{ - Lock: lock, - tblID2Handle: b.handleHelper.tailMap(), + Lock: lock, + tblID2Handle: b.handleHelper.tailMap(), + partitionedTable: b.partitionedTable, }.Init(b.ctx) selectLock.SetChildren(src) return selectLock diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 8afa2424881b4..c91ca8ce4482b 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -364,6 +364,12 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error { return p.baseLogicalPlan.PruneColumns(parentUsedCols) } + if len(p.partitionedTable) > 0 { + // If the children include partitioned tables, do not prune columns. + // Because the executor needs the partitioned columns to calculate the lock key. + return p.children[0].PruneColumns(p.Schema().Columns) + } + for _, cols := range p.tblID2Handle { parentUsedCols = append(parentUsedCols, cols...) } diff --git a/session/session_test.go b/session/session_test.go index 1255f03a79196..d1439556e6ab2 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3022,3 +3022,69 @@ func (s *testSessionSuite2) TestStmtHints(c *C) { c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) } + +func (s *testSessionSuite2) TestPessimisticLockOnPartition(c *C) { + // This test checks that 'select ... for update' locks the partition instead of the table. + // Cover a bug that table ID is used to encode the lock key mistakenly. + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec(`create table if not exists forupdate_on_partition ( + age int not null primary key, + nickname varchar(20) not null, + gender int not null default 0, + first_name varchar(30) not null default '', + last_name varchar(20) not null default '', + full_name varchar(60) as (concat(first_name, ' ', last_name)), + index idx_nickname (nickname) +) partition by range (age) ( + partition child values less than (18), + partition young values less than (30), + partition middle values less than (50), + partition old values less than (123) +);`) + tk.MustExec("insert into forupdate_on_partition (`age`, `nickname`) values (25, 'cosven');") + + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustExec("use test") + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from forupdate_on_partition where age=25 for update").Check(testkit.Rows("25 cosven 0 ")) + tk1.MustExec("begin pessimistic") + + ch := make(chan int32, 5) + go func() { + tk1.MustExec("update forupdate_on_partition set first_name='sw' where age=25") + ch <- 0 + tk1.MustExec("commit") + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + c.Assert(<-ch, Equals, int32(1)) + c.Assert(<-ch, Equals, int32(0)) + + // Once again... + // This time, test for the update-update conflict. + tk.MustExec("begin pessimistic") + tk.MustExec("update forupdate_on_partition set first_name='sw' where age=25") + tk1.MustExec("begin pessimistic") + + go func() { + tk1.MustExec("update forupdate_on_partition set first_name = 'xxx' where age=25") + ch <- 0 + tk1.MustExec("commit") + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + c.Assert(<-ch, Equals, int32(1)) + c.Assert(<-ch, Equals, int32(0)) +} diff --git a/table/table.go b/table/table.go index 9e5ebfb0e337a..89551ad9880fb 100644 --- a/table/table.go +++ b/table/table.go @@ -265,7 +265,7 @@ type PhysicalTable interface { type PartitionedTable interface { Table GetPartition(physicalID int64) PhysicalTable - GetPartitionByRow(sessionctx.Context, []types.Datum) (Table, error) + GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error) } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/partition.go b/table/tables/partition.go index eb436a3fb160c..6be609365b19a 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -303,7 +303,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { } // GetPartitionByRow returns a Table, which is actually a Partition. -func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.Table, error) { +func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r) if err != nil { return nil, errors.Trace(err)