Skip to content

Commit

Permalink
statistics: fix some problem related to stats async load (#57723) (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Dec 5, 2024
1 parent 238b0d8 commit 6aaadc9
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":statistics"],
flaky = True,
shard_count = 37,
shard_count = 38,
deps = [
"//pkg/config",
"//pkg/parser/ast",
Expand Down
10 changes: 10 additions & 0 deletions pkg/statistics/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,13 @@ func (c *Column) StatsAvailable() bool {
// StatsVer, so we check NDV > 0 || NullCount > 0 for the case.
return c.IsAnalyzed() || c.NDV > 0 || c.NullCount > 0
}

// EmptyColumn creates an empty column object. It may be used for pseudo estimation or to stop loading unexisting stats.
func EmptyColumn(tid int64, pkIsHandle bool, colInfo *model.ColumnInfo) *Column {
return &Column{
PhysicalID: tid,
Info: colInfo,
Histogram: *NewHistogram(colInfo.ID, 0, 0, 0, &colInfo.FieldType, 0, 0),
IsHandle: pkIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
}
}
52 changes: 38 additions & 14 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -515,7 +516,6 @@ func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *
table.RealtimeCount = realtimeCount

rows, _, err := util.ExecRows(sctx, "select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", tableID)
// Check deleted table.
if err != nil || len(rows) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -563,7 +563,7 @@ func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID i
}

// LoadNeededHistograms will load histograms for those needed columns/indices.
func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, loadFMSketch bool) (err error) {
func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsHandle, loadFMSketch bool) (err error) {
items := statistics.HistogramNeededItems.AllItems()
for _, item := range items {
if !item.IsIndex {
Expand Down Expand Up @@ -606,18 +606,42 @@ func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
return reallyNeeded
}

func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, col model.TableItemID, loadFMSketch bool, fullLoad bool) (err error) {
tbl, ok := statsCache.Get(col.TableID)
func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.StatsHandle, col model.TableItemID, loadFMSketch bool, fullLoad bool) (err error) {
statsTbl, ok := statsCache.Get(col.TableID)
if !ok {
return nil
}
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tbl, ok := statsCache.TableInfoByID(is, col.TableID)
if !ok {
return nil
}
tblInfo := tbl.Meta()
var colInfo *model.ColumnInfo
_, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(col.ID, true)
_, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(col.ID, true)
for _, ci := range tblInfo.Columns {
if col.ID == ci.ID {
colInfo = ci
break
}
}
if colInfo == nil {
statistics.HistogramNeededItems.Delete(col)
return nil
}
if !loadNeeded || !analyzed {
// If this column is not analyzed yet and we don't have it in memory.
// We create a fake one for the pseudo estimation.
// Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed.
if loadNeeded && !analyzed {
fakeCol := statistics.EmptyColumn(colInfo.ID, tblInfo.PKIsHandle, colInfo)
statsTbl.Columns[col.ID] = fakeCol
statsCache.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
}
statistics.HistogramNeededItems.Delete(col)
return nil
}
colInfo = tbl.ColAndIdxExistenceMap.GetCol(col.ID)

hg, _, statsVer, _, err := HistMetaFromStorageWithHighPriority(sctx, &col, colInfo)
if hg == nil || err != nil {
statistics.HistogramNeededItems.Delete(col)
Expand Down Expand Up @@ -651,29 +675,29 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
CMSketch: cms,
TopN: topN,
FMSketch: fms,
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
IsHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()),
StatsVer: statsVer,
}
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
tbl, ok = statsCache.Get(col.TableID)
statsTbl, ok = statsCache.Get(col.TableID)
if !ok {
return nil
}
tbl = tbl.Copy()
statsTbl = statsTbl.Copy()
if colHist.StatsAvailable() {
if fullLoad {
colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
} else {
colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, colHist.LastUpdateVersion)
if statsVer != statistics.Version0 {
tbl.StatsVer = int(statsVer)
statsTbl.LastAnalyzeVersion = max(statsTbl.LastAnalyzeVersion, colHist.LastUpdateVersion)
statsTbl.StatsVer = int(statsVer)
}
}
tbl.Columns[col.ID] = colHist
statsCache.UpdateStatsCache([]*statistics.Table{tbl}, nil)
statsTbl.Columns[col.ID] = colHist
statsCache.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
statistics.HistogramNeededItems.Delete(col)
if col.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down Expand Up @@ -728,9 +752,9 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
tbl = tbl.Copy()
if idxHist.StatsVer != statistics.Version0 {
tbl.StatsVer = int(idxHist.StatsVer)
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion)
}
tbl.Indices[idx.ID] = idxHist
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion)
statsCache.UpdateStatsCache([]*statistics.Table{tbl}, nil)
if idx.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down
8 changes: 2 additions & 6 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,9 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
}
// If this column is not analyzed yet and we don't have it in memory.
// We create a fake one for the pseudo estimation.
// Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed.
if loadNeeded && !analyzed {
wrapper.col = &statistics.Column{
PhysicalID: item.TableID,
Info: wrapper.colInfo,
Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0),
IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()),
}
wrapper.col = statistics.EmptyColumn(item.TableID, tbl.IsPkIsHandle, wrapper.colInfo)
s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad)
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/statistics/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,25 @@ func TestTableLastAnalyzeVersion(t *testing.T) {
require.True(t, found)
require.NotEqual(t, uint64(0), statsTbl.LastAnalyzeVersion)
}

func TestLastAnalyzeVersionNotChangedWithAsyncStatsLoad(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@tidb_stats_load_sync_wait = 0;")
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int);")
require.NoError(t, dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()))
require.NoError(t, dom.StatsHandle().Update(dom.InfoSchema()))
tk.MustExec("insert into t values (1, 1);")
err := dom.StatsHandle().DumpStatsDeltaToKV(true)
require.NoError(t, err)
tk.MustExec("alter table t add column c int default 1;")
dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())
tk.MustExec("select * from t where a = 1 or b = 1 or c = 1;")
require.NoError(t, dom.StatsHandle().LoadNeededHistograms())
result := tk.MustQuery("show stats_meta where table_name = 't'")
require.Len(t, result.Rows(), 1)
// The last analyze time.
require.Equal(t, "<nil>", result.Rows()[0][6])
}
2 changes: 1 addition & 1 deletion pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func (t *Table) GetStatsHealthy() (int64, bool) {
}

// ColumnIsLoadNeeded checks whether the column needs trigger the async/sync load.
// The Column should be visible in the table and really has analyzed statistics in the stroage.
// The Column should be visible in the table and really has analyzed statistics in the storage.
// Also, if the stats has been loaded into the memory, we also don't need to load it.
// We return the Column together with the checking result, to avoid accessing the map multiple times.
// The first bool is whether we have it in memory. The second bool is whether this column has stats in the system table or not.
Expand Down

0 comments on commit 6aaadc9

Please sign in to comment.