Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fill extra partition ID column in UnionScan executor #28666

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = x.virtualColumnIndex
us.extraPIDColumn.colIdx = x.extraPIDColumnIndex
us.extraPIDColumn.partitionID = getPhysicalTableID(x.table)
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand Down
9 changes: 9 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,15 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
// The partition ID is returned as an extra column from the table reader.
if offset, ok := e.tblID2PIDColumnIndex[id]; ok {
physicalID = row.GetInt64(offset)

if physicalID == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be some unexpected errors if the physicalID is zero and this condition is a bit confusing. Do we have some other ways do check the left join result situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree it's confusing here. physicalID == 0 may be casue by left join, or maybe it's caused by bugs.

Distinguish those cases is unrealistic, because left join have several implementations: hash join / merge join / nest loop join / index join etc... and left join is one of the case we found (that will generate empty or null row), there might be other cases that fill empty row ... It's hard to find out all.

So ... let's look a step back.
In the past, we have bug for lock on partition ... (that's bad)
Then, we fix it ... #14921
Then, we find more bug (that's bad)
Then, we try to fix it ... #21148
And the the solution caused more serious problems and introduced more critical bugs ... (wow! worse)

After change, we come back from worse to bad, that's a big progress!
I mean, we fix some problems and make the solution (at least) not bad than before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qw4990
Do you have any ideas about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing I'm worried about is though the former bugs will make the query panic it will not have future impact on the data in storage. If we could not verify which is expected in some write statement, there could be some wrong data writting into storage, just like the issue listed above an invalid key is locked and the lock record is persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes... I have the same worries that bugs break the data.
If there are some better ways to fix this problem, I'd like to choose that solution. But I can't come up with better ideas.
So we have to fix the current problem and add tests to cover more scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite necessary to add more tests by now seems there could be more unknown issues. BTW do we have bandwidth for the coverage enhancement or our QA team?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we let all OuterJoins set this column to a specified value (e.g. -1) explicitly when mismatching?
Then we can define pid=0 as the uninitialized state and we know it must be caused by some bug;
And then we can return an error like pid is not uninitialized in this case.
We don't have to find all OuterJoins at once; We can find them by our best effort this time, and then just wait for the uninitialized error and fix them.

// Left join will fill null for the rows that don't match.
// In that case, physicalID is 0, the partition doesn't need to be locked.
// For example:
// select * from t1 left join t2 on t1.col = t2.col for update
// t2 is a empty partition table, t1 is not empty.
continue
}
}
}

Expand Down
80 changes: 80 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3089,3 +3089,83 @@ func (s *partitionTableSuite) TestIssue26251(c *C) {
c.Fail()
}
}

func (s *partitionTableSuite) TestLeftJoinForUpdate(c *C) {
tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")
tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(c, s.store)
tk3.MustExec("use test")

tk1.MustExec("drop table if exists nt, pt")
tk1.MustExec("create table nt (id int, col varchar(32), primary key (id))")
tk1.MustExec("create table pt (id int, col varchar(32), primary key (id)) partition by hash(id) partitions 4")

resetData := func() {
tk1.MustExec("truncate table nt")
tk1.MustExec("truncate table pt")
tk1.MustExec("insert into nt values (1, 'hello')")
tk1.MustExec("insert into pt values (2, 'test')")
}

// ========================== First round of test ==================
// partition table left join normal table.
// =================================================================
resetData()
ch := make(chan int, 10)
tk1.MustExec("begin pessimistic")
// No union scan
tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Check(testkit.Rows("2 test <nil> <nil>"))
go func() {
// Check the key is locked.
tk2.MustExec("update pt set col = 'xxx' where id = 2")
ch <- 2
}()

// Union scan
tk1.MustExec("insert into pt values (1, 'world')")
tk1.MustQuery("select * from pt left join nt on pt.id = nt.id for update").Sort().Check(testkit.Rows("1 world 1 hello", "2 test <nil> <nil>"))
go func() {
// Check the key is locked.
tk3.MustExec("update nt set col = 'yyy' where id = 1")
ch <- 3
}()

// Give chance for the goroutines to run first.
time.Sleep(80 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be unstable in the CI environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be stable.
The test wants to check 2, 3 is blocked by 1 ...
Here we give chance for 2 and 3 to run first, let 1 sleep for a while
Its purpose is to verify 2 and 3 is blocked and can't run, then we check the final order is
1 2 3 or 1 3 2 and we achieve the test goal: 2 3 is blocked by 1.

2, 3 blocked by 1 means the partition pessimistic lock works as expected, the partition key is constructed correctly.

ch <- 1
tk1.MustExec("rollback")

checkOrder := func() {
c.Assert(<-ch, Equals, 1)
v1 := <-ch
v2 := <-ch
c.Assert((v1 == 2 && v2 == 3) || (v1 == 3 && v2 == 2), IsTrue)
}
checkOrder()

// ========================== Another round of test ==================
// normal table left join partition table.
// ===================================================================
resetData()
tk1.MustExec("begin pessimistic")
// No union scan
tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello <nil> <nil>"))

// Union scan
tk1.MustExec("insert into pt values (1, 'world')")
tk1.MustQuery("select * from nt left join pt on pt.id = nt.id for update").Check(testkit.Rows("1 hello 1 world"))
go func() {
tk2.MustExec("replace into pt values (1, 'aaa')")
ch <- 2
}()
go func() {
tk3.MustExec("update nt set col = 'bbb' where id = 1")
ch <- 3
}()
time.Sleep(80 * time.Millisecond)
ch <- 1
tk1.MustExec("rollback")
checkOrder()
}
15 changes: 14 additions & 1 deletion executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type UnionScanExec struct {
// virtualColumnIndex records all the indices of virtual columns and sort them in definition
// to make sure we can compute the virtual column in right order.
virtualColumnIndex []int

extraPIDColumn struct {
colIdx offsetOptional
partitionID int64
}
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -121,7 +126,7 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
// no more data.
if row == nil {
return nil
break
}
mutableRow.SetDatums(row...)

Expand Down Expand Up @@ -151,6 +156,14 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.AppendRow(mutableRow.ToRow())
}
}

// Fill the extra partition ID column.
// The row may come from the membuffer instead of the transaction snapshot.
// For example: begin; insert into pt values (...); select * from pt for update;
// In that case, the extra partition ID column isn't filled by the TableReader executor, so the UnionScan executor should do it here.
if idx := us.extraPIDColumn.colIdx; idx.valid() {
fillExtraPIDColumn(req, idx.value(), us.extraPIDColumn.partitionID)
}
return nil
}

Expand Down
36 changes: 36 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -403,3 +404,38 @@ func (s *testSuite7) TestForApplyAndUnionScan(c *C) {
tk.MustQuery("select c_int, c_str from t where (select count(*) from t1 where t1.c_int in (t.c_int, t.c_int + 2, t.c_int + 10)) > 2").Check(testkit.Rows())
tk.MustExec("rollback")
}

func (s *testSuite7) TestIssue28073(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int, c_str) , key(c_int)) partition by hash (c_int) partitions 4")
tk.MustExec("create table t2 like t1")
tk.MustExec("insert into t1 values (1, 'flamboyant mcclintock')")
tk.MustExec("insert into t2 select * from t1")

tk.MustExec("begin")
tk.MustExec("insert into t2 (c_int, c_str) values (2, 'romantic grothendieck')")
tk.MustQuery("select * from t2 left join t1 on t1.c_int = t2.c_int for update").Sort().Check(
testkit.Rows(
"1 flamboyant mcclintock 1 flamboyant mcclintock",
"2 romantic grothendieck <nil> <nil>",
))
tk.MustExec("commit")

// Check no key is written to table ID 0
txn, err := s.store.Begin()
c.Assert(err, IsNil)
start := tablecodec.EncodeTablePrefix(0)
end := tablecodec.EncodeTablePrefix(1)
iter, err := txn.Iter(start, end)
c.Assert(err, IsNil)

exist := false
for iter.Valid() {
c.Assert(iter.Next(), IsNil)
exist = true
break
}
c.Assert(exist, IsFalse)
}