Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support capture evolve plan tasks #13199

Merged
merged 4 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions bindinfo/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ type hintProcessor struct {
*HintsSet
// bindHint2Ast indicates the behavior of the processor, `true` for bind hint to ast, `false` for extract hint from ast.
bindHint2Ast bool
tableCounter int64
indexCounter int64
tableCounter int
indexCounter int
}

func (hp *hintProcessor) Enter(in ast.Node) (ast.Node, bool) {
switch v := in.(type) {
case *ast.SelectStmt:
if hp.bindHint2Ast {
v.TableHints = hp.tableHints[hp.tableCounter]
if hp.tableCounter < len(hp.tableHints) {
v.TableHints = hp.tableHints[hp.tableCounter]
} else {
v.TableHints = nil
}
hp.tableCounter++
} else {
hp.tableHints = append(hp.tableHints, v.TableHints)
}
case *ast.TableName:
if hp.bindHint2Ast {
v.IndexHints = hp.indexHints[hp.indexCounter]
if hp.indexCounter < len(hp.indexHints) {
v.IndexHints = hp.indexHints[hp.indexCounter]
} else {
v.IndexHints = nil
}
hp.indexCounter++
} else {
hp.indexHints = append(hp.indexHints, v.IndexHints)
Expand Down
22 changes: 21 additions & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,29 @@ func (s *testSuite) TestUseMultiplyBindings(c *C) {
tk.MustExec("analyze table t")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_b) where a >= 1 and b >= 1 and c = 0")
// It cannot choose `idx_c` although it has lowest cost.
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
tk.MustQuery("select * from t where a >= 1 and b >= 4 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_b")
}

func (s *testSuite) TestAddEvolveTasks(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b), index idx_c(c))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("analyze table t")
tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("set @@tidb_evolve_plan_baselines=1")
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
domain.GetDomain(tk.Se).BindHandle().SaveEvolveTasksToStore()
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
c.Assert(rows[1][3], Equals, "pending verify")
}
9 changes: 6 additions & 3 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
deleted = "deleted"
// Invalid is the bind info's invalid status.
Invalid = "invalid"
// PendingVerify means the bind info need to verified.
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
PendingVerify = "pending verify"
)

// Binding stores the basic bind hint info.
Expand All @@ -47,6 +49,7 @@ type Binding struct {
// Hint is the parsed hints, it is used to bind hints to stmt node.
Hint *HintsSet
// id is the string form of all hints. It is used to uniquely identify different hints.
// It would be non-empty only when the status is `Using` or `PendingVerify`.
id string
}

Expand All @@ -71,10 +74,10 @@ func (br *BindRecord) HasUsingBinding() bool {
return false
}

// FindUsingBinding find bindings with status `Using` in BindRecord.
func (br *BindRecord) FindUsingBinding(hint string) *Binding {
// FindBinding find bindings in BindRecord.
func (br *BindRecord) FindBinding(hint string) *Binding {
for _, binding := range br.Bindings {
if binding.Status == Using && binding.id == hint {
if binding.id == hint {
return &binding
}
}
Expand Down
142 changes: 99 additions & 43 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type BindHandle struct {

// invalidBindRecordMap indicates the invalid bind records found during querying.
// A record will be deleted from this map, after 2 bind-lease, after it is dropped from the kv.
invalidBindRecordMap struct {
sync.Mutex
atomic.Value
}
invalidBindRecordMap tmpBindRecordMap

// pendingVerifyBindRecordMap indicates the pending verify bind records that found during query.
pendingVerifyBindRecordMap tmpBindRecordMap

lastUpdateTime types.Time

Expand All @@ -83,9 +83,9 @@ type BindHandle struct {
// Lease influences the duration of loading bind info and handling invalid bind.
var Lease = 3 * time.Second

type invalidBindRecordMap struct {
bindRecord *BindRecord
droppedTime time.Time
type bindRecordUpdate struct {
bindRecord *BindRecord
updateTime time.Time
}

// NewBindHandle creates a new BindHandle.
Expand All @@ -95,7 +95,14 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle.bindInfo.Value.Store(make(cache, 32))
handle.bindInfo.parser = parser.New()
handle.parser4Baseline = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*invalidBindRecordMap))
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = handle.DropBindRecord
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.AddBindRecord(nil, nil, record)
}
return handle
}

Expand Down Expand Up @@ -150,6 +157,21 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
if err != nil {
return err
}

br := h.GetBindRecord(parser.DigestHash(record.OriginalSQL), record.OriginalSQL, record.Db)
var duplicateBinding string
if br != nil {
binding := br.FindBinding(record.Bindings[0].id)
if binding != nil {
// There is already a binding with status `Using` or `PendingVerify`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
// Otherwise, we need to remove it before insert.
duplicateBinding = binding.BindSQL
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
}
}

exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.sctx.Lock()
_, err = exec.Execute(context.TODO(), "BEGIN")
Expand Down Expand Up @@ -179,10 +201,11 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
h.bindInfo.Unlock()
}()

// remove all the unused sql binds.
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db))
if err != nil {
return err
if duplicateBinding != "" {
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding))
if err != nil {
return err
}
}

txn, err1 := h.sctx.Context.Txn(true)
Expand All @@ -196,7 +219,6 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
Fsp: 3,
}
record.Bindings[i].UpdateTime = record.Bindings[0].CreateTime
record.Bindings[i].Status = Using

// insert the BindRecord to the storage.
_, err = exec.Execute(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
Expand Down Expand Up @@ -254,44 +276,60 @@ func (h *BindHandle) DropBindRecord(record *BindRecord) (err error) {
return err
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
invalidBindRecordMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Load().(map[string]*invalidBindRecordMap))
for key, invalidBindRecord := range invalidBindRecordMap {
if invalidBindRecord.droppedTime.IsZero() {
err := h.DropBindRecord(invalidBindRecord.bindRecord)
// tmpBindRecordMap is used to temporarily save bind record changes.
// Those changes will be flushed into store periodically.
type tmpBindRecordMap struct {
sync.Mutex
atomic.Value
flushFunc func(record *BindRecord) error
}

func (tmpMap *tmpBindRecordMap) flushToStore() {
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
for key, bindRecord := range newMap {
if bindRecord.updateTime.IsZero() {
err := tmpMap.flushFunc(bindRecord.bindRecord)
if err != nil {
logutil.BgLogger().Error("DropInvalidBindRecord failed", zap.Error(err))
logutil.BgLogger().Error("flush bind record failed", zap.Error(err))
}
invalidBindRecord.droppedTime = time.Now()
bindRecord.updateTime = time.Now()
continue
}

if time.Since(invalidBindRecord.droppedTime) > 6*time.Second {
delete(invalidBindRecordMap, key)
updateMetrics(metrics.ScopeGlobal, invalidBindRecord.bindRecord, nil, false)
if time.Since(bindRecord.updateTime) > 6*time.Second {
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
delete(newMap, key)
updateMetrics(metrics.ScopeGlobal, bindRecord.bindRecord, nil, false)
}
}
h.invalidBindRecordMap.Store(invalidBindRecordMap)
tmpMap.Store(newMap)
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
key := invalidBindRecord.OriginalSQL + ":" + invalidBindRecord.Db
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
func (tmpMap *tmpBindRecordMap) saveToCache(bindRecord *BindRecord) {
key := bindRecord.OriginalSQL + ":" + bindRecord.Db + ":" + bindRecord.Bindings[0].id
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
h.invalidBindRecordMap.Lock()
defer h.invalidBindRecordMap.Unlock()
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
tmpMap.Lock()
defer tmpMap.Unlock()
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
newMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap))
newMap[key] = &invalidBindRecordMap{
bindRecord: invalidBindRecord,
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
newMap[key] = &bindRecordUpdate{
bindRecord: bindRecord,
}
h.invalidBindRecordMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, invalidBindRecord, false)
tmpMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, bindRecord, false)
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
h.invalidBindRecordMap.flushToStore()
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
h.invalidBindRecordMap.saveToCache(invalidBindRecord)
}

// Size return the size of bind info cache.
Expand Down Expand Up @@ -338,6 +376,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
if err != nil {
return "", nil, err
}
h.sctx.GetSessionVars().StmtCtx.TimeZone = h.sctx.GetSessionVars().TimeZone
h.sctx.GetSessionVars().CurrentDB = bindRecord.Db
err = bindRecord.prepareHintsForUsing(h.sctx.Context, h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema))
return hash, bindRecord, err
Expand All @@ -349,7 +388,7 @@ func (h *BindHandle) appendBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(cache).copy()
oldRecord := newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db)
newRecord := merge(oldRecord, meta)
newCache.setBindRecord(hash, meta)
newCache.setBindRecord(hash, newRecord)
h.bindInfo.Value.Store(newCache)
updateMetrics(metrics.ScopeGlobal, oldRecord, newRecord, false)
}
Expand Down Expand Up @@ -409,8 +448,8 @@ func (c cache) copy() cache {
return newCache
}

func copyInvalidBindRecordMap(oldMap map[string]*invalidBindRecordMap) map[string]*invalidBindRecordMap {
newMap := make(map[string]*invalidBindRecordMap, len(oldMap))
func copyBindRecordUpdateMap(oldMap map[string]*bindRecordUpdate) map[string]*bindRecordUpdate {
newMap := make(map[string]*bindRecordUpdate, len(oldMap))
for k, v := range oldMap {
newMap[k] = v
}
Expand All @@ -429,11 +468,12 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db string) string {
func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
return fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s`,
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
expression.Quote(bindSQL),
)
}

Expand Down Expand Up @@ -514,9 +554,25 @@ func (h *BindHandle) CaptureBaselines() {
}
}

// AddEvolvePlanTask adds the evolve plan task into memory cache. It would be flushed to store periodically.
func (h *BindHandle) AddEvolvePlanTask(originalSQL, DB string, binding Binding, planHint string) {
binding.id = planHint
br := &BindRecord{
OriginalSQL: originalSQL,
Db: DB,
Bindings: []Binding{binding},
}
h.pendingVerifyBindRecordMap.saveToCache(br)
}

// SaveEvolveTasksToStore saves the evolve task into store.
func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

// Clear resets the bind handle. It is used for test.
func (h *BindHandle) Clear() {
h.bindInfo.Store(make(cache))
h.invalidBindRecordMap.Store(make(map[string]*invalidBindRecordMap))
h.invalidBindRecordMap.Store(make(map[string]*bindRecordUpdate))
h.lastUpdateTime = types.ZeroTimestamp
}
1 change: 1 addition & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ func (do *Domain) globalBindHandleWorkerLoop() {
continue
}
do.bindHandle.CaptureBaselines()
do.bindHandle.SaveEvolveTasksToStore()
}
}
}()
Expand Down
Loading