diff --git a/executor/builder.go b/executor/builder.go index cd4a12f9c1c2f..e0a02014371f1 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1047,6 +1047,8 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.columns = x.columns us.table = x.table us.virtualColumnIndex = x.virtualColumnIndex + us.extraPIDColumn.colIdx = x.extraPIDColumnIndex + us.extraPIDColumn.partitionID = getPhysicalTableID(x.table) case *IndexReaderExecutor: us.desc = x.desc for _, ic := range x.index.Columns { diff --git a/executor/executor.go b/executor/executor.go index 3a1fa3beb9c48..b4031eb58881f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -931,6 +931,15 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { // The partition ID is returned as an extra column from the table reader. if offset, ok := e.tblID2PIDColumnIndex[id]; ok { physicalID = row.GetInt64(offset) + + if physicalID == 0 { + // Left join will fill null for the rows that don't match. + // In that case, physicalID is 0, the partition doesn't need to be locked. + // For example: + // select * from t1 left join t2 on t1.col = t2.col for update + // t2 is a empty partition table, t1 is not empty. + continue + } } } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 6c61797db5a3d..b2a67e3da0bbb 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -3089,3 +3089,83 @@ func (s *partitionTableSuite) TestIssue26251(c *C) { c.Fail() } } + +func (s *partitionTableSuite) TestLeftJoinForUpdate(c *C) { + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustExec("use test") + tk2 := testkit.NewTestKit(c, s.store) + tk2.MustExec("use test") + tk3 := testkit.NewTestKit(c, s.store) + tk3.MustExec("use test") + + tk1.MustExec("drop table if exists nt, pt") + tk1.MustExec("create table nt (id int, col varchar(32), primary key (id))") + tk1.MustExec("create table pt (id int, col varchar(32), primary key (id)) partition by hash(id) partitions 4") + + resetData := func() { + tk1.MustExec("truncate table nt") + tk1.MustExec("truncate table pt") + tk1.MustExec("insert into nt values (1, 'hello')") + tk1.MustExec("insert into pt values (2, 'test')") + } + + // ========================== First round of test ================== + // partition table left join normal table. + // ================================================================= + resetData() + ch := make(chan int, 10) + tk1.MustExec("begin pessimistic") + // No union scan + tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Check(testkit.Rows("2 test ")) + go func() { + // Check the key is locked. + tk2.MustExec("update pt set col = 'xxx' where id = 2") + ch <- 2 + }() + + // Union scan + tk1.MustExec("insert into pt values (1, 'world')") + tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Sort().Check(testkit.Rows("1 world 1 hello", "2 test ")) + go func() { + // Check the key is locked. + tk3.MustExec("update nt set col = 'yyy' where id = 1") + ch <- 3 + }() + + // Give chance for the goroutines to run first. + time.Sleep(80 * time.Millisecond) + ch <- 1 + tk1.MustExec("rollback") + + checkOrder := func() { + c.Assert(<-ch, Equals, 1) + v1 := <-ch + v2 := <-ch + c.Assert((v1 == 2 && v2 == 3) || (v1 == 3 && v2 == 2), IsTrue) + } + checkOrder() + + // ========================== Another round of test ================== + // normal table left join partition table. + // =================================================================== + resetData() + tk1.MustExec("begin pessimistic") + // No union scan + tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello ")) + + // Union scan + tk1.MustExec("insert into pt values (1, 'world')") + tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello 1 world")) + go func() { + tk2.MustExec("replace into pt values (1, 'aaa')") + ch <- 2 + }() + go func() { + tk3.MustExec("update nt set col = 'bbb' where id = 1") + ch <- 3 + }() + time.Sleep(80 * time.Millisecond) + ch <- 1 + tk1.MustExec("rollback") + checkOrder() +} diff --git a/executor/union_scan.go b/executor/union_scan.go index 6289f23015b61..ecb3927fdd1fc 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -56,6 +56,11 @@ type UnionScanExec struct { // virtualColumnIndex records all the indices of virtual columns and sort them in definition // to make sure we can compute the virtual column in right order. virtualColumnIndex []int + + extraPIDColumn struct { + colIdx offsetOptional + partitionID int64 + } } // Open implements the Executor Open interface. @@ -121,7 +126,7 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error { } // no more data. if row == nil { - return nil + break } mutableRow.SetDatums(row...) @@ -151,6 +156,14 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error { req.AppendRow(mutableRow.ToRow()) } } + + // Fill the extra partition ID column. + // The row may come from the membuffer instead of the transaction snapshot. + // For example: begin; insert into pt values (...); select * from pt for update; + // In that case, the extra partition ID column isn't filled by the TableReader executor, so the UnionScan executor should do it here. + if idx := us.extraPIDColumn.colIdx; idx.valid() { + fillExtraPIDColumn(req, idx.value(), us.extraPIDColumn.partitionID) + } return nil } diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 5e0727458230e..1f6e748f8bc3f 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -18,6 +18,7 @@ import ( "fmt" . "github.com/pingcap/check" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/testkit" ) @@ -403,3 +404,38 @@ func (s *testSuite7) TestForApplyAndUnionScan(c *C) { tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows()) tk.MustExec("rollback") } + +func (s *testSuite7) TestIssue28073(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4") + tk.MustExec("create table t2 like t1") + tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')") + tk.MustExec("insert into t2 select * from t1") + + tk.MustExec("begin") + tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')") + tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check( + testkit.Rows( + "1 flamboyant mcclintock 1 flamboyant mcclintock", + "2 romantic grothendieck ", + )) + tk.MustExec("commit") + + // Check no key is written to table ID 0 + txn, err := s.store.Begin() + c.Assert(err, IsNil) + start := tablecodec.EncodeTablePrefix(0) + end := tablecodec.EncodeTablePrefix(1) + iter, err := txn.Iter(start, end) + c.Assert(err, IsNil) + + exist := false + for iter.Valid() { + c.Assert(iter.Next(), IsNil) + exist = true + break + } + c.Assert(exist, IsFalse) +}