Skip to content

Commit

Permalink
statistics: split table analysis job into three different types of jo…
Browse files Browse the repository at this point in the history
…bs (#51531)

ref #50132
  • Loading branch information
Rustin170506 authored Mar 7, 2024
1 parent 50eedac commit f94a6ba
Show file tree
Hide file tree
Showing 15 changed files with 1,431 additions and 1,016 deletions.
7 changes: 7 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ go_library(
name = "priorityqueue",
srcs = [
"calculator.go",
"dynamic_partitioned_table_analysis_job.go",
"interval.go",
"job.go",
"non_partitioned_table_analysis_job.go",
"queue.go",
"static_partitioned_table_analysis_job.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
Expand All @@ -17,6 +20,7 @@ go_library(
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util/intest",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -26,10 +30,13 @@ go_test(
timeout = "short",
srcs = [
"calculator_test.go",
"dynamic_partitioned_table_analysis_job_test.go",
"interval_test.go",
"job_test.go",
"main_test.go",
"non_partitioned_table_analysis_job_test.go",
"queue_test.go",
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 22,
Expand Down
11 changes: 6 additions & 5 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@ func NewPriorityCalculator() *PriorityCalculator {
// 0.1 * (1 - math.Log10(1 + TableSize)) +
// 0.3 * math.Log10(1 + math.Sqrt(AnalysisInterval)) +
// special_event[event])
func (pc *PriorityCalculator) CalculateWeight(job *TableAnalysisJob) float64 {
func (pc *PriorityCalculator) CalculateWeight(job AnalysisJob) float64 {
// We multiply the priority_score by 100 to increase its magnitude. This ensures that
// when we apply the log10 function, the resulting value is more meaningful and reasonable.
changeRatio := 100 * job.ChangePercentage
indicators := job.GetIndicators()
changeRatio := 100 * indicators.ChangePercentage
return changeRatioWeight*math.Log10(1+changeRatio) +
sizeWeight*(1-math.Log10(1+job.TableSize)) +
analysisInterval*math.Log10(1+math.Sqrt(job.LastAnalysisDuration.Seconds())) +
sizeWeight*(1-math.Log10(1+indicators.TableSize)) +
analysisInterval*math.Log10(1+math.Sqrt(indicators.LastAnalysisDuration.Seconds())) +
pc.GetSpecialEvent(job)
}

// GetSpecialEvent returns the special event weight.
// Exported for testing purposes.
func (*PriorityCalculator) GetSpecialEvent(job *TableAnalysisJob) float64 {
func (*PriorityCalculator) GetSpecialEvent(job AnalysisJob) float64 {
if job.HasNewlyAddedIndex() {
return EventNewIndex
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/statistics/handle/autoanalyze/priorityqueue/calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func TestCalculateWeight(t *testing.T) {
func testWeightCalculation(t *testing.T, pc *priorityqueue.PriorityCalculator, group []testData) {
prevWeight := -1.0
for _, tc := range group {
job := &priorityqueue.TableAnalysisJob{
ChangePercentage: tc.ChangePercentage,
TableSize: tc.TableSize,
LastAnalysisDuration: tc.LastAnalysisDuration,
job := &priorityqueue.NonPartitionedTableAnalysisJob{
Indicators: priorityqueue.Indicators{
ChangePercentage: tc.ChangePercentage,
TableSize: tc.TableSize,
LastAnalysisDuration: tc.LastAnalysisDuration,
},
}
weight := pc.CalculateWeight(job)
require.Greater(t, weight, 0.0)
Expand All @@ -125,21 +127,20 @@ func testWeightCalculation(t *testing.T, pc *priorityqueue.PriorityCalculator, g
func TestGetSpecialEvent(t *testing.T) {
pc := priorityqueue.NewPriorityCalculator()

jobWithIndex := &priorityqueue.TableAnalysisJob{
jobWithIndex1 := &priorityqueue.DynamicPartitionedTableAnalysisJob{
PartitionIndexes: map[string][]string{
"index1": {"p1", "p2"},
},
}
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex))
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex1))

jobWithIndex = &priorityqueue.TableAnalysisJob{
jobWithIndex2 := &priorityqueue.NonPartitionedTableAnalysisJob{
Indexes: []string{"index1"},
}
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex))
require.Equal(t, priorityqueue.EventNewIndex, pc.GetSpecialEvent(jobWithIndex2))

jobWithoutIndex := &priorityqueue.TableAnalysisJob{
jobWithoutIndex := &priorityqueue.DynamicPartitionedTableAnalysisJob{
PartitionIndexes: map[string][]string{},
Indexes: []string{},
}
require.Equal(t, priorityqueue.EventNone, pc.GetSpecialEvent(jobWithoutIndex))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package priorityqueue

import (
"fmt"
"strings"
"time"

"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
)

var _ AnalysisJob = &DynamicPartitionedTableAnalysisJob{}

const (
analyzeDynamicPartition analyzeType = "analyzeDynamicPartition"
analyzeDynamicPartitionIndex analyzeType = "analyzeDynamicPartitionIndex"
)

// DynamicPartitionedTableAnalysisJob is a TableAnalysisJob for analyzing dynamic pruned partitioned table.
type DynamicPartitionedTableAnalysisJob struct {
// Only set when partitions's indexes need to be analyzed.
// It looks like: {"indexName": ["partitionName1", "partitionName2"]}
// This is only for newly added indexes.
// The reason why we need to record the partition names is that we need to analyze partitions in batch mode
// and we don't want to analyze the same partition multiple times.
// For example, the user may analyze some partitions manually, and we don't want to analyze them again.
PartitionIndexes map[string][]string

TableSchema string
GlobalTableName string
// This will analyze all indexes and columns of the specified partitions.
Partitions []string
// Some indicators to help us decide whether we need to analyze this table.
Indicators
GlobalTableID int64

// Analyze table with this version of statistics.
TableStatsVer int
// Weight is used to calculate the priority of the job.
Weight float64
}

// NewDynamicPartitionedTableAnalysisJob creates a new job for analyzing a dynamic partitioned table's partitions.
func NewDynamicPartitionedTableAnalysisJob(
schema, tableName string,
tableID int64,
partitions []string,
partitionIndexes map[string][]string,
tableStatsVer int,
changePercentage float64,
tableSize float64,
lastAnalysisDuration time.Duration,
) *DynamicPartitionedTableAnalysisJob {
return &DynamicPartitionedTableAnalysisJob{
GlobalTableID: tableID,
TableSchema: schema,
GlobalTableName: tableName,
Partitions: partitions,
PartitionIndexes: partitionIndexes,
TableStatsVer: tableStatsVer,
Indicators: Indicators{
ChangePercentage: changePercentage,
TableSize: tableSize,
LastAnalysisDuration: lastAnalysisDuration,
},
}
}

// Analyze analyzes the partitions or partition indexes.
func (j *DynamicPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) error {
return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
switch j.getAnalyzeType() {
case analyzeDynamicPartition:
j.analyzePartitions(sctx, statsHandle, sysProcTracker)
case analyzeDynamicPartitionIndex:
j.analyzePartitionIndexes(sctx, statsHandle, sysProcTracker)
}
return nil
})
}

// GetIndicators returns the indicators of the table.
func (j *DynamicPartitionedTableAnalysisJob) GetIndicators() Indicators {
return j.Indicators
}

// HasNewlyAddedIndex checks whether the job has newly added index.
func (j *DynamicPartitionedTableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.PartitionIndexes) > 0
}

// IsValidToAnalyze checks whether the table or partition is valid to analyze.
// We need to check each partition to determine whether the table is valid to analyze.
func (j *DynamicPartitionedTableAnalysisJob) IsValidToAnalyze(sctx sessionctx.Context) (bool, string) {
if valid, failReason := isValidWeight(j.Weight); !valid {
return false, failReason
}
// Check whether the table or partition is valid to analyze.
if len(j.Partitions) > 0 || len(j.PartitionIndexes) > 0 {
// Any partition is invalid to analyze, the whole table is invalid to analyze.
// Because we need to analyze partitions in batch mode.
partitions := append(j.Partitions, getPartitionNames(j.PartitionIndexes)...)
if valid, failReason := isValidToAnalyze(
sctx,
j.TableSchema,
j.GlobalTableName,
partitions...,
); !valid {
return false, failReason
}
}

return true, ""
}

// SetWeight sets the weight of the job.
func (j *DynamicPartitionedTableAnalysisJob) SetWeight(weight float64) {
j.Weight = weight
}

// GetWeight gets the weight of the job.
func (j *DynamicPartitionedTableAnalysisJob) GetWeight() float64 {
return j.Weight
}

// String implements fmt.Stringer interface.
func (j *DynamicPartitionedTableAnalysisJob) String() string {
return fmt.Sprintf(
"DynamicPartitionedTableAnalysisJob:\n"+
"\tAnalyzeType: %s\n"+
"\tPartitions: %s\n"+
"\tPartitionIndexes: %v\n"+
"\tSchema: %s\n"+
"\tGlobal Table: %s\n"+
"\tGlobal TableID: %d\n"+
"\tTableStatsVer: %d\n"+
"\tChangePercentage: %.2f\n"+
"\tTableSize: %.2f\n"+
"\tLastAnalysisDuration: %s\n"+
"\tWeight: %.4f\n",
j.getAnalyzeType(),
strings.Join(j.Partitions, ", "),
j.PartitionIndexes,
j.TableSchema, j.GlobalTableName,
j.GlobalTableID, j.TableStatsVer, j.ChangePercentage,
j.TableSize, j.LastAnalysisDuration, j.Weight,
)
}

// analyzePartitions performs analysis on the specified partitions.
// This function uses a batch mode for efficiency. After analyzing the partitions,
// it's necessary to merge their statistics. By analyzing them in batches,
// we can reduce the overhead of this merging process.
func (j *DynamicPartitionedTableAnalysisJob) analyzePartitions(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load())
needAnalyzePartitionNames := make([]any, 0, len(j.Partitions))
for _, partition := range j.Partitions {
needAnalyzePartitionNames = append(needAnalyzePartitionNames, partition)
}
for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize {
start := i
end := start + analyzePartitionBatchSize
if end >= len(needAnalyzePartitionNames) {
end = len(needAnalyzePartitionNames)
}

sql := getPartitionSQL("analyze table %n.%n partition", "", end-start)
params := append([]any{j.TableSchema, j.GlobalTableName}, needAnalyzePartitionNames[start:end]...)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}

// analyzePartitionIndexes performs analysis on the specified partition indexes.
func (j *DynamicPartitionedTableAnalysisJob) analyzePartitionIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load())

for indexName, partitionNames := range j.PartitionIndexes {
needAnalyzePartitionNames := make([]any, 0, len(partitionNames))
for _, partition := range partitionNames {
needAnalyzePartitionNames = append(needAnalyzePartitionNames, partition)
}
for i := 0; i < len(needAnalyzePartitionNames); i += analyzePartitionBatchSize {
start := i
end := start + analyzePartitionBatchSize
if end >= len(needAnalyzePartitionNames) {
end = len(needAnalyzePartitionNames)
}

sql := getPartitionSQL("analyze table %n.%n partition", " index %n", end-start)
params := append([]any{j.TableSchema, j.GlobalTableName}, needAnalyzePartitionNames[start:end]...)
params = append(params, indexName)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}
}

func (j *DynamicPartitionedTableAnalysisJob) getAnalyzeType() analyzeType {
switch {
case j.HasNewlyAddedIndex():
return analyzeDynamicPartitionIndex
default:
return analyzeDynamicPartition
}
}

func getPartitionSQL(prefix, suffix string, numPartitions int) string {
var sqlBuilder strings.Builder
sqlBuilder.WriteString(prefix)
for i := 0; i < numPartitions; i++ {
if i != 0 {
sqlBuilder.WriteString(",")
}
sqlBuilder.WriteString(" %n")
}
sqlBuilder.WriteString(suffix)
return sqlBuilder.String()
}

func getPartitionNames(partitionIndexes map[string][]string) []string {
names := make([]string, 0, len(partitionIndexes))
for _, partitionNames := range partitionIndexes {
names = append(names, partitionNames...)
}
return names
}
Loading

0 comments on commit f94a6ba

Please sign in to comment.