Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: prepared plan cache support cached plan with placeholder in limit clause (#40196) #41644

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,14 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node) (uVal uint64, isNull bo
if !v.InExecute {
return 0, false, true
}
<<<<<<< HEAD
=======
if mustInt64orUint64 {
if expected, _ := CheckParamTypeInt64orUint64(v); !expected {
return 0, false, false
}
}
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
param, err := expression.ParamMarkerExpression(ctx, v, false)
if err != nil {
return 0, false, false
Expand Down Expand Up @@ -2022,6 +2030,24 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node) (uVal uint64, isNull bo
return 0, false, false
}

<<<<<<< HEAD
=======
// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now
// eg: set @a = 1;
func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) {
val := param.GetValue()
switch v := val.(type) {
case int64:
if v >= 0 {
return true, uint64(v)
}
case uint64:
return true, v
}
return false, 0
}

>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64,
offset uint64, err error) {
var isExpectedType bool
Expand Down
34 changes: 32 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,27 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
return plan, names, err
}
}

limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx)
if paramErr != nil {
return nil, nil, paramErr
}
if stmtCtx.UseCache { // for non-point plans
<<<<<<< HEAD
if plan, names, ok, err := getGeneralPlan(sctx, isGeneralPlanCache, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
=======
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt,
paramTypes, limitCountAndOffset); err != nil || ok {
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
return plan, names, err
}
}

<<<<<<< HEAD
return generateNewPlan(ctx, sctx, isGeneralPlanCache, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL)
=======
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL, limitCountAndOffset)
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
}

// parseParamTypes get parameters' types in PREPARE statement
Expand Down Expand Up @@ -212,13 +224,22 @@ func getPointQueryPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmtC
return plan, names, true, nil
}

<<<<<<< HEAD
func getGeneralPlan(sctx sessionctx.Context, isGeneralPlanCache bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
=======
func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan,
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

<<<<<<< HEAD
candidate, exist := sctx.GetPlanCache(isGeneralPlanCache).Get(cacheKey, paramTypes)
=======
candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams)
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
if !exist {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -256,9 +277,14 @@ func getGeneralPlan(sctx sessionctx.Context, isGeneralPlanCache bool, cacheKey k

// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
<<<<<<< HEAD
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isGeneralPlanCache bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramTypes []*types.FieldType,
bindSQL string) (Plan, []*types.FieldName, error) {
=======
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int,
paramTypes []*types.FieldType, bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) {
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand Down Expand Up @@ -289,11 +315,15 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isGeneralPlan
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
<<<<<<< HEAD
sctx.GetPlanCache(isGeneralPlanCache).Put(cacheKey, cached, paramTypes)
=======
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams)
>>>>>>> 17df596863 (planner: prepared plan cache support cached plan with placeholder in limit clause (#40196))
}
sessVars.FoundInPlanCache = false
return p, names, err
Expand Down
36 changes: 28 additions & 8 deletions planner/core/plan_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type LRUPlanCache struct {
lock sync.Mutex

// pickFromBucket get one element from bucket. The LRUPlanCache can not work if it is nil
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool)
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool)
// onEvict will be called if any eviction happened, only for test use now
onEvict func(kvcache.Key, kvcache.Value)

Expand All @@ -68,7 +68,7 @@ type LRUPlanCache struct {
// NewLRUPlanCache creates a PCLRUCache object, whose capacity is "capacity".
// NOTE: "capacity" should be a positive value.
func NewLRUPlanCache(capacity uint, guard float64, quota uint64,
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
if capacity < 1 {
capacity = 100
logutil.BgLogger().Info("capacity of LRU cache is less than 1, will use default value(100) init cache")
Expand All @@ -94,13 +94,13 @@ func strHashKey(key kvcache.Key, deepCopy bool) string {
}

// Get tries to find the corresponding value according to the given key.
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) {
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) {
l.lock.Lock()
defer l.lock.Unlock()

bucket, bucketExist := l.buckets[strHashKey(key, false)]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.lruList.MoveToFront(element)
return element.Value.(*planCacheEntry).PlanValue, true
}
Expand All @@ -109,14 +109,14 @@ func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (valu
}

// Put puts the (key, value) pair into the LRU Cache.
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) {
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) {
l.lock.Lock()
defer l.lock.Unlock()

hash := strHashKey(key, true)
bucket, bucketExist := l.buckets[hash]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.updateInstanceMetric(&planCacheEntry{PlanKey: key, PlanValue: value}, element.Value.(*planCacheEntry))
element.Value.(*planCacheEntry).PlanValue = value
l.lruList.MoveToFront(element)
Expand Down Expand Up @@ -252,16 +252,36 @@ func (l *LRUPlanCache) memoryControl() {
}

// PickPlanFromBucket pick one plan from bucket
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType) (*list.Element, bool) {
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType, limitParams []uint64) (*list.Element, bool) {
for k := range bucket {
plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue)
if plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) {
ok1 := plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes)
if !ok1 {
continue
}
ok2 := checkUint64SliceIfEqual(plan.limitOffsetAndCount, limitParams)
if ok2 {
return k, true
}
}
return nil, false
}

func checkUint64SliceIfEqual(a, b []uint64) bool {
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// updateInstanceMetric update the memory usage and plan num for show in grafana
func (l *LRUPlanCache) updateInstanceMetric(in, out *planCacheEntry) {
updateInstancePlanNum(in, out)
Expand Down
51 changes: 32 additions & 19 deletions planner/core/plan_cache_lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ func TestLRUPCPut(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}

// one key corresponding to multi values
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestLRUPCPut(t *testing.T) {

bucket, exist := lru.buckets[string(hack.String(keys[i].Hash()))]
require.True(t, exist)
element, exist := lru.pickFromBucket(bucket, pTypes[i])
element, exist := lru.pickFromBucket(bucket, pTypes[i], limitParams[i])
require.NotNil(t, element)
require.True(t, exist)
require.Equal(t, root, element)
Expand Down Expand Up @@ -131,22 +135,25 @@ func TestLRUPCGet(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}
// 5 bucket
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i%4), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i], limitOffsetAndCount: limitParams[i]}
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}

// test for non-existent elements
for i := 0; i < 2; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.False(t, exists)
require.Nil(t, value)
}

for i := 2; i < 5; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.True(t, exists)
require.NotNil(t, value)
require.Equal(t, vals[i], value)
Expand Down Expand Up @@ -175,23 +182,29 @@ func TestLRUPCDelete(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeEnum)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeDate)},
}
limitParams := [][]uint64{
{1}, {2}, {3},
}
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.Delete(keys[1])
value, exists := lru.Get(keys[1], pTypes[1])
value, exists := lru.Get(keys[1], pTypes[1], limitParams[1])
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 2, int(lru.size))

_, exists = lru.Get(keys[0], pTypes[0])
_, exists = lru.Get(keys[0], pTypes[0], limitParams[0])
require.True(t, exists)

_, exists = lru.Get(keys[2], pTypes[2])
_, exists = lru.Get(keys[2], pTypes[2], limitParams[2])
require.True(t, exists)
}

Expand All @@ -207,14 +220,14 @@ func TestLRUPCDeleteAll(t *testing.T) {
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.DeleteAll()

for i := 0; i < 3; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], []uint64{})
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 0, int(lru.size))
Expand Down Expand Up @@ -242,7 +255,7 @@ func TestLRUPCSetCapacity(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(5), lru.size)
Expand Down Expand Up @@ -292,7 +305,7 @@ func TestIssue37914(t *testing.T) {
val := &PlanCacheValue{ParamTypes: pTypes}

require.NotPanics(t, func() {
lru.Put(key, val, pTypes)
lru.Put(key, val, pTypes, []uint64{})
})
}

Expand All @@ -313,7 +326,7 @@ func TestIssue38244(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand All @@ -334,15 +347,15 @@ func TestLRUPlanCacheMemoryUsage(t *testing.T) {
for i := 0; i < 3; i++ {
k := randomPlanCacheKey()
v := randomPlanCacheValue(pTypes)
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
require.Equal(t, lru.MemoryUsage(), res)
}
// evict
p := &PhysicalTableScan{}
k := &planCacheKey{database: "3"}
v := &PlanCacheValue{Plan: p}
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
for kk, vv := range evict {
res -= kk.(*planCacheKey).MemoryUsage() + vv.(*PlanCacheValue).MemoryUsage()
Expand Down
Loading