Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix a bug that the pessimistic lock doesn't work on a partition (#14921) #15086

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
return e
}
Expand Down
39 changes: 33 additions & 6 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,11 @@ type SelectLockExec struct {
Lock ast.SelectLockType
keys []kv.Key

tblID2Handle map[int64][]*expression.Column
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
}

// Open implements the Executor Open interface.
Expand All @@ -831,6 +835,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// This operation is only for schema validator check.
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
}

Expand All @@ -845,12 +861,23 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {

if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for id, cols := range e.tblID2Handle {
for _, col := range cols {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
}

for _, col := range cols {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index)))
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ package executor

import (
"context"
"github.com/pingcap/tidb/meta/autoid"
"strings"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -135,7 +135,17 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h)

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, false, 0, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,8 +1699,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
childProp := prop.Clone()
lock := PhysicalLock{
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
PartitionedTable: p.partitionedTable,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
return []PhysicalPlan{lock}
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,6 +2565,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

if tableInfo.GetPartitionInfo() != nil {
b.optFlag = b.optFlag | flagPartitionProcessor
b.partitionedTable = append(b.partitionedTable, tbl.(table.PartitionedTable))
// check partition by name.
for _, name := range tn.PartitionNames {
_, err = tables.FindPartitionByName(tableInfo, name.L)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,9 @@ type LogicalLimit struct {
type LogicalLock struct {
baseLogicalPlan

Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable
}

// WindowFrame represents a window function frame.
Expand Down
4 changes: 3 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/ranger"
)
Expand Down Expand Up @@ -409,7 +410,8 @@ type PhysicalLock struct {

Lock ast.SelectLockType

TblID2Handle map[int64][]*expression.Column
TblID2Handle map[int64][]*expression.Column
PartitionedTable []table.PartitionedTable
}

// PhysicalLimit is the physical operator of Limit.
Expand Down
8 changes: 6 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ type PlanBuilder struct {
hintProcessor *BlockHintProcessor
// selectOffset is the offsets of current processing select stmts.
selectOffset []int

// SelectLock need this information to locate the lock on partitions.
partitionedTable []table.PartitionedTable
}

type handleColHelper struct {
Expand Down Expand Up @@ -795,8 +798,9 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T

func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock {
selectLock := LogicalLock{
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
partitionedTable: b.partitionedTable,
}.Init(b.ctx)
selectLock.SetChildren(src)
return selectLock
Expand Down
6 changes: 6 additions & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

if len(p.partitionedTable) > 0 {
// If the children include partitioned tables, do not prune columns.
// Because the executor needs the partitioned columns to calculate the lock key.
return p.children[0].PruneColumns(p.Schema().Columns)
}

for _, cols := range p.tblID2Handle {
parentUsedCols = append(parentUsedCols, cols...)
}
Expand Down
66 changes: 66 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3024,3 +3024,69 @@ func (s *testSessionSuite2) TestStmtHints(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}

func (s *testSessionSuite2) TestPessimisticLockOnPartition(c *C) {
// This test checks that 'select ... for update' locks the partition instead of the table.
// Cover a bug that table ID is used to encode the lock key mistakenly.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table if not exists forupdate_on_partition (
age int not null primary key,
nickname varchar(20) not null,
gender int not null default 0,
first_name varchar(30) not null default '',
last_name varchar(20) not null default '',
full_name varchar(60) as (concat(first_name, ' ', last_name)),
index idx_nickname (nickname)
) partition by range (age) (
partition child values less than (18),
partition young values less than (30),
partition middle values less than (50),
partition old values less than (123)
);`)
tk.MustExec("insert into forupdate_on_partition (`age`, `nickname`) values (25, 'cosven');")

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from forupdate_on_partition where age=25 for update").Check(testkit.Rows("25 cosven 0 "))
tk1.MustExec("begin pessimistic")

ch := make(chan int32, 5)
go func() {
tk1.MustExec("update forupdate_on_partition set first_name='sw' where age=25")
ch <- 0
tk1.MustExec("commit")
}()

// Leave 50ms for tk1 to run, tk1 should be blocked at the update operation.
time.Sleep(50 * time.Millisecond)
ch <- 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, check the order.
c.Assert(<-ch, Equals, int32(1))
c.Assert(<-ch, Equals, int32(0))

// Once again...
// This time, test for the update-update conflict.
tk.MustExec("begin pessimistic")
tk.MustExec("update forupdate_on_partition set first_name='sw' where age=25")
tk1.MustExec("begin pessimistic")

go func() {
tk1.MustExec("update forupdate_on_partition set first_name = 'xxx' where age=25")
ch <- 0
tk1.MustExec("commit")
}()

// Leave 50ms for tk1 to run, tk1 should be blocked at the update operation.
time.Sleep(50 * time.Millisecond)
ch <- 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, check the order.
c.Assert(<-ch, Equals, int32(1))
c.Assert(<-ch, Equals, int32(0))
}
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type PhysicalTable interface {
type PartitionedTable interface {
Table
GetPartition(physicalID int64) PhysicalTable
GetPartitionByRow(sessionctx.Context, []types.Datum) (Table, error)
GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error)
}

// TableFromMeta builds a table.Table from *model.TableInfo.
Expand Down
2 changes: 1 addition & 1 deletion table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
}

// GetPartitionByRow returns a Table, which is actually a Partition.
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.Table, error) {
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) {
pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r)
if err != nil {
return nil, errors.Trace(err)
Expand Down