Skip to content

Commit

Permalink
planner,executor,parser: support the prepared plan cache for insert/d…
Browse files Browse the repository at this point in the history
…elete/update statements (#8107)
  • Loading branch information
dbjoa authored and ngaut committed Nov 5, 2018
1 parent 6efbd17 commit 34c3d4c
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 40 deletions.
178 changes: 178 additions & 0 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/testkit"
dto "github.com/prometheus/client_model/go"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -389,3 +391,179 @@ func (s *testSuite) TestPreparedIssue7579(c *C) {
r.Check(nil)
}
}

func (s *testSuite) TestPreparedInsert(c *C) {
orgEnable := plannercore.PreparedPlanCacheEnabled()
orgCapacity := plannercore.PreparedPlanCacheCapacity
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
plannercore.PreparedPlanCacheCapacity = orgCapacity
}()
metrics.PlanCacheCounter.Reset()
counter := metrics.PlanCacheCounter.WithLabelValues("prepare")
pb := &dto.Metric{}
flags := []bool{false, true}
for _, flag := range flags {
plannercore.SetPreparedPlanCache(flag)
plannercore.PreparedPlanCacheCapacity = 100
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists prepare_test")
tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)")
tk.MustExec(`prepare stmt_insert from 'insert into prepare_test values (?, ?)'`)
tk.MustExec(`set @a=1,@b=1; execute stmt_insert using @a, @b;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(0))
}
tk.MustExec(`set @a=2,@b=2; execute stmt_insert using @a, @b;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(1))
}
tk.MustExec(`set @a=3,@b=3; execute stmt_insert using @a, @b;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(2))
}

result := tk.MustQuery("select id, c1 from prepare_test where id = ?", 1)
result.Check(testkit.Rows("1 1"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 2)
result.Check(testkit.Rows("2 2"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 3)
result.Check(testkit.Rows("3 3"))

tk.MustExec(`prepare stmt_insert_select from 'insert into prepare_test (id, c1) select id + 100, c1 + 100 from prepare_test where id = ?'`)
tk.MustExec(`set @a=1; execute stmt_insert_select using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(2))
}
tk.MustExec(`set @a=2; execute stmt_insert_select using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(3))
}
tk.MustExec(`set @a=3; execute stmt_insert_select using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(4))
}

result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 101)
result.Check(testkit.Rows("101 101"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 102)
result.Check(testkit.Rows("102 102"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 103)
result.Check(testkit.Rows("103 103"))
}
}

func (s *testSuite) TestPreparedUpdate(c *C) {
orgEnable := plannercore.PreparedPlanCacheEnabled()
orgCapacity := plannercore.PreparedPlanCacheCapacity
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
plannercore.PreparedPlanCacheCapacity = orgCapacity
}()
metrics.PlanCacheCounter.Reset()
counter := metrics.PlanCacheCounter.WithLabelValues("prepare")
pb := &dto.Metric{}
flags := []bool{false, true}
for _, flag := range flags {
plannercore.SetPreparedPlanCache(flag)
plannercore.PreparedPlanCacheCapacity = 100
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists prepare_test")
tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)")
tk.MustExec(`insert into prepare_test values (1, 1)`)
tk.MustExec(`insert into prepare_test values (2, 2)`)
tk.MustExec(`insert into prepare_test values (3, 3)`)

tk.MustExec(`prepare stmt_update from 'update prepare_test set c1 = c1 + ? where id = ?'`)
tk.MustExec(`set @a=1,@b=100; execute stmt_update using @b,@a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(0))
}
tk.MustExec(`set @a=2,@b=200; execute stmt_update using @b,@a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(1))
}
tk.MustExec(`set @a=3,@b=300; execute stmt_update using @b,@a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(2))
}

result := tk.MustQuery("select id, c1 from prepare_test where id = ?", 1)
result.Check(testkit.Rows("1 101"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 2)
result.Check(testkit.Rows("2 202"))
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 3)
result.Check(testkit.Rows("3 303"))
}
}

func (s *testSuite) TestPreparedDelete(c *C) {
orgEnable := plannercore.PreparedPlanCacheEnabled()
orgCapacity := plannercore.PreparedPlanCacheCapacity
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
plannercore.PreparedPlanCacheCapacity = orgCapacity
}()
metrics.PlanCacheCounter.Reset()
counter := metrics.PlanCacheCounter.WithLabelValues("prepare")
pb := &dto.Metric{}
flags := []bool{false, true}
for _, flag := range flags {
plannercore.SetPreparedPlanCache(flag)
plannercore.PreparedPlanCacheCapacity = 100
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists prepare_test")
tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)")
tk.MustExec(`insert into prepare_test values (1, 1)`)
tk.MustExec(`insert into prepare_test values (2, 2)`)
tk.MustExec(`insert into prepare_test values (3, 3)`)

tk.MustExec(`prepare stmt_delete from 'delete from prepare_test where id = ?'`)
tk.MustExec(`set @a=1; execute stmt_delete using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(0))
}
tk.MustExec(`set @a=2; execute stmt_delete using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(1))
}
tk.MustExec(`set @a=3; execute stmt_delete using @a;`)
if flag {
counter.Write(pb)
hit := pb.GetCounter().GetValue()
c.Check(hit, Equals, float64(2))
}

result := tk.MustQuery("select id, c1 from prepare_test where id = ?", 1)
result.Check(nil)
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 2)
result.Check(nil)
result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 3)
result.Check(nil)
}
}
2 changes: 1 addition & 1 deletion expression/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *Constant) Eval(_ chunk.Row) (types.Datum, error) {
}
val, err := dt.ConvertTo(sf.GetCtx().GetSessionVars().StmtCtx, retType)
if err != nil {
return c.Value, err
return dt, err
}
c.Value.SetValue(val.GetValue())
}
Expand Down
12 changes: 9 additions & 3 deletions expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ func (sr *simpleRewriter) Leave(originInNode ast.Node) (retNode ast.Node, ok boo
sr.inToExpression(len(v.List), v.Not, &v.Type)
}
case *driver.ParamMarkerExpr:
tp := types.NewFieldType(mysql.TypeUnspecified)
types.DefaultParamTypeForValue(v.GetValue(), tp)
value := &Constant{Value: v.ValueExpr.Datum, RetType: tp}
var value Expression
value, sr.err = GetParamExpression(sr.ctx, v, sr.useCache())
if sr.err != nil {
return retNode, false
}
sr.push(value)
case *ast.RowExpr:
sr.rowToScalarFunc(v)
Expand All @@ -168,6 +170,10 @@ func (sr *simpleRewriter) Leave(originInNode ast.Node) (retNode ast.Node, ok boo
return originInNode, true
}

func (sr *simpleRewriter) useCache() bool {
return sr.ctx.GetSessionVars().StmtCtx.UseCache
}

func (sr *simpleRewriter) binaryOpToExpression(v *ast.BinaryOperationExpr) {
right := sr.pop()
left := sr.pop()
Expand Down
23 changes: 23 additions & 0 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
)
Expand Down Expand Up @@ -503,3 +504,25 @@ func ColumnSliceIsIntersect(s1, s2 []*Column) bool {
}
return false
}

// DatumToConstant generates a Constant expression from a Datum.
func DatumToConstant(d types.Datum, tp byte) *Constant {
return &Constant{Value: d, RetType: types.NewFieldType(tp)}
}

// GetParamExpression generate a getparam function expression.
func GetParamExpression(ctx sessionctx.Context, v *driver.ParamMarkerExpr, useCache bool) (Expression, error) {
tp := types.NewFieldType(mysql.TypeUnspecified)
types.DefaultParamTypeForValue(v.GetValue(), tp)
value := &Constant{Value: v.Datum, RetType: tp}
if useCache {
f, err := NewFunctionBase(ctx, ast.GetParam, &v.Type,
DatumToConstant(types.NewIntDatum(int64(v.Order)), mysql.TypeLonglong))
if err != nil {
return nil, errors.Trace(err)
}
f.GetType().Tp = v.Type.Tp
value.DeferredExpr = f
}
return value, nil
}
6 changes: 5 additions & 1 deletion planner/core/cacheable_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (

// Cacheable checks whether the input ast is cacheable.
func Cacheable(node ast.Node) bool {
if _, isSelect := node.(*ast.SelectStmt); !isSelect {
_, isSelect := node.(*ast.SelectStmt)
_, isUpdate := node.(*ast.UpdateStmt)
_, isInsert := node.(*ast.InsertStmt)
_, isDelete := node.(*ast.DeleteStmt)
if !(isSelect || isUpdate || isInsert || isDelete) {
return false
}
checker := cacheableChecker{
Expand Down
Loading

0 comments on commit 34c3d4c

Please sign in to comment.