Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix bug that pessimistic lock doesn't work on a partition table when index merge is used #31025

Closed
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3600,6 +3600,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
if tblInfo.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
return e, nil
}

Expand Down
24 changes: 14 additions & 10 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type IndexMergeReaderExecutor struct {
isCorColInPartialFilters []bool
isCorColInTableFilter bool
isCorColInPartialAccess []bool

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column.
extraPIDColumnIndex offsetOptional
}

// Open implements the Executor Open interface
Expand Down Expand Up @@ -595,16 +598,17 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co

func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (_ Executor, err error) {
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTablePlanRootID()),
table: tbl,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
streaming: e.tableStreaming,
columns: e.columns,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
plans: e.tblPlans,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTablePlanRootID()),
table: tbl,
dagPB: e.tableRequest,
startTS: e.startTS,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
streaming: e.tableStreaming,
columns: e.columns,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
}
if e.isCorColInTableFilter {
if tableReaderExec.dagPB.Executors, _, err = constructDistExec(e.ctx, e.tblPlans); err != nil {
Expand Down
45 changes: 45 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -454,3 +455,47 @@ func (test *testSerialSuite2) TestIndexMergeReaderMemTracker(c *C) {
c.Assert(err, IsNil)
c.Assert(bytes, Greater, 0.0)
}

func (test *testSerialSuite2) TestPessimisticLockOnPartitionForIndexMerge(c *C) {
// Same purpose with TestPessimisticLockOnPartition, but test IndexMergeReader.
tk := testkit.NewTestKit(c, test.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2;")
tk.MustExec(`create table t1 (c_datetime datetime, c1 int, c2 int, primary key (c_datetime), key(c1), key(c2))
partition by range (to_days(c_datetime)) (
partition p0 values less than (to_days('2020-02-01')),
partition p1 values less than (to_days('2020-04-01')),
partition p2 values less than (to_days('2020-06-01')),
partition p3 values less than maxvalue) ;`)
tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime));")
tk.MustExec("insert into t1 values ('2020-06-26 03:24:00', 1, 1), ('2020-02-21 07:15:33', 2, 2), ('2020-04-27 13:50:58', 3, 3);")
tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18');")

tk1 := testkit.NewTestKit(c, test.store)
tk1.MustExec("use test")

tk.MustExec("begin pessimistic")
tk.MustQuery(`select /*+ use_index_merge(t1) */ c1 from t1 join t2
on t1.c_datetime >= t2.c_datetime
where t1.c1 < 10 or t1.c2 < 10 for update;`).Sort().Check(testkit.Rows("1", "1", "1", "2", "2", "3", "3"))
tk1.MustExec("begin pessimistic")

ch := make(chan int32, 5)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
go func() {
tk1.MustExec("update t1 set c_datetime = '2020-06-26 03:24:00' where c1 = 1;")
ch <- 0
tk1.MustExec("commit")
ch <- 0
}()

// Leave 50ms for tk1 to run, tk1 should be blocked at the update operation.
time.Sleep(50 * time.Millisecond)
ch <- 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, check the order.
c.Assert(<-ch, Equals, int32(1))
c.Assert(<-ch, Equals, int32(0))
<-ch // wait for goroutine to quit.
}