Skip to content

Commit

Permalink
*: fix a bug that the pessimistic lock doesn't work on a partition (#…
Browse files Browse the repository at this point in the history
…14921)

physicalID should be used if 'select for update' works on a
partitioned table.
  • Loading branch information
tiancaiamao authored Mar 3, 2020
1 parent e1597a6 commit b3469e7
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 20 deletions.
7 changes: 4 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
return e
}
Expand Down
39 changes: 33 additions & 6 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,11 @@ type SelectLockExec struct {
Lock ast.SelectLockType
keys []kv.Key

tblID2Handle map[int64][]*expression.Column
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
}

// Open implements the Executor Open interface.
Expand All @@ -821,6 +825,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// This operation is only for schema validator check.
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
}

Expand All @@ -835,12 +851,23 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {

if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for id, cols := range e.tblID2Handle {
for _, col := range cols {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
}

for _, col := range cols {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index)))
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ package executor

import (
"context"
"github.com/pingcap/tidb/meta/autoid"
"strings"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -135,7 +135,17 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(t.Meta().ID, h)

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, false, 0, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1703,8 +1703,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
childProp := prop.Clone()
lock := PhysicalLock{
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
PartitionedTable: p.partitionedTable,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
return []PhysicalPlan{lock}
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,6 +2565,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

if tableInfo.GetPartitionInfo() != nil {
b.optFlag = b.optFlag | flagPartitionProcessor
b.partitionedTable = append(b.partitionedTable, tbl.(table.PartitionedTable))
// check partition by name.
for _, name := range tn.PartitionNames {
_, err = tables.FindPartitionByName(tableInfo, name.L)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,9 @@ type LogicalLimit struct {
type LogicalLock struct {
baseLogicalPlan

Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable
}

// WindowFrame represents a window function frame.
Expand Down
4 changes: 3 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/ranger"
)
Expand Down Expand Up @@ -416,7 +417,8 @@ type PhysicalLock struct {

Lock ast.SelectLockType

TblID2Handle map[int64][]*expression.Column
TblID2Handle map[int64][]*expression.Column
PartitionedTable []table.PartitionedTable
}

// PhysicalLimit is the physical operator of Limit.
Expand Down
8 changes: 6 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ type PlanBuilder struct {
hintProcessor *BlockHintProcessor
// selectOffset is the offsets of current processing select stmts.
selectOffset []int

// SelectLock need this information to locate the lock on partitions.
partitionedTable []table.PartitionedTable
}

type handleColHelper struct {
Expand Down Expand Up @@ -801,8 +804,9 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T

func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock {
selectLock := LogicalLock{
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
partitionedTable: b.partitionedTable,
}.Init(b.ctx)
selectLock.SetChildren(src)
return selectLock
Expand Down
6 changes: 6 additions & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

if len(p.partitionedTable) > 0 {
// If the children include partitioned tables, do not prune columns.
// Because the executor needs the partitioned columns to calculate the lock key.
return p.children[0].PruneColumns(p.Schema().Columns)
}

for _, cols := range p.tblID2Handle {
parentUsedCols = append(parentUsedCols, cols...)
}
Expand Down
66 changes: 66 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3022,3 +3022,69 @@ func (s *testSessionSuite2) TestStmtHints(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}

func (s *testSessionSuite2) TestPessimisticLockOnPartition(c *C) {
// This test checks that 'select ... for update' locks the partition instead of the table.
// Cover a bug that table ID is used to encode the lock key mistakenly.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table if not exists forupdate_on_partition (
age int not null primary key,
nickname varchar(20) not null,
gender int not null default 0,
first_name varchar(30) not null default '',
last_name varchar(20) not null default '',
full_name varchar(60) as (concat(first_name, ' ', last_name)),
index idx_nickname (nickname)
) partition by range (age) (
partition child values less than (18),
partition young values less than (30),
partition middle values less than (50),
partition old values less than (123)
);`)
tk.MustExec("insert into forupdate_on_partition (`age`, `nickname`) values (25, 'cosven');")

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from forupdate_on_partition where age=25 for update").Check(testkit.Rows("25 cosven 0 "))
tk1.MustExec("begin pessimistic")

ch := make(chan int32, 5)
go func() {
tk1.MustExec("update forupdate_on_partition set first_name='sw' where age=25")
ch <- 0
tk1.MustExec("commit")
}()

// Leave 50ms for tk1 to run, tk1 should be blocked at the update operation.
time.Sleep(50 * time.Millisecond)
ch <- 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, check the order.
c.Assert(<-ch, Equals, int32(1))
c.Assert(<-ch, Equals, int32(0))

// Once again...
// This time, test for the update-update conflict.
tk.MustExec("begin pessimistic")
tk.MustExec("update forupdate_on_partition set first_name='sw' where age=25")
tk1.MustExec("begin pessimistic")

go func() {
tk1.MustExec("update forupdate_on_partition set first_name = 'xxx' where age=25")
ch <- 0
tk1.MustExec("commit")
}()

// Leave 50ms for tk1 to run, tk1 should be blocked at the update operation.
time.Sleep(50 * time.Millisecond)
ch <- 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, check the order.
c.Assert(<-ch, Equals, int32(1))
c.Assert(<-ch, Equals, int32(0))
}
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ type PhysicalTable interface {
type PartitionedTable interface {
Table
GetPartition(physicalID int64) PhysicalTable
GetPartitionByRow(sessionctx.Context, []types.Datum) (Table, error)
GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error)
}

// TableFromMeta builds a table.Table from *model.TableInfo.
Expand Down
2 changes: 1 addition & 1 deletion table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
}

// GetPartitionByRow returns a Table, which is actually a Partition.
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.Table, error) {
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) {
pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r)
if err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit b3469e7

Please sign in to comment.