Skip to content

Commit

Permalink
cherry pick pingcap#18818 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
Lingyu Song authored and ti-srebot committed Sep 1, 2020
1 parent 2ca62ad commit ba443b3
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 6 deletions.
23 changes: 23 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,36 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
<<<<<<< HEAD
=======
x.Handle = kv.IntHandle(iv)
if x.PartitionInfo != nil {
if x.TblInfo.Partition.Type != model.PartitionTypeHash {
return errors.New("range partition table can not use plan cache")
}
num := x.TblInfo.Partition.Num
pos := math.Abs(iv) % int64(num)
x.PartitionInfo = &x.TblInfo.Partition.Definitions[pos]
}
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
return nil
}
for i, param := range x.IndexValueParams {
if param != nil {
x.IndexValues[i] = param.Datum
}
}
<<<<<<< HEAD
=======
if x.PartitionInfo != nil {
if x.TblInfo.Partition.Type != model.PartitionTypeHash {
return errors.New("range partition table can not use plan cache")
}
val := x.IndexValues[x.partitionColumnPos].GetInt64()
partitionID := val % int64(x.TblInfo.Partition.Num)
x.PartitionInfo = &x.TblInfo.Partition.Definitions[partitionID]
}
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
return nil
case PhysicalPlan:
for _, child := range x.Children() {
Expand Down
219 changes: 214 additions & 5 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -61,9 +63,17 @@ func (p *partition) GetPhysicalID() int64 {
// partitionedTable implements the table.PartitionedTable interface.
// partitionedTable is a table, it contains many Partitions.
type partitionedTable struct {
<<<<<<< HEAD
Table
partitionExpr *PartitionExpr
partitions map[int64]*partition
=======
TableCommon
partitionExpr *PartitionExpr
partitions map[int64]*partition
evalBufferTypes []*types.FieldType
evalBufferPool sync.Pool
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
}

func newPartitionedTable(tbl *Table, tblInfo *model.TableInfo) (table.Table, error) {
Expand All @@ -73,8 +83,18 @@ func newPartitionedTable(tbl *Table, tblInfo *model.TableInfo) (table.Table, err
return nil, errors.Trace(err)
}
ret.partitionExpr = partitionExpr
<<<<<<< HEAD

if err := initTableIndices(&ret.tableCommon); err != nil {
=======
initEvalBufferType(ret)
ret.evalBufferPool = sync.Pool{
New: func() interface{} {
return initEvalBuffer(ret)
},
}
if err := initTableIndices(&ret.TableCommon); err != nil {
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
return nil, errors.Trace(err)
}
partitions := make(map[int64]*partition)
Expand Down Expand Up @@ -127,6 +147,60 @@ type PartitionExpr struct {
UpperBounds []expression.Expression
// Expr is the hash partition expression.
Expr expression.Expression
<<<<<<< HEAD
=======
// Used in the range pruning process.
*ForRangePruning
// Used in the range column pruning process.
*ForRangeColumnsPruning
}

func initEvalBufferType(t *partitionedTable) {
hasExtraHandle := false
numCols := len(t.Cols())
if !t.Meta().PKIsHandle {
hasExtraHandle = true
numCols++
}
t.evalBufferTypes = make([]*types.FieldType, numCols)
for i, col := range t.Cols() {
t.evalBufferTypes[i] = &col.FieldType
}

if hasExtraHandle {
t.evalBufferTypes[len(t.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong)
}
}

func initEvalBuffer(t *partitionedTable) *chunk.MutRow {
evalBuffer := chunk.MutRowFromTypes(t.evalBufferTypes)
return &evalBuffer
}

// ForRangeColumnsPruning is used for range partition pruning.
type ForRangeColumnsPruning struct {
LessThan []expression.Expression
MaxValue bool
}

func dataForRangeColumnsPruning(ctx sessionctx.Context, pi *model.PartitionInfo, schema *expression.Schema, names []*types.FieldName, p *parser.Parser) (*ForRangeColumnsPruning, error) {
var res ForRangeColumnsPruning
res.LessThan = make([]expression.Expression, len(pi.Definitions))
for i := 0; i < len(pi.Definitions); i++ {
if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") {
// Use a bool flag instead of math.MaxInt64 to avoid the corner cases.
res.MaxValue = true
} else {
tmp, err := parseSimpleExprWithNames(p, ctx, pi.Definitions[i].LessThan[0], schema, names)
if err != nil {
return nil, err
}
res.LessThan[i] = tmp
}
}
return &res, nil
}
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)

// The new range partition pruning
*ForRangePruning
Expand Down Expand Up @@ -188,6 +262,7 @@ func generatePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
partitionPruneExprs := make([]expression.Expression, 0, len(pi.Definitions))
locateExprs := make([]expression.Expression, 0, len(pi.Definitions))
var buf bytes.Buffer
p := parser.New()
schema := expression.NewSchema(columns...)
partStr := rangePartitionString(pi)
for i := 0; i < len(pi.Definitions); i++ {
Expand All @@ -207,6 +282,7 @@ func generatePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
}
locateExprs = append(locateExprs, exprs[0])

<<<<<<< HEAD
if i > 0 {
fmt.Fprintf(&buf, " and ((%s) >= (%s))", partStr, pi.Definitions[i-1].LessThan[0])
} else {
Expand All @@ -225,11 +301,21 @@ func generatePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
}

exprs, err = expression.ParseSimpleExprsWithSchema(ctx, buf.String(), schema)
=======
switch len(pi.Columns) {
case 0:
exprs, err := parseSimpleExprWithNames(p, ctx, pi.Expr, schema, names)
if err != nil {
return nil, err
}
tmp, err := dataForRangePruning(ctx, pi)
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
if err != nil {
// If it got an error here, ddl may hang forever, so this error log is important.
logutil.Logger(context.Background()).Error("wrong table partition expression", zap.String("expression", buf.String()), zap.Error(err))
return nil, errors.Trace(err)
}
<<<<<<< HEAD
// Get a hash code in advance to prevent data race afterwards.
exprs[0].HashCode(ctx.GetSessionVars().StmtCtx)
partitionPruneExprs = append(partitionPruneExprs, exprs[0])
Expand All @@ -240,6 +326,12 @@ func generatePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
if len(pi.Columns) == 0 {
rangePruning = &ForRangePruning{}
err := makeLessThanData(pi, rangePruning)
=======
ret.Expr = exprs
ret.ForRangePruning = tmp
case 1:
tmp, err := dataForRangeColumnsPruning(ctx, pi, schema, names, p)
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -313,7 +405,11 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.Par
var idx int
switch t.meta.Partition.Type {
case model.PartitionTypeRange:
idx, err = t.locateRangePartition(ctx, pi, r)
if len(pi.Columns) == 0 {
idx, err = t.locateRangePartition(ctx, pi, r)
} else {
idx, err = t.locateRangeColumnPartition(ctx, pi, r)
}
case model.PartitionTypeHash:
idx, err = t.locateHashPartition(ctx, pi, r)
}
Expand All @@ -323,13 +419,15 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.Par
return pi.Definitions[idx].ID, nil
}

func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) {
func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) {
var err error
var isNull bool
partitionExprs := t.partitionExpr.UpperBounds
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow)
defer t.evalBufferPool.Put(evalBuffer)
idx := sort.Search(len(partitionExprs), func(i int) bool {
var ret int64
ret, isNull, err = partitionExprs[i].EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow())
evalBuffer.SetDatums(r...)
ret, isNull, err := partitionExprs[i].EvalInt(ctx, evalBuffer.ToRow())
if err != nil {
return true // Break the search.
}
Expand Down Expand Up @@ -366,9 +464,74 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode
return idx, nil
}

func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) {
var (
ret int64
val int64
isNull bool
err error
)
if col, ok := t.partitionExpr.Expr.(*expression.Column); ok {
if r[col.Index].IsNull() {
isNull = true
}
ret = r[col.Index].GetInt64()
} else {
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow)
defer t.evalBufferPool.Put(evalBuffer)
evalBuffer.SetDatums(r...)
val, isNull, err = t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow())
if err != nil {
return 0, err
}
ret = val
}
unsigned := mysql.HasUnsignedFlag(t.partitionExpr.Expr.GetType().Flag)
ranges := t.partitionExpr.ForRangePruning
length := len(ranges.LessThan)
pos := sort.Search(length, func(i int) bool {
if isNull {
return true
}
return ranges.compare(i, ret, unsigned) > 0
})
if isNull {
pos = 0
}
if pos < 0 || pos >= length {
// The data does not belong to any of the partition returns `table has no partition for value %s`.
var valueMsg string
if pi.Expr != "" {
e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta)
if err == nil {
val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow())
if err == nil {
valueMsg = fmt.Sprintf("%d", val)
}
}
} else {
// When the table is partitioned by range columns.
valueMsg = "from column_list"
}
return 0, table.ErrNoPartitionForGivenValue.GenWithStackByArgs(valueMsg)
}
return pos, nil
}

// TODO: supports linear hashing
func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) {
ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow())
if col, ok := t.partitionExpr.Expr.(*expression.Column); ok {
ret := r[col.Index].GetInt64()
ret = ret % int64(t.meta.Partition.Num)
if ret < 0 {
ret = -ret
}
return int(ret), nil
}
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow)
defer t.evalBufferPool.Put(evalBuffer)
evalBuffer.SetDatums(r...)
ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -475,3 +638,49 @@ func FindPartitionByName(meta *model.TableInfo, parName string) (int64, error) {
}
return -1, errors.Trace(table.ErrUnknownPartition.GenWithStackByArgs(parName, meta.Name.O))
}
<<<<<<< HEAD
=======

func parseExpr(p *parser.Parser, exprStr string) (ast.ExprNode, error) {
exprStr = "select " + exprStr
stmts, _, err := p.Parse(exprStr, "", "")
if err != nil {
return nil, util.SyntaxWarn(err)
}
fields := stmts[0].(*ast.SelectStmt).Fields.Fields
return fields[0].Expr, nil
}

func rewritePartitionExpr(ctx sessionctx.Context, field ast.ExprNode, schema *expression.Schema, names types.NameSlice) (expression.Expression, error) {
expr, err := expression.RewriteSimpleExprWithNames(ctx, field, schema, names)
return expr, err
}

func compareUnsigned(v1, v2 int64) int {
switch {
case uint64(v1) > uint64(v2):
return 1
case uint64(v1) == uint64(v2):
return 0
}
return -1
}

func (lt *ForRangePruning) compare(ith int, v int64, unsigned bool) int {
if ith == len(lt.LessThan)-1 {
if lt.MaxValue {
return 1
}
}
if unsigned {
return compareUnsigned(lt.LessThan[ith], v)
}
switch {
case lt.LessThan[ith] > v:
return 1
case lt.LessThan[ith] == v:
return 0
}
return -1
}
>>>>>>> 349adf8... table: use evalBuffer to improve performance of locatePartition (#18818)
Loading

0 comments on commit ba443b3

Please sign in to comment.