Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: update the underlying binding cache structure #58481

Merged
merged 21 commits into from
Dec 24, 2024
4 changes: 1 addition & 3 deletions pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ go_library(
"//pkg/util/hack",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/kvcache",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/parser",
"//pkg/util/sqlexec",
"//pkg/util/stringutil",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pkg_errors//:errors",
Expand Down
199 changes: 59 additions & 140 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ import (
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/pkg/bindinfo/internal/logutil"
"github.com/pingcap/tidb/pkg/bindinfo/norm"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/kvcache"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/stringutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -59,14 +56,14 @@ type digestBiMap interface {
// Del deletes the pair of noDBDigest and sqlDigest.
Del(sqlDigest string)

// All returns all the sqlDigests.
All() (sqlDigests []string)

// NoDBDigest2SQLDigest converts noDBDigest to sqlDigest.
NoDBDigest2SQLDigest(noDBDigest string) []string

// SQLDigest2NoDBDigest converts sqlDigest to noDBDigest.
SQLDigest2NoDBDigest(sqlDigest string) string

// Copy copies this digestBiMap.
Copy() digestBiMap
}

type digestBiMapImpl struct {
Expand Down Expand Up @@ -116,6 +113,17 @@ func (b *digestBiMapImpl) Del(sqlDigest string) {
delete(b.sqlDigest2noDBDigest, sqlDigest)
}

// All returns all the sqlDigests.
func (b *digestBiMapImpl) All() []string {
b.mu.RLock()
defer b.mu.RUnlock()
sqlDigests := make([]string, 0, len(b.sqlDigest2noDBDigest))
for sqlDigest := range b.sqlDigest2noDBDigest {
sqlDigests = append(sqlDigests, sqlDigest)
}
return sqlDigests
}

// NoDBDigest2SQLDigest converts noDBDigest to sqlDigest.
func (b *digestBiMapImpl) NoDBDigest2SQLDigest(noDBDigest string) []string {
b.mu.RLock()
Expand All @@ -130,26 +138,6 @@ func (b *digestBiMapImpl) SQLDigest2NoDBDigest(sqlDigest string) string {
return b.sqlDigest2noDBDigest[sqlDigest]
}

// Copy copies this digestBiMap.
func (b *digestBiMapImpl) Copy() digestBiMap {
b.mu.RLock()
defer b.mu.RUnlock()
noDBDigest2SQLDigest := make(map[string][]string, len(b.noDBDigest2SQLDigest))
for k, list := range b.noDBDigest2SQLDigest {
newList := make([]string, len(list))
copy(newList, list)
noDBDigest2SQLDigest[k] = newList
}
sqlDigest2noDBDigest := make(map[string]string, len(b.sqlDigest2noDBDigest))
for k, v := range b.sqlDigest2noDBDigest {
sqlDigest2noDBDigest[k] = v
}
return &digestBiMapImpl{
noDBDigest2SQLDigest: noDBDigest2SQLDigest,
sqlDigest2noDBDigest: sqlDigest2noDBDigest,
}
}

// BindingCache is the interface for the cache of the SQL plan bindings.
type BindingCache interface {
// MatchingBinding supports cross-db matching on bindings.
Expand All @@ -170,104 +158,40 @@ type BindingCache interface {
GetMemCapacity() int64
// Size returns the number of items in the cache.
Size() int
// Close closes the cache.
Close()
}

// bindingCache uses the LRU cache to store the bindings.
// The key of the LRU cache is original sql, the value is a slice of Bindings.
// Note: The bindingCache should be accessed with lock.
type bindingCache struct {
lock sync.RWMutex
digestBiMap digestBiMap // mapping between noDBDigest and sqlDigest, used to support cross-db binding.
cache *kvcache.SimpleLRUCache // the underlying cache to store the bindings.
memCapacity int64
memTracker *memory.Tracker // track memory usage.
// TODO: use the underlying cache structure to track and control the memory usage, and remove
// memCapacity and memTracker to simplify the code.
digestBiMap digestBiMap // mapping between noDBDigest and sqlDigest, used to support cross-db binding.
cache *ristretto.Cache // the underlying cache to store the bindings.

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)
}

type bindingCacheKey string

func (key bindingCacheKey) Hash() []byte {
return hack.Slice(string(key))
}

func calcBindCacheKVMem(key bindingCacheKey, value Bindings) int64 {
var valMem int64
valMem += int64(value.size())
return int64(len(key.Hash())) + valMem
}

func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)) BindingCache {
// since bindingCache controls the memory usage by itself, set the capacity of
// the underlying LRUCache to max to close its memory control
cache := kvcache.NewSimpleLRUCache(mathutil.MaxUint, 0, 0)
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e6,
MaxCost: variable.MemQuotaBindingCache.Load(),
BufferItems: 64,
Cost: func(value any) int64 {
return int64(value.(Bindings).size())
},
Metrics: true,
IgnoreInternalCost: true,
})
c := bindingCache{
cache: cache,
digestBiMap: newDigestBiMap(),
memCapacity: variable.MemQuotaBindingCache.Load(),
memTracker: memory.NewTracker(memory.LabelForBindCache, -1),
loadBindingFromStorageFunc: bindingLoad,
}
return &c
}

// get gets a cache item according to cache key. It's not thread-safe.
// Note: Only other functions of the bindingCache file can use this function.
// Don't use this function directly in other files in bindinfo package.
// The return value is not read-only, but it is only can be used in other functions which are also in the bind_cache.go.
func (c *bindingCache) get(key bindingCacheKey) Bindings {
value, hit := c.cache.Get(key)
if !hit {
return nil
}
typedValue := value.(Bindings)
return typedValue
}

// set inserts an item to the cache. It's not thread-safe.
// Only other functions of the bindingCache can use this function.
// The set operation will return error message when the memory usage of binding_cache exceeds its capacity.
func (c *bindingCache) set(key bindingCacheKey, value Bindings) (ok bool, err error) {
mem := calcBindCacheKVMem(key, value)
if mem > c.memCapacity { // ignore this kv pair if its size is too large
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
return
}
bindings := c.get(key)
if bindings != nil {
// Remove the origin key-value pair.
mem -= calcBindCacheKVMem(key, bindings)
}
for mem+c.memTracker.BytesConsumed() > c.memCapacity {
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
evictedKey, evictedValue, evicted := c.cache.RemoveOldest()
if !evicted {
return
}
c.memTracker.Consume(-calcBindCacheKVMem(evictedKey.(bindingCacheKey), evictedValue.(Bindings)))
}
c.memTracker.Consume(mem)
c.cache.Put(key, value)
ok = true
return
}

// delete remove an item from the cache. It's not thread-safe.
// Only other functions of the bindingCache can use this function.
func (c *bindingCache) delete(key bindingCacheKey) bool {
bindings := c.get(key)
if bindings != nil {
mem := calcBindCacheKVMem(key, bindings)
c.cache.Delete(key)
c.memTracker.Consume(-mem)
return true
}
return false
}

func (c *bindingCache) shouldMetric() bool {
return c.loadBindingFromStorageFunc != nil // only metric for GlobalBindingCache, whose loadBindingFromStorageFunc is not nil.
}
Expand Down Expand Up @@ -367,31 +291,28 @@ func (c *bindingCache) loadFromStore(sctx sessionctx.Context, missingSQLDigest [
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindingCache) GetBinding(sqlDigest string) Bindings {
c.lock.RLock()
defer c.lock.RUnlock()
return c.get(bindingCacheKey(sqlDigest))
v, ok := c.cache.Get(sqlDigest)
if !ok {
return nil
}
return v.(Bindings)
}

// GetAllBindings return all the bindings from the bindingCache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindingCache) GetAllBindings() Bindings {
c.lock.RLock()
defer c.lock.RUnlock()
values := c.cache.Values()
bindings := make(Bindings, 0, len(values))
for _, vals := range values {
bindings = append(bindings, vals.(Bindings)...)
sqlDigests := c.digestBiMap.All()
bindings := make(Bindings, 0, len(sqlDigests))
for _, sqlDigest := range sqlDigests {
bindings = append(bindings, c.GetBinding(sqlDigest)...)
}
return bindings
}

// SetBinding sets the Bindings to the cache.
// The function is thread-safe.
func (c *bindingCache) SetBinding(sqlDigest string, bindings Bindings) (err error) {
c.lock.Lock()
defer c.lock.Unlock()

// prepare noDBDigests for all bindings
noDBDigests := make([]string, 0, len(bindings))
p := parser.New()
Expand All @@ -404,53 +325,51 @@ func (c *bindingCache) SetBinding(sqlDigest string, bindings Bindings) (err erro
noDBDigests = append(noDBDigests, noDBDigest)
}

for i, binding := range bindings {
c.digestBiMap.Add(noDBDigests[i], binding.SQLDigest)
for i := range bindings {
c.digestBiMap.Add(noDBDigests[i], sqlDigest)
}

// NOTE: due to LRU eviction, the underlying BindingCache state might be inconsistent with noDBDigest2SQLDigest and
// sqlDigest2noDBDigest, but it's acceptable, just return cache-miss in that case.
cacheKey := bindingCacheKey(sqlDigest)
_, err = c.set(cacheKey, bindings)
// NOTE: due to LRU eviction, the underlying BindingCache state might be inconsistent with digestBiMap,
// but it's acceptable, the optimizer will load the binding when cache-miss.
// NOTE: the Set might fail if the operation is too frequent, but binding update is a low-frequently operation, so
// this risk seems acceptable.
// TODO: handle the Set failure more gracefully.
c.cache.Set(sqlDigest, bindings, 0)
c.cache.Wait()
return
}

// RemoveBinding removes the Bindings which has same originSQL with specified Bindings.
// The function is thread-safe.
func (c *bindingCache) RemoveBinding(sqlDigest string) {
c.lock.Lock()
defer c.lock.Unlock()
c.digestBiMap.Del(sqlDigest)
c.delete(bindingCacheKey(sqlDigest))
c.cache.Del(sqlDigest)
}

// SetMemCapacity sets the memory capacity for the cache.
// The function is thread-safe.
func (c *bindingCache) SetMemCapacity(capacity int64) {
c.lock.Lock()
defer c.lock.Unlock()
// Only change the capacity size without affecting the cached bindings
c.memCapacity = capacity
c.cache.UpdateMaxCost(capacity)
}

// GetMemUsage get the memory Usage for the cache.
// The function is thread-safe.
func (c *bindingCache) GetMemUsage() int64 {
c.lock.Lock()
defer c.lock.Unlock()
return c.memTracker.BytesConsumed()
return int64(c.cache.Metrics.CostAdded() - c.cache.Metrics.CostEvicted())
}

// GetMemCapacity get the memory capacity for the cache.
// The function is thread-safe.
func (c *bindingCache) GetMemCapacity() int64 {
c.lock.Lock()
defer c.lock.Unlock()
return c.memCapacity
return c.cache.MaxCost()
}

func (c *bindingCache) Size() int {
c.lock.Lock()
defer c.lock.Unlock()
return c.cache.Size()
return int(c.cache.Metrics.KeysAdded() - c.cache.Metrics.KeysEvicted())
}

// Close closes the cache.
func (c *bindingCache) Close() {
c.cache.Clear()
c.cache.Close()
c.cache.Wait()
}
22 changes: 17 additions & 5 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bindinfo

import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/bindinfo/norm"
"github.com/pingcap/tidb/pkg/parser"
Expand Down Expand Up @@ -70,9 +71,13 @@ func TestCrossDBBindingCache(t *testing.T) {

func TestBindCache(t *testing.T) {
bindings := Bindings{{BindSQL: "SELECT * FROM t1"}}
kvSize := len("digest1") + int(bindings.size())
kvSize := int(bindings.size())
defer func(v int64) {
variable.MemQuotaBindingCache.Store(v)
}(variable.MemQuotaBindingCache.Load())
variable.MemQuotaBindingCache.Store(int64(kvSize*3) - 1)
bindCache := newBindCache(nil)
defer bindCache.Close()

err := bindCache.SetBinding("digest1", bindings)
require.Nil(t, err)
Expand All @@ -83,9 +88,16 @@ func TestBindCache(t *testing.T) {
require.NotNil(t, bindCache.GetBinding("digest2"))

err = bindCache.SetBinding("digest3", bindings)
require.NotNil(t, err) // exceed the memory limit
require.NotNil(t, bindCache.GetBinding("digest2"))
require.Nil(t, err)
require.NotNil(t, bindCache.GetBinding("digest3"))

require.Nil(t, bindCache.GetBinding("digest1")) // digest1 is evicted
require.NotNil(t, bindCache.GetBinding("digest2")) // digest2 is still in the cache
require.Eventually(t, func() bool {
hit := 0
for _, digest := range []string{"digest1", "digest2", "digest3"} {
if bindCache.GetBinding(digest) != nil {
hit++
}
}
return hit == 2
}, time.Second*5, time.Millisecond*100)
}
8 changes: 8 additions & 0 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type GlobalBindingHandle interface {
// GetMemCapacity returns the memory capacity for the bind cache.
GetMemCapacity() (memCapacity int64)

// Close closes the binding cache.
CloseCache()

variable.Statistics
}

Expand Down Expand Up @@ -606,6 +609,11 @@ func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]any, er
return m, nil
}

// Close closes the binding cache.
func (h *globalBindingHandle) CloseCache() {
h.bindingCache.Close()
}

// LoadBindingsFromStorageToCache loads global bindings from storage to the memory cache.
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) (Bindings, error) {
if sqlDigest == "" {
Expand Down
Loading