From 3b5bea5cc3d5e26146d6cc1ff5a4a6223c8a5222 Mon Sep 17 00:00:00 2001 From: Rustin170506 <29879298+Rustin170506@users.noreply.github.com> Date: Thu, 5 Sep 2024 17:55:05 +0800 Subject: [PATCH] statistics: add AnalysisPriorityQueueV2 Signed-off-by: Rustin170506 <29879298+Rustin170506@users.noreply.github.com> --- .../handle/autoanalyze/internal/heap/heap.go | 70 +-- .../autoanalyze/priorityqueue/BUILD.bazel | 12 + .../handle/autoanalyze/priorityqueue/build.go | 372 +++++++++++++++ .../dynamic_partitioned_table_analysis_job.go | 10 + .../handle/autoanalyze/priorityqueue/job.go | 5 + .../non_partitioned_table_analysis_job.go | 10 + .../autoanalyze/priorityqueue/queue2.go | 448 ++++++++++++++++++ .../static_partitioned_table_analysis_job.go | 10 + 8 files changed, 902 insertions(+), 35 deletions(-) create mode 100644 pkg/statistics/handle/autoanalyze/priorityqueue/build.go create mode 100644 pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go index f3d677167814b..76b497cedddc0 100644 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go +++ b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go @@ -34,23 +34,23 @@ const ( type LessFunc[T any] func(T, T) bool // KeyFunc is used to generate a key for an object. -type KeyFunc[T any, K comparable] func(T) (K, error) +type KeyFunc[K comparable, T any] func(T) (K, error) -type heapItem[T any] struct { +type heapItem[K comparable, T any] struct { obj T // The object which is stored in the heap. index int // The index of the object's key in the Heap.queue. } -type itemKeyValue[T any, K comparable] struct { - obj T +type itemKeyValue[K comparable, T any] struct { key K + obj T } // heapData is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. -type heapData[T any, K comparable] struct { - items map[K]*heapItem[T] - keyFunc KeyFunc[T, K] +type heapData[K comparable, T any] struct { + items map[K]*heapItem[K, T] + keyFunc KeyFunc[K, T] lessFunc LessFunc[T] queue []K } @@ -60,7 +60,7 @@ var ( ) // Less is a standard heap interface function. -func (h *heapData[T, K]) Less(i, j int) bool { +func (h *heapData[K, T]) Less(i, j int) bool { if i >= len(h.queue) || j >= len(h.queue) { return false } @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool { } // Len is a standard heap interface function. -func (h *heapData[T, K]) Len() int { return len(h.queue) } +func (h *heapData[K, T]) Len() int { return len(h.queue) } // Swap is a standard heap interface function. -func (h *heapData[T, K]) Swap(i, j int) { +func (h *heapData[K, T]) Swap(i, j int) { h.queue[i], h.queue[j] = h.queue[j], h.queue[i] item := h.items[h.queue[i]] item.index = i @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) { } // Push is a standard heap interface function. -func (h *heapData[T, K]) Push(kv any) { - keyValue := kv.(*itemKeyValue[T, K]) +func (h *heapData[K, T]) Push(kv any) { + keyValue := kv.(*itemKeyValue[K, T]) n := len(h.queue) - h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n} + h.items[keyValue.key] = &heapItem[K, T]{keyValue.obj, n} h.queue = append(h.queue, keyValue.key) } // Pop is a standard heap interface function. -func (h *heapData[T, K]) Pop() any { +func (h *heapData[K, T]) Pop() any { key := h.queue[len(h.queue)-1] h.queue = h.queue[:len(h.queue)-1] item, ok := h.items[key] @@ -108,15 +108,15 @@ func (h *heapData[T, K]) Pop() any { } // Heap is a thread-safe producer/consumer queue that implements a heap data structure. -type Heap[T any, K comparable] struct { - data *heapData[T, K] +type Heap[K comparable, T any] struct { + data *heapData[K, T] cond sync.Cond lock sync.RWMutex closed bool } // Close closes the heap. -func (h *Heap[T, K]) Close() { +func (h *Heap[K, T]) Close() { h.lock.Lock() defer h.lock.Unlock() h.closed = true @@ -124,7 +124,7 @@ func (h *Heap[T, K]) Close() { } // Add adds an object or updates it if it already exists. -func (h *Heap[T, K]) Add(obj T) error { +func (h *Heap[K, T]) Add(obj T) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error { } // BulkAdd adds a list of objects to the heap. -func (h *Heap[T, K]) BulkAdd(list []T) error { +func (h *Heap[K, T]) BulkAdd(list []T) error { h.lock.Lock() defer h.lock.Unlock() if h.closed { @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error { } // AddIfNotPresent adds an object if it does not already exist. -func (h *Heap[T, K]) AddIfNotPresent(obj T) error { +func (h *Heap[K, T]) AddIfNotPresent(obj T) error { id, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error { return nil } -func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) { +func (h *Heap[K, T]) addIfNotPresentLocked(key K, obj T) { if _, exists := h.data.items[key]; exists { return } - heap.Push(h.data, &itemKeyValue[T, K]{obj, key}) + heap.Push(h.data, &itemKeyValue[K, T]{key, obj}) } // Update is an alias for Add. -func (h *Heap[T, K]) Update(obj T) error { +func (h *Heap[K, T]) Update(obj T) error { return h.Add(obj) } // Delete removes an object from the heap. -func (h *Heap[T, K]) Delete(obj T) error { +func (h *Heap[K, T]) Delete(obj T) error { key, err := h.data.keyFunc(obj) if err != nil { return errors.Errorf("key error: %v", err) @@ -211,7 +211,7 @@ func (h *Heap[T, K]) Delete(obj T) error { } // Peek returns the top object from the heap without removing it. -func (h *Heap[T, K]) Peek() (T, error) { +func (h *Heap[K, T]) Peek() (T, error) { h.lock.RLock() defer h.lock.RUnlock() if len(h.data.queue) == 0 { @@ -222,7 +222,7 @@ func (h *Heap[T, K]) Peek() (T, error) { } // Pop removes the top object from the heap and returns it. -func (h *Heap[T, K]) Pop() (T, error) { +func (h *Heap[K, T]) Pop() (T, error) { h.lock.Lock() defer h.lock.Unlock() for len(h.data.queue) == 0 { @@ -241,7 +241,7 @@ func (h *Heap[T, K]) Pop() (T, error) { } // List returns a list of all objects in the heap. -func (h *Heap[T, K]) List() []T { +func (h *Heap[K, T]) List() []T { h.lock.RLock() defer h.lock.RUnlock() list := make([]T, 0, len(h.data.items)) @@ -252,7 +252,7 @@ func (h *Heap[T, K]) List() []T { } // ListKeys returns a list of all keys in the heap. -func (h *Heap[T, K]) ListKeys() []K { +func (h *Heap[K, T]) ListKeys() []K { h.lock.RLock() defer h.lock.RUnlock() list := make([]K, 0, len(h.data.items)) @@ -263,7 +263,7 @@ func (h *Heap[T, K]) ListKeys() []K { } // Get returns an object from the heap. -func (h *Heap[T, K]) Get(obj T) (T, bool, error) { +func (h *Heap[K, T]) Get(obj T) (T, bool, error) { key, err := h.data.keyFunc(obj) if err != nil { var zero T @@ -273,7 +273,7 @@ func (h *Heap[T, K]) Get(obj T) (T, bool, error) { } // GetByKey returns an object from the heap by key. -func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) { +func (h *Heap[K, T]) GetByKey(key K) (T, bool, error) { h.lock.RLock() defer h.lock.RUnlock() item, exists := h.data.items[key] @@ -285,17 +285,17 @@ func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) { } // IsClosed returns true if the heap is closed. -func (h *Heap[T, K]) IsClosed() bool { +func (h *Heap[K, T]) IsClosed() bool { h.lock.RLock() defer h.lock.RUnlock() return h.closed } // NewHeap returns a Heap which can be used to queue up items to process. -func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] { - h := &Heap[T, K]{ - data: &heapData[T, K]{ - items: map[K]*heapItem[T]{}, +func NewHeap[K comparable, T any](keyFn KeyFunc[K, T], lessFn LessFunc[T]) *Heap[K, T] { + h := &Heap[K, T]{ + data: &heapData[K, T]{ + items: map[K]*heapItem[K, T]{}, queue: []K{}, keyFunc: keyFn, lessFunc: lessFn, diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index 927b1f0b50522..4855c201da0ae 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -3,24 +3,36 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "priorityqueue", srcs = [ + "build.go", "calculator.go", "dynamic_partitioned_table_analysis_job.go", "interval.go", "job.go", "non_partitioned_table_analysis_job.go", "queue.go", + "queue2.go", "static_partitioned_table_analysis_job.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue", visibility = ["//visibility:public"], deps = [ + "//pkg/infoschema", + "//pkg/meta/model", "//pkg/sessionctx", "//pkg/sessionctx/sysproctrack", "//pkg/sessionctx/variable", + "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", + "//pkg/statistics/handle/autoanalyze/internal/heap", + "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", + "//pkg/util", + "//pkg/util/logutil", + "//pkg/util/timeutil", + "@com_github_pkg_errors//:errors", + "@com_github_tikv_client_go_v2//oracle", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/build.go b/pkg/statistics/handle/autoanalyze/priorityqueue/build.go new file mode 100644 index 0000000000000..a0fc79b93bb9f --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/build.go @@ -0,0 +1,372 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package priorityqueue + +import ( + "time" + + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/tikv/client-go/v2/oracle" +) + +// CreateTableAnalysisJob creates a TableAnalysisJob for the physical table. +func CreateTableAnalysisJob( + sctx sessionctx.Context, + tableSchema string, + tblInfo *model.TableInfo, + tblStats *statistics.Table, + autoAnalyzeRatio float64, + currentTs uint64, +) AnalysisJob { + if !tblStats.IsEligibleForAnalysis() { + return nil + } + + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer) + + changePercentage := CalculateChangePercentage(tblStats, autoAnalyzeRatio) + tableSize := calculateTableSize(tblInfo, tblStats) + lastAnalysisDuration := GetTableLastAnalyzeDuration(tblStats, currentTs) + indexes := CheckIndexesNeedAnalyze(tblInfo, tblStats) + + // No need to analyze. + // We perform a separate check because users may set the auto analyze ratio to 0, + // yet still wish to analyze newly added indexes and tables that have not been analyzed. + if changePercentage == 0 && len(indexes) == 0 { + return nil + } + + job := NewNonPartitionedTableAnalysisJob( + tableSchema, + tblInfo.Name.O, + tblInfo.ID, + indexes, + tableStatsVer, + changePercentage, + tableSize, + lastAnalysisDuration, + ) + + return job +} + +func calculateTableSize( + tblInfo *model.TableInfo, + tblStats *statistics.Table, +) float64 { + tblCnt := float64(tblStats.RealtimeCount) + // TODO: Ignore unanalyzable columns. + colCnt := float64(len(tblInfo.Columns)) + + return tblCnt * colCnt +} + +// GetTableLastAnalyzeDuration gets the duration since the last analysis of the table. +func GetTableLastAnalyzeDuration( + tblStats *statistics.Table, + currentTs uint64, +) time.Duration { + lastTime := findLastAnalyzeTime(tblStats, currentTs) + currentTime := oracle.GetTimeFromTS(currentTs) + + // Calculate the duration since last analyze. + return currentTime.Sub(lastTime) +} + +// CheckIndexesNeedAnalyze checks if the indexes of the table need to be analyzed. +func CheckIndexesNeedAnalyze( + tblInfo *model.TableInfo, + tblStats *statistics.Table, +) []string { + // If table is not analyzed, we need to analyze whole table. + // So we don't need to check indexes. + if !tblStats.IsAnalyzed() { + return nil + } + + indexes := make([]string, 0, len(tblInfo.Indices)) + // Check if missing index stats. + for _, idx := range tblInfo.Indices { + if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) && idx.State == model.StatePublic { + indexes = append(indexes, idx.Name.O) + } + } + + return indexes +} + +// findLastAnalyzeTime finds the last analyze time of the table. +// It uses `LastUpdateVersion` to find the last analyze time. +// The `LastUpdateVersion` is the version of the transaction that updates the statistics. +// It always not null(default 0), so we can use it to find the last analyze time. +func findLastAnalyzeTime( + tblStats *statistics.Table, + currentTs uint64, +) time.Time { + // Table is not analyzed, compose a fake version. + if !tblStats.IsAnalyzed() { + phy := oracle.GetTimeFromTS(currentTs) + return phy.Add(unanalyzedTableDefaultLastUpdateDuration) + } + return oracle.GetTimeFromTS(tblStats.LastAnalyzeVersion) +} + +// PartitionIDAndName is a struct that contains the ID and name of a partition. +// Exported for testing purposes. Do not use it in other packages. +type PartitionIDAndName struct { + Name string + ID int64 +} + +func createTableAnalysisJobForPartitions( + sctx sessionctx.Context, + tableSchema string, + tblInfo *model.TableInfo, + tblStats *statistics.Table, + partitionStats map[PartitionIDAndName]*statistics.Table, + autoAnalyzeRatio float64, + currentTs uint64, +) AnalysisJob { + if !tblStats.IsEligibleForAnalysis() { + return nil + } + + // TODO: figure out how to check the table stats version correctly for partitioned tables. + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer) + + averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := CalculateIndicatorsForPartitions( + tblInfo, + partitionStats, + autoAnalyzeRatio, + currentTs, + ) + partitionIndexes := CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable( + tblInfo, + partitionStats, + ) + // No need to analyze. + // We perform a separate check because users may set the auto analyze ratio to 0, + // yet still wish to analyze newly added indexes and tables that have not been analyzed. + if len(partitionNames) == 0 && len(partitionIndexes) == 0 { + return nil + } + + job := NewDynamicPartitionedTableAnalysisJob( + tableSchema, + tblInfo.Name.O, + tblInfo.ID, + partitionNames, + partitionIndexes, + tableStatsVer, + averageChangePercentage, + avgSize, + minLastAnalyzeDuration, + ) + + return job +} + +// CalculateChangePercentage calculates the change percentage of the table +// based on the change count and the analysis count. +// TODO: DO NOT COPY THIS FUNCTION +func CalculateChangePercentage( + tblStats *statistics.Table, + autoAnalyzeRatio float64, +) float64 { + if !tblStats.IsAnalyzed() { + return 1 + } + + // Auto analyze based on the change percentage is disabled. + // However, this check should not affect the analysis of indexes, + // as index analysis is still needed for query performance. + if autoAnalyzeRatio == 0 { + return 0 + } + + tblCnt := float64(tblStats.RealtimeCount) + if histCnt := tblStats.GetAnalyzeRowCount(); histCnt > 0 { + tblCnt = histCnt + } + res := float64(tblStats.ModifyCount) / tblCnt + if res > autoAnalyzeRatio { + return res + } + + return 0 +} + +// CalculateIndicatorsForPartitions calculates the average change percentage, +// average size and average last analyze duration for the partitions that meet the threshold. +// Change percentage is the ratio of the number of modified rows to the total number of rows. +// Size is the product of the number of rows and the number of columns. +// Last analyze duration is the duration since the last analyze. +func CalculateIndicatorsForPartitions( + tblInfo *model.TableInfo, + partitionStats map[PartitionIDAndName]*statistics.Table, + autoAnalyzeRatio float64, + currentTs uint64, +) ( + avgChange float64, + avgSize float64, + avgLastAnalyzeDuration time.Duration, + partitionNames []string, +) { + totalChangePercent := 0.0 + totalSize := 0.0 + count := 0.0 + partitionNames = make([]string, 0, len(partitionStats)) + cols := float64(len(tblInfo.Columns)) + totalLastAnalyzeDuration := time.Duration(0) + + for pIDAndName, tblStats := range partitionStats { + changePercent := CalculateChangePercentage(tblStats, autoAnalyzeRatio) + // Skip partition analysis if it doesn't meet the threshold, stats are not yet loaded, + // or the auto analyze ratio is set to 0 by the user. + if changePercent == 0 { + continue + } + + totalChangePercent += changePercent + // size = count * cols + totalSize += float64(tblStats.RealtimeCount) * cols + lastAnalyzeDuration := GetTableLastAnalyzeDuration(tblStats, currentTs) + totalLastAnalyzeDuration += lastAnalyzeDuration + partitionNames = append(partitionNames, pIDAndName.Name) + count++ + } + if len(partitionNames) == 0 { + return 0, 0, 0, partitionNames + } + + avgChange = totalChangePercent / count + avgSize = totalSize / count + avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count) + + return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames +} + +// CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed. +// It returns a map from index name to the names of the partitions that need to be analyzed. +// NOTE: This is only for newly added indexes. +func CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable( + tblInfo *model.TableInfo, + partitionStats map[PartitionIDAndName]*statistics.Table, +) map[string][]string { + partitionIndexes := make(map[string][]string, len(tblInfo.Indices)) + + for _, idx := range tblInfo.Indices { + // No need to analyze the index if it's not public. + if idx.State != model.StatePublic { + continue + } + + // Find all the partitions that need to analyze this index. + names := make([]string, 0, len(partitionStats)) + for pIDAndName, tblStats := range partitionStats { + if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) { + names = append(names, pIDAndName.Name) + } + } + + if len(names) > 0 { + partitionIndexes[idx.Name.O] = names + } + } + + return partitionIndexes +} + +func getPartitionStats( + statsHandle statstypes.StatsHandle, + tblInfo *model.TableInfo, + defs []model.PartitionDefinition, +) map[PartitionIDAndName]*statistics.Table { + partitionStats := make(map[PartitionIDAndName]*statistics.Table, len(defs)) + + for _, def := range defs { + stats := statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, def.ID) + // Ignore the partition if it's not ready to analyze. + if !stats.IsEligibleForAnalysis() { + continue + } + d := PartitionIDAndName{ + ID: def.ID, + Name: def.Name.O, + } + partitionStats[d] = stats + } + + return partitionStats +} + +func getStartTs(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// CreateStaticPartitionAnalysisJob creates a TableAnalysisJob for the static partition. +func CreateStaticPartitionAnalysisJob( + sctx sessionctx.Context, + tableSchema string, + globalTblInfo *model.TableInfo, + partitionID int64, + partitionName string, + partitionStats *statistics.Table, + autoAnalyzeRatio float64, + currentTs uint64, +) AnalysisJob { + if !partitionStats.IsEligibleForAnalysis() { + return nil + } + + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(partitionStats, &tableStatsVer) + + changePercentage := CalculateChangePercentage(partitionStats, autoAnalyzeRatio) + tableSize := calculateTableSize(globalTblInfo, partitionStats) + lastAnalysisDuration := GetTableLastAnalyzeDuration(partitionStats, currentTs) + indexes := CheckIndexesNeedAnalyze(globalTblInfo, partitionStats) + + // No need to analyze. + // We perform a separate check because users may set the auto analyze ratio to 0, + // yet still wish to analyze newly added indexes and tables that have not been analyzed. + if changePercentage == 0 && len(indexes) == 0 { + return nil + } + + job := NewStaticPartitionTableAnalysisJob( + tableSchema, + globalTblInfo.Name.O, + globalTblInfo.ID, + partitionName, + partitionID, + indexes, + tableStatsVer, + changePercentage, + tableSize, + lastAnalysisDuration, + ) + + return job +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 8d8ab745eb3cb..982c86acf474a 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -84,11 +84,21 @@ func NewDynamicPartitionedTableAnalysisJob( } } +// GetAnalyzeType gets the analyze type of the job. +func (j *DynamicPartitionedTableAnalysisJob) GetAnalyzeType() analyzeType { + return j.getAnalyzeType() +} + // GetTableID gets the table ID of the job. func (j *DynamicPartitionedTableAnalysisJob) GetTableID() int64 { return j.GlobalTableID } +// SetIndicators sets the indicators of the job. +func (j *DynamicPartitionedTableAnalysisJob) SetIndicators(indicators Indicators) { + j.Indicators = indicators +} + // Analyze analyzes the partitions or partition indexes. func (j *DynamicPartitionedTableAnalysisJob) Analyze( statsHandle statstypes.StatsHandle, diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 537e09918dff3..341bae8af5623 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -46,6 +46,8 @@ type Indicators struct { // AnalysisJob is the interface for the analysis job. type AnalysisJob interface { + // GetAnalyzeType gets the analyze type of the job. + GetAnalyzeType() analyzeType // IsValidToAnalyze checks whether the table is valid to analyze. // It checks the last failed analysis duration and the average analysis duration. // If the last failed analysis duration is less than 2 times the average analysis duration, @@ -72,6 +74,9 @@ type AnalysisJob interface { // GetIndicators gets the indicators of the job. GetIndicators() Indicators + // SetIndicators sets the indicators of the job. + SetIndicators(indicators Indicators) + // GetTableID gets the table ID of the job. GetTableID() int64 diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index 0a4276c16f837..7cc6df72b95ba 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -69,6 +69,11 @@ func NewNonPartitionedTableAnalysisJob( } } +// GetAnalyzeType gets the analyze type of the job. +func (j *NonPartitionedTableAnalysisJob) GetAnalyzeType() analyzeType { + return analyzeTable +} + // GetTableID gets the table ID of the job. func (j *NonPartitionedTableAnalysisJob) GetTableID() int64 { return j.TableID @@ -126,6 +131,11 @@ func (j *NonPartitionedTableAnalysisJob) GetIndicators() Indicators { return j.Indicators } +// SetIndicators sets the indicators of the job. +func (j *NonPartitionedTableAnalysisJob) SetIndicators(indicators Indicators) { + j.Indicators = indicators +} + // String implements fmt.Stringer interface. func (j *NonPartitionedTableAnalysisJob) String() string { return fmt.Sprintf( diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go new file mode 100644 index 0000000000000..1925d509552cd --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go @@ -0,0 +1,448 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package priorityqueue + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" + "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap" + "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/timeutil" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + // unanalyzedTableDefaultChangePercentage is the default change percentage of unanalyzed table. + unanalyzedTableDefaultChangePercentage = 1 + // unanalyzedTableDefaultLastUpdateDuration is the default last update duration of unanalyzed table. + unanalyzedTableDefaultLastUpdateDuration = -30 * time.Minute +) + +// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs. +type AnalysisPriorityQueueV2 struct { + inner *heap.Heap[int64, AnalysisJob] + statsHandle statstypes.StatsHandle + calculator *PriorityCalculator + autoAnalysisTimeWindow autoAnalysisTimeWindow + + ctx context.Context + cancel context.CancelFunc + wg util.WaitGroupWrapper + // lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch. + lastDMLUpdateFetchTimestamp atomic.Uint64 +} + +// NewAnalysisPriorityQueue2 creates a new AnalysisPriorityQueue2. +func NewAnalysisPriorityQueue2(handle statstypes.StatsHandle) (*AnalysisPriorityQueueV2, error) { + keyFunc := func(job AnalysisJob) (int64, error) { + return job.GetTableID(), nil + } + lessFunc := func(a, b AnalysisJob) bool { + return a.GetWeight() > b.GetWeight() + } + ctx, cancel := context.WithCancel(context.Background()) + + pq := &AnalysisPriorityQueueV2{ + inner: heap.NewHeap(keyFunc, lessFunc), + statsHandle: handle, + calculator: NewPriorityCalculator(), + + ctx: ctx, + cancel: cancel, + } + if err := pq.init(); err != nil { + pq.Close() + return nil, err + } + pq.wg.Run(pq.run) + return pq, nil +} + +func (pq *AnalysisPriorityQueueV2) init() error { + if err := statsutil.CallWithSCtx( + pq.statsHandle.SPool(), + func(sctx sessionctx.Context) error { + parameters := exec.GetAutoAnalyzeParameters(sctx) + autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + // Get the available time period for auto analyze and check if the current time is in the period. + start, end, err := exec.ParseAutoAnalysisWindow( + parameters[variable.TiDBAutoAnalyzeStartTime], + parameters[variable.TiDBAutoAnalyzeEndTime], + ) + if err != nil { + statslogutil.StatsLogger().Error( + "parse auto analyze period failed", + zap.Error(err), + ) + return err + } + // We will check it again when we try to execute the job. + // So store the time window for later use. + pq.autoAnalysisTimeWindow = autoAnalysisTimeWindow{ + start: start, + end: end, + } + if !pq.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) { + return nil + } + pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + // Query locked tables once to minimize overhead. + // Outdated lock info is acceptable as we verify table lock status pre-analysis. + lockedTables, err := lockstats.QueryLockedTables(sctx) + if err != nil { + return err + } + // Get current timestamp from the session context. + currentTs, err := getStartTs(sctx) + if err != nil { + return err + } + + dbs := is.AllSchemaNames() + for _, db := range dbs { + // Sometimes the tables are too many. Auto-analyze will take too much time on it. + // so we need to check the available time. + if !pq.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) { + return nil + } + // Ignore the memory and system database. + if util.IsMemOrSysDB(db.L) { + continue + } + + tbls, err := is.SchemaTableInfos(context.Background(), db) + if err != nil { + return err + } + // We need to check every partition of every table to see if it needs to be analyzed. + for _, tblInfo := range tbls { + // If table locked, skip analyze all partitions of the table. + if _, ok := lockedTables[tblInfo.ID]; ok { + continue + } + + if tblInfo.IsView() { + continue + } + pi := tblInfo.GetPartitionInfo() + pushJobFunc := func(job AnalysisJob) { + if job == nil { + return + } + // Calculate the weight of the job. + weight := pq.calculator.CalculateWeight(job) + // We apply a penalty to larger tables, which can potentially result in a negative weight. + // To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative. + if weight <= 0 { + statslogutil.SingletonStatsSamplerLogger().Warn( + "Table gets a negative weight", + zap.Float64("weight", weight), + zap.Stringer("job", job), + ) + } + job.SetWeight(weight) + // Push the job onto the queue. + pq.inner.Add(job) + } + // No partitions, analyze the whole table. + if pi == nil { + job := CreateTableAnalysisJob( + sctx, + db.O, + tblInfo, + pq.statsHandle.GetTableStatsForAutoAnalyze(tblInfo), + autoAnalyzeRatio, + currentTs, + ) + pushJobFunc(job) + // Skip the rest of the loop. + continue + } + + // Only analyze the partition that has not been locked. + partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + if _, ok := lockedTables[def.ID]; !ok { + partitionDefs = append(partitionDefs, def) + } + } + partitionStats := getPartitionStats(pq.statsHandle, tblInfo, partitionDefs) + // If the prune mode is static, we need to analyze every partition as a separate table. + if pruneMode == variable.Static { + for pIDAndName, stats := range partitionStats { + job := CreateStaticPartitionAnalysisJob( + sctx, + db.O, + tblInfo, + pIDAndName.ID, + pIDAndName.Name, + stats, + autoAnalyzeRatio, + currentTs, + ) + pushJobFunc(job) + } + } else { + job := createTableAnalysisJobForPartitions( + sctx, + db.O, + tblInfo, + pq.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), + partitionStats, + autoAnalyzeRatio, + currentTs, + ) + pushJobFunc(job) + } + } + } + + return nil + }, + statsutil.FlagWrapTxn, + ); err != nil { + return err + } + + return nil +} + +func (pq *AnalysisPriorityQueueV2) run() { + dmlFetchInterval := time.NewTicker(time.Minute * 5) + defer dmlFetchInterval.Stop() + timeRefreshInterval := time.NewTicker(time.Minute * 10) + defer timeRefreshInterval.Stop() + + for { + select { + case <-pq.ctx.Done(): + return + case <-dmlFetchInterval.C: + pq.fetchDMLUpdate() + case <-timeRefreshInterval.C: + pq.refreshTime() + } + } +} + +func (pq *AnalysisPriorityQueueV2) fetchDMLUpdate() { + if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { + parameters := exec.GetAutoAnalyzeParameters(sctx) + // Get the available time period for auto analyze and check if the current time is in the period. + start, end, err := exec.ParseAutoAnalysisWindow( + parameters[variable.TiDBAutoAnalyzeStartTime], + parameters[variable.TiDBAutoAnalyzeEndTime], + ) + if err != nil { + statslogutil.StatsLogger().Error( + "parse auto analyze period failed", + zap.Error(err), + ) + return err + } + // We will check it again when we try to execute the job. + // So store the time window for later use. + pq.autoAnalysisTimeWindow = autoAnalysisTimeWindow{ + start: start, + end: end, + } + if !pq.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) { + return nil + } + values := pq.statsHandle.Values() + lastFetchTimestamp := pq.lastDMLUpdateFetchTimestamp.Load() + var newMaxVersion uint64 + + for _, value := range values { + if value.Version > lastFetchTimestamp { + // Handle the table stats + pq.handleTableStats(value) + } + newMaxVersion = max(newMaxVersion, value.Version) + } + + // Only update if we've seen a newer version + if newMaxVersion > lastFetchTimestamp { + pq.lastDMLUpdateFetchTimestamp.Store(newMaxVersion) + } + return nil + }); err != nil { + logutil.BgLogger().Error("failed to fetch dml update", zap.Error(err)) + } +} + +func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) { + if !stats.IsEligibleForAnalysis() { + return + } + + if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { + parameters := exec.GetAutoAnalyzeParameters(sctx) + autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + changePercent := CalculateChangePercentage(stats, autoAnalyzeRatio) + if changePercent == 0 { + return nil + } + + var job AnalysisJob + pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) + // Get current timestamp from the session context. + currentTs, err := getStartTs(sctx) + if err != nil { + return err + } + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + job, ok, _ := pq.inner.GetByKey(stats.PhysicalID) + if !ok { + tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID) + tableMeta := tableInfo.Meta() + if !ok { + return errors.Errorf("table info not found for table id %d", stats.PhysicalID) + } + partitionedTable := tableMeta.GetPartitionInfo() + if partitionedTable == nil { + job = CreateTableAnalysisJob(sctx, tableMeta.Name.O, tableMeta, stats, autoAnalyzeRatio, currentTs) + } else { + partitionDefs := partitionedTable.Definitions + if pruneMode == variable.Static { + var partitionDef model.PartitionDefinition + for _, def := range partitionDefs { + if def.ID == stats.PhysicalID { + partitionDef = def + break + } + } + job = CreateStaticPartitionAnalysisJob( + sctx, + tableMeta.Name.O, + tableMeta, + partitionDef.ID, + partitionDef.Name.O, + stats, + autoAnalyzeRatio, + currentTs, + ) + } else { + partitionStats := getPartitionStats(pq.statsHandle, tableMeta, partitionDefs) + job = createTableAnalysisJobForPartitions( + sctx, + tableMeta.Name.O, + tableMeta, + stats, + partitionStats, + autoAnalyzeRatio, + currentTs, + ) + } + } + } else { + indicators := job.GetIndicators() + // For dynamic partitioned table, there is no way to only update the partition that has been changed. + if job.GetAnalyzeType() == analyzeDynamicPartition || job.GetAnalyzeType() == analyzeDynamicPartitionIndex { + tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID) + tableMeta := tableInfo.Meta() + if !ok { + return errors.Errorf("table info not found for table id %d", stats.PhysicalID) + } + partitionedTable := tableMeta.GetPartitionInfo() + partitionDefs := partitionedTable.Definitions + partitionStats := getPartitionStats(pq.statsHandle, tableMeta, partitionDefs) + job = createTableAnalysisJobForPartitions( + sctx, + tableMeta.Name.O, + tableMeta, + stats, + partitionStats, + autoAnalyzeRatio, + currentTs, + ) + } else { + indicators.ChangePercentage = CalculateChangePercentage(stats, autoAnalyzeRatio) + // TODO: check if this is correct. + indicators.TableSize = float64(stats.ColNum() * int(stats.RealtimeCount)) + job.SetIndicators(indicators) + } + } + + job.SetWeight(pq.calculator.CalculateWeight(job)) + return pq.inner.Add(job) + }); err != nil { + logutil.BgLogger().Error("failed to create table analysis job", zap.Error(err)) + } +} + +func (pq *AnalysisPriorityQueueV2) refreshTime() { + if !pq.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) { + return + } + if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { + jobs := pq.inner.List() + for _, job := range jobs { + indicators := job.GetIndicators() + currentTs, err := getStartTs(sctx) + if err != nil { + return err + } + tableStats, ok := pq.statsHandle.Get(job.GetTableID()) + if !ok { + // TODO: Handle this case. + continue + } + indicators.LastAnalysisDuration = GetTableLastAnalyzeDuration(tableStats, currentTs) + job.SetIndicators(indicators) + job.SetWeight(pq.calculator.CalculateWeight(job)) + } + return nil + }); err != nil { + logutil.BgLogger().Error("failed to refresh time", zap.Error(err)) + } +} + +// Close closes the priority queue. +func (pq *AnalysisPriorityQueueV2) Close() { + pq.cancel() + pq.wg.Wait() + pq.inner.Close() +} + +// autoAnalysisTimeWindow is a struct that contains the start and end time of the auto analyze time window. +type autoAnalysisTimeWindow struct { + start time.Time + end time.Time +} + +// isWithinTimeWindow checks if the current time is within the time window. +// If the auto analyze time window is not set or the current time is not in the window, return false. +func (a autoAnalysisTimeWindow) isWithinTimeWindow(currentTime time.Time) bool { + if a.start == (time.Time{}) || a.end == (time.Time{}) { + return false + } + return timeutil.WithinDayTimePeriod(a.start, a.end, currentTime) +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 2378bc28c061f..e910c8907c44b 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -77,6 +77,11 @@ func NewStaticPartitionTableAnalysisJob( } } +// GetAnalyzeType gets the analyze type of the job. +func (j *StaticPartitionedTableAnalysisJob) GetAnalyzeType() analyzeType { + return analyzeStaticPartition +} + // GetTableID gets the table ID of the job. func (j *StaticPartitionedTableAnalysisJob) GetTableID() int64 { // Because we only analyze the specified static partition, the table ID is the static partition ID. @@ -104,6 +109,11 @@ func (j *StaticPartitionedTableAnalysisJob) GetIndicators() Indicators { return j.Indicators } +// SetIndicators implements AnalysisJob. +func (j *StaticPartitionedTableAnalysisJob) SetIndicators(indicators Indicators) { + j.Indicators = indicators +} + // HasNewlyAddedIndex implements AnalysisJob. func (j *StaticPartitionedTableAnalysisJob) HasNewlyAddedIndex() bool { return len(j.Indexes) > 0