diff --git a/executor/analyzetest/analyze_test.go b/executor/analyzetest/analyze_test.go index 48da055637d99..0c8f4c6ca9772 100644 --- a/executor/analyzetest/analyze_test.go +++ b/executor/analyzetest/analyze_test.go @@ -1721,9 +1721,9 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't' and is_index = 0").Sort().Check( // db, tbl, part, col, is_index, bucket_id, count, repeats, lower, upper, ndv testkit.Rows("test t global a 0 0 5 2 1 4 0", - "test t global a 0 1 12 2 17 17 0", + "test t global a 0 1 12 2 11 17 0", "test t global c 0 0 6 1 2 6 0", - "test t global c 0 1 14 2 13 13 0", + "test t global c 0 1 14 2 7 13 0", "test t p0 a 0 0 2 1 1 2 0", "test t p0 a 0 1 3 1 3 3 0", "test t p0 c 0 0 3 1 3 5 0", @@ -1736,7 +1736,7 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't' and is_index = 1").Sort().Check( // db, tbl, part, col, is_index, bucket_id, count, repeats, lower, upper, ndv testkit.Rows("test t global idx 1 0 6 1 2 6 0", - "test t global idx 1 1 14 2 13 13 0", + "test t global idx 1 1 14 2 7 13 0", "test t p0 idx 1 0 3 1 3 5 0", "test t p0 idx 1 1 4 1 6 6 0", "test t p1 idx 1 0 4 1 7 10 0", @@ -2992,8 +2992,13 @@ PARTITION BY RANGE ( a ) ( require.NoError(t, h.LoadNeededHistograms()) tbl := h.GetTableStats(tableInfo) lastVersion := tbl.Version +<<<<<<< HEAD:executor/analyzetest/analyze_test.go require.Equal(t, 3, len(tbl.Columns[tableInfo.Columns[2].ID].Buckets)) require.Equal(t, 3, len(tbl.Columns[tableInfo.Columns[3].ID].Buckets)) +======= + require.Equal(t, 2, len(tbl.GetCol(tableInfo.Columns[2].ID).Buckets)) + require.Equal(t, 3, len(tbl.GetCol(tableInfo.Columns[3].ID).Buckets)) +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/executor/test/analyzetest/analyze_test.go tk.MustExec("analyze table t partition p1 index idx with 1 topn, 2 buckets") tk.MustQuery("show warnings").Sort().Check(testkit.Rows()) diff --git a/pkg/statistics/handle/globalstats/global_stats.go b/pkg/statistics/handle/globalstats/global_stats.go new file mode 100644 index 0000000000000..a311d47db28de --- /dev/null +++ b/pkg/statistics/handle/globalstats/global_stats.go @@ -0,0 +1,399 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tiancaiamao/gp" + "go.uber.org/zap" +) + +const ( + // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats + MaxPartitionMergeBatchSize = 256 +) + +// statsGlobalImpl implements util.StatsGlobal +type statsGlobalImpl struct { + statsHandler statstypes.StatsHandle +} + +// NewStatsGlobal creates a new StatsGlobal. +func NewStatsGlobal(statsHandler statstypes.StatsHandle) statstypes.StatsGlobal { + return &statsGlobalImpl{statsHandler: statsHandler} +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func (sg *statsGlobalImpl) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + info *statstypes.GlobalStatsInfo, + physicalID int64, +) (err error) { + globalStats, err := MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, info.IsIndex == 1, info.HistIDs) + if err != nil { + if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) { + // When we find some partition-level stats are missing, we need to report warning. + sc.GetSessionVars().StmtCtx.AppendWarning(err) + } + return err + } + return WriteGlobalStatsToStorage(sg.statsHandler, globalStats, info, physicalID) +} + +// GlobalStats is used to store the statistics contained in the global-level stats +// which is generated by the merge of partition-level stats. +// It will both store the column stats and index stats. +// In the column statistics, the variable `num` is equal to the number of columns in the partition table. +// In the index statistics, the variable `num` is always equal to one. +type GlobalStats struct { + Hg []*statistics.Histogram + Cms []*statistics.CMSketch + TopN []*statistics.TopN + Fms []*statistics.FMSketch + MissingPartitionStats []string + Num int + Count int64 + ModifyCount int64 +} + +func newGlobalStats(histCount int) *GlobalStats { + globalStats := new(GlobalStats) + globalStats.Num = histCount + globalStats.Count = 0 + globalStats.Hg = make([]*statistics.Histogram, globalStats.Num) + globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num) + globalStats.TopN = make([]*statistics.TopN, globalStats.Num) + globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num) + + return globalStats +} + +// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +func MergePartitionStats2GlobalStats( + sc sessionctx.Context, + statsHandle statstypes.StatsHandle, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + globalTableInfo *model.TableInfo, + isIndex bool, + histIDs []int64, +) (globalStats *GlobalStats, err error) { + if sc.GetSessionVars().EnableAsyncMergeGlobalStats { + statslogutil.SingletonStatsSamplerLogger().Info("use async merge global stats", + zap.Int64("tableID", globalTableInfo.ID), + zap.String("table", globalTableInfo.Name.L), + ) + worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) + if err != nil { + return nil, errors.Trace(err) + } + err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex) + if err != nil { + return nil, errors.Trace(err) + } + return worker.Result(), nil + } + statslogutil.SingletonStatsSamplerLogger().Info("use blocking merge global stats", + zap.Int64("tableID", globalTableInfo.ID), + zap.String("table", globalTableInfo.Name.L), + ) + return blockingMergePartitionStats2GlobalStats(sc, statsHandle.GPool(), opts, is, globalTableInfo, isIndex, histIDs, nil, statsHandle) +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func MergePartitionStats2GlobalStatsByTableID( + sc sessionctx.Context, + statsHandle statstypes.StatsHandle, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + tableID int64, + isIndex bool, + histIDs []int64, +) (globalStats *GlobalStats, err error) { + // Get the partition table IDs. + globalTable, ok := statsHandle.TableInfoByID(is, tableID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID) + return + } + + globalTableInfo := globalTable.Meta() + globalStats, err = MergePartitionStats2GlobalStats(sc, statsHandle, opts, is, globalTableInfo, isIndex, histIDs) + if err != nil { + return nil, errors.Trace(err) + } + if len(globalStats.MissingPartitionStats) > 0 { + var item string + if !isIndex { + item = "columns" + } else { + item = "index" + if len(histIDs) > 0 { + item += " " + globalTableInfo.FindIndexNameByID(histIDs[0]) + } + } + + logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L), + zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats)) + } + return +} + +// analyzeOptionDefault saves the default values of NumBuckets and NumTopN. +// These values will be used in dynamic mode when we drop table partition and then need to merge global-stats. +// These values originally came from the analyzeOptionDefault structure in the planner/core/planbuilder.go file. +var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ + ast.AnalyzeOptNumBuckets: 256, + ast.AnalyzeOptNumTopN: 20, +} + +// blockingMergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. +// It is the old algorithm to merge partition-level stats to global-level stats. It will happen the OOM. because it will load all the partition-level stats into memory. +func blockingMergePartitionStats2GlobalStats( + sc sessionctx.Context, + gpool *gp.Pool, + opts map[ast.AnalyzeOptionType]uint64, + is infoschema.InfoSchema, + globalTableInfo *model.TableInfo, + isIndex bool, + histIDs []int64, + allPartitionStats map[int64]*statistics.Table, + statsHandle statstypes.StatsHandle, +) (globalStats *GlobalStats, err error) { + externalCache := false + if allPartitionStats != nil { + externalCache = true + } + + partitionNum := len(globalTableInfo.Partition.Definitions) + if len(histIDs) == 0 { + for _, col := range globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsVirtualGenerated() { + continue + } + histIDs = append(histIDs, col.ID) + } + } + + // Initialized the globalStats. + globalStats = newGlobalStats(len(histIDs)) + + // Slice Dimensions Explanation + // First dimension: Column or Index Stats + // Second dimension: Partition Tables + // Because all topN and histograms need to be collected before they can be merged. + // So we should store all the partition-level stats first, and merge them together. + allHg := make([][]*statistics.Histogram, globalStats.Num) + allCms := make([][]*statistics.CMSketch, globalStats.Num) + allTopN := make([][]*statistics.TopN, globalStats.Num) + allFms := make([][]*statistics.FMSketch, globalStats.Num) + for i := 0; i < globalStats.Num; i++ { + allHg[i] = make([]*statistics.Histogram, 0, partitionNum) + allCms[i] = make([]*statistics.CMSketch, 0, partitionNum) + allTopN[i] = make([]*statistics.TopN, 0, partitionNum) + allFms[i] = make([]*statistics.FMSketch, 0, partitionNum) + } + + skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats + for _, def := range globalTableInfo.Partition.Definitions { + partitionID := def.ID + partitionTable, ok := statsHandle.TableInfoByID(is, partitionID) + if !ok { + err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + return + } + tableInfo := partitionTable.Meta() + var partitionStats *statistics.Table + var okLoad bool + if allPartitionStats != nil { + partitionStats, okLoad = allPartitionStats[partitionID] + } else { + okLoad = false + } + // If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats + if !okLoad { + var err1 error + partitionStats, err1 = statsHandle.LoadTablePartitionStats(tableInfo, &def) + if err1 != nil { + if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) { + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L)) + continue + } + err = err1 + return + } + if externalCache { + allPartitionStats[partitionID] = partitionStats + } + } + + for i := 0; i < globalStats.Num; i++ { + // GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race. + // partitionStats will be released after the for loop. + hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache) + skipPartition := false + if !analyzed { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart) + skipPartition = true + } + + // Partition stats is not empty but column stats(hist, topN) is missing. + if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) { + var missingPart string + if !isIndex { + missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i])) + } else { + missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i])) + } + if !skipMissingPartitionStats { + err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart)) + return + } + globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN") + skipPartition = true + } + + if i == 0 { + // In a partition, we will only update globalStats.Count once. + globalStats.Count += partitionStats.RealtimeCount + globalStats.ModifyCount += partitionStats.ModifyCount + } + + if !skipPartition { + allHg[i] = append(allHg[i], hg) + allCms[i] = append(allCms[i], cms) + allTopN[i] = append(allTopN[i], topN) + allFms[i] = append(allFms[i], fms) + } + } + } + + // After collect all the statistics from the partition-level stats, + // we should merge them together. + for i := 0; i < globalStats.Num; i++ { + if len(allHg[i]) == 0 { + // If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0` + // correctly. It can avoid unexpected behaviors such as nil pointer panic. + continue + } + // FMSketch use many memory, so we first deal with it and then destroy it. + // Merge FMSketch. + // NOTE: allFms maybe contain empty. + globalStats.Fms[i] = allFms[i][0] + for j := 1; j < len(allFms[i]); j++ { + if globalStats.Fms[i] == nil { + globalStats.Fms[i] = allFms[i][j] + } else { + globalStats.Fms[i].MergeFMSketch(allFms[i][j]) + allFms[i][j].DestroyAndPutToPool() + } + } + + // Update the global NDV. + globalStatsNDV := globalStats.Fms[i].NDV() + if globalStatsNDV > globalStats.Count { + globalStatsNDV = globalStats.Count + } + globalStats.Fms[i].DestroyAndPutToPool() + + // Merge CMSketch. + globalStats.Cms[i] = allCms[i][0] + for j := 1; j < len(allCms[i]); j++ { + err = globalStats.Cms[i].MergeCMSketch(allCms[i][j]) + if err != nil { + return + } + } + + // Merge topN. + // Note: We need to merge TopN before merging the histogram. + // Because after merging TopN, some numbers will be left. + // These remaining topN numbers will be used as a separate bucket for later histogram merging. + var poppedTopN []statistics.TopNMeta + wrapper := NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper, + sc.GetSessionVars().StmtCtx.TimeZone(), sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) + if err != nil { + return + } + + // Merge histogram. + globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN, + int64(opts[ast.AnalyzeOptNumBuckets]), isIndex, sc.GetSessionVars().AnalyzeVersion) + if err != nil { + return + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range globalStats.Hg[i].Buckets { + globalStats.Hg[i].Buckets[j].NDV = 0 + } + + globalStats.Hg[i].NDV = globalStatsNDV + } + return +} + +// WriteGlobalStatsToStorage is to write global stats to storage +func WriteGlobalStatsToStorage(statsHandle statstypes.StatsHandle, globalStats *GlobalStats, info *statstypes.GlobalStatsInfo, gid int64) (err error) { + // Dump global-level stats to kv. + for i := 0; i < globalStats.Num; i++ { + hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i] + if hg == nil { + // All partitions have no stats so global stats are not created. + continue + } + // fms for global stats doesn't need to dump to kv. + err = statsHandle.SaveStatsToStorage(gid, + globalStats.Count, + globalStats.ModifyCount, + info.IsIndex, + hg, + cms, + topN, + info.StatsVersion, + true, + util.StatsMetaHistorySourceAnalyze, + ) + if err != nil { + statslogutil.StatsLogger().Error("save global-level stats to storage failed", + zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", gid)) + } + } + return err +} diff --git a/pkg/statistics/handle/globalstats/global_stats_async.go b/pkg/statistics/handle/globalstats/global_stats_async.go new file mode 100644 index 0000000000000..13208a15cbb0c --- /dev/null +++ b/pkg/statistics/handle/globalstats/global_stats_async.go @@ -0,0 +1,546 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "context" + stderrors "errors" + "fmt" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type mergeItem[T any] struct { + item T + idx int +} + +type skipItem struct { + histID int64 + partitionID int64 +} + +// toSQLIndex is used to convert bool to int64. +func toSQLIndex(isIndex bool) int { + var index = int(0) + if isIndex { + index = 1 + } + return index +} + +// AsyncMergePartitionStats2GlobalStats is used to merge partition stats to global stats. +// it divides the merge task into two parts. +// - IOWorker: load stats from storage. it will load fmsketch, cmsketch, histogram and topn. and send them to cpuWorker. +// - CPUWorker: merge the stats from IOWorker and generate global stats. +// +// ┌────────────────────────┐ ┌───────────────────────┐ +// │ │ │ │ +// │ │ │ │ +// │ │ │ │ +// │ IOWorker │ │ CPUWorker │ +// │ │ ────► │ │ +// │ │ │ │ +// │ │ │ │ +// │ │ │ │ +// └────────────────────────┘ └───────────────────────┘ +type AsyncMergePartitionStats2GlobalStats struct { + is infoschema.InfoSchema + statsHandle statstypes.StatsHandle + globalStats *GlobalStats + cmsketch chan mergeItem[*statistics.CMSketch] + fmsketch chan mergeItem[*statistics.FMSketch] + histogramAndTopn chan mergeItem[*StatsWrapper] + allPartitionStats map[int64]*statistics.Table + PartitionDefinition map[int64]model.PartitionDefinition + tableInfo map[int64]*model.TableInfo + // key is partition id and histID + skipPartition map[skipItem]struct{} + // ioWorker meet error, it will close this channel to notify cpuWorker. + ioWorkerExitWhenErrChan chan struct{} + // cpuWorker exit, it will close this channel to notify ioWorker. + cpuWorkerExitChan chan struct{} + globalTableInfo *model.TableInfo + histIDs []int64 + globalStatsNDV []int64 + partitionIDs []int64 + partitionNum int + skipMissingPartitionStats bool +} + +// NewAsyncMergePartitionStats2GlobalStats creates a new AsyncMergePartitionStats2GlobalStats. +func NewAsyncMergePartitionStats2GlobalStats( + statsHandle statstypes.StatsHandle, + globalTableInfo *model.TableInfo, + histIDs []int64, + is infoschema.InfoSchema) (*AsyncMergePartitionStats2GlobalStats, error) { + partitionNum := len(globalTableInfo.Partition.Definitions) + return &AsyncMergePartitionStats2GlobalStats{ + statsHandle: statsHandle, + cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), + fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), + histogramAndTopn: make(chan mergeItem[*StatsWrapper]), + PartitionDefinition: make(map[int64]model.PartitionDefinition), + tableInfo: make(map[int64]*model.TableInfo), + partitionIDs: make([]int64, 0, partitionNum), + ioWorkerExitWhenErrChan: make(chan struct{}), + cpuWorkerExitChan: make(chan struct{}), + skipPartition: make(map[skipItem]struct{}), + allPartitionStats: make(map[int64]*statistics.Table), + globalTableInfo: globalTableInfo, + histIDs: histIDs, + is: is, + partitionNum: partitionNum, + }, nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) prepare(sctx sessionctx.Context, isIndex bool) (err error) { + if len(a.histIDs) == 0 { + for _, col := range a.globalTableInfo.Columns { + // The virtual generated column stats can not be merged to the global stats. + if col.IsVirtualGenerated() { + continue + } + a.histIDs = append(a.histIDs, col.ID) + } + } + a.globalStats = newGlobalStats(len(a.histIDs)) + a.globalStats.Num = len(a.histIDs) + a.globalStatsNDV = make([]int64, 0, a.globalStats.Num) + // get all partition stats + for _, def := range a.globalTableInfo.Partition.Definitions { + partitionID := def.ID + a.partitionIDs = append(a.partitionIDs, partitionID) + a.PartitionDefinition[partitionID] = def + partitionTable, ok := a.statsHandle.TableInfoByID(a.is, partitionID) + if !ok { + return errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) + } + tableInfo := partitionTable.Meta() + a.tableInfo[partitionID] = tableInfo + realtimeCount, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount( + util.StatsCtx, + sctx, + partitionID, + ) + if err != nil { + return err + } + if !isNull { + // In a partition, we will only update globalStats.Count once. + a.globalStats.Count += realtimeCount + a.globalStats.ModifyCount += modifyCount + } + err1 := skipPartition(sctx, partitionID, isIndex) + if err1 != nil { + // no idx so idx = 0 + err := a.dealWithSkipPartition(partitionID, isIndex, 0, err1) + if err != nil { + return err + } + if types.ErrPartitionStatsMissing.Equal(err1) { + continue + } + } + for idx, hist := range a.histIDs { + err1 := skipColumnPartition(sctx, partitionID, isIndex, hist) + if err1 != nil { + err := a.dealWithSkipPartition(partitionID, isIndex, idx, err1) + if err != nil { + return err + } + if types.ErrPartitionStatsMissing.Equal(err1) { + break + } + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealWithSkipPartition(partitionID int64, isIndex bool, idx int, err error) error { + switch { + case types.ErrPartitionStatsMissing.Equal(err): + return a.dealErrPartitionStatsMissing(partitionID) + case types.ErrPartitionColumnStatsMissing.Equal(err): + return a.dealErrPartitionColumnStatsMissing(isIndex, partitionID, idx) + default: + return err + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionStatsMissing(partitionID int64) error { + missingPart := fmt.Sprintf("partition `%s`", a.PartitionDefinition[partitionID].Name.L) + a.globalStats.MissingPartitionStats = append(a.globalStats.MissingPartitionStats, missingPart) + for _, histID := range a.histIDs { + a.skipPartition[skipItem{ + histID: histID, + partitionID: partitionID, + }] = struct{}{} + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissing(isIndex bool, partitionID int64, idx int) error { + var missingPart string + if isIndex { + missingPart = fmt.Sprintf("partition `%s` index `%s`", a.PartitionDefinition[partitionID].Name.L, a.tableInfo[partitionID].FindIndexNameByID(a.histIDs[idx])) + } else { + missingPart = fmt.Sprintf("partition `%s` column `%s`", a.PartitionDefinition[partitionID].Name.L, a.tableInfo[partitionID].FindColumnNameByID(a.histIDs[idx])) + } + if !a.skipMissingPartitionStats { + return types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", a.tableInfo[partitionID].Name.L, missingPart)) + } + a.globalStats.MissingPartitionStats = append(a.globalStats.MissingPartitionStats, missingPart) + a.skipPartition[skipItem{ + histID: a.histIDs[idx], + partitionID: partitionID, + }] = struct{}{} + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) { + defer func() { + if r := recover(); r != nil { + statslogutil.StatsLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r)) + close(a.ioWorkerExitWhenErrChan) + err = errors.New(fmt.Sprint(r)) + } + }() + err = a.loadFmsketch(sctx, isIndex) + if err != nil { + close(a.ioWorkerExitWhenErrChan) + return err + } + close(a.fmsketch) + err = a.loadCMsketch(sctx, isIndex) + if err != nil { + close(a.ioWorkerExitWhenErrChan) + return err + } + close(a.cmsketch) + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) + err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex) + if err != nil { + close(a.ioWorkerExitWhenErrChan) + return err + } + close(a.histogramAndTopn) + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + defer func() { + if r := recover(); r != nil { + statslogutil.StatsLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r)) + err = errors.New(fmt.Sprint(r)) + } + close(a.cpuWorkerExitChan) + }() + a.dealFMSketch() + select { + case <-a.ioWorkerExitWhenErrChan: + return nil + default: + for i := 0; i < a.globalStats.Num; i++ { + // Update the global NDV. + globalStatsNDV := a.globalStats.Fms[i].NDV() + if globalStatsNDV > a.globalStats.Count { + globalStatsNDV = a.globalStats.Count + } + a.globalStatsNDV = append(a.globalStatsNDV, globalStatsNDV) + a.globalStats.Fms[i].DestroyAndPutToPool() + } + } + err = a.dealCMSketch() + if err != nil { + statslogutil.StatsLogger().Warn("dealCMSketch failed", zap.Error(err)) + return err + } + failpoint.Inject("PanicSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + panic("test for PanicSameTime") + } + }) + err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) + if err != nil { + statslogutil.StatsLogger().Warn("dealHistogramAndTopN failed", zap.Error(err)) + return err + } + return nil +} + +// Result returns the global stats. +func (a *AsyncMergePartitionStats2GlobalStats) Result() *GlobalStats { + return a.globalStats +} + +// MergePartitionStats2GlobalStats merges partition stats to global stats. +func (a *AsyncMergePartitionStats2GlobalStats) MergePartitionStats2GlobalStats( + sctx sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, + isIndex bool, +) error { + a.skipMissingPartitionStats = sctx.GetSessionVars().SkipMissingPartitionStats + tz := sctx.GetSessionVars().StmtCtx.TimeZone() + analyzeVersion := sctx.GetSessionVars().AnalyzeVersion + stmtCtx := sctx.GetSessionVars().StmtCtx + return util.CallWithSCtx(a.statsHandle.SPool(), + func(sctx sessionctx.Context) error { + err := a.prepare(sctx, isIndex) + if err != nil { + return err + } + ctx := context.Background() + metawg, _ := errgroup.WithContext(ctx) + mergeWg, _ := errgroup.WithContext(ctx) + metawg.Go(func() error { + return a.ioWorker(sctx, isIndex) + }) + mergeWg.Go(func() error { + return a.cpuWorker(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion) + }) + err = metawg.Wait() + if err != nil { + if err1 := mergeWg.Wait(); err1 != nil { + err = stderrors.Join(err, err1) + } + return err + } + return mergeWg.Wait() + }, + ) +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Context, isIndex bool) error { + for i := 0; i < a.globalStats.Num; i++ { + // load fmsketch from tikv + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + fmsketch, err := storage.FMSketchFromStorage(sctx, partitionID, int64(toSQLIndex(isIndex)), a.histIDs[i]) + if err != nil { + return err + } + select { + case a.fmsketch <- mergeItem[*statistics.FMSketch]{ + fmsketch, i, + }: + case <-a.cpuWorkerExitChan: + statslogutil.StatsLogger().Warn("ioWorker detects CPUWorker has exited") + return nil + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Context, isIndex bool) error { + failpoint.Inject("PanicInIOWorker", nil) + for i := 0; i < a.globalStats.Num; i++ { + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + cmsketch, err := storage.CMSketchFromStorage(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i]) + if err != nil { + return err + } + a.cmsketch <- mergeItem[*statistics.CMSketch]{ + cmsketch, i, + } + select { + case a.cmsketch <- mergeItem[*statistics.CMSketch]{ + cmsketch, i, + }: + case <-a.cpuWorkerExitChan: + statslogutil.StatsLogger().Warn("ioWorker detects CPUWorker has exited") + return nil + } + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error { + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) + for i := 0; i < a.globalStats.Num; i++ { + hists := make([]*statistics.Histogram, 0, a.partitionNum) + topn := make([]*statistics.TopN, 0, a.partitionNum) + for _, partitionID := range a.partitionIDs { + _, ok := a.skipPartition[skipItem{ + histID: a.histIDs[i], + partitionID: partitionID, + }] + if ok { + continue + } + h, err := storage.LoadHistogram(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i], tableInfo) + if err != nil { + return err + } + t, err := storage.TopNFromStorage(sctx, partitionID, toSQLIndex(isIndex), a.histIDs[i]) + if err != nil { + return err + } + hists = append(hists, h) + topn = append(topn, t) + } + select { + case a.histogramAndTopn <- mergeItem[*StatsWrapper]{ + NewStatsWrapper(hists, topn), i, + }: + case <-a.cpuWorkerExitChan: + statslogutil.StatsLogger().Warn("ioWorker detects CPUWorker has exited") + return nil + } + } + return nil +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() { + failpoint.Inject("PanicInCPUWorker", nil) + for { + select { + case fms, ok := <-a.fmsketch: + if !ok { + return + } + if a.globalStats.Fms[fms.idx] == nil { + a.globalStats.Fms[fms.idx] = fms.item + } else { + a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item) + } + case <-a.ioWorkerExitWhenErrChan: + return + } + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error { + failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealCMSketch returned error")) + } + }) + for { + select { + case cms, ok := <-a.cmsketch: + if !ok { + return nil + } + if a.globalStats.Cms[cms.idx] == nil { + a.globalStats.Cms[cms.idx] = cms.item + } else { + err := a.globalStats.Cms[cms.idx].MergeCMSketch(cms.item) + if err != nil { + return err + } + } + case <-a.ioWorkerExitWhenErrChan: + return nil + } + } +} + +func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) { + failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) { + if val, _ := val.(bool); val { + failpoint.Return(errors.New("dealHistogramAndTopNErr returned error")) + } + }) + failpoint.Inject("ErrorSameTime", func(val failpoint.Value) { + if val, _ := val.(bool); val { + time.Sleep(1 * time.Second) + failpoint.Return(errors.New("ErrorSameTime returned error")) + } + }) + for { + select { + case item, ok := <-a.histogramAndTopn: + if !ok { + return nil + } + var err error + var poppedTopN []statistics.TopNMeta + var allhg []*statistics.Histogram + wrapper := item.item + a.globalStats.TopN[item.idx], poppedTopN, allhg, err = mergeGlobalStatsTopN(a.statsHandle.GPool(), sctx, wrapper, + tz, analyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) + if err != nil { + return err + } + + // Merge histogram. + globalHg := &(a.globalStats.Hg[item.idx]) + *globalHg, err = statistics.MergePartitionHist2GlobalHist(stmtCtx, allhg, poppedTopN, + int64(opts[ast.AnalyzeOptNumBuckets]), isIndex, analyzeVersion) + if err != nil { + return err + } + + // NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them. + for j := range (*globalHg).Buckets { + (*globalHg).Buckets[j].NDV = 0 + } + (*globalHg).NDV = a.globalStatsNDV[item.idx] + case <-a.ioWorkerExitWhenErrChan: + return nil + } + } +} + +func skipPartition(sctx sessionctx.Context, partitionID int64, isIndex bool) error { + return storage.CheckSkipPartition(sctx, partitionID, toSQLIndex(isIndex)) +} + +func skipColumnPartition(sctx sessionctx.Context, partitionID int64, isIndex bool, histsID int64) error { + return storage.CheckSkipColumnPartiion(sctx, partitionID, toSQLIndex(isIndex), histsID) +} diff --git a/pkg/statistics/handle/globalstats/global_stats_test.go b/pkg/statistics/handle/globalstats/global_stats_test.go new file mode 100644 index 0000000000000..405b9403fbc99 --- /dev/null +++ b/pkg/statistics/handle/globalstats/global_stats_test.go @@ -0,0 +1,977 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" +) + +func TestShowGlobalStatsWithAsyncMergeGlobal(t *testing.T) { + testShowGlobalStats(t, true) +} + +func TestShowGlobalStatsWithoutAsyncMergeGlobal(t *testing.T) { + testShowGlobalStats(t, false) +} + +func testShowGlobalStats(t *testing.T, isAsync bool) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@session.tidb_analyze_version = 0") + if isAsync { + tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 0") + } else { + tk.MustExec("set @@global.tidb_enable_async_merge_global_stats = 1") + } + tk.MustExec("drop table if exists t") + tk.MustExec("set @@tidb_partition_prune_mode = 'static'") + tk.MustExec("create table t (a int, key(a)) partition by hash(a) partitions 2") + tk.MustExec("insert into t values (1), (2), (3), (4)") + tk.MustExec("analyze table t with 1 buckets") + require.Len(t, tk.MustQuery("show stats_meta").Rows(), 2) + require.Len(t, tk.MustQuery("show stats_meta where partition_name='global'").Rows(), 0) + require.Len(t, tk.MustQuery("show stats_buckets").Rows(), 4) // 2 partitions * (1 for the column_a and 1 for the index_a) + require.Len(t, tk.MustQuery("show stats_buckets where partition_name='global'").Rows(), 0) + require.Len(t, tk.MustQuery("show stats_histograms").Rows(), 4) + require.Len(t, tk.MustQuery("show stats_histograms where partition_name='global'").Rows(), 0) + require.Len(t, tk.MustQuery("show stats_healthy").Rows(), 2) + require.Len(t, tk.MustQuery("show stats_healthy where partition_name='global'").Rows(), 0) + + tk.MustExec("set @@tidb_analyze_version = 2") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("analyze table t with 0 topn, 1 buckets") + require.Len(t, tk.MustQuery("show stats_meta").Rows(), 3) + require.Len(t, tk.MustQuery("show stats_meta where partition_name='global'").Rows(), 1) + require.Len(t, tk.MustQuery("show stats_buckets").Rows(), 6) + require.Len(t, tk.MustQuery("show stats_buckets where partition_name='global'").Rows(), 2) + require.Len(t, tk.MustQuery("show stats_histograms").Rows(), 6) + require.Len(t, tk.MustQuery("show stats_histograms where partition_name='global'").Rows(), 2) + require.Len(t, tk.MustQuery("show stats_healthy").Rows(), 3) + require.Len(t, tk.MustQuery("show stats_healthy where partition_name='global'").Rows(), 1) +} + +func simpleTest(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a)) partition by hash(a) partitions 10") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (8), (10), (20), (30)") + tk.MustExec("analyze table t with 0 topn, 1 buckets") +} + +func TestGlobalStatsPanicInIOWorker(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInIOWorker" + require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsWithCMSketchErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsPanicInCPUWorker(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker" + require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsPanicSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestGlobalStatsErrorSametime(t *testing.T) { + fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime" + require.NoError(t, failpoint.Enable(fpName, `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable(fpName)) + }() + simpleTest(t) +} + +func TestBuildGlobalLevelStats(t *testing.T) { + store := testkit.CreateMockStore(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t, t1;") + testKit.MustExec("set @@tidb_analyze_version = 2") + testKit.MustExec("set @@tidb_partition_prune_mode = 'static';") + testKit.MustExec("create table t(a int, b int, c int) PARTITION BY HASH(a) PARTITIONS 3;") + testKit.MustExec("create table t1(a int);") + testKit.MustExec("insert into t values(1,1,1),(3,12,3),(4,20,4),(2,7,2),(5,21,5);") + testKit.MustExec("insert into t1 values(1),(3),(4),(2),(5);") + testKit.MustExec("create index idx_t_ab on t(a, b);") + testKit.MustExec("create index idx_t_b on t(b);") + testKit.MustExec("select * from t where c = 0") + testKit.MustExec("select * from t1 where a = 0") + do, err := session.GetDomain(store) + require.NoError(t, err) + statsHandle := do.StatsHandle() + require.NoError(t, statsHandle.DumpColStatsUsageToKV()) + testKit.MustExec("analyze table t, t1;") + result := testKit.MustQuery("show stats_meta where table_name = 't';").Sort() + require.Len(t, result.Rows(), 3) + require.Equal(t, "1", result.Rows()[0][5]) + require.Equal(t, "2", result.Rows()[1][5]) + require.Equal(t, "2", result.Rows()[2][5]) + result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort() + require.Len(t, result.Rows(), 15) + + result = testKit.MustQuery("show stats_meta where table_name = 't1';").Sort() + require.Len(t, result.Rows(), 1) + require.Equal(t, "5", result.Rows()[0][5]) + result = testKit.MustQuery("show stats_histograms where table_name = 't1';").Sort() + require.Len(t, result.Rows(), 1) + + // Test the 'dynamic' mode + testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic';") + testKit.MustExec("analyze table t, t1;") + result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort() + require.Len(t, result.Rows(), 4) + require.Equal(t, "5", result.Rows()[0][5]) + require.Equal(t, "1", result.Rows()[1][5]) + require.Equal(t, "2", result.Rows()[2][5]) + require.Equal(t, "2", result.Rows()[3][5]) + result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort() + require.Len(t, result.Rows(), 20) + + result = testKit.MustQuery("show stats_meta where table_name = 't1';").Sort() + require.Len(t, result.Rows(), 1) + require.Equal(t, "5", result.Rows()[0][5]) + result = testKit.MustQuery("show stats_histograms where table_name = 't1';").Sort() + require.Len(t, result.Rows(), 1) + + testKit.MustExec("analyze table t index idx_t_ab, idx_t_b;") + result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort() + require.Len(t, result.Rows(), 4) + require.Equal(t, "5", result.Rows()[0][5]) + require.Equal(t, "1", result.Rows()[1][5]) + require.Equal(t, "2", result.Rows()[2][5]) + require.Equal(t, "2", result.Rows()[3][5]) + result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort() + require.Len(t, result.Rows(), 20) +} + +func TestGlobalStatsHealthy(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(` +create table t ( + a int, + key(a) +) +partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20) +)`) + + checkModifyAndCount := func(gModify, gCount, p0Modify, p0Count, p1Modify, p1Count int) { + rs := tk.MustQuery("show stats_meta").Rows() + require.Equal(t, fmt.Sprintf("%v", gModify), rs[0][4].(string)) // global.modify_count + require.Equal(t, fmt.Sprintf("%v", gCount), rs[0][5].(string)) // global.row_count + require.Equal(t, fmt.Sprintf("%v", p0Modify), rs[1][4].(string)) // p0.modify_count + require.Equal(t, fmt.Sprintf("%v", p0Count), rs[1][5].(string)) // p0.row_count + require.Equal(t, fmt.Sprintf("%v", p1Modify), rs[2][4].(string)) // p1.modify_count + require.Equal(t, fmt.Sprintf("%v", p1Count), rs[2][5].(string)) // p1.row_count + } + checkHealthy := func(gH, p0H, p1H int) { + tk.MustQuery("show stats_healthy").Check(testkit.Rows( + fmt.Sprintf("test t global %v", gH), + fmt.Sprintf("test t p0 %v", p0H), + fmt.Sprintf("test t p1 %v", p1H))) + } + + tk.MustExec("set @@tidb_analyze_version=2") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("analyze table t") + checkModifyAndCount(0, 0, 0, 0, 0, 0) + checkHealthy(100, 100, 100) + + tk.MustExec("insert into t values (1), (2)") // update p0 + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema())) + checkModifyAndCount(2, 2, 2, 2, 0, 0) + checkHealthy(0, 0, 100) + + tk.MustExec("insert into t values (11), (12), (13), (14)") // update p1 + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema())) + checkModifyAndCount(6, 6, 2, 2, 4, 4) + checkHealthy(0, 0, 0) + + tk.MustExec("analyze table t") + checkModifyAndCount(0, 6, 0, 2, 0, 4) + checkHealthy(100, 100, 100) + + tk.MustExec("insert into t values (4), (5), (15), (16)") // update p0 and p1 together + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema())) + checkModifyAndCount(4, 10, 2, 4, 2, 6) + checkHealthy(33, 0, 50) +} + +func TestGlobalStatsData(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(` +create table t ( + a int, + key(a) +) +partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20) +)`) + tk.MustExec("set @@tidb_analyze_version=2") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (6), (null), (11), (12), (13), (14), (15), (16), (17), (18), (19), (19)") + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t with 0 topn, 2 buckets") + + tk.MustQuery("select modify_count, count from mysql.stats_meta order by table_id asc").Check( + testkit.Rows("0 18", "0 8", "0 10")) // global row-count = sum(partition row-count) + + // distinct, null_count, tot_col_size should be the sum of their values in partition-stats, and correlation should be 0 + tk.MustQuery("select distinct_count, null_count, tot_col_size, correlation=0 from mysql.stats_histograms where is_index=0 order by table_id asc").Check( + testkit.Rows("15 1 17 1", "6 1 7 0", "9 0 10 0")) + tk.MustQuery("select distinct_count, null_count, tot_col_size, correlation=0 from mysql.stats_histograms where is_index=1 order by table_id asc").Check( + testkit.Rows("15 1 0 1", "6 1 7 1", "9 0 10 1")) + + tk.MustQuery("show stats_buckets where is_index=0").Check( + // db table partition col is_idx bucket_id count repeats lower upper ndv + testkit.Rows("test t global a 0 0 7 2 1 6 0", + "test t global a 0 1 17 2 11 19 0", + "test t p0 a 0 0 4 1 1 4 0", + "test t p0 a 0 1 7 2 5 6 0", + "test t p1 a 0 0 6 1 11 16 0", + "test t p1 a 0 1 10 2 17 19 0")) + tk.MustQuery("show stats_buckets where is_index=1").Check( + testkit.Rows("test t global a 1 0 7 2 1 6 0", + "test t global a 1 1 17 2 11 19 0", + "test t p0 a 1 0 4 1 1 4 0", + "test t p0 a 1 1 7 2 5 6 0", + "test t p1 a 1 0 6 1 11 16 0", + "test t p1 a 1 1 10 2 17 19 0")) +} + +func TestGlobalStatsData2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + testGlobalStats2(t, tk, dom) +} + +func TestGlobalStatsData2WithConcurrency(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_merge_partition_stats_concurrency=2") + defer func() { + tk.MustExec("set global tidb_merge_partition_stats_concurrency=1") + }() + testGlobalStats2(t, tk, dom) +} + +func TestGlobalStatsData3(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("set @@tidb_analyze_version=2") + + // index(int, int) + tk.MustExec("drop table if exists tintint") + tk.MustExec("create table tintint (a int, b int, key(a, b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec(`insert into tintint values ` + + `(1, 1), (1, 2), (2, 1), (2, 2), (2, 3), (2, 3), (3, 1), (3, 1), (3, 1),` + // values in p0 + `(11, 1), (12, 1), (12, 2), (13, 1), (13, 1), (13, 2), (13, 2), (13, 2)`) // values in p1 + tk.MustExec("analyze table tintint with 2 topn, 2 buckets") + + rs := tk.MustQuery("show stats_meta where table_name='tintint'").Rows() + require.Equal(t, "17", rs[0][5].(string)) // g.total = p0.total + p1.total + require.Equal(t, "9", rs[1][5].(string)) + require.Equal(t, "8", rs[2][5].(string)) + + tk.MustQuery("show stats_topn where table_name='tintint' and is_index=1").Check(testkit.Rows( + "test tintint global a 1 (3, 1) 3", + "test tintint global a 1 (13, 2) 3", + "test tintint p0 a 1 (2, 3) 2", + "test tintint p0 a 1 (3, 1) 3", + "test tintint p1 a 1 (13, 1) 2", + "test tintint p1 a 1 (13, 2) 3")) + + tk.MustQuery("show stats_buckets where table_name='tintint' and is_index=1").Check(testkit.Rows( + "test tintint global a 1 0 6 2 (1, 1) (2, 3) 0", // (2, 3) is popped into it + "test tintint global a 1 1 11 2 (11, 1) (13, 1) 0", // (13, 1) is popped into it + "test tintint p0 a 1 0 3 1 (1, 1) (2, 1) 0", + "test tintint p0 a 1 1 4 1 (2, 2) (2, 2) 0", + "test tintint p1 a 1 0 2 1 (11, 1) (12, 1) 0", + "test tintint p1 a 1 1 3 1 (12, 2) (12, 2) 0")) + + rs = tk.MustQuery("show stats_histograms where table_name='tintint' and is_index=1").Rows() + require.Equal(t, "11", rs[0][6].(string)) // g.ndv = p0.ndv + p1.ndv + require.Equal(t, "6", rs[1][6].(string)) + require.Equal(t, "5", rs[2][6].(string)) + + // index(int, string) + tk.MustExec("drop table if exists tintstr") + tk.MustExec("create table tintstr (a int, b varchar(32), key(a, b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec(`insert into tintstr values ` + + `(1, '1'), (1, '2'), (2, '1'), (2, '2'), (2, '3'), (2, '3'), (3, '1'), (3, '1'), (3, '1'),` + // values in p0 + `(11, '1'), (12, '1'), (12, '2'), (13, '1'), (13, '1'), (13, '2'), (13, '2'), (13, '2')`) // values in p1 + tk.MustExec("analyze table tintstr with 2 topn, 2 buckets") + + rs = tk.MustQuery("show stats_meta where table_name='tintstr'").Rows() + require.Equal(t, "17", rs[0][5].(string)) // g.total = p0.total + p1.total + require.Equal(t, "9", rs[1][5].(string)) + require.Equal(t, "8", rs[2][5].(string)) + + tk.MustQuery("show stats_topn where table_name='tintstr' and is_index=1").Check(testkit.Rows( + "test tintstr global a 1 (3, 1) 3", + "test tintstr global a 1 (13, 2) 3", + "test tintstr p0 a 1 (2, 3) 2", + "test tintstr p0 a 1 (3, 1) 3", + "test tintstr p1 a 1 (13, 1) 2", + "test tintstr p1 a 1 (13, 2) 3")) + + tk.MustQuery("show stats_buckets where table_name='tintstr' and is_index=1").Check(testkit.Rows( + "test tintstr global a 1 0 6 2 (1, 1) (2, 3) 0", // (2, 3) is popped into it + "test tintstr global a 1 1 11 2 (11, 1) (13, 1) 0", // (13, 1) is popped into it + "test tintstr p0 a 1 0 3 1 (1, 1) (2, 1) 0", + "test tintstr p0 a 1 1 4 1 (2, 2) (2, 2) 0", + "test tintstr p1 a 1 0 2 1 (11, 1) (12, 1) 0", + "test tintstr p1 a 1 1 3 1 (12, 2) (12, 2) 0")) + + rs = tk.MustQuery("show stats_histograms where table_name='tintstr' and is_index=1").Rows() + require.Equal(t, "11", rs[0][6].(string)) // g.ndv = p0.ndv + p1.ndv + require.Equal(t, "6", rs[1][6].(string)) + require.Equal(t, "5", rs[2][6].(string)) + + // index(int, double) + tk.MustExec("drop table if exists tintdouble") + tk.MustExec("create table tintdouble (a int, b double, key(a, b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec(`insert into tintdouble values ` + + `(1, 1), (1, 2), (2, 1), (2, 2), (2, 3), (2, 3), (3, 1), (3, 1), (3, 1),` + // values in p0 + `(11, 1), (12, 1), (12, 2), (13, 1), (13, 1), (13, 2), (13, 2), (13, 2)`) // values in p1 + tk.MustExec("analyze table tintdouble with 2 topn, 2 buckets") + + rs = tk.MustQuery("show stats_meta where table_name='tintdouble'").Rows() + require.Equal(t, "17", rs[0][5].(string)) // g.total = p0.total + p1.total + require.Equal(t, "9", rs[1][5].(string)) + require.Equal(t, "8", rs[2][5].(string)) + + tk.MustQuery("show stats_topn where table_name='tintdouble' and is_index=1").Check(testkit.Rows( + "test tintdouble global a 1 (3, 1) 3", + "test tintdouble global a 1 (13, 2) 3", + "test tintdouble p0 a 1 (2, 3) 2", + "test tintdouble p0 a 1 (3, 1) 3", + "test tintdouble p1 a 1 (13, 1) 2", + "test tintdouble p1 a 1 (13, 2) 3")) + + tk.MustQuery("show stats_buckets where table_name='tintdouble' and is_index=1").Check(testkit.Rows( + "test tintdouble global a 1 0 6 2 (1, 1) (2, 3) 0", // (2, 3) is popped into it + "test tintdouble global a 1 1 11 2 (11, 1) (13, 1) 0", // (13, 1) is popped into it + "test tintdouble p0 a 1 0 3 1 (1, 1) (2, 1) 0", + "test tintdouble p0 a 1 1 4 1 (2, 2) (2, 2) 0", + "test tintdouble p1 a 1 0 2 1 (11, 1) (12, 1) 0", + "test tintdouble p1 a 1 1 3 1 (12, 2) (12, 2) 0")) + + rs = tk.MustQuery("show stats_histograms where table_name='tintdouble' and is_index=1").Rows() + require.Equal(t, "11", rs[0][6].(string)) // g.ndv = p0.ndv + p1.ndv + require.Equal(t, "6", rs[1][6].(string)) + require.Equal(t, "5", rs[2][6].(string)) + + // index(double, decimal) + tk.MustExec("drop table if exists tdoubledecimal") + tk.MustExec("create table tdoubledecimal (a int, b decimal(30, 2), key(a, b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec(`insert into tdoubledecimal values ` + + `(1, 1), (1, 2), (2, 1), (2, 2), (2, 3), (2, 3), (3, 1), (3, 1), (3, 1),` + // values in p0 + `(11, 1), (12, 1), (12, 2), (13, 1), (13, 1), (13, 2), (13, 2), (13, 2)`) // values in p1 + tk.MustExec("analyze table tdoubledecimal with 2 topn, 2 buckets") + + rs = tk.MustQuery("show stats_meta where table_name='tdoubledecimal'").Rows() + require.Equal(t, "17", rs[0][5].(string)) // g.total = p0.total + p1.total + require.Equal(t, "9", rs[1][5].(string)) + require.Equal(t, "8", rs[2][5].(string)) + + tk.MustQuery("show stats_topn where table_name='tdoubledecimal' and is_index=1").Check(testkit.Rows( + "test tdoubledecimal global a 1 (3, 1.00) 3", + "test tdoubledecimal global a 1 (13, 2.00) 3", + "test tdoubledecimal p0 a 1 (2, 3.00) 2", + "test tdoubledecimal p0 a 1 (3, 1.00) 3", + "test tdoubledecimal p1 a 1 (13, 1.00) 2", + "test tdoubledecimal p1 a 1 (13, 2.00) 3")) + + tk.MustQuery("show stats_buckets where table_name='tdoubledecimal' and is_index=1").Check(testkit.Rows( + "test tdoubledecimal global a 1 0 6 2 (1, 1.00) (2, 3.00) 0", // (2, 3) is popped into it + "test tdoubledecimal global a 1 1 11 2 (11, 1.00) (13, 1.00) 0", // (13, 1) is popped into it + "test tdoubledecimal p0 a 1 0 3 1 (1, 1.00) (2, 1.00) 0", + "test tdoubledecimal p0 a 1 1 4 1 (2, 2.00) (2, 2.00) 0", + "test tdoubledecimal p1 a 1 0 2 1 (11, 1.00) (12, 1.00) 0", + "test tdoubledecimal p1 a 1 1 3 1 (12, 2.00) (12, 2.00) 0")) + + rs = tk.MustQuery("show stats_histograms where table_name='tdoubledecimal' and is_index=1").Rows() + require.Equal(t, "11", rs[0][6].(string)) // g.ndv = p0.ndv + p1.ndv + require.Equal(t, "6", rs[1][6].(string)) + require.Equal(t, "5", rs[2][6].(string)) + + // index(string, datetime) + tk.MustExec("drop table if exists tstrdt") + tk.MustExec("create table tstrdt (a int, b datetime, key(a, b)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec(`insert into tstrdt values ` + + `(1, '2000-01-01'), (1, '2000-01-02'), (2, '2000-01-01'), (2, '2000-01-02'), (2, '2000-01-03'), (2, '2000-01-03'), (3, '2000-01-01'), (3, '2000-01-01'), (3, '2000-01-01'),` + // values in p0 + `(11, '2000-01-01'), (12, '2000-01-01'), (12, '2000-01-02'), (13, '2000-01-01'), (13, '2000-01-01'), (13, '2000-01-02'), (13, '2000-01-02'), (13, '2000-01-02')`) // values in p1 + tk.MustExec("analyze table tstrdt with 2 topn, 2 buckets") + + rs = tk.MustQuery("show stats_meta where table_name='tstrdt'").Rows() + require.Equal(t, "17", rs[0][5].(string)) // g.total = p0.total + p1.total + require.Equal(t, "9", rs[1][5].(string)) + require.Equal(t, "8", rs[2][5].(string)) + + tk.MustQuery("show stats_topn where table_name='tstrdt' and is_index=1").Check(testkit.Rows( + "test tstrdt global a 1 (3, 2000-01-01 00:00:00) 3", + "test tstrdt global a 1 (13, 2000-01-02 00:00:00) 3", + "test tstrdt p0 a 1 (2, 2000-01-03 00:00:00) 2", + "test tstrdt p0 a 1 (3, 2000-01-01 00:00:00) 3", + "test tstrdt p1 a 1 (13, 2000-01-01 00:00:00) 2", + "test tstrdt p1 a 1 (13, 2000-01-02 00:00:00) 3")) + + tk.MustQuery("show stats_buckets where table_name='tstrdt' and is_index=1").Check(testkit.Rows( + "test tstrdt global a 1 0 6 2 (1, 2000-01-01 00:00:00) (2, 2000-01-03 00:00:00) 0", // (2, 3) is popped into it + "test tstrdt global a 1 1 11 2 (11, 2000-01-01 00:00:00) (13, 2000-01-01 00:00:00) 0", // (13, 1) is popped into it + "test tstrdt p0 a 1 0 3 1 (1, 2000-01-01 00:00:00) (2, 2000-01-01 00:00:00) 0", + "test tstrdt p0 a 1 1 4 1 (2, 2000-01-02 00:00:00) (2, 2000-01-02 00:00:00) 0", + "test tstrdt p1 a 1 0 2 1 (11, 2000-01-01 00:00:00) (12, 2000-01-01 00:00:00) 0", + "test tstrdt p1 a 1 1 3 1 (12, 2000-01-02 00:00:00) (12, 2000-01-02 00:00:00) 0")) + + rs = tk.MustQuery("show stats_histograms where table_name='tstrdt' and is_index=1").Rows() + require.Equal(t, "11", rs[0][6].(string)) // g.ndv = p0.ndv + p1.ndv + require.Equal(t, "6", rs[1][6].(string)) + require.Equal(t, "5", rs[2][6].(string)) +} + +func TestGlobalStatsVersion(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(` +create table t ( + a int +) +partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20) +)`) + err := statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) + require.NoError(t, err) + tk.MustExec("insert into t values (1), (5), (null), (11), (15)") + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + + tk.MustExec("set @@tidb_partition_prune_mode='static'") + tk.MustExec("set @@session.tidb_analyze_version=1") + tk.MustExec("analyze table t") // both p0 and p1 are in ver1 + require.Len(t, tk.MustQuery("show stats_meta").Rows(), 2) + + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("set @@session.tidb_analyze_version=1") + err = tk.ExecToErr("analyze table t") // try to build global-stats on ver1 + require.NoError(t, err) + + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("set @@session.tidb_analyze_version=2") + err = tk.ExecToErr("analyze table t partition p1") // only analyze p1 to let it in ver2 while p0 is in ver1 + require.NoError(t, err) + + tk.MustExec("analyze table t") // both p0 and p1 are in ver2 + require.Len(t, tk.MustQuery("show stats_meta").Rows(), 3) + + // If we already have global-stats, we can get the latest global-stats by analyzing the newly added partition. + tk.MustExec("alter table t add partition (partition p2 values less than (30))") + tk.MustExec("insert t values (13), (14), (22), (23)") + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t partition p2") // it will success since p0 and p1 are both in ver2 + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + do := dom + is := do.InfoSchema() + h := do.StatsHandle() + require.NoError(t, h.Update(context.Background(), is)) + tbl, err := is.TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + globalStats := h.GetTableStats(tableInfo) + // global.count = p0.count(3) + p1.count(4) + p2.count(2) + // modify count is 2 because we didn't analyze p1 after the second insert + require.Equal(t, int64(9), globalStats.RealtimeCount) + require.Equal(t, int64(2), globalStats.ModifyCount) + + tk.MustExec("analyze table t partition p1;") + globalStats = h.GetTableStats(tableInfo) + // global.count = p0.count(3) + p1.count(4) + p2.count(4) + // The value of modify count is 0 now. + require.Equal(t, int64(9), globalStats.RealtimeCount) + require.Equal(t, int64(0), globalStats.ModifyCount) + + tk.MustExec("alter table t drop partition p2;") + require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t;") + globalStats = h.GetTableStats(tableInfo) + // global.count = p0.count(3) + p1.count(4) + require.Equal(t, int64(7), globalStats.RealtimeCount) +} + +func TestDDLPartition4GlobalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec(`create table t (a int) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than (30), + partition p3 values less than (40), + partition p4 values less than (50), + partition p5 values less than (60) + )`) + do := dom + is := do.InfoSchema() + h := do.StatsHandle() + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + require.NoError(t, h.Update(context.Background(), is)) + tk.MustExec("insert into t values (1), (2), (3), (4), (5), " + + "(11), (21), (31), (41), (51)," + + "(12), (22), (32), (42), (52);") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + require.NoError(t, h.Update(context.Background(), is)) + tk.MustExec("analyze table t") + result := tk.MustQuery("show stats_meta where table_name = 't';").Rows() + require.Len(t, result, 7) + tbl, err := is.TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + globalStats := h.GetTableStats(tableInfo) + require.Equal(t, int64(15), globalStats.RealtimeCount) + + tk.MustExec("alter table t truncate partition p2, p4;") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + require.NoError(t, h.Update(context.Background(), is)) + // We will update the global-stats after the truncate operation. + globalStats = h.GetTableStats(tableInfo) + require.Equal(t, int64(11), globalStats.RealtimeCount) + + tk.MustExec("analyze table t;") + result = tk.MustQuery("show stats_meta where table_name = 't';").Rows() + // The truncate operation only delete the data from the partition p2 and p4. It will not delete the partition-stats. + require.Len(t, result, 7) + // The result for the globalStats.count will be right now + globalStats = h.GetTableStats(tableInfo) + require.Equal(t, int64(11), globalStats.RealtimeCount) +} + +func TestGlobalStatsNDV(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec(`CREATE TABLE t ( a int, key(a) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20), + PARTITION p2 VALUES LESS THAN (30), + PARTITION p3 VALUES LESS THAN (40))`) + + checkNDV := func(ndvs ...int) { // g, p0, ..., p3 + tk.MustExec("analyze table t") + rs := tk.MustQuery(`show stats_histograms where is_index=1`).Rows() + require.Len(t, rs, 5) + for i, ndv := range ndvs { + require.Equal(t, fmt.Sprintf("%v", ndv), rs[i][6].(string)) + } + } + + // all partitions are empty + checkNDV(0, 0, 0, 0, 0) + + // p0 has data while others are empty + tk.MustExec("insert into t values (1), (2), (3)") + checkNDV(3, 3, 0, 0, 0) + + // p0, p1, p2 have data while p3 is empty + tk.MustExec("insert into t values (11), (12), (13), (21), (22), (23)") + checkNDV(9, 3, 3, 3, 0) + + // all partitions are not empty + tk.MustExec("insert into t values (31), (32), (33), (34)") + checkNDV(13, 3, 3, 3, 4) + + // insert some duplicated records + tk.MustExec("insert into t values (31), (33), (34)") + tk.MustExec("insert into t values (1), (2), (3)") + checkNDV(13, 3, 3, 3, 4) +} + +func TestGlobalStatsIndexNDV(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + checkNDV := func(tbl string, g int, ps ...int) { // g, p0, ..., p3 + tk.MustExec("analyze table " + tbl) + rs := tk.MustQuery(fmt.Sprintf(`show stats_histograms where is_index=1 and table_name='%v'`, tbl)).Rows() + require.Len(t, rs, 1+len(ps)) // 1(global) + number of partitions + require.Equal(t, fmt.Sprintf("%v", g), rs[0][6].(string)) // global + for i, ndv := range ps { + require.Equal(t, fmt.Sprintf("%v", ndv), rs[i+1][6].(string)) + } + } + + // int + tk.MustExec("drop table if exists tint") + tk.MustExec(`CREATE TABLE tint ( a int, b int, key(b) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20))`) + tk.MustExec("insert into tint values (1, 1), (1, 2), (1, 3)") // p0.b: [1, 2, 3], p1.b: [] + checkNDV("tint", 3, 3, 0) + tk.MustExec("insert into tint values (11, 1), (11, 2), (11, 3)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3] + checkNDV("tint", 3, 3, 3) + tk.MustExec("insert into tint values (11, 4), (11, 5), (11, 6)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tint", 6, 3, 6) + tk.MustExec("insert into tint values (1, 4), (1, 5), (1, 6), (1, 7), (1, 8)") // p0.b: [1, 2, 3, 4, 5, 6, 7, 8], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tint", 8, 8, 6) + + // double + tk.MustExec("drop table if exists tdouble") + tk.MustExec(`CREATE TABLE tdouble ( a int, b double, key(b) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20))`) + tk.MustExec("insert into tdouble values (1, 1.1), (1, 2.2), (1, 3.3)") // p0.b: [1, 2, 3], p1.b: [] + checkNDV("tdouble", 3, 3, 0) + tk.MustExec("insert into tdouble values (11, 1.1), (11, 2.2), (11, 3.3)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3] + checkNDV("tdouble", 3, 3, 3) + tk.MustExec("insert into tdouble values (11, 4.4), (11, 5.5), (11, 6.6)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdouble", 6, 3, 6) + tk.MustExec("insert into tdouble values (1, 4.4), (1, 5.5), (1, 6.6), (1, 7.7), (1, 8.8)") // p0.b: [1, 2, 3, 4, 5, 6, 7, 8], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdouble", 8, 8, 6) + + // decimal + tk.MustExec("drop table if exists tdecimal") + tk.MustExec(`CREATE TABLE tdecimal ( a int, b decimal(30, 15), key(b) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20))`) + tk.MustExec("insert into tdecimal values (1, 1.1), (1, 2.2), (1, 3.3)") // p0.b: [1, 2, 3], p1.b: [] + checkNDV("tdecimal", 3, 3, 0) + tk.MustExec("insert into tdecimal values (11, 1.1), (11, 2.2), (11, 3.3)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3] + checkNDV("tdecimal", 3, 3, 3) + tk.MustExec("insert into tdecimal values (11, 4.4), (11, 5.5), (11, 6.6)") // p0.b: [1, 2, 3], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdecimal", 6, 3, 6) + tk.MustExec("insert into tdecimal values (1, 4.4), (1, 5.5), (1, 6.6), (1, 7.7), (1, 8.8)") // p0.b: [1, 2, 3, 4, 5, 6, 7, 8], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdecimal", 8, 8, 6) + + // string + tk.MustExec("drop table if exists tstring") + tk.MustExec(`CREATE TABLE tstring ( a int, b varchar(30), key(b) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20))`) + tk.MustExec("insert into tstring values (1, '111'), (1, '222'), (1, '333')") // p0.b: [1, 2, 3], p1.b: [] + checkNDV("tstring", 3, 3, 0) + tk.MustExec("insert into tstring values (11, '111'), (11, '222'), (11, '333')") // p0.b: [1, 2, 3], p1.b: [1, 2, 3] + checkNDV("tstring", 3, 3, 3) + tk.MustExec("insert into tstring values (11, '444'), (11, '555'), (11, '666')") // p0.b: [1, 2, 3], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tstring", 6, 3, 6) + tk.MustExec("insert into tstring values (1, '444'), (1, '555'), (1, '666'), (1, '777'), (1, '888')") // p0.b: [1, 2, 3, 4, 5, 6, 7, 8], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tstring", 8, 8, 6) + + // datetime + tk.MustExec("drop table if exists tdatetime") + tk.MustExec(`CREATE TABLE tdatetime ( a int, b datetime, key(b) ) + PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20))`) + tk.MustExec("insert into tdatetime values (1, '2001-01-01'), (1, '2002-01-01'), (1, '2003-01-01')") // p0.b: [1, 2, 3], p1.b: [] + checkNDV("tdatetime", 3, 3, 0) + tk.MustExec("insert into tdatetime values (11, '2001-01-01'), (11, '2002-01-01'), (11, '2003-01-01')") // p0.b: [1, 2, 3], p1.b: [1, 2, 3] + checkNDV("tdatetime", 3, 3, 3) + tk.MustExec("insert into tdatetime values (11, '2004-01-01'), (11, '2005-01-01'), (11, '2006-01-01')") // p0.b: [1, 2, 3], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdatetime", 6, 3, 6) + tk.MustExec("insert into tdatetime values (1, '2004-01-01'), (1, '2005-01-01'), (1, '2006-01-01'), (1, '2007-01-01'), (1, '2008-01-01')") // p0.b: [1, 2, 3, 4, 5, 6, 7, 8], p1.b: [1, 2, 3, 4, 5, 6] + checkNDV("tdatetime", 8, 8, 6) +} + +func TestGlobalStats(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/planner/core/forceDynamicPrune", `return(true)`) + defer failpoint.Disable("github.com/pingcap/tidb/pkg/planner/core/forceDynamicPrune") + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("set @@session.tidb_analyze_version = 2;") + tk.MustExec(`create table t (a int, key(a)) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than (30) + );`) + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic';") + tk.MustExec("insert into t values (1), (5), (null), (11), (15), (21), (25);") + tk.MustExec("analyze table t;") + // On the table with global-stats, we use explain to query a multi-partition query. + // And we should get the result that global-stats is used instead of pseudo-stats. + tk.MustQuery("explain format = 'brief' select a from t where a > 5").Check(testkit.Rows( + "IndexReader 4.00 root partition:all index:IndexRangeScan", + "└─IndexRangeScan 4.00 cop[tikv] table:t, index:a(a) range:(5,+inf], keep order:false")) + // On the table with global-stats, we use explain to query a single-partition query. + // And we should get the result that global-stats is used instead of pseudo-stats. + tk.MustQuery("explain format = 'brief' select * from t partition(p1) where a > 15;").Check(testkit.Rows( + "IndexReader 2.00 root partition:p1 index:IndexRangeScan", + "└─IndexRangeScan 2.00 cop[tikv] table:t, index:a(a) range:(15,+inf], keep order:false")) + + // Even if we have global-stats, we will not use it when the switch is set to `static`. + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustQuery("explain format = 'brief' select a from t where a > 5").Check(testkit.Rows( + "PartitionUnion 5.00 root ", + "├─IndexReader 1.00 root index:IndexRangeScan", + "│ └─IndexRangeScan 1.00 cop[tikv] table:t, partition:p0, index:a(a) range:(5,+inf], keep order:false", + "├─IndexReader 2.00 root index:IndexRangeScan", + "│ └─IndexRangeScan 2.00 cop[tikv] table:t, partition:p1, index:a(a) range:(5,+inf], keep order:false", + "└─IndexReader 2.00 root index:IndexRangeScan", + " └─IndexRangeScan 2.00 cop[tikv] table:t, partition:p2, index:a(a) range:(5,+inf], keep order:false")) + + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("drop table t;") + tk.MustExec("create table t(a int, b int, key(a)) PARTITION BY HASH(a) PARTITIONS 2;") + tk.MustExec("insert into t values(1,1),(3,3),(4,4),(2,2),(5,5);") + // When we set the mode to `static`, using analyze will not report an error and will not generate global-stats. + // In addition, when using explain to view the plan of the related query, it was found that `Union` was used. + tk.MustExec("analyze table t;") + result := tk.MustQuery("show stats_meta where table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "2", result.Rows()[0][5]) + require.Equal(t, "3", result.Rows()[1][5]) + tk.MustQuery("explain format = 'brief' select a from t where a > 3;").Check(testkit.Rows( + "PartitionUnion 2.00 root ", + "├─IndexReader 1.00 root index:IndexRangeScan", + "│ └─IndexRangeScan 1.00 cop[tikv] table:t, partition:p0, index:a(a) range:(3,+inf], keep order:false", + "└─IndexReader 1.00 root index:IndexRangeScan", + " └─IndexRangeScan 1.00 cop[tikv] table:t, partition:p1, index:a(a) range:(3,+inf], keep order:false")) + + // When we turned on the switch, we found that pseudo-stats will be used in the plan instead of `Union`. + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic';") + tk.MustQuery("explain format = 'brief' select a from t where a > 3;").Check(testkit.Rows( + "IndexReader 3333.33 root partition:all index:IndexRangeScan", + "└─IndexRangeScan 3333.33 cop[tikv] table:t, index:a(a) range:(3,+inf], keep order:false, stats:pseudo")) + + // Execute analyze again without error and can generate global-stats. + // And when executing related queries, neither Union nor pseudo-stats are used. + tk.MustExec("analyze table t;") + result = tk.MustQuery("show stats_meta where table_name = 't'").Sort() + require.Len(t, result.Rows(), 3) + require.Equal(t, "5", result.Rows()[0][5]) + require.Equal(t, "2", result.Rows()[1][5]) + require.Equal(t, "3", result.Rows()[2][5]) + tk.MustQuery("explain format = 'brief' select a from t where a > 3;").Check(testkit.Rows( + "IndexReader 2.00 root partition:all index:IndexRangeScan", + "└─IndexRangeScan 2.00 cop[tikv] table:t, index:a(a) range:(3,+inf], keep order:false")) + + tk.MustExec("drop table t;") + tk.MustExec("create table t (a int, b int, c int) PARTITION BY HASH(a) PARTITIONS 2;") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic';") + tk.MustExec("create index idx_ab on t(a, b);") + tk.MustExec("insert into t values (1, 1, 1), (5, 5, 5), (11, 11, 11), (15, 15, 15), (21, 21, 21), (25, 25, 25);") + tk.MustExec("analyze table t;") + // test the indexScan + tk.MustQuery("explain format = 'brief' select b from t where a > 5 and b > 10;").Check(testkit.Rows( + "IndexReader 2.67 root partition:all index:Projection", + "└─Projection 2.67 cop[tikv] test.t.b", + " └─Selection 2.67 cop[tikv] gt(test.t.b, 10)", + " └─IndexRangeScan 4.00 cop[tikv] table:t, index:idx_ab(a, b) range:(5,+inf], keep order:false")) + // test the indexLookUp + tk.MustQuery("explain format = 'brief' select * from t use index(idx_ab) where a > 1;").Check(testkit.Rows( + "IndexLookUp 5.00 root partition:all ", + "├─IndexRangeScan(Build) 5.00 cop[tikv] table:t, index:idx_ab(a, b) range:(1,+inf], keep order:false", + "└─TableRowIDScan(Probe) 5.00 cop[tikv] table:t keep order:false")) + // test the tableScan + tk.MustQuery("explain format = 'brief' select * from t;").Check(testkit.Rows( + "TableReader 6.00 root partition:all data:TableFullScan", + "└─TableFullScan 6.00 cop[tikv] table:t keep order:false")) +} + +func TestGlobalIndexStatistics(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + h := dom.StatsHandle() + originLease := h.Lease() + defer h.SetLease(originLease) + h.SetLease(time.Millisecond) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + for i, version := range []string{"1", "2"} { + tk.MustExec("set @@session.tidb_analyze_version = " + version) + + // analyze table t + tk.MustExec("drop table if exists t") + if i != 0 { + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + } + tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, key(a) )" + + "PARTITION BY RANGE (a) (" + + "PARTITION p0 VALUES LESS THAN (10)," + + "PARTITION p1 VALUES LESS THAN (20)," + + "PARTITION p2 VALUES LESS THAN (30)," + + "PARTITION p3 VALUES LESS THAN (40))") + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") + tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") + <-h.DDLEventCh() + require.Nil(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t") + require.Nil(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustQuery("SELECT b FROM t use index(idx) WHERE b < 16 ORDER BY b"). + Check(testkit.Rows("1", "2", "3", "15")) + tk.MustQuery("EXPLAIN format='brief' SELECT b FROM t use index(idx) WHERE b < 16 ORDER BY b"). + Check(testkit.Rows("IndexReader 4.00 root partition:all index:IndexRangeScan", + "└─IndexRangeScan 4.00 cop[tikv] table:t, index:idx(b) range:[-inf,16), keep order:true")) + + // analyze table t index idx + tk.MustExec("drop table if exists t") + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered)" + + "PARTITION BY RANGE (a) (" + + "PARTITION p0 VALUES LESS THAN (10)," + + "PARTITION p1 VALUES LESS THAN (20)," + + "PARTITION p2 VALUES LESS THAN (30)," + + "PARTITION p3 VALUES LESS THAN (40));") + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") + tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") + <-h.DDLEventCh() + require.Nil(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t index idx") + require.Nil(t, h.Update(context.Background(), dom.InfoSchema())) + rows := tk.MustQuery("EXPLAIN SELECT b FROM t use index(idx) WHERE b < 16 ORDER BY b;").Rows() + require.Equal(t, "4.00", rows[0][1]) + + // analyze table t index + tk.MustExec("drop table if exists t") + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered )" + + "PARTITION BY RANGE (a) (" + + "PARTITION p0 VALUES LESS THAN (10)," + + "PARTITION p1 VALUES LESS THAN (20)," + + "PARTITION p2 VALUES LESS THAN (30)," + + "PARTITION p3 VALUES LESS THAN (40));") + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") + tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") + <-h.DDLEventCh() + require.Nil(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t index") + require.Nil(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustQuery("EXPLAIN format='brief' SELECT b FROM t use index(idx) WHERE b < 16 ORDER BY b;"). + Check(testkit.Rows("IndexReader 4.00 root partition:all index:IndexRangeScan", + "└─IndexRangeScan 4.00 cop[tikv] table:t, index:idx(b) range:[-inf,16), keep order:true")) + } +} + +func TestIssues24349(t *testing.T) { + store := testkit.CreateMockStore(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("set @@tidb_partition_prune_mode='dynamic'") + testKit.MustExec("set @@tidb_analyze_version=2") + defer testKit.MustExec("set @@tidb_analyze_version=1") + defer testKit.MustExec("set @@tidb_partition_prune_mode='static'") + testIssues24349(t, testKit, store) +} + +func TestIssues24349WithConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("set @@tidb_partition_prune_mode='dynamic'") + testKit.MustExec("set @@tidb_analyze_version=2") + testKit.MustExec("set global tidb_merge_partition_stats_concurrency=2") + defer testKit.MustExec("set @@tidb_analyze_version=1") + defer testKit.MustExec("set @@tidb_partition_prune_mode='static'") + defer testKit.MustExec("set global tidb_merge_partition_stats_concurrency=1") + testIssues24349(t, testKit, store) +} + +func TestGlobalStatsAndSQLBinding(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_merge_partition_stats_concurrency=1") + testGlobalStatsAndSQLBinding(tk) +} + +func TestGlobalStatsAndSQLBindingWithConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_merge_partition_stats_concurrency=2") + testGlobalStatsAndSQLBinding(tk) +} diff --git a/pkg/statistics/histogram_test.go b/pkg/statistics/histogram_test.go new file mode 100644 index 0000000000000..7e6a1c811a17a --- /dev/null +++ b/pkg/statistics/histogram_test.go @@ -0,0 +1,732 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +func TestTruncateHistogram(t *testing.T) { + hist := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLonglong), 1, 0) + low, high := types.NewIntDatum(0), types.NewIntDatum(1) + hist.AppendBucket(&low, &high, 0, 1) + newHist := hist.TruncateHistogram(1) + require.True(t, HistogramEqual(hist, newHist, true)) + newHist = hist.TruncateHistogram(0) + require.Equal(t, 0, newHist.Len()) +} + +func TestValueToString4InvalidKey(t *testing.T) { + bytes, err := codec.EncodeKey(time.UTC, nil, types.NewDatum(1), types.NewDatum(0.5)) + require.NoError(t, err) + // Append invalid flag. + bytes = append(bytes, 20) + datum := types.NewDatum(bytes) + res, err := ValueToString(nil, &datum, 3, nil) + require.NoError(t, err) + require.Equal(t, "(1, 0.5, \x14)", res) +} + +type bucket4Test struct { + lower int64 + upper int64 + count int64 + repeat int64 + ndv int64 +} + +type topN4Test struct { + data int64 + count int64 +} + +func genHist4Test(t *testing.T, buckets []*bucket4Test, totColSize int64) *Histogram { + h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeBlob), len(buckets), totColSize) + for _, bucket := range buckets { + lower, err := codec.EncodeKey(time.UTC, nil, types.NewIntDatum(bucket.lower)) + require.NoError(t, err) + upper, err := codec.EncodeKey(time.UTC, nil, types.NewIntDatum(bucket.upper)) + require.NoError(t, err) + di, du := types.NewBytesDatum(lower), types.NewBytesDatum(upper) + h.AppendBucketWithNDV(&di, &du, bucket.count, bucket.repeat, bucket.ndv) + } + return h +} + +func TestMergePartitionLevelHist(t *testing.T) { + type testCase struct { + partitionHists [][]*bucket4Test + totColSize []int64 + popedTopN []topN4Test + expHist []*bucket4Test + expBucketNumber int + } + tests := []testCase{ + { + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11}, + popedTopN: []topN4Test{}, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 9, + count: 10, + repeat: 2, + ndv: 7, + }, + { + lower: 11, + upper: 17, + count: 22, + repeat: 1, + ndv: 8, + }, + }, + expBucketNumber: 2, + }, + { + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11}, + popedTopN: []topN4Test{ + { + data: 18, + count: 5, + }, + { + data: 4, + count: 6, + }, + }, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 5, + count: 10, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 12, + count: 22, + repeat: 3, + ndv: 6, + }, + { + lower: 13, + upper: 18, + count: 33, + repeat: 5, + ndv: 5, + }, + }, + expBucketNumber: 3, + }, + { + // issue#49023 + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 5, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(3) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(4) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11, 11, 11}, + popedTopN: []topN4Test{ + { + data: 18, + count: 5, + }, + { + data: 4, + count: 6, + }, + }, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 9, + count: 17, + repeat: 2, + ndv: 8, + }, + { + lower: 11, + upper: 11, + count: 35, + repeat: 9, + ndv: 1, + }, + { + lower: 13, + upper: 18, + count: 55, + repeat: 5, + ndv: 6, + }, + }, + expBucketNumber: 3, + }, + } + failpoint.Enable("github.com/pingcap/pkg/statistics/enableTopNNDV", `return(true)`) + + for ii, tt := range tests { + var expTotColSize int64 + hists := make([]*Histogram, 0, len(tt.partitionHists)) + for i := range tt.partitionHists { + hists = append(hists, genHist4Test(t, tt.partitionHists[i], tt.totColSize[i])) + expTotColSize += tt.totColSize[i] + } + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + poped := make([]TopNMeta, 0, len(tt.popedTopN)) + for _, top := range tt.popedTopN { + b, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(top.data)) + require.NoError(t, err) + tmp := TopNMeta{ + Encoded: b, + Count: uint64(top.count), + } + poped = append(poped, tmp) + } + globalHist, err := MergePartitionHist2GlobalHist(sc, hists, poped, int64(tt.expBucketNumber), true, Version2) + require.NoError(t, err) + require.Equal(t, tt.expBucketNumber, len(globalHist.Buckets)) + for i, b := range tt.expHist { + lo, err := ValueToString(ctx.GetSessionVars(), globalHist.GetLower(i), 1, []byte{types.KindInt64}) + require.NoError(t, err, "failed at #%d case, %d bucket", ii, i) + up, err := ValueToString(ctx.GetSessionVars(), globalHist.GetUpper(i), 1, []byte{types.KindInt64}) + require.NoError(t, err, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, fmt.Sprintf("%v", b.lower), lo, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, fmt.Sprintf("%v", b.upper), up, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.count, globalHist.Buckets[i].Count, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.repeat, globalHist.Buckets[i].Repeat, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.ndv, globalHist.Buckets[i].NDV, "failed at #%d case, %d bucket", ii, i) + } + require.Equal(t, expTotColSize, globalHist.TotColSize, "failed at #%d case", ii) + } + failpoint.Disable("github.com/pingcap/pkg/statistics/enableTopNNDV") +} + +func genBucket4Merging4Test(lower, upper, ndv, disjointNDV int64) bucket4Merging { + l := types.NewIntDatum(lower) + r := types.NewIntDatum(upper) + return bucket4Merging{ + lower: &l, + upper: &r, + Bucket: Bucket{ + NDV: ndv, + Count: ndv, + }, + disjointNDV: disjointNDV, + } +} + +func TestMergeBucketNDV(t *testing.T) { + type testData struct { + left bucket4Merging + right bucket4Merging + result bucket4Merging + } + tests := []testData{ + { + left: genBucket4Merging4Test(1, 2, 2, 0), + right: genBucket4Merging4Test(1, 2, 3, 0), + result: genBucket4Merging4Test(1, 2, 3, 0), + }, + { + left: genBucket4Merging4Test(1, 3, 2, 0), + right: genBucket4Merging4Test(2, 3, 2, 0), + result: genBucket4Merging4Test(1, 3, 3, 0), + }, + { + left: genBucket4Merging4Test(1, 3, 2, 0), + right: genBucket4Merging4Test(4, 6, 2, 2), + result: genBucket4Merging4Test(1, 3, 2, 4), + }, + { + left: genBucket4Merging4Test(1, 5, 5, 0), + right: genBucket4Merging4Test(2, 6, 5, 0), + result: genBucket4Merging4Test(1, 6, 6, 0), + }, + { + left: genBucket4Merging4Test(3, 5, 3, 0), + right: genBucket4Merging4Test(2, 6, 4, 0), + result: genBucket4Merging4Test(2, 6, 5, 0), + }, + } + sc := mock.NewContext().GetSessionVars().StmtCtx + for i, tt := range tests { + res, err := mergeBucketNDV(sc, &tt.left, &tt.right) + require.NoError(t, err, "failed at #%Td case", i) + require.Equal(t, res.lower.GetInt64(), tt.result.lower.GetInt64(), "failed at #%Td case", i) + require.Equal(t, res.upper.GetInt64(), tt.result.upper.GetInt64(), "failed at #%Td case", i) + require.Equal(t, res.NDV, tt.result.NDV, "failed at #%Td case", i) + require.Equal(t, res.disjointNDV, tt.result.disjointNDV, "failed at #%Td case", i) + } +} + +func TestIndexQueryBytes(t *testing.T) { + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + idx := &Index{Info: &model.IndexInfo{Columns: []*model.IndexColumn{{Name: ast.NewCIStr("a"), Offset: 0}}}} + idx.Histogram = *NewHistogram(0, 15, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + low, err1 := codec.EncodeKey(sc.TimeZone(), nil, types.NewBytesDatum([]byte("0"))) + require.NoError(t, err1) + high, err2 := codec.EncodeKey(sc.TimeZone(), nil, types.NewBytesDatum([]byte("3"))) + require.NoError(t, err2) + idx.Bounds.AppendBytes(0, low) + idx.Bounds.AppendBytes(0, high) + idx.Buckets = append(idx.Buckets, Bucket{Repeat: 10, Count: 20, NDV: 20}) + idx.PreCalculateScalar() + idx.CMSketch = nil + // Count / NDV + require.Equal(t, idx.QueryBytes(nil, low), uint64(1)) + // Repeat + require.Equal(t, idx.QueryBytes(nil, high), uint64(10)) +} + +type histogramInputAndOutput struct { + inputHist *Histogram + inputHistToStr string + outputHistToStr string +} + +func TestStandardizeForV2AnalyzeIndex(t *testing.T) { + // 1. prepare expected input and output histograms (in string) + testData := []*histogramInputAndOutput{ + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 34567 upper_bound: 5 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 876 upper_bound: 876 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 990 upper_bound: 990 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 111 upper_bound: 111 repeats: 10 ndv: 1\n" + + "num: 12 lower_bound: 123 upper_bound: 34567 repeats: 4 ndv: 20\n" + + "num: 10 lower_bound: 5 upper_bound: 990 repeats: 6 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 111 upper_bound: 111 repeats: 10 ndv: 0\n" + + "num: 12 lower_bound: 123 upper_bound: 34567 repeats: 4 ndv: 0\n" + + "num: 10 lower_bound: 5 upper_bound: 990 repeats: 6 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 5 upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 876 upper_bound: 876 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 2\n" + + "num: 10 lower_bound: 95 upper_bound: 95 repeats: 3 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 95 upper_bound: 95 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 5 upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 876 upper_bound: 876 repeats: 3 ndv: 2\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 95 upper_bound: 95 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 876 upper_bound: 876 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 0", + }, + } + // 2. prepare the actual Histogram input + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + val0, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(111)) + require.NoError(t, err) + val1, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(123)) + require.NoError(t, err) + val2, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(34567)) + require.NoError(t, err) + val3, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(5)) + require.NoError(t, err) + val4, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(876)) + require.NoError(t, err) + val5, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(990)) + require.NoError(t, err) + val6, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(95)) + require.NoError(t, err) + val0Bytes := types.NewBytesDatum(val0) + val1Bytes := types.NewBytesDatum(val1) + val2Bytes := types.NewBytesDatum(val2) + val3Bytes := types.NewBytesDatum(val3) + val4Bytes := types.NewBytesDatum(val4) + val5Bytes := types.NewBytesDatum(val5) + val6Bytes := types.NewBytesDatum(val6) + hist0 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist0.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist0.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist0.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 10, 3, 2) + testData[0].inputHist = hist0 + hist1 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist1.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist1.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist1.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 0, 0, 0) + testData[1].inputHist = hist1 + hist2 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist2.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 10, 3, 2) + hist2.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 10, 0, 0) + hist2.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 10, 0, 0) + testData[2].inputHist = hist2 + hist3 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist3.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 10, 10, 1) + hist3.AppendBucketWithNDV(&val1Bytes, &val2Bytes, 22, 4, 20) + hist3.AppendBucketWithNDV(&val3Bytes, &val5Bytes, 32, 6, 2) + testData[3].inputHist = hist3 + hist4 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist4.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist4.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist4.AppendBucketWithNDV(&val2Bytes, &val2Bytes, 10, 3, 2) + hist4.AppendBucketWithNDV(&val3Bytes, &val3Bytes, 10, 0, 0) + hist4.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 10, 0, 0) + hist4.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 20, 3, 2) + hist4.AppendBucketWithNDV(&val6Bytes, &val6Bytes, 30, 3, 2) + testData[4].inputHist = hist4 + hist5 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist5.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist5.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist5.AppendBucketWithNDV(&val2Bytes, &val2Bytes, 10, 3, 2) + hist5.AppendBucketWithNDV(&val3Bytes, &val3Bytes, 10, 0, 0) + hist5.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 20, 3, 2) + hist5.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 30, 3, 2) + hist5.AppendBucketWithNDV(&val6Bytes, &val6Bytes, 30, 0, 0) + testData[5].inputHist = hist5 + + // 3. the actual test + for i, test := range testData { + require.Equal(t, test.inputHistToStr, test.inputHist.ToString(1)) + test.inputHist.StandardizeForV2AnalyzeIndex() + require.Equal(t, test.outputHistToStr, test.inputHist.ToString(1), + fmt.Sprintf("testData[%d].inputHist:%s", i, test.inputHistToStr)) + } +} + +func generateData(t *testing.T) *Histogram { + var data []*bucket4Test + sumCount := int64(0) + for n := 100; n < 10000; n = n + 100 { + sumCount += 100 + data = append(data, &bucket4Test{ + lower: int64(n), + upper: int64(n + 100), + count: sumCount, + repeat: 10, + ndv: 10, + }) + } + return genHist4Test(t, data, 0) +} + +func TestVerifyHistsBinarySearchRemoveValAndRemoveVals(t *testing.T) { + data1 := generateData(t) + data2 := generateData(t) + + require.Equal(t, data1, data2) + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + b, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(150)) + require.NoError(t, err) + tmp := TopNMeta{ + Encoded: b, + Count: 2, + } + data1.RemoveVals([]TopNMeta{tmp}) + data2.BinarySearchRemoveVal(tmp) + require.Equal(t, data1, data2) +} diff --git a/statistics/handle/globalstats/globalstats_internal_test.go b/statistics/handle/globalstats/globalstats_internal_test.go index ab88e12e48a68..8581a4c36942e 100644 --- a/statistics/handle/globalstats/globalstats_internal_test.go +++ b/statistics/handle/globalstats/globalstats_internal_test.go @@ -61,7 +61,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where is_index=0").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tint global c 0 0 5 2 1 4 0", // bucket.ndv is not maintained for column histograms - "test tint global c 0 1 12 2 17 17 0", + "test tint global c 0 1 12 2 11 17 0", "test tint p0 c 0 0 2 1 1 2 0", "test tint p0 c 0 1 3 1 3 3 0", "test tint p1 c 0 0 3 1 11 13 0", @@ -75,7 +75,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where is_index=1").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tint global c 1 0 5 2 1 4 0", // 4 is popped from p0.TopN, so g.ndv = p0.ndv+1 - "test tint global c 1 1 12 2 17 17 0", // same with the column's + "test tint global c 1 1 12 2 11 17 0", // same with the column's "test tint p0 c 1 0 2 1 1 2 0", "test tint p0 c 1 1 3 1 3 3 0", "test tint p1 c 1 0 3 1 11 13 0", @@ -120,7 +120,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdouble' and is_index=0 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdouble global c 0 0 5 2 1 4 0", // bucket.ndv is not maintained for column histograms - "test tdouble global c 0 1 12 2 17 17 0", + "test tdouble global c 0 1 12 2 11 17 0", "test tdouble p0 c 0 0 2 1 1 2 0", "test tdouble p0 c 0 1 3 1 3 3 0", "test tdouble p1 c 0 0 3 1 11 13 0", @@ -137,7 +137,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdouble' and is_index=1 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdouble global c 1 0 5 2 1 4 0", // 4 is popped from p0.TopN, so g.ndv = p0.ndv+1 - "test tdouble global c 1 1 12 2 17 17 0", + "test tdouble global c 1 1 12 2 11 17 0", "test tdouble p0 c 1 0 2 1 1 2 0", "test tdouble p0 c 1 1 3 1 3 3 0", "test tdouble p1 c 1 0 3 1 11 13 0", @@ -185,7 +185,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdecimal' and is_index=0 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdecimal global c 0 0 5 2 1.00 4.00 0", // bucket.ndv is not maintained for column histograms - "test tdecimal global c 0 1 12 2 17.00 17.00 0", + "test tdecimal global c 0 1 12 2 11.00 17.00 0", "test tdecimal p0 c 0 0 2 1 1.00 2.00 0", "test tdecimal p0 c 0 1 3 1 3.00 3.00 0", "test tdecimal p1 c 0 0 3 1 11.00 13.00 0", @@ -202,7 +202,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdecimal' and is_index=1 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdecimal global c 1 0 5 2 1.00 4.00 0", // 4 is popped from p0.TopN, so g.ndv = p0.ndv+1 - "test tdecimal global c 1 1 12 2 17.00 17.00 0", + "test tdecimal global c 1 1 12 2 11.00 17.00 0", "test tdecimal p0 c 1 0 2 1 1.00 2.00 0", "test tdecimal p0 c 1 1 3 1 3.00 3.00 0", "test tdecimal p1 c 1 0 3 1 11.00 13.00 0", @@ -250,7 +250,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdatetime' and is_index=0 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdatetime global c 0 0 5 2 2000-01-01 00:00:00 2000-01-04 00:00:00 0", // bucket.ndv is not maintained for column histograms - "test tdatetime global c 0 1 12 2 2000-01-17 00:00:00 2000-01-17 00:00:00 0", + "test tdatetime global c 0 1 12 2 2000-01-11 00:00:00 2000-01-17 00:00:00 0", "test tdatetime p0 c 0 0 2 1 2000-01-01 00:00:00 2000-01-02 00:00:00 0", "test tdatetime p0 c 0 1 3 1 2000-01-03 00:00:00 2000-01-03 00:00:00 0", "test tdatetime p1 c 0 0 3 1 2000-01-11 00:00:00 2000-01-13 00:00:00 0", @@ -267,7 +267,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tdatetime' and is_index=1 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tdatetime global c 1 0 5 2 2000-01-01 00:00:00 2000-01-04 00:00:00 0", // 4 is popped from p0.TopN, so g.ndv = p0.ndv+1 - "test tdatetime global c 1 1 12 2 2000-01-17 00:00:00 2000-01-17 00:00:00 0", + "test tdatetime global c 1 1 12 2 2000-01-11 00:00:00 2000-01-17 00:00:00 0", "test tdatetime p0 c 1 0 2 1 2000-01-01 00:00:00 2000-01-02 00:00:00 0", "test tdatetime p0 c 1 1 3 1 2000-01-03 00:00:00 2000-01-03 00:00:00 0", "test tdatetime p1 c 1 0 3 1 2000-01-11 00:00:00 2000-01-13 00:00:00 0", @@ -315,7 +315,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tstring' and is_index=0 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tstring global c 0 0 5 2 a1 a4 0", // bucket.ndv is not maintained for column histograms - "test tstring global c 0 1 12 2 b17 b17 0", + "test tstring global c 0 1 12 2 b11 b17 0", "test tstring p0 c 0 0 2 1 a1 a2 0", "test tstring p0 c 0 1 3 1 a3 a3 0", "test tstring p1 c 0 0 3 1 b11 b13 0", @@ -332,7 +332,7 @@ func testGlobalStats2(t *testing.T, tk *testkit.TestKit, dom *domain.Domain) { tk.MustQuery("show stats_buckets where table_name='tstring' and is_index=1 and column_name='c'").Check(testkit.Rows( // db, tbl, part, col, isIdx, bucketID, count, repeat, lower, upper, ndv "test tstring global c 1 0 5 2 a1 a4 0", // 4 is popped from p0.TopN, so g.ndv = p0.ndv+1 - "test tstring global c 1 1 12 2 b17 b17 0", + "test tstring global c 1 1 12 2 b11 b17 0", "test tstring p0 c 1 0 2 1 a1 a2 0", "test tstring p0 c 1 1 3 1 a3 a3 0", "test tstring p1 c 1 0 3 1 b11 b13 0", @@ -351,10 +351,18 @@ func testIssues24349(testKit *testkit.TestKit) { testKit.MustExec("create table t (a int, b int) partition by hash(a) partitions 3") testKit.MustExec("insert into t values (0, 3), (0, 3), (0, 3), (0, 2), (1, 1), (1, 2), (1, 2), (1, 2), (1, 3), (1, 4), (2, 1), (2, 1)") testKit.MustExec("analyze table t with 1 topn, 3 buckets") +<<<<<<< HEAD:statistics/handle/globalstats/globalstats_internal_test.go +======= + testKit.MustQuery("show stats_topn where partition_name = 'global'").Sort().Check(testkit.Rows( + "test t global a 0 1 6", + "test t global b 0 2 4", + )) + testKit.MustExec("explain select * from t where a > 0 and b > 0") +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/handle/globalstats/global_stats_internal_test.go testKit.MustQuery("show stats_buckets where partition_name='global'").Check(testkit.Rows( - "test t global a 0 0 2 2 0 2 0", - "test t global b 0 0 3 1 1 2 0", - "test t global b 0 1 10 1 4 4 0", + "test t global a 0 0 4 4 0 0 0", + "test t global a 0 1 6 2 2 2 0", + "test t global b 0 0 10 1 1 4 0", )) } diff --git a/statistics/histogram.go b/statistics/histogram.go index f1c50774ae578..b2fd743545dec 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -24,6 +24,7 @@ import ( "unsafe" "github.com/pingcap/errors" +<<<<<<< HEAD:statistics/histogram.go "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" @@ -38,6 +39,25 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/ranger" +======= + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/charset" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/planner/planctx" + "github.com/pingcap/tidb/pkg/planner/util/debugtrace" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/collate" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/ranger" +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go "github.com/pingcap/tipb/go-tipb" "github.com/twmb/murmur3" "go.uber.org/zap" @@ -1232,12 +1252,18 @@ func (b *bucket4Merging) Clone() bucket4Merging { // Before merging, you need to make sure that when using (upper, lower) as the comparison key, `right` is greater than `left` func mergeBucketNDV(sc *stmtctx.StatementContext, left *bucket4Merging, right *bucket4Merging) (*bucket4Merging, error) { res := right.Clone() - if left.NDV == 0 { + if left.Count == 0 { return &res, nil } +<<<<<<< HEAD:statistics/histogram.go if right.NDV == 0 { res.lower = left.lower.Clone() res.upper = left.upper.Clone() +======= + if right.Count == 0 { + left.lower.Copy(res.lower) + left.upper.Copy(res.upper) +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go res.NDV = left.NDV return &res, nil } @@ -1384,30 +1410,44 @@ func mergePartitionBuckets(sc *stmtctx.StatementContext, buckets []*bucket4Mergi return &res, nil } +<<<<<<< HEAD:statistics/histogram.go func (t *TopNMeta) buildBucket4Merging(d *types.Datum) *bucket4Merging { res := newBucket4Meging() res.lower = d.Clone() res.upper = d.Clone() +======= +func (t *TopNMeta) buildBucket4Merging(d *types.Datum, analyzeVer int) *bucket4Merging { + res := newbucket4MergingForRecycle() + d.Copy(res.lower) + d.Copy(res.upper) +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go res.Count = int64(t.Count) res.Repeat = int64(t.Count) - res.NDV = int64(1) + if analyzeVer <= Version2 { + res.NDV = 0 + } + failpoint.Inject("github.com/pingcap/pkg/statistics/enableTopNNDV", func(_ failpoint.Value) { + res.NDV = 1 + }) + intest.Assert(analyzeVer <= Version2) return res } // MergePartitionHist2GlobalHist merges hists (partition-level Histogram) to a global-level Histogram -func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histogram, popedTopN []TopNMeta, expBucketNumber int64, isIndex bool) (*Histogram, error) { - var totCount, totNull, bucketNumber, totColSize int64 +func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histogram, popedTopN []TopNMeta, expBucketNumber int64, isIndex bool, analyzeVer int) (*Histogram, error) { + var totCount, totNull, totColSize int64 + var bucketNumber int if expBucketNumber == 0 { return nil, errors.Errorf("expBucketNumber can not be zero") } - // minValue is used to calc the bucket lower. - var minValue *types.Datum for _, hist := range hists { totColSize += hist.TotColSize totNull += hist.NullCount - bucketNumber += int64(hist.Len()) - if hist.Len() > 0 { + histLen := hist.Len() + if histLen > 0 { + bucketNumber += histLen totCount += hist.Buckets[hist.Len()-1].Count +<<<<<<< HEAD:statistics/histogram.go if minValue == nil { minValue = hist.GetLower(0).Clone() continue @@ -1419,10 +1459,17 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog if res < 0 { minValue = hist.GetLower(0).Clone() } +======= +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go } } - bucketNumber += int64(len(popedTopN)) + // If all the hist and the topn is empty, return a empty hist. + if bucketNumber+len(popedTopN) == 0 { + return NewHistogram(hists[0].ID, 0, totNull, hists[0].LastUpdateVersion, hists[0].Tp, 0, totColSize), nil + } + + bucketNumber += len(popedTopN) buckets := make([]*bucket4Merging, 0, bucketNumber) globalBuckets := make([]*bucket4Merging, 0, expBucketNumber) @@ -1450,6 +1497,7 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog return nil, err } } +<<<<<<< HEAD:statistics/histogram.go if minValue == nil { minValue = d.Clone() continue @@ -1462,6 +1510,9 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog minValue = d.Clone() } buckets = append(buckets, meta.buildBucket4Merging(&d)) +======= + buckets = append(buckets, meta.buildBucket4Merging(&d, analyzeVer)) +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go } // Remove empty buckets @@ -1474,6 +1525,7 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog } buckets = buckets[:tail] +<<<<<<< HEAD:statistics/histogram.go var sortError error slices.SortFunc(buckets, func(i, j *bucket4Merging) int { res, err := i.upper.Compare(sc, j.upper, collate.GetBinaryCollator()) @@ -1491,19 +1543,45 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog }) if sortError != nil { return nil, sortError +======= + err := sortBucketsByUpperBound(sc.TypeCtx(), buckets) + if err != nil { + return nil, err +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go } var sum, prevSum int64 - r, prevR := len(buckets), 0 + r := len(buckets) bucketCount := int64(1) gBucketCountThreshold := (totCount / expBucketNumber) * 80 / 100 // expectedBucketSize * 0.8 - var bucketNDV int64 + mergeBuffer := make([]*bucket4Merging, 0, (len(buckets)+int(expBucketNumber)-1)/int(expBucketNumber)) + cutAndFixBuffer := make([]*bucket4Merging, 0, (len(buckets)+int(expBucketNumber))/int(expBucketNumber)) + var currentLeftMost *types.Datum for i := len(buckets) - 1; i >= 0; i-- { + if currentLeftMost == nil { + currentLeftMost = buckets[i].lower + } else { + res, err := currentLeftMost.Compare(sc.TypeCtx(), buckets[i].lower, collate.GetBinaryCollator()) + if err != nil { + return nil, err + } + if res > 0 { + currentLeftMost = buckets[i].lower + } + } sum += buckets[i].Count - bucketNDV += buckets[i].NDV if sum >= totCount*bucketCount/expBucketNumber && sum-prevSum >= gBucketCountThreshold { +<<<<<<< HEAD:statistics/histogram.go for ; i > 0; i-- { // if the buckets have the same upper, we merge them into the same new buckets. res, err := buckets[i-1].upper.Compare(sc, buckets[i].upper, collate.GetBinaryCollator()) +======= + // If the buckets have the same upper, we merge them into the same new buckets. + // We don't need to update the currentLeftMost in the for loop because the leftmost bucket's lower + // will be the smallest when their upper is the same. + // We just need to update it after the for loop. + for ; i > 0; i-- { + res, err := buckets[i-1].upper.Compare(sc.TypeCtx(), buckets[i].upper, collate.GetBinaryCollator()) +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go if err != nil { return nil, err } @@ -1511,42 +1589,154 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog break } sum += buckets[i-1].Count - bucketNDV += buckets[i-1].NDV } - merged, err := mergePartitionBuckets(sc, buckets[i:r]) + res, err := currentLeftMost.Compare(sc.TypeCtx(), buckets[i].lower, collate.GetBinaryCollator()) if err != nil { return nil, err } + if res > 0 { + currentLeftMost = buckets[i].lower + } + + // Iterate possible overlapped ones. + // We need to re-sort this part. + mergeBuffer = mergeBuffer[:0] + cutAndFixBuffer = cutAndFixBuffer[:0] + leftMostValidPosForNonOverlapping := i + for ; i > 0; i-- { + res, err := buckets[i-1].upper.Compare(sc.TypeCtx(), currentLeftMost, collate.GetBinaryCollator()) + if err != nil { + return nil, err + } + // If buckets[i-1].upper < currentLeftMost, this bucket has no overlap with current merging one. Break it. + if res < 0 { + break + } + // Now the bucket[i-1].upper >= currentLeftMost, they are overlapped. + res, err = buckets[i-1].lower.Compare(sc.TypeCtx(), currentLeftMost, collate.GetBinaryCollator()) + if err != nil { + return nil, err + } + // If buckets[i-1].lower >= currentLeftMost, this bucket is totally inside. So it can be totally merged. + if res >= 0 { + sum += buckets[i-1].Count + mergeBuffer = append(mergeBuffer, buckets[i-1]) + continue + } + // Now buckets[i-1].lower < currentLeftMost < buckets[i-1].upper + // calcFraction4Datums calc the value: (currentLeftMost - lower_bound) / (upper_bound - lower_bound) + overlapping := 1 - calcFraction4Datums(buckets[i-1].lower, buckets[i-1].upper, currentLeftMost) + overlappedCount := int64(float64(buckets[i-1].Count) * overlapping) + overlappedNDV := int64(float64(buckets[i-1].NDV) * overlapping) + sum += overlappedCount + buckets[i-1].Count -= overlappedCount + buckets[i-1].NDV -= overlappedNDV + buckets[i-1].Repeat = 0 + if buckets[i-1].NDV < 0 { + buckets[i-1].NDV = 0 + } + if buckets[i-1].Count < 0 { + buckets[i-1].Count = 0 + } + + // Cut it. + cutBkt := newbucket4MergingForRecycle() + buckets[i-1].upper.Copy(cutBkt.upper) + currentLeftMost.Copy(cutBkt.lower) + currentLeftMost.Copy(buckets[i-1].upper) + cutBkt.Count = overlappedCount + cutBkt.NDV = overlappedNDV + mergeBuffer = append(mergeBuffer, cutBkt) + cutAndFixBuffer = append(cutAndFixBuffer, cutBkt) + } + var merged *bucket4Merging + if len(cutAndFixBuffer) == 0 { + merged, err = mergePartitionBuckets(sc, buckets[i:r]) + if err != nil { + return nil, err + } + } else { + // The content in the merge buffer don't need a re-sort since we just fix some lower bound for them. + mergeBuffer = append(mergeBuffer, buckets[leftMostValidPosForNonOverlapping:r]...) + merged, err = mergePartitionBuckets(sc, mergeBuffer) + if err != nil { + return nil, err + } + for _, bkt := range cutAndFixBuffer { + releasebucket4MergingForRecycle(bkt) + } + // The buckets in buckets[i:origI] needs a re-sort. + err = sortBucketsByUpperBound(sc.TypeCtx(), buckets[i:leftMostValidPosForNonOverlapping]) + if err != nil { + return nil, err + } + // After the operation, the buckets in buckets[i:origI] contains two kinds of buckets: + // 1. The buckets that are totally inside the merged bucket. => lower_bound >= currentLeftMost + // It's not changed. [lower_bound_i, upper_bound_i] with lower_bound_i >= currentLeftMost + // 2. The buckets that are overlapped with the merged bucket. lower_bound < currentLeftMost < upper_bound + // After cutting, the remained part is [lower_bound_i, currentLeftMost] + // To do the next round of merging, we need to kick out the 1st kind of buckets. + // And after the re-sort, the 2nd kind of buckets will be in the front. + leftMostInvalidPosForNextRound := leftMostValidPosForNonOverlapping + for ; leftMostInvalidPosForNextRound > i; leftMostInvalidPosForNextRound-- { + res, err := buckets[leftMostInvalidPosForNextRound-1].lower.Compare(sc.TypeCtx(), currentLeftMost, collate.GetBinaryCollator()) + if err != nil { + return nil, err + } + // Once the lower bound < currentLeftMost, we've skipped all the 1st kind of bucket. + // We can break here. + if res < 0 { + break + } + } + intest.AssertFunc(func() bool { + for j := i; j < leftMostInvalidPosForNextRound; j++ { + res, err := buckets[j].upper.Compare(sc.TypeCtx(), currentLeftMost, collate.GetBinaryCollator()) + if err != nil { + return false + } + if res != 0 { + return false + } + } + return true + }, "the buckets are not sorted actually") + i = leftMostInvalidPosForNextRound + } + currentLeftMost.Copy(merged.lower) + currentLeftMost = nil globalBuckets = append(globalBuckets, merged) - prevR = r r = i bucketCount++ prevSum = sum - bucketNDV = 0 } } if r > 0 { - bucketSum := int64(0) - for _, b := range buckets[:r] { - bucketSum += b.Count - } - - if len(globalBuckets) > 0 && bucketSum < gBucketCountThreshold { // merge them into the previous global bucket - r = prevR - globalBuckets = globalBuckets[:len(globalBuckets)-1] + leftMost := buckets[0].lower + for i, b := range buckets[:r] { + if i == 0 { + continue + } + res, err := leftMost.Compare(sc.TypeCtx(), b.lower, collate.GetBinaryCollator()) + if err != nil { + return nil, err + } + if res > 0 { + leftMost = b.lower + } } merged, err := mergePartitionBuckets(sc, buckets[:r]) if err != nil { return nil, err } + leftMost.Copy(merged.lower) globalBuckets = append(globalBuckets, merged) } // Because we merge backwards, we need to flip the slices. - for i, j := 0, len(globalBuckets)-1; i < j; i, j = i+1, j-1 { - globalBuckets[i], globalBuckets[j] = globalBuckets[j], globalBuckets[i] - } + slices.Reverse(globalBuckets) +<<<<<<< HEAD:statistics/histogram.go // Calc the bucket lower. if minValue == nil || len(globalBuckets) == 0 { // both hists and popedTopN are empty, returns an empty hist in this case return NewHistogram(hists[0].ID, 0, totNull, hists[0].LastUpdateVersion, hists[0].Tp, len(globalBuckets), totColSize), nil @@ -1558,6 +1748,9 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog } else { globalBuckets[i].lower = globalBuckets[i-1].upper.Clone() } +======= + for i := 1; i < len(globalBuckets); i++ { +>>>>>>> a5d2d28d017 (statistics: fix the potential error when merging global stats (#52218)):pkg/statistics/histogram.go globalBuckets[i].Count = globalBuckets[i].Count + globalBuckets[i-1].Count } @@ -1584,6 +1777,27 @@ func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histog return globalHist, nil } +// sortBucketsByUpperBound the bucket by upper bound first, then by lower bound. +// If bkt[i].upper = bkt[i+1].upper, then we'll get bkt[i].lower < bkt[i+1].lower. +func sortBucketsByUpperBound(ctx types.Context, buckets []*bucket4Merging) error { + var sortError error + slices.SortFunc(buckets, func(i, j *bucket4Merging) int { + res, err := i.upper.Compare(ctx, j.upper, collate.GetBinaryCollator()) + if err != nil { + sortError = err + } + if res != 0 { + return res + } + res, err = i.lower.Compare(ctx, j.lower, collate.GetBinaryCollator()) + if err != nil { + sortError = err + } + return res + }) + return sortError +} + const ( allLoaded = iota onlyCmsEvicted diff --git a/statistics/histogram_bench_test.go b/statistics/histogram_bench_test.go index 1a4b8e3db9746..b7bfd98785a83 100644 --- a/statistics/histogram_bench_test.go +++ b/statistics/histogram_bench_test.go @@ -89,7 +89,7 @@ func benchmarkMergePartitionHist2GlobalHist(b *testing.B, partition int) { poped = append(poped, tmp) } b.StartTimer() - MergePartitionHist2GlobalHist(sc, hists, poped, expBucketNumber, true) + MergePartitionHist2GlobalHist(sc, hists, poped, expBucketNumber, true, Version2) b.StopTimer() }