Skip to content

Commit

Permalink
cherry pick pingcap#21148 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
tiancaiamao authored and ti-srebot committed Jun 16, 2021
1 parent 57fa70a commit 2d1a805
Show file tree
Hide file tree
Showing 13 changed files with 3,639 additions and 28 deletions.
137 changes: 137 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -2342,6 +2352,9 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand All @@ -2368,6 +2381,33 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

<<<<<<< HEAD
=======
func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
return newOffset(idx)
}
}
return 0
}

func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor {
startTs, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}
gather := &MPPGather{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
}
return gather
}

>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *TableReaderExecutor {
Expand All @@ -2381,6 +2421,42 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
<<<<<<< HEAD
=======

if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := ts.IsPartition(); ok {
return ret
}

pi := ts.Table.GetPartitionInfo()
if pi == nil {
return ret
}

tmp, _ := b.is.TableByID(ts.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
if err != nil {
b.err = err
return nil
}
if v.StoreType == kv.TiFlash {
sctx.IsTiFlash.Store(true)
}

if len(partitions) == 0 {
return &TableDualExec{baseExecutor: *ret.base()}
}
ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{
sctx: b.ctx,
partitions: partitions,
}

>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
return ret
}

Expand Down Expand Up @@ -2449,6 +2525,35 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
<<<<<<< HEAD
=======

if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := is.IsPartition(); ok {
return ret
}

pi := is.Table.GetPartitionInfo()
if pi == nil {
return ret
}

if is.Index.Global {
return ret
}

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
if err != nil {
b.err = err
return nil
}
ret.partitions = partitions
>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
return ret
}

Expand Down Expand Up @@ -2516,6 +2621,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down Expand Up @@ -2552,6 +2660,35 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
<<<<<<< HEAD
=======

if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}

if pi := is.Table.GetPartitionInfo(); pi == nil {
return ret
}

if is.Index.Global {
return ret
}
if ok, _ := is.IsPartition(); ok {
// Already pruned when translated to logical union.
return ret
}

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(b.ctx, tbl, v.PartitionInfo.PruningConds, v.PartitionInfo.PartitionNames, v.PartitionInfo.Columns, v.PartitionInfo.ColumnNames)
if err != nil {
b.err = err
return nil
}
ret.partitionTableMode = true
ret.prunedPartitions = partitions
>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
return ret
}

Expand Down
16 changes: 16 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ type IndexLookUpExecutor struct {
PushedLimit *plannercore.PushedDownLimit

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional
}

type checkIndexValue struct {
Expand Down Expand Up @@ -533,6 +536,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReaderExec := &TableReaderExecutor{
<<<<<<< HEAD
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: e.table,
dagPB: e.tableRequest,
Expand All @@ -542,6 +546,18 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
=======
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
Expand Down
44 changes: 21 additions & 23 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,32 +866,31 @@ type SelectLockExec struct {
Lock ast.SelectLockType
keys []kv.Key

<<<<<<< HEAD
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
inited bool
=======
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable

// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
>>>>>>> 0490590b0... planner,executor: fix 'select ...(join on partition table) for update' panic (#21148)
}

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
return e.baseExecutor.Open(ctx)
}

// Next implements the Executor Next interface.
Expand All @@ -909,15 +908,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
offset := e.tblID2PIDColumnIndex[id]
physicalID = row.GetInt64(offset)
}

for _, col := range cols {
Expand Down
Loading

0 comments on commit 2d1a805

Please sign in to comment.