From f75100c36e22e9dd56c8de45c60dfc15cd02395c Mon Sep 17 00:00:00 2001 From: Rustin <29879298+Rustin170506@users.noreply.github.com> Date: Thu, 26 Sep 2024 05:12:46 -0700 Subject: [PATCH] statistics: extract the common function to rebuild the queue (#56251) ref pingcap/tidb#55906 --- .../autoanalyze/priorityqueue/BUILD.bazel | 3 + .../handle/autoanalyze/priorityqueue/queue.go | 160 +++++++++++++++++- .../handle/autoanalyze/refresher/BUILD.bazel | 3 - .../handle/autoanalyze/refresher/refresher.go | 121 +------------ 4 files changed, 162 insertions(+), 125 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index 29a5535235cb4..d6b5dd6b0bbbd 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -15,15 +15,18 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue", visibility = ["//visibility:public"], deps = [ + "//pkg/infoschema", "//pkg/meta/model", "//pkg/sessionctx", "//pkg/sessionctx/sysproctrack", "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", + "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", + "//pkg/util", "//pkg/util/intest", "//pkg/util/timeutil", "@com_github_tikv_client_go_v2//oracle", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 14b87efd201b8..c2753a6e44a7e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -14,7 +14,162 @@ package priorityqueue -import "container/heap" +import ( + "container/heap" + "context" + "time" + + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/util" + "go.uber.org/zap" +) + +// PushJobFunc is a function that pushes an AnalysisJob to a queue. +type PushJobFunc func(job AnalysisJob) error + +// FetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions. +func FetchAllTablesAndBuildAnalysisJobs( + sctx sessionctx.Context, + parameters map[string]string, + autoAnalysisTimeWindow AutoAnalysisTimeWindow, + statsHandle statstypes.StatsHandle, + jobFunc PushJobFunc, +) error { + autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + // Query locked tables once to minimize overhead. + // Outdated lock info is acceptable as we verify table lock status pre-analysis. + lockedTables, err := lockstats.QueryLockedTables(sctx) + if err != nil { + return err + } + // Get current timestamp from the session context. + currentTs, err := getStartTs(sctx) + if err != nil { + return err + } + + jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs) + calculator := NewPriorityCalculator() + + dbs := is.AllSchemaNames() + for _, db := range dbs { + // Sometimes the tables are too many. Auto-analyze will take too much time on it. + // so we need to check the available time. + if !autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) { + return nil + } + + // Ignore the memory and system database. + if util.IsMemOrSysDB(db.L) { + continue + } + + tbls, err := is.SchemaTableInfos(context.Background(), db) + if err != nil { + return err + } + + // We need to check every partition of every table to see if it needs to be analyzed. + for _, tblInfo := range tbls { + // If table locked, skip analyze all partitions of the table. + if _, ok := lockedTables[tblInfo.ID]; ok { + continue + } + + if tblInfo.IsView() { + continue + } + + pi := tblInfo.GetPartitionInfo() + if pi == nil { + job := jobFactory.CreateNonPartitionedTableAnalysisJob( + db.O, + tblInfo, + statsHandle.GetTableStatsForAutoAnalyze(tblInfo), + ) + err := setWeightAndPushJob(jobFunc, job, calculator) + if err != nil { + return err + } + continue + } + + // Only analyze the partition that has not been locked. + partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + if _, ok := lockedTables[def.ID]; !ok { + partitionDefs = append(partitionDefs, def) + } + } + partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs) + // If the prune mode is static, we need to analyze every partition as a separate table. + if pruneMode == variable.Static { + for pIDAndName, stats := range partitionStats { + job := jobFactory.CreateStaticPartitionAnalysisJob( + db.O, + tblInfo, + pIDAndName.ID, + pIDAndName.Name, + stats, + ) + err := setWeightAndPushJob(jobFunc, job, calculator) + if err != nil { + return err + } + } + } else { + job := jobFactory.CreateDynamicPartitionedTableAnalysisJob( + db.O, + tblInfo, + statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), + partitionStats, + ) + err := setWeightAndPushJob(jobFunc, job, calculator) + if err != nil { + return err + } + } + } + } + + return nil +} + +func setWeightAndPushJob(pushFunc PushJobFunc, job AnalysisJob, calculator *PriorityCalculator) error { + if job == nil { + return nil + } + // We apply a penalty to larger tables, which can potentially result in a negative weight. + // To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative. + weight := calculator.CalculateWeight(job) + if weight <= 0 { + statslogutil.SingletonStatsSamplerLogger().Warn( + "Table gets a negative weight", + zap.Float64("weight", weight), + zap.Stringer("job", job), + ) + } + job.SetWeight(weight) + // Push the job onto the queue. + return pushFunc(job) +} + +func getStartTs(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} // AnalysisPriorityQueue is a priority queue for TableAnalysisJobs. type AnalysisPriorityQueue struct { @@ -31,8 +186,9 @@ func NewAnalysisPriorityQueue() *AnalysisPriorityQueue { } // Push adds a job to the priority queue with the given weight. -func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) { +func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) error { heap.Push(apq.inner, job) + return nil } // Pop removes the highest priority job from the queue. diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index 97c8f62b369f7..9c1da5fda5b65 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -9,14 +9,11 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher", visibility = ["//visibility:public"], deps = [ - "//pkg/infoschema", - "//pkg/meta/model", "//pkg/sessionctx", "//pkg/sessionctx/sysproctrack", "//pkg/sessionctx/variable", "//pkg/statistics/handle/autoanalyze/exec", "//pkg/statistics/handle/autoanalyze/priorityqueue", - "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 5a991cb354924..bacb7d2eb9702 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -15,21 +15,16 @@ package refresher import ( - "context" "time" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" - "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "go.uber.org/zap" ) @@ -157,7 +152,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error { r.statsHandle.SPool(), func(sctx sessionctx.Context) error { parameters := exec.GetAutoAnalyzeParameters(sctx) - autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) // Get the available time period for auto analyze and check if the current time is in the period. start, end, err := exec.ParseAutoAnalysisWindow( parameters[variable.TiDBAutoAnalyzeStartTime], @@ -176,93 +170,7 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error { if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) { return nil } - calculator := priorityqueue.NewPriorityCalculator() - pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) - is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - // Query locked tables once to minimize overhead. - // Outdated lock info is acceptable as we verify table lock status pre-analysis. - lockedTables, err := lockstats.QueryLockedTables(sctx) - if err != nil { - return err - } - // Get current timestamp from the session context. - currentTs, err := getStartTs(sctx) - if err != nil { - return err - } - - jobFactory := priorityqueue.NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs) - - dbs := is.AllSchemaNames() - for _, db := range dbs { - // Sometimes the tables are too many. Auto-analyze will take too much time on it. - // so we need to check the available time. - if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) { - return nil - } - // Ignore the memory and system database. - if util.IsMemOrSysDB(db.L) { - continue - } - - tbls, err := is.SchemaTableInfos(context.Background(), db) - if err != nil { - return err - } - // We need to check every partition of every table to see if it needs to be analyzed. - for _, tblInfo := range tbls { - // If table locked, skip analyze all partitions of the table. - if _, ok := lockedTables[tblInfo.ID]; ok { - continue - } - - if tblInfo.IsView() { - continue - } - pi := tblInfo.GetPartitionInfo() - if pi == nil { - job := jobFactory.CreateNonPartitionedTableAnalysisJob( - db.O, - tblInfo, - r.statsHandle.GetTableStatsForAutoAnalyze(tblInfo), - ) - r.pushJob(job, calculator) - continue - } - - // Only analyze the partition that has not been locked. - partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)) - for _, def := range pi.Definitions { - if _, ok := lockedTables[def.ID]; !ok { - partitionDefs = append(partitionDefs, def) - } - } - partitionStats := priorityqueue.GetPartitionStats(r.statsHandle, tblInfo, partitionDefs) - // If the prune mode is static, we need to analyze every partition as a separate table. - if pruneMode == variable.Static { - for pIDAndName, stats := range partitionStats { - job := jobFactory.CreateStaticPartitionAnalysisJob( - db.O, - tblInfo, - pIDAndName.ID, - pIDAndName.Name, - stats, - ) - r.pushJob(job, calculator) - } - } else { - job := jobFactory.CreateDynamicPartitionedTableAnalysisJob( - db.O, - tblInfo, - r.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), - partitionStats, - ) - r.pushJob(job, calculator) - } - } - } - - return nil + return priorityqueue.FetchAllTablesAndBuildAnalysisJobs(sctx, parameters, r.autoAnalysisTimeWindow, r.statsHandle, r.Jobs.Push) }, statsutil.FlagWrapTxn, ); err != nil { @@ -272,25 +180,6 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error { return nil } -func (r *Refresher) pushJob(job priorityqueue.AnalysisJob, calculator *priorityqueue.PriorityCalculator) { - if job == nil { - return - } - // We apply a penalty to larger tables, which can potentially result in a negative weight. - // To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative. - weight := calculator.CalculateWeight(job) - if weight <= 0 { - statslogutil.SingletonStatsSamplerLogger().Warn( - "Table gets a negative weight", - zap.Float64("weight", weight), - zap.Stringer("job", job), - ) - } - job.SetWeight(weight) - // Push the job onto the queue. - r.Jobs.Push(job) -} - // WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished. // Only used in the test. func (r *Refresher) WaitAutoAnalyzeFinishedForTest() { @@ -307,11 +196,3 @@ func (r *Refresher) GetRunningJobs() map[int64]struct{} { func (r *Refresher) Close() { r.worker.Stop() } - -func getStartTs(sctx sessionctx.Context) (uint64, error) { - txn, err := sctx.Txn(true) - if err != nil { - return 0, err - } - return txn.StartTS(), nil -}