diff --git a/docs/tidb_http_api.md b/docs/tidb_http_api.md index b479062aa883b..3e126a3e1ab49 100644 --- a/docs/tidb_http_api.md +++ b/docs/tidb_http_api.md @@ -169,3 +169,22 @@ timezone.* - Go mutex pprof - Full goroutine - TiDB config and version + + Param: + + - seconds: profile time(s), default is 10s. + +1. Get statistics data of specified table. + + ```shell + curl http://{TiDBIP}:10080/stats/dump/{db}/{table} + ``` + +1. Get statistics data of specific table and timestamp. + + ```shell + curl http://{TiDBIP}:10080/stats/dump/{db}/{table}/{yyyyMMddHHmmss} + ``` + ```shell + curl http://{TiDBIP}:10080/stats/dump/{db}/{table}/{yyyy-MM-dd HH:mm:ss} + ``` diff --git a/executor/adapter.go b/executor/adapter.go index e4c9351b6e1f3..de882907923c0 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -401,7 +401,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) { copTaskInfo := sessVars.StmtCtx.CopTasksDetails() statsInfos := a.getStatsInfo() memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() - joinVars := fmt.Sprintf("idxBatch %d hjCon %d", sessVars.IndexJoinBatchSize,sessVars.HashJoinConcurrency) + joinVars := fmt.Sprintf("idxBatch %d hjCon %d", sessVars.IndexJoinBatchSize, sessVars.HashJoinConcurrency) if costTime < threshold { _, digest := sessVars.StmtCtx.SQLDigest() logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql, planString, joinVars)) diff --git a/executor/load_stats.go b/executor/load_stats.go index 7a5894745d495..0800dcefaaa66 100644 --- a/executor/load_stats.go +++ b/executor/load_stats.go @@ -15,6 +15,7 @@ package executor import ( "encoding/json" + "fmt" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" @@ -77,6 +78,7 @@ func (e *LoadStatsExec) Open(ctx context.Context) error { // Update updates the stats of the corresponding table according to the data. func (e *LoadStatsInfo) Update(data []byte) error { jsonTbl := &statistics.JSONTable{} + fmt.Printf("%s\n", data[:10]) if err := json.Unmarshal(data, jsonTbl); err != nil { return errors.Trace(err) } diff --git a/planner/core/stringer.go b/planner/core/stringer.go index 293c5654f6e91..3d153342a071d 100644 --- a/planner/core/stringer.go +++ b/planner/core/stringer.go @@ -319,7 +319,7 @@ func toStringWithCount(in Plan, strs []string, idxs []int) ([]string, []int) { str = "MaxOneRow" case *PhysicalLimit: str = "Limit" + fmt.Sprintf("%v\n", x.StatsCount()) - case *PhysicalSort: + case *PhysicalSort: str = "Sort" + fmt.Sprintf("%.2f", x.StatsCount()) case *PhysicalUnionAll: last := len(idxs) - 1 diff --git a/server/http_handler.go b/server/http_handler.go index 26f80189971a9..7192dfd048d24 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -69,6 +69,7 @@ const ( pColumnFlag = "colFlag" pColumnLen = "colLen" pRowBin = "rowBin" + pSnapshot = "snapshot" ) // For query string diff --git a/server/http_status.go b/server/http_status.go index c5a176608d049..c891dec2123ac 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -50,7 +50,8 @@ func (s *Server) startHTTPServer() { router.Handle("/metrics", prometheus.Handler()) // HTTP path for dump statistics. - router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler()) + router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler()).Name("StatsDump") + router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()).Name("StatsHistoryDump") router.Handle("/settings", settingsHandler{}) router.Handle("/binlog/recover", binlogRecover{}) diff --git a/server/statistics_handler.go b/server/statistics_handler.go index 40761114073f1..45151e5b1d0a6 100644 --- a/server/statistics_handler.go +++ b/server/statistics_handler.go @@ -14,12 +14,20 @@ package server import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx" "net/http" + "strings" + "time" "github.com/gorilla/mux" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" ) // StatsHandler is the handler for dumping statistics. @@ -51,7 +59,7 @@ func (sh StatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err != nil { writeError(w, err) } else { - js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta()) + js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), nil) if err != nil { writeError(w, err) } else { @@ -59,3 +67,108 @@ func (sh StatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } } + +// StatsHistoryHandler is the handler for dumping statistics. +type StatsHistoryHandler struct { + do *domain.Domain +} + +func (s *Server) newStatsHistoryHandler() *StatsHistoryHandler { + store, ok := s.driver.(*TiDBDriver) + if !ok { + panic("Illegal driver") + } + + do, err := session.GetDomain(store.store) + if err != nil { + panic("Failed to get domain") + } + return &StatsHistoryHandler{do} +} + +func compatibleParseGCTime(format, value string) (time.Time, error) { + t, err := time.Parse(format, value) + + if err != nil { + // Remove the last field that separated by space + parts := strings.Split(value, " ") + prefix := strings.Join(parts[:len(parts)-1], " ") + t, err = time.Parse(format, prefix) + } + + if err != nil { + err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\"", value, format) + } + return t, err +} + +// validateSnapshot checks that the newly set snapshot time is after GC safe point time. +func validateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error { + sql := "SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_safe_point'" + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) + if err != nil { + return errors.Trace(err) + } + if len(rows) != 1 { + return errors.New("can not get 'tikv_gc_safe_point'") + } + safePointString := rows[0].GetString(0) + const gcTimeFormat = "20060102-15:04:05 -0700" + safePointTime, err := compatibleParseGCTime(gcTimeFormat, safePointString) + if err != nil { + return errors.Trace(err) + } + safePointTS := variable.GoTimeToTS(safePointTime) + if safePointTS > snapshotTS { + return variable.ErrSnapshotTooOld.GenWithStackByArgs(safePointString) + } + return nil +} + +func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + + params := mux.Vars(req) + se, err := session.CreateSession(sh.do.Store()) + if err != nil { + writeError(w, err) + return + } + se.GetSessionVars().StmtCtx.TimeZone = time.Local + t, err := types.ParseTime(se.GetSessionVars().StmtCtx, params[pSnapshot], mysql.TypeTimestamp, 6) + if err != nil { + writeError(w, err) + return + } + t1, err := t.Time.GoTime(time.Local) + if err != nil { + writeError(w, err) + return + } + snapshot := variable.GoTimeToTS(t1) + err = validateSnapshot(se, snapshot) + if err != nil { + writeError(w, err) + return + } + + is, err := sh.do.GetSnapshotInfoSchema(snapshot) + if err != nil { + writeError(w, err) + return + } + h := sh.do.StatsHandle() + tbl, err := is.TableByName(model.NewCIStr(params[pDBName]), model.NewCIStr(params[pTableName])) + if err != nil { + writeError(w, err) + return + } + se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot + historyStatsExec := se.(sqlexec.RestrictedSQLExecutor) + js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec) + if err != nil { + writeError(w, err) + } else { + writeData(w, js) + } +} diff --git a/server/statistics_handler_test.go b/server/statistics_handler_test.go index ce67e3ac6f179..33200a46e070f 100644 --- a/server/statistics_handler_test.go +++ b/server/statistics_handler_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "net/http" "os" + "time" "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" @@ -105,6 +106,37 @@ func (ds *testDumpStatsSuite) TestDumpStatsAPI(c *C) { c.Assert(err, IsNil) fp.Write(js) ds.checkData(c, path) + + // sleep for 1 seconds to ensure the existence of tidb.test + time.Sleep(time.Second) + timeBeforeDropStats := time.Now() + snapshot := timeBeforeDropStats.Format("20060102150405") + ds.prepare4DumpHistoryStats(c) + + // test dump history stats + resp1, err := http.Get("http://127.0.0.1:10090/stats/dump/tidb/test") + c.Assert(err, IsNil) + defer resp1.Body.Close() + js, err = ioutil.ReadAll(resp1.Body) + c.Assert(err, IsNil) + c.Assert(string(js), Equals, "null") + + path1 := "/tmp/stats_history.json" + fp1, err := os.Create(path1) + c.Assert(err, IsNil) + c.Assert(fp1, NotNil) + defer func() { + c.Assert(fp1.Close(), IsNil) + //c.Assert(os.Remove(path1), IsNil) + }() + + resp1, err = http.Get("http://127.0.0.1:10090/stats/dump/tidb/test/" + snapshot) + c.Assert(err, IsNil) + + js, err = ioutil.ReadAll(resp1.Body) + c.Assert(err, IsNil) + fp1.Write(js) + ds.checkData(c, path1) } func (ds *testDumpStatsSuite) prepareData(c *C) { @@ -128,6 +160,25 @@ func (ds *testDumpStatsSuite) prepareData(c *C) { c.Assert(h.Update(is), IsNil) } +func (ds *testDumpStatsSuite) prepare4DumpHistoryStats(c *C) { + db, err := sql.Open("mysql", getDSN()) + c.Assert(err, IsNil, Commentf("Error connecting")) + defer db.Close() + + dbt := &DBTest{c, db} + + safePointName := "tikv_gc_safe_point" + safePointValue := "20060102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + dbt.mustExec(updateSafePoint) + + dbt.mustExec("drop table tidb.test") + dbt.mustExec("create table tidb.test (a int, b varchar(20))") +} + func (ds *testDumpStatsSuite) checkData(c *C, path string) { db, err := sql.Open("mysql", getDSN(func(config *mysql.Config) { config.AllowAllFiles = true @@ -135,16 +186,11 @@ func (ds *testDumpStatsSuite) checkData(c *C, path string) { })) c.Assert(err, IsNil, Commentf("Error connecting")) dbt := &DBTest{c, db} - defer func() { - dbt.mustExec("drop database tidb") - dbt.mustExec("truncate table mysql.stats_meta") - dbt.mustExec("truncate table mysql.stats_histograms") - dbt.mustExec("truncate table mysql.stats_buckets") - db.Close() - }() + defer db.Close() dbt.mustExec("use tidb") dbt.mustExec("drop stats test") + fmt.Printf("%v\n", fmt.Sprintf("load stats '%s'", path)) _, err = dbt.db.Exec(fmt.Sprintf("load stats '%s'", path)) c.Assert(err, IsNil) @@ -160,3 +206,15 @@ func (ds *testDumpStatsSuite) checkData(c *C, path string) { dbt.Check(modifyCount, Equals, int64(3)) dbt.Check(count, Equals, int64(4)) } + +func (ds *testDumpStatsSuite) clearData(c *C, path string) { + db, err := sql.Open("mysql", getDSN()) + c.Assert(err, IsNil, Commentf("Error connecting")) + defer db.Close() + + dbt := &DBTest{c, db} + dbt.mustExec("drop database tidb") + dbt.mustExec("truncate table mysql.stats_meta") + dbt.mustExec("truncate table mysql.stats_histograms") + dbt.mustExec("truncate table mysql.stats_buckets") +} diff --git a/session/session.go b/session/session.go index f6c6e4f501d6f..73d6b3684126a 100644 --- a/session/session.go +++ b/session/session.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -1581,3 +1582,69 @@ func logQuery(query string, vars *variable.SessionVars) { zap.String("sql", query+vars.GetExecuteArgumentsInfo())) } } + +// ExecRestrictedSQLWithSnapshot implements RestrictedSQLExecutor interface. +// This is used for executing some restricted sql statements with snapshot. +// If current session sets the snapshot timestamp, then execute with this snapshot timestamp. +// Otherwise, execute with the current transaction start timestamp if the transaction is valid. +func (s *session) ExecRestrictedSQLWithSnapshot(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) { + ctx := context.TODO() + + // Use special session to execute the sql. + tmp, err := s.sysSessionPool().Get() + if err != nil { + return nil, nil, err + } + se := tmp.(*session) + defer s.sysSessionPool().Put(tmp) + metrics.SessionRestrictedSQLCounter.Inc() + var snapshot uint64 + txn, err := s.Txn(false) + if err != nil { + return nil, nil, err + } + if txn.Valid() { + snapshot = s.txn.StartTS() + } + if s.sessionVars.SnapshotTS != 0 { + snapshot = s.sessionVars.SnapshotTS + } + // Set snapshot. + if snapshot != 0 { + if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, strconv.FormatUint(snapshot, 10)); err != nil { + return nil, nil, err + } + defer func() { + if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil { + logutil.Logger(context.Background()).Error("set tidbSnapshot error", zap.Error(err)) + } + }() + } + startTime := time.Now() + recordSets, err := se.Execute(ctx, sql) + if err != nil { + return nil, nil, errors.Trace(err) + } + + var ( + rows []chunk.Row + fields []*ast.ResultField + ) + // Execute all recordset, take out the first one as result. + for i, rs := range recordSets { + tmp, err := drainRecordSet(ctx, se, rs) + if err != nil { + return nil, nil, errors.Trace(err) + } + if err = rs.Close(); err != nil { + return nil, nil, errors.Trace(err) + } + + if i == 0 { + rows = tmp + fields = rs.Fields() + } + } + metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal).Observe(time.Since(startTime).Seconds()) + return rows, fields, nil +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6e8b3620b2815..f6ebe91e29ef7 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -802,8 +802,10 @@ const ( SlowLogCopWaitMax = "Cop_wait_max" // SlowLogMemMax is the max number bytes of memory used in this statement. SlowLogMemMax = "Mem_max" + // SlowLogPlan is the plan of slow query. SlowLogPlan = "Plan" - SlowLogJoinVars = "Join_Var" + // SlowLogJoinVars is the join session vars. + SlowLogJoinVars = "Join_Vars" ) // SlowLogFormat uses for formatting slow log. @@ -884,10 +886,10 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe buf.WriteString(SlowLogPrefixStr + SlowLogMemMax + SlowLogSpaceMarkStr + strconv.FormatInt(memMax, 10) + "\n") } if len(planString) > 0 { - buf.WriteString(SlowLogPrefixStr + SlowLogPlan+ SlowLogSpaceMarkStr + planString + "\n") + buf.WriteString(SlowLogPrefixStr + SlowLogPlan + SlowLogSpaceMarkStr + planString + "\n") } if len(joinVars) > 0 { - buf.WriteString(SlowLogPrefixStr + SlowLogJoinVars+ SlowLogSpaceMarkStr + joinVars + "\n") + buf.WriteString(SlowLogPrefixStr + SlowLogJoinVars + SlowLogSpaceMarkStr + joinVars + "\n") } if len(sql) == 0 { sql = ";" diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 89d9223d96d5a..706112b03f50d 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -106,6 +106,6 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) { select * from t;` sql := "select * from t" digest := parser.DigestHash(sql) - logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql) + logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql, "", "") c.Assert(logString, Equals, resultString) } diff --git a/statistics/dump.go b/statistics/dump.go index 1a420c3a82351..59d6c1eab9fd4 100644 --- a/statistics/dump.go +++ b/statistics/dump.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tipb/go-tipb" ) @@ -58,10 +59,10 @@ func dumpJSONCol(hist *Histogram, CMSketch *CMSketch) *jsonColumn { } // DumpStatsToJSON dumps statistic to json. -func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo) (*JSONTable, error) { +func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { pi := tableInfo.GetPartitionInfo() if pi == nil { - return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID) + return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec) } jsonTbl := &JSONTable{ DatabaseName: dbName, @@ -69,7 +70,7 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo) (*JS Partitions: make(map[string]*JSONTable, len(pi.Definitions)), } for _, def := range pi.Definitions { - tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID) + tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec) if err != nil { return nil, errors.Trace(err) } @@ -81,13 +82,14 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo) (*JS return jsonTbl, nil } -func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64) (*JSONTable, error) { - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true) - if err != nil { - return nil, errors.Trace(err) +func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec) + if err != nil || tbl == nil { + return nil, err } - if tbl == nil { - return nil, nil + tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec) + if err != nil { + return nil, err } jsonTbl := &JSONTable{ DatabaseName: dbName, diff --git a/statistics/dump_test.go b/statistics/dump_test.go index 5283855dc0a8d..06bbe4bde6fef 100644 --- a/statistics/dump_test.go +++ b/statistics/dump_test.go @@ -62,7 +62,7 @@ func (s *testDumpStatsSuite) TestConversion(c *C) { tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) - jsonTbl, err := h.DumpStatsToJSON("test", tableInfo.Meta()) + jsonTbl, err := h.DumpStatsToJSON("test", tableInfo.Meta(), nil) c.Assert(err, IsNil) loadTbl, err := statistics.TableStatsFromJSON(tableInfo.Meta(), tableInfo.Meta().ID, jsonTbl) c.Assert(err, IsNil) @@ -101,7 +101,7 @@ PARTITION BY RANGE ( a ) ( table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tableInfo := table.Meta() - jsonTbl, err := h.DumpStatsToJSON("test", tableInfo) + jsonTbl, err := h.DumpStatsToJSON("test", tableInfo, nil) c.Assert(err, IsNil) pi := tableInfo.GetPartitionInfo() originTables := make([]*statistics.Table, 0, len(pi.Definitions)) @@ -136,6 +136,6 @@ func (s *testDumpStatsSuite) TestDumpAlteredTable(c *C) { tk.MustExec("alter table t drop column a") table, err := s.do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) - _, err = h.DumpStatsToJSON("test", table.Meta()) + _, err = h.DumpStatsToJSON("test", table.Meta(), nil) c.Assert(err, IsNil) } diff --git a/statistics/handle.go b/statistics/handle.go index 2b8c6ac42d1d4..81e01af913c63 100644 --- a/statistics/handle.go +++ b/statistics/handle.go @@ -153,7 +153,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { continue } tableInfo := table.Meta() - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false) + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, nil) // Error is not nil may mean that there are some ddl changes on this table, we will not update it. if err != nil { logutil.Logger(context.Background()).Debug("error occurred when read table stats", zap.String("table", tableInfo.Name.O), zap.Error(err)) @@ -257,11 +257,11 @@ func (h *Handle) LoadNeededHistograms() error { histogramNeededColumns.delete(col) continue } - hg, err := h.histogramFromStorage(col.tableID, c.ID, &c.Info.FieldType, c.NDV, 0, c.LastUpdateVersion, c.NullCount, c.TotColSize) + hg, err := h.histogramFromStorage(col.tableID, c.ID, &c.Info.FieldType, c.NDV, 0, c.LastUpdateVersion, c.NullCount, c.TotColSize, nil) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(col.tableID, 0, col.columnID) + cms, err := h.cmSketchFromStorage(col.tableID, 0, col.columnID, nil) if err != nil { return errors.Trace(err) } diff --git a/statistics/histogram.go b/statistics/histogram.go index 940868a74fe1b..965b55a9549dc 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -16,6 +16,7 @@ package statistics import ( "bytes" "fmt" + "github.com/pingcap/parser/ast" "math" "strings" "time" @@ -295,11 +296,16 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error return } -func (h *Handle) histogramFromStorage(tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64) (*Histogram, error) { +func (h *Handle) histogramFromStorage(tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *Histogram, err error) { selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) - rows, fields, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) - if err != nil { - return nil, errors.Trace(err) + var ( + rows []chunk.Row + fields []*ast.ResultField + ) + if historyStatsExec != nil { + rows, fields, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) + } else { + rows, fields, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) } bucketSize := len(rows) hg := NewHistogram(colID, distinct, nullCount, ver, tp, bucketSize, totColSize) @@ -331,6 +337,23 @@ func (h *Handle) histogramFromStorage(tableID int64, colID int64, tp *types.Fiel return hg, nil } +func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (version uint64, modifyCount, count int64, err error) { + selSQL := fmt.Sprintf("SELECT version, modify_count, count from mysql.stats_meta where table_id = %d order by version", tableID) + var rows []chunk.Row + if historyStatsExec == nil { + rows, _, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + } else { + rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) + } + if err != nil || len(rows) == 0 { + return + } + version = rows[0].GetUint64(0) + modifyCount = rows[0].GetInt64(1) + count = rows[0].GetInt64(2) + return +} + func (h *Handle) columnCountFromStorage(tableID, colID int64) (int64, error) { selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) rows, _, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) diff --git a/statistics/table.go b/statistics/table.go index 21e5ad83b9922..f3346733fae7c 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -16,6 +16,7 @@ package statistics import ( "context" "fmt" + "github.com/pingcap/tidb/util/sqlexec" "math" "strings" "sync" @@ -96,9 +97,14 @@ func (t *Table) copy() *Table { return nt } -func (h *Handle) cmSketchFromStorage(tblID int64, isIndex, histID int64) (*CMSketch, error) { +func (h *Handle) cmSketchFromStorage(tblID int64, isIndex, histID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *CMSketch, err error) { selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + var rows []chunk.Row + if historyStatsExec != nil { + rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) + } else { + rows, _, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + } if err != nil { return nil, errors.Trace(err) } @@ -108,7 +114,7 @@ func (h *Handle) cmSketchFromStorage(tblID int64, isIndex, histID int64) (*CMSke return decodeCMSketch(rows[0].GetBytes(0)) } -func (h *Handle) indexStatsFromStorage(row chunk.Row, table *Table, tableInfo *model.TableInfo) error { +func (h *Handle) indexStatsFromStorage(row chunk.Row, table *Table, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) error { histID := row.GetInt64(2) distinct := row.GetInt64(3) histVer := row.GetUint64(4) @@ -127,11 +133,11 @@ func (h *Handle) indexStatsFromStorage(row chunk.Row, table *Table, tableInfo *m continue } if idx == nil || idx.LastUpdateVersion < histVer { - hg, err := h.histogramFromStorage(table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0) + hg, err := h.histogramFromStorage(table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, historyStatsExec) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(table.PhysicalID, 1, idxInfo.ID) + cms, err := h.cmSketchFromStorage(table.PhysicalID, 1, idxInfo.ID, historyStatsExec) if err != nil { return errors.Trace(err) } @@ -147,7 +153,7 @@ func (h *Handle) indexStatsFromStorage(row chunk.Row, table *Table, tableInfo *m return nil } -func (h *Handle) columnStatsFromStorage(row chunk.Row, table *Table, tableInfo *model.TableInfo, loadAll bool) error { +func (h *Handle) columnStatsFromStorage(row chunk.Row, table *Table, tableInfo *model.TableInfo, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) error { histID := row.GetInt64(2) distinct := row.GetInt64(3) histVer := row.GetUint64(4) @@ -191,11 +197,11 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *Table, tableInfo * break } if col == nil || col.LastUpdateVersion < histVer || loadAll { - hg, err := h.histogramFromStorage(table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize) + hg, err := h.histogramFromStorage(table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, historyStatsExec) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(table.PhysicalID, 0, colInfo.ID) + cms, err := h.cmSketchFromStorage(table.PhysicalID, 0, colInfo.ID, historyStatsExec) if err != nil { return errors.Trace(err) } @@ -228,7 +234,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *Table, tableInfo * } // tableStatsFromStorage loads table stats info from storage. -func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool) (*Table, error) { +func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *Table, err error) { table, ok := h.statsCache.Load().(statsCache)[physicalID] // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. @@ -248,7 +254,12 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in } table.Pseudo = false selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag from mysql.stats_histograms where table_id = %d", physicalID) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + var rows []chunk.Row + if historyStatsExec != nil { + rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) + } else { + rows, _, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + } if err != nil { return nil, errors.Trace(err) } @@ -258,11 +269,11 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in } for _, row := range rows { if row.GetInt64(1) > 0 { - if err := h.indexStatsFromStorage(row, table, tableInfo); err != nil { + if err := h.indexStatsFromStorage(row, table, tableInfo, historyStatsExec); err != nil { return nil, errors.Trace(err) } } else { - if err := h.columnStatsFromStorage(row, table, tableInfo, loadAll); err != nil { + if err := h.columnStatsFromStorage(row, table, tableInfo, loadAll, historyStatsExec); err != nil { return nil, errors.Trace(err) } } diff --git a/util/sqlexec/restricted_sql_executor.go b/util/sqlexec/restricted_sql_executor.go index 0d665f1567a73..aa9195edea863 100644 --- a/util/sqlexec/restricted_sql_executor.go +++ b/util/sqlexec/restricted_sql_executor.go @@ -36,6 +36,7 @@ import ( type RestrictedSQLExecutor interface { // ExecRestrictedSQL run sql statement in ctx with some restriction. ExecRestrictedSQL(ctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) + ExecRestrictedSQLWithSnapshot(ctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) } // SQLExecutor is an interface provides executing normal sql statement.