From 0ffa6e541fbb17bc64d022e2735762392854865e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 26 Dec 2021 16:19:16 +0800 Subject: [PATCH 1/4] fix bug that pessimistic lock doesn't work on a partition table when index merge is used Signed-off-by: guo-shaoge --- executor/builder.go | 3 ++ executor/index_merge_reader.go | 24 ++++++++------- executor/index_merge_reader_test.go | 46 +++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 10 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 78c94fd02f1aa..1751bc1e9e601 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3586,6 +3586,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } collectTable := false e.tableRequest.CollectRangeCounts = &collectTable + if tblInfo.Meta().Partition != nil { + e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema()) + } return e, nil } diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 7fe89de2e3c36..20fe2806ffaef 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -110,6 +110,9 @@ type IndexMergeReaderExecutor struct { handleCols plannercore.HandleCols stats *IndexMergeRuntimeStat + + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. + extraPIDColumnIndex offsetOptional } // Open implements the Executor Open interface @@ -543,16 +546,17 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (Executor, error) { tableReaderExec := &TableReaderExecutor{ - baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTablePlanRootID()), - table: tbl, - dagPB: e.tableRequest, - startTS: e.startTS, - readReplicaScope: e.readReplicaScope, - isStaleness: e.isStaleness, - streaming: e.tableStreaming, - columns: e.columns, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - plans: e.tblPlans, + baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTablePlanRootID()), + table: tbl, + dagPB: e.tableRequest, + startTS: e.startTS, + readReplicaScope: e.readReplicaScope, + isStaleness: e.isStaleness, + streaming: e.tableStreaming, + columns: e.columns, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + plans: e.tblPlans, + extraPIDColumnIndex: e.extraPIDColumnIndex, } tableReaderExec.buildVirtualColumnInfo() tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, false) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 5c91ffd435273..0364aba02b8e0 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -20,6 +20,7 @@ import ( "regexp" "strconv" "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/util" @@ -454,3 +455,48 @@ func (test *testSerialSuite2) TestIndexMergeReaderMemTracker(c *C) { c.Assert(err, IsNil) c.Assert(bytes, Greater, 0.0) } + +func (s *testSerialSuite2) TestPessimisticLockOnPartitionForIndexMerge(c *C) { + // Same purpose with TestPessimisticLockOnPartition, but test IndexMergeReader. + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec(`create table t1 (c_datetime datetime, c1 int, c2 int, primary key (c_datetime), key(c1), key(c2)) + partition by range (to_days(c_datetime)) ( + partition p0 values less than (to_days('2020-02-01')), + partition p1 values less than (to_days('2020-04-01')), + partition p2 values less than (to_days('2020-06-01')), + partition p3 values less than maxvalue) ;`) + tk.MustExec("create table t2 (c_datetime datetime, unique key(c_datetime));") + tk.MustExec("insert into t1 values ('2020-06-26 03:24:00', 1, 1), ('2020-02-21 07:15:33', 2, 2), ('2020-04-27 13:50:58', 3, 3);") + tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18');") + + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustExec("use test") + + tk.MustExec("begin pessimistic") + tk.MustQuery(`select /*+ use_index_merge(t1) */ c1 from t1 join t2 + on t1.c_datetime >= t2.c_datetime + where t1.c1 < 10 or t1.c2 < 10 for update;`).Sort().Check(testkit.Rows("1", "1", "1", "2", "2", "3", "3")) + tk1.MustExec("begin pessimistic") + + ch := make(chan int32, 5) + go func() { + tk1.MustExec("update t1 set c_datetime = '2020-06-26 03:24:00' where c1 = 1;") + ch <- 0 + tk1.MustExec("commit") + ch <- 0 + }() + + // Leave 50ms for tk1 to run, tk1 should be blocked at the update operation. + time.Sleep(50 * time.Millisecond) + ch <- 1 + + tk.MustExec("commit") + // tk1 should be blocked until tk commit, check the order. + c.Assert(<-ch, Equals, int32(1)) + c.Assert(<-ch, Equals, int32(0)) + <-ch // wait for goroutine to quit. +} From 9f615e85d8e0ac171e39e0d320fb3cf09ee00bd8 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 26 Dec 2021 16:31:39 +0800 Subject: [PATCH 2/4] fix lint Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 0364aba02b8e0..bdc282743ca6a 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -456,10 +456,9 @@ func (test *testSerialSuite2) TestIndexMergeReaderMemTracker(c *C) { c.Assert(bytes, Greater, 0.0) } -func (s *testSerialSuite2) TestPessimisticLockOnPartitionForIndexMerge(c *C) { +func (test *testSerialSuite2) TestPessimisticLockOnPartitionForIndexMerge(c *C) { // Same purpose with TestPessimisticLockOnPartition, but test IndexMergeReader. - - tk := testkit.NewTestKit(c, s.store) + tk := testkit.NewTestKit(c, test.store) tk.MustExec("use test") tk.MustExec("drop table if exists t1, t2;") @@ -473,7 +472,7 @@ func (s *testSerialSuite2) TestPessimisticLockOnPartitionForIndexMerge(c *C) { tk.MustExec("insert into t1 values ('2020-06-26 03:24:00', 1, 1), ('2020-02-21 07:15:33', 2, 2), ('2020-04-27 13:50:58', 3, 3);") tk.MustExec("insert into t2 values ('2020-01-10 09:36:00'), ('2020-02-04 06:00:00'), ('2020-06-12 03:45:18');") - tk1 := testkit.NewTestKit(c, s.store) + tk1 := testkit.NewTestKit(c, test.store) tk1.MustExec("use test") tk.MustExec("begin pessimistic") From aefb04488c8bc6f19a0a8ca823b05a6b673d2b6b Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 26 Dec 2021 16:36:44 +0800 Subject: [PATCH 3/4] executor: fix bug that pessimistic lock doesn't work on a partition table when index merge is used Signed-off-by: guo-shaoge From ee7d6e774c2a99f965e68f3af2b79fef07355974 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 28 Dec 2021 16:47:56 +0800 Subject: [PATCH 4/4] fix fmt Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 0f104c328528d..841469e19e949 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -117,7 +117,7 @@ type IndexMergeReaderExecutor struct { isCorColInTableFilter bool isCorColInPartialAccess []bool - // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. + // extraPIDColumnIndex is used for partition reader to add an extra partition ID column. extraPIDColumnIndex offsetOptional }