Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Fix pessimistic lock doesn't work on the partition table for subquery/joins #21641

Closed
wants to merge 13 commits into from
Closed
65 changes: 63 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ type SelectLockExec struct {

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable

// ptcolMaps is partitioned table column map to row indexes
ptColMaps map[int64][]int
}

// Open implements the Executor Open interface.
Expand All @@ -895,12 +898,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
return err
}

is := domain.GetDomain(e.ctx).InfoSchema()
if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
e.ptColMaps = make(map[int64][]int, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
err := e.generatePartitionedTableColumnMap(p, is)
if err != nil {
return err
}
}
}
}
Expand All @@ -909,6 +918,55 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
return nil
}

func (e *SelectLockExec) generatePartitionedTableColumnMap(pt table.PartitionedTable, is infoschema.InfoSchema) error {
// Get Table Name and DB name
tblInfo := pt.Meta()
dbInfo, ok := is.SchemaByTable(tblInfo)
if !ok {
return errors.Trace(errors.Errorf("Cannot get schema info for table %s", tblInfo.Name.O))
}
colNamePrefix := fmt.Sprintf("%s.%s.", dbInfo.Name.L, tblInfo.Name.L)
cols := pt.Cols()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use VisibleCols instead of Cols? @bb7133

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.

matched := false
ret := make([]int, 0, len(cols))
for _, colInfo := range cols {
colFullName := colNamePrefix + colInfo.Name.L
matched = false
for i, col := range e.schema.Columns {
if col.OrigName == colFullName {
ret = append(ret, i)
matched = true
break
}
}
if !matched {
return errors.Trace(errors.Errorf("Table %s column %s cannot find data with select result", tblInfo.Name.O, colInfo.Name.L))
}
}
e.ptColMaps[tblInfo.ID] = ret
return nil
}

func (e *SelectLockExec) projectRowToPartitionedTable(row chunk.Row, ptID int64) ([]types.Datum, error) {
rowDatums := row.GetDatumRow(e.base().retFieldTypes)
numDatums := len(rowDatums)
if len(e.schema.Columns) != numDatums {
return nil, errors.Trace(errors.Errorf("Columns length not match row fields length"))
}
proj, have := e.ptColMaps[ptID]
if !have {
return nil, errors.Trace(errors.Errorf("Cannot get column maps"))
}
ret := make([]types.Datum, 0, numDatums)
for _, idx := range proj {
if idx >= numDatums {
return nil, errors.Trace(errors.Errorf("Column maps index is overflow!"))
}
ret = append(ret, rowDatums[idx])
}
return ret, nil
}

// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
Expand All @@ -927,8 +985,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
ptRowData, err := e.projectRowToPartitionedTable(row, id)
if err != nil {
return err
}
p, err := pt.GetPartitionByRow(e.ctx, ptRowData)
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7124,6 +7124,18 @@ func (s *testSuite) Test13004(c *C) {
tk.MustQuery("SELECT TIMESTAMP '9999-01-01 00:00:00'").Check(testkit.Rows("9999-01-01 00:00:00"))
}

func (s *testSuite) Test21509(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t0")
tk.MustExec("create table t0 (c_int int, c_timestamp timestamp, primary key (c_int), key(c_timestamp)) partition by hash (c_int) partitions 4")
tk.MustExec("insert into t0 values (1, '2020-12-05 01:02:03')")
tk.MustExec("begin")
// the select for update should not got error
tk.MustQuery("select * from t0 where c_timestamp in (select c_timestamp from t0 where c_int = 1) for update")
tk.MustExec("commit")
}

func (s *testSuite) Test12178(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down