From 045612a40c6708f6a481af18a2cd8dc1e1ba795f Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Thu, 10 Dec 2020 16:06:24 +0800 Subject: [PATCH 1/3] executor: Add projection step when calculate partition table's physical ID --- executor/executor.go | 65 +++++++++++++++++++++++++++++++++++++-- executor/executor_test.go | 12 ++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 706348f80ecb5..365d981079e4e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -886,6 +886,9 @@ type SelectLockExec struct { // tblID2Table is cached to reduce cost. tblID2Table map[int64]table.PartitionedTable + + // ptcolMaps is partitioned table column map to row indexes + ptColMaps map[int64][]int } // Open implements the Executor Open interface. @@ -894,12 +897,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error { return err } + is := domain.GetDomain(e.ctx).InfoSchema() if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) + e.ptColMaps = make(map[int64][]int, len(e.partitionedTable)) for id := range e.tblID2Handle { for _, p := range e.partitionedTable { if id == p.Meta().ID { e.tblID2Table[id] = p + err := e.generatePartitionedTableColumnMap(p, is) + if err != nil { + return err + } } } } @@ -908,6 +917,55 @@ func (e *SelectLockExec) Open(ctx context.Context) error { return nil } +func (e *SelectLockExec) generatePartitionedTableColumnMap(pt table.PartitionedTable, is infoschema.InfoSchema) error { + // Get Table Name and DB name + tblInfo := pt.Meta() + dbInfo, ok := is.SchemaByTable(tblInfo) + if !ok { + return errors.Trace(errors.Errorf("Cannot get schema info for table %s", tblInfo.Name.O)) + } + colNamePrefix := fmt.Sprintf("%s.%s.", dbInfo.Name.L, tblInfo.Name.L) + cols := pt.Cols() + matched := false + ret := make([]int, 0, len(cols)) + for _, colInfo := range cols { + colFullName := colNamePrefix + colInfo.Name.L + matched = false + for i, col := range e.schema.Columns { + if col.OrigName == colFullName { + ret = append(ret, i) + matched = true + break + } + } + if !matched { + return errors.Trace(errors.Errorf("Table %s column %s cannot find data with select result", tblInfo.Name.O, colInfo.Name.L)) + } + } + e.ptColMaps[tblInfo.ID] = ret + return nil +} + +func (e *SelectLockExec) projectRowToPartitionedTable(row chunk.Row, ptID int64) ([]types.Datum, error) { + rowDatums := row.GetDatumRow(e.base().retFieldTypes) + numDatums := len(rowDatums) + if len(e.schema.Columns) != numDatums { + return nil, errors.Trace(errors.Errorf("Columns length not match row fields length")) + } + proj, have := e.ptColMaps[ptID] + if !have { + return nil, errors.Trace(errors.Errorf("Cannot get column maps")) + } + ret := make([]types.Datum, 0, numDatums) + for _, idx := range proj { + if idx >= numDatums { + return nil, errors.Trace(errors.Errorf("Column maps index is overflow!")) + } + ret = append(ret, rowDatums[idx]) + } + return ret, nil +} + // Next implements the Executor Next interface. func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) @@ -926,8 +984,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { for id, cols := range e.tblID2Handle { physicalID := id if pt, ok := e.tblID2Table[id]; ok { - // On a partitioned table, we have to use physical ID to encode the lock key! - p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes)) + ptRowData, err := e.projectRowToPartitionedTable(row, id) + if err != nil { + return err + } + p, err := pt.GetPartitionByRow(e.ctx, ptRowData) if err != nil { return err } diff --git a/executor/executor_test.go b/executor/executor_test.go index 2affecc1728c7..279806e3bb174 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7083,3 +7083,15 @@ func (s *testSuite) Test13004(c *C) { // see https://dev.mysql.com/doc/refman/5.6/en/date-and-time-literals.html, timestamp here actually produces a datetime tk.MustQuery("SELECT TIMESTAMP '9999-01-01 00:00:00'").Check(testkit.Rows("9999-01-01 00:00:00")) } + +func (s *testSuite) Test21509(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t0") + tk.MustExec("create table t0 (c_int int, c_timestamp timestamp, primary key (c_int), key(c_timestamp)) partition by hash (c_int) partitions 4") + tk.MustExec("insert into t0 values (1, '2020-12-05 01:02:03')") + tk.MustExec("begin") + // the select for update should not got error + tk.MustQuery("select * from t0 where c_timestamp in (select c_timestamp from t0 where c_int = 1) for update") + tk.MustExec("commit") +} From ef9f7c9e12a4ff376db97b7319b3566200a636dc Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Thu, 10 Dec 2020 19:02:49 +0800 Subject: [PATCH 2/3] executor: rename function and add more test --- executor/executor.go | 16 ++++++++-------- executor/executor_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 671c0db4a9b0e..0af0b1682f331 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -888,8 +888,8 @@ type SelectLockExec struct { // tblID2Table is cached to reduce cost. tblID2Table map[int64]table.PartitionedTable - // ptcolMaps is partitioned table column map to row indexes - ptColMaps map[int64][]int + // ptCol2RowIndexes is partitioned table column map to row indexes + ptCol2RowIndexes map[int64][]int } // Open implements the Executor Open interface. @@ -901,7 +901,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error { is := domain.GetDomain(e.ctx).InfoSchema() if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 { e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable)) - e.ptColMaps = make(map[int64][]int, len(e.partitionedTable)) + e.ptCol2RowIndexes = make(map[int64][]int, len(e.partitionedTable)) for id := range e.tblID2Handle { for _, p := range e.partitionedTable { if id == p.Meta().ID { @@ -926,7 +926,7 @@ func (e *SelectLockExec) generatePartitionedTableColumnMap(pt table.PartitionedT return errors.Trace(errors.Errorf("Cannot get schema info for table %s", tblInfo.Name.O)) } colNamePrefix := fmt.Sprintf("%s.%s.", dbInfo.Name.L, tblInfo.Name.L) - cols := pt.Cols() + cols := pt.VisibleCols() matched := false ret := make([]int, 0, len(cols)) for _, colInfo := range cols { @@ -943,17 +943,17 @@ func (e *SelectLockExec) generatePartitionedTableColumnMap(pt table.PartitionedT return errors.Trace(errors.Errorf("Table %s column %s cannot find data with select result", tblInfo.Name.O, colInfo.Name.L)) } } - e.ptColMaps[tblInfo.ID] = ret + e.ptCol2RowIndexes[tblInfo.ID] = ret return nil } -func (e *SelectLockExec) projectRowToPartitionedTable(row chunk.Row, ptID int64) ([]types.Datum, error) { +func (e *SelectLockExec) projectRowToPartitionedTableRow(row chunk.Row, ptID int64) ([]types.Datum, error) { rowDatums := row.GetDatumRow(e.base().retFieldTypes) numDatums := len(rowDatums) if len(e.schema.Columns) != numDatums { return nil, errors.Trace(errors.Errorf("Columns length not match row fields length")) } - proj, have := e.ptColMaps[ptID] + proj, have := e.ptCol2RowIndexes[ptID] if !have { return nil, errors.Trace(errors.Errorf("Cannot get column maps")) } @@ -985,7 +985,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { for id, cols := range e.tblID2Handle { physicalID := id if pt, ok := e.tblID2Table[id]; ok { - ptRowData, err := e.projectRowToPartitionedTable(row, id) + ptRowData, err := e.projectRowToPartitionedTableRow(row, id) if err != nil { return err } diff --git a/executor/executor_test.go b/executor/executor_test.go index 454362419a35f..bafa3ac037139 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7136,6 +7136,42 @@ func (s *testSuite) Test21509(c *C) { tk.MustExec("commit") } +func (s *testSuite) Test21618(c *C) { + tk1 := testkit.NewTestKit(c, s.store) + tk2 := testkit.NewTestKit(c, s.store) + // Prepare + tk1.MustExec("use test") + tk2.MustExec("use test") + tk1.MustExec("drop table if exists t") + tk1.MustExec("create table t (c_int int, d_int int, primary key (c_int), key(d_int)) partition by hash (c_int) partitions 4") + tk1.MustExec("insert into t values (1, 2)") + // Transaction 1 execute + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t where d_int in (select d_int from t where c_int = 1) for update") + fc := make(chan int) + go func() { + // Transaction 2 execute + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from t where d_int = 2 for update") + tk2.MustExec("commit") + fc <- 1 + }() + timer := time.NewTimer(1 * time.Second) + select { + case <-fc: + c.Assert(true, IsTrue, Commentf("Should not finish transaction 2")) + case <-timer.C: + } + tk1.MustExec("commit") + + timer = time.NewTimer(1 * time.Second) + select { + case <-fc: + case <-timer.C: + c.Assert(true, IsTrue, Commentf("Transaction 2 should be finished")) + } +} + func (s *testSuite) Test12178(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") From d00b61ba6fddb98a9cf958a607c0fd54378f9bdd Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Tue, 29 Dec 2020 17:47:34 +0800 Subject: [PATCH 3/3] Fix unit test bug --- executor/executor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 1dd7a951322ca..b17cd257a1aa6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7259,7 +7259,7 @@ func (s *testSuite) Test21618(c *C) { timer := time.NewTimer(1 * time.Second) select { case <-fc: - c.Assert(true, IsTrue, Commentf("Should not finish transaction 2")) + c.Assert(false, IsTrue, Commentf("Should not finish transaction 2")) case <-timer.C: } tk1.MustExec("commit") @@ -7268,7 +7268,7 @@ func (s *testSuite) Test21618(c *C) { select { case <-fc: case <-timer.C: - c.Assert(true, IsTrue, Commentf("Transaction 2 should be finished")) + c.Assert(false, IsTrue, Commentf("Transaction 2 should be finished")) } }