Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support capture evolve plan tasks #13199

Merged
merged 4 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions bindinfo/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ type hintProcessor struct {
*HintsSet
// bindHint2Ast indicates the behavior of the processor, `true` for bind hint to ast, `false` for extract hint from ast.
bindHint2Ast bool
tableCounter int64
indexCounter int64
tableCounter int
indexCounter int
}

func (hp *hintProcessor) Enter(in ast.Node) (ast.Node, bool) {
switch v := in.(type) {
case *ast.SelectStmt:
if hp.bindHint2Ast {
v.TableHints = hp.tableHints[hp.tableCounter]
if hp.tableCounter < len(hp.tableHints) {
v.TableHints = hp.tableHints[hp.tableCounter]
} else {
v.TableHints = nil
}
hp.tableCounter++
} else {
hp.tableHints = append(hp.tableHints, v.TableHints)
}
case *ast.TableName:
if hp.bindHint2Ast {
v.IndexHints = hp.indexHints[hp.indexCounter]
if hp.indexCounter < len(hp.indexHints) {
v.IndexHints = hp.indexHints[hp.indexCounter]
} else {
v.IndexHints = nil
}
hp.indexCounter++
} else {
hp.indexHints = append(hp.indexHints, v.IndexHints)
Expand Down
22 changes: 21 additions & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *testSuite) TestUseMultiplyBindings(c *C) {
tk.MustExec("analyze table t")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("create binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_b) where a >= 1 and b >= 1 and c = 0")
// It cannot choose `idx_c` although it has lowest cost.
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
tk.MustQuery("select * from t where a >= 1 and b >= 4 and c = 0")
Expand Down Expand Up @@ -504,3 +504,23 @@ func (s *testSuite) TestDropSingleBindings(c *C) {
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 0)
}

func (s *testSuite) TestAddEvolveTasks(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b), index idx_c(c))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("analyze table t")
tk.MustExec("create global binding for select * from t where a >= 1 and b >= 1 and c = 0 using select * from t use index(idx_a) where a >= 1 and b >= 1 and c = 0")
tk.MustExec("set @@tidb_evolve_plan_baselines=1")
// It cannot choose table path although it has lowest cost.
tk.MustQuery("select * from t where a >= 4 and b >= 1 and c = 0")
c.Assert(tk.Se.GetSessionVars().StmtCtx.IndexNames[0], Equals, "t:idx_a")
domain.GetDomain(tk.Se).BindHandle().SaveEvolveTasksToStore()
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[1][1], Equals, "SELECT /*+ USE_INDEX(@`sel_1` `test`.`t` )*/ * FROM `test`.`t` WHERE `a`>=4 AND `b`>=1 AND `c`=0")
c.Assert(rows[1][3], Equals, "pending verify")
}
11 changes: 7 additions & 4 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
deleted = "deleted"
// Invalid is the bind info's invalid status.
Invalid = "invalid"
// PendingVerify means the bind info needs to be verified.
PendingVerify = "pending verify"
)

// Binding stores the basic bind hint info.
Expand All @@ -47,6 +49,7 @@ type Binding struct {
// Hint is the parsed hints, it is used to bind hints to stmt node.
Hint *HintsSet
// id is the string form of all hints. It is used to uniquely identify different hints.
// It would be non-empty only when the status is `Using` or `PendingVerify`.
id string
}

Expand All @@ -71,10 +74,10 @@ func (br *BindRecord) HasUsingBinding() bool {
return false
}

// FindUsingBinding find bindings with status `Using` in BindRecord.
func (br *BindRecord) FindUsingBinding(hint string) *Binding {
// FindBinding find bindings in BindRecord.
func (br *BindRecord) FindBinding(hint string) *Binding {
for _, binding := range br.Bindings {
if binding.Status == Using && binding.id == hint {
if binding.id == hint {
return &binding
}
}
Expand All @@ -84,7 +87,7 @@ func (br *BindRecord) FindUsingBinding(hint string) *Binding {
func (br *BindRecord) prepareHints(sctx sessionctx.Context, is infoschema.InfoSchema) error {
p := parser.New()
for i, bind := range br.Bindings {
if bind.Hint != nil {
if bind.Hint != nil || bind.id != "" {
continue
}
stmtNode, err := p.ParseOneStmt(bind.BindSQL, bind.Charset, bind.Collation)
Expand Down
150 changes: 100 additions & 50 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type BindHandle struct {

// invalidBindRecordMap indicates the invalid bind records found during querying.
// A record will be deleted from this map, after 2 bind-lease, after it is dropped from the kv.
invalidBindRecordMap struct {
sync.Mutex
atomic.Value
}
invalidBindRecordMap tmpBindRecordMap

// pendingVerifyBindRecordMap indicates the pending verify bind records that found during query.
pendingVerifyBindRecordMap tmpBindRecordMap

lastUpdateTime types.Time

Expand All @@ -83,9 +83,9 @@ type BindHandle struct {
// Lease influences the duration of loading bind info and handling invalid bind.
var Lease = 3 * time.Second

type invalidBindRecordMap struct {
bindRecord *BindRecord
droppedTime time.Time
type bindRecordUpdate struct {
bindRecord *BindRecord
updateTime time.Time
}

// NewBindHandle creates a new BindHandle.
Expand All @@ -95,7 +95,18 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle.bindInfo.Value.Store(make(cache, 32))
handle.bindInfo.parser = parser.New()
handle.parser4Baseline = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*invalidBindRecordMap))
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.DropBindRecord(nil, nil, record)
}
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
// We do not need the first two parameters because they are only use to generate hint,
// and we already have the hint.
return handle.AddBindRecord(nil, nil, record)
}
return handle
}

Expand Down Expand Up @@ -150,6 +161,21 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
if err != nil {
return err
}

br := h.GetBindRecord(parser.DigestHash(record.OriginalSQL), record.OriginalSQL, record.Db)
var duplicateBinding string
if br != nil {
binding := br.FindBinding(record.Bindings[0].id)
if binding != nil {
// There is already a binding with status `Using` or `PendingVerify`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
// Otherwise, we need to remove it before insert.
duplicateBinding = binding.BindSQL
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
}
}

exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.sctx.Lock()
_, err = exec.Execute(context.TODO(), "BEGIN")
Expand Down Expand Up @@ -179,18 +205,10 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
h.bindInfo.Unlock()
}()

oldBindRecord := h.GetBindRecord(parser.DigestHash(record.OriginalSQL), record.OriginalSQL, record.Db)
if oldBindRecord != nil {
for _, newBinding := range record.Bindings {
binding := oldBindRecord.FindUsingBinding(newBinding.id)
if binding == nil {
continue
}
// Remove duplicates before insert.
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, binding.BindSQL))
if err != nil {
return err
}
if duplicateBinding != "" {
_, err = exec.Execute(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding))
if err != nil {
return err
}
}

Expand All @@ -205,7 +223,6 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, is infoschema.InfoSc
Fsp: 3,
}
record.Bindings[i].UpdateTime = record.Bindings[0].CreateTime
record.Bindings[i].Status = Using

// insert the BindRecord to the storage.
_, err = exec.Execute(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
Expand Down Expand Up @@ -266,7 +283,7 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
if oldBindRecord == nil {
continue
}
binding := oldBindRecord.FindUsingBinding(record.Bindings[i].id)
binding := oldBindRecord.FindBinding(record.Bindings[i].id)
if binding != nil {
bindingSQLs = append(bindingSQLs, binding.BindSQL)
}
Expand All @@ -276,44 +293,60 @@ func (h *BindHandle) DropBindRecord(sctx sessionctx.Context, is infoschema.InfoS
return err
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
invalidBindRecordMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Load().(map[string]*invalidBindRecordMap))
for key, invalidBindRecord := range invalidBindRecordMap {
if invalidBindRecord.droppedTime.IsZero() {
err := h.DropBindRecord(nil, nil, invalidBindRecord.bindRecord)
// tmpBindRecordMap is used to temporarily save bind record changes.
// Those changes will be flushed into store periodically.
type tmpBindRecordMap struct {
sync.Mutex
atomic.Value
flushFunc func(record *BindRecord) error
}

func (tmpMap *tmpBindRecordMap) flushToStore() {
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
for key, bindRecord := range newMap {
if bindRecord.updateTime.IsZero() {
err := tmpMap.flushFunc(bindRecord.bindRecord)
if err != nil {
logutil.BgLogger().Error("DropInvalidBindRecord failed", zap.Error(err))
logutil.BgLogger().Error("flush bind record failed", zap.Error(err))
}
invalidBindRecord.droppedTime = time.Now()
bindRecord.updateTime = time.Now()
continue
}

if time.Since(invalidBindRecord.droppedTime) > 6*time.Second {
delete(invalidBindRecordMap, key)
updateMetrics(metrics.ScopeGlobal, invalidBindRecord.bindRecord, nil, false)
if time.Since(bindRecord.updateTime) > 6*time.Second {
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
delete(newMap, key)
updateMetrics(metrics.ScopeGlobal, bindRecord.bindRecord, nil, false)
}
}
h.invalidBindRecordMap.Store(invalidBindRecordMap)
tmpMap.Store(newMap)
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
key := invalidBindRecord.OriginalSQL + ":" + invalidBindRecord.Db
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
func (tmpMap *tmpBindRecordMap) saveToCache(bindRecord *BindRecord) {
key := bindRecord.OriginalSQL + ":" + bindRecord.Db + ":" + bindRecord.Bindings[0].id
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
h.invalidBindRecordMap.Lock()
defer h.invalidBindRecordMap.Unlock()
if _, ok := h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap)[key]; ok {
tmpMap.Lock()
defer tmpMap.Unlock()
if _, ok := tmpMap.Load().(map[string]*bindRecordUpdate)[key]; ok {
return
}
newMap := copyInvalidBindRecordMap(h.invalidBindRecordMap.Value.Load().(map[string]*invalidBindRecordMap))
newMap[key] = &invalidBindRecordMap{
bindRecord: invalidBindRecord,
newMap := copyBindRecordUpdateMap(tmpMap.Load().(map[string]*bindRecordUpdate))
newMap[key] = &bindRecordUpdate{
bindRecord: bindRecord,
}
h.invalidBindRecordMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, invalidBindRecord, false)
tmpMap.Store(newMap)
updateMetrics(metrics.ScopeGlobal, nil, bindRecord, false)
}

// DropInvalidBindRecord execute the drop bindRecord task.
func (h *BindHandle) DropInvalidBindRecord() {
h.invalidBindRecordMap.flushToStore()
}

// AddDropInvalidBindTask add bindRecord to invalidBindRecordMap when the bindRecord need to be deleted.
func (h *BindHandle) AddDropInvalidBindTask(invalidBindRecord *BindRecord) {
h.invalidBindRecordMap.saveToCache(invalidBindRecord)
}

// Size return the size of bind info cache.
Expand Down Expand Up @@ -360,6 +393,7 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
if err != nil {
return "", nil, err
}
h.sctx.GetSessionVars().StmtCtx.TimeZone = h.sctx.GetSessionVars().TimeZone
h.sctx.GetSessionVars().CurrentDB = bindRecord.Db
err = bindRecord.prepareHints(h.sctx.Context, h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema))
return hash, bindRecord, err
Expand Down Expand Up @@ -431,8 +465,8 @@ func (c cache) copy() cache {
return newCache
}

func copyInvalidBindRecordMap(oldMap map[string]*invalidBindRecordMap) map[string]*invalidBindRecordMap {
newMap := make(map[string]*invalidBindRecordMap, len(oldMap))
func copyBindRecordUpdateMap(oldMap map[string]*bindRecordUpdate) map[string]*bindRecordUpdate {
newMap := make(map[string]*bindRecordUpdate, len(oldMap))
for k, v := range oldMap {
newMap[k] = v
}
Expand All @@ -453,7 +487,7 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
return fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql = %s`,
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
expression.Quote(bindSQL),
Expand Down Expand Up @@ -536,9 +570,25 @@ func (h *BindHandle) CaptureBaselines() {
}
}

// AddEvolvePlanTask adds the evolve plan task into memory cache. It would be flushed to store periodically.
func (h *BindHandle) AddEvolvePlanTask(originalSQL, DB string, binding Binding, planHint string) {
binding.id = planHint
br := &BindRecord{
OriginalSQL: originalSQL,
Db: DB,
Bindings: []Binding{binding},
}
h.pendingVerifyBindRecordMap.saveToCache(br)
}

// SaveEvolveTasksToStore saves the evolve task into store.
func (h *BindHandle) SaveEvolveTasksToStore() {
h.pendingVerifyBindRecordMap.flushToStore()
}

// Clear resets the bind handle. It is used for test.
func (h *BindHandle) Clear() {
h.bindInfo.Store(make(cache))
h.invalidBindRecordMap.Store(make(map[string]*invalidBindRecordMap))
h.invalidBindRecordMap.Store(make(map[string]*bindRecordUpdate))
h.lastUpdateTime = types.ZeroTimestamp
}
1 change: 1 addition & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ func (do *Domain) globalBindHandleWorkerLoop() {
continue
}
do.bindHandle.CaptureBaselines()
do.bindHandle.SaveEvolveTasksToStore()
}
}
}()
Expand Down
Loading