Skip to content

Commit

Permalink
fix: make new pq workable
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <29879298+Rustin170506@users.noreply.github.com>
  • Loading branch information
Rustin170506 committed Sep 18, 2024
1 parent e46cfd2 commit 650c10c
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 15 deletions.
18 changes: 18 additions & 0 deletions pkg/infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,24 @@ func (is *infoSchema) TableByID(_ stdctx.Context, id int64) (val table.Table, ok
return slice[idx], true
}

func (is *infoSchema) SchemaNameByTableID(id int64) (val pmodel.CIStr, ok bool) {
if !tableIDIsValid(id) {
return
}

slice := is.sortedTablesBuckets[tableBucketIdx(id)]
idx := slice.searchTable(id)
if idx == -1 {
return
}
dbId := slice[idx].Meta().DBID
db, ok := is.SchemaByID(dbId)
if !ok {
return
}
return db.Name, true
}

// TableInfoByID implements InfoSchema.TableInfoByID
func (is *infoSchema) TableInfoByID(id int64) (*model.TableInfo, bool) {
tbl, ok := is.TableByID(stdctx.Background(), id)
Expand Down
14 changes: 14 additions & 0 deletions pkg/infoschema/infoschema_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ func (is *infoschemaV2) TableByID(ctx context.Context, id int64) (val table.Tabl
return ret, true
}

func (is *infoschemaV2) SchemaNameByTableID(id int64) (val pmodel.CIStr, ok bool) {
if !tableIDIsValid(id) {
return
}

eq := func(a, b *tableItem) bool { return a.tableID == b.tableID }
itm, ok := search(is.byID, is.infoSchema.schemaMetaVersion, tableItem{tableID: id, schemaVersion: math.MaxInt64}, eq)
if !ok {
return pmodel.CIStr{}, false
}

return itm.dbName, true
}

// TableItem is exported from tableItem.
type TableItem struct {
DBName pmodel.CIStr
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type InfoSchema interface {
context.MetaOnlyInfoSchema
TableByName(ctx stdctx.Context, schema, table pmodel.CIStr) (table.Table, error)
TableByID(ctx stdctx.Context, id int64) (table.Table, bool)
SchemaNameByTableID(id int64) (pmodel.CIStr, bool)
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo, *model.PartitionDefinition)
ListTablesWithSpecialAttribute(filter specialAttributeFilter) []tableInfoResult
base() *infoSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/timeutil",
"@com_github_pkg_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
27 changes: 16 additions & 11 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue2.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (pq *AnalysisPriorityQueueV2) init() error {
if err := pq.setAutoAnalysisTimeWindow(sctx); err != nil {
return err
}
if !pq.isWithinTimeWindow() {
if !pq.IsWithinTimeWindow() {
return nil
}
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
Expand All @@ -108,7 +108,7 @@ func (pq *AnalysisPriorityQueueV2) init() error {
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !pq.isWithinTimeWindow() {
if !pq.IsWithinTimeWindow() {
return nil
}
// Ignore the memory and system database.
Expand Down Expand Up @@ -208,7 +208,7 @@ func (pq *AnalysisPriorityQueueV2) init() error {
}

func (pq *AnalysisPriorityQueueV2) run() {
dmlFetchInterval := time.NewTicker(time.Minute * 5)
dmlFetchInterval := time.NewTicker(time.Second * 10)
defer dmlFetchInterval.Stop()
timeRefreshInterval := time.NewTicker(time.Minute * 10)
defer timeRefreshInterval.Stop()
Expand All @@ -232,7 +232,7 @@ func (pq *AnalysisPriorityQueueV2) fetchDMLUpdate() {
if err := pq.setAutoAnalysisTimeWindow(sctx); err != nil {
return err
}
if !pq.isWithinTimeWindow() {
if !pq.IsWithinTimeWindow() {
return nil
}
values := pq.statsHandle.Values()
Expand Down Expand Up @@ -283,7 +283,12 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())

is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
job, ok, _ := pq.inner.GetByKey(stats.PhysicalID)
schemaName, ok := is.SchemaNameByTableID(stats.PhysicalID)
if !ok {
statslogutil.StatsLogger().Warn("schema not found for table id", zap.Int64("table_id", stats.PhysicalID))
return nil
}
job, ok, _ = pq.inner.GetByKey(stats.PhysicalID)
if !ok {
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
tableMeta := tableInfo.Meta()
Expand All @@ -294,7 +299,7 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
partitionedTable := tableMeta.GetPartitionInfo()
if partitionedTable == nil {
job = jobFactory.CreateNonPartitionedTableAnalysisJob(
tableMeta.Name.O,
schemaName.O,
tableMeta,
stats,
)
Expand All @@ -309,7 +314,7 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
}
}
job = jobFactory.CreateStaticPartitionAnalysisJob(
tableMeta.Name.O,
schemaName.O,
tableMeta,
partitionDef.ID,
partitionDef.Name.O,
Expand All @@ -318,7 +323,7 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
} else {
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, partitionDefs)
job = jobFactory.CreateDynamicPartitionedTableAnalysisJob(
tableMeta.Name.O,
schemaName.O,
tableMeta,
stats,
partitionStats,
Expand All @@ -339,7 +344,7 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
partitionDefs := partitionedTable.Definitions
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, partitionDefs)
job = jobFactory.CreateDynamicPartitionedTableAnalysisJob(
tableMeta.Name.O,
schemaName.O,
tableMeta,
stats,
partitionStats,
Expand All @@ -359,7 +364,7 @@ func (pq *AnalysisPriorityQueueV2) handleTableStats(stats *statistics.Table) {
}

func (pq *AnalysisPriorityQueueV2) refreshTime() {
if !pq.isWithinTimeWindow() {
if !pq.IsWithinTimeWindow() {
return
}
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
Expand Down Expand Up @@ -428,7 +433,7 @@ func (pq *AnalysisPriorityQueueV2) setAutoAnalysisTimeWindow(sctx sessionctx.Con
return nil
}

func (pq *AnalysisPriorityQueueV2) isWithinTimeWindow() bool {
func (pq *AnalysisPriorityQueueV2) IsWithinTimeWindow() bool {
window := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow)
return window.IsWithinTimeWindow(time.Now())
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

// useAnalysisPriorityQueueV2 is a constant that controls whether to use the new priority queue implementation.
const useAnalysisPriorityQueueV2 = true
const useAnalysisPriorityQueueV2 = false

// Refresher provides methods to refresh stats info.
// NOTE: Refresher is not thread-safe.
Expand Down Expand Up @@ -90,8 +90,14 @@ func (r *Refresher) UpdateConcurrency() {

// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return false
if useAnalysisPriorityQueueV2 {
if !r.jobsV2.IsWithinTimeWindow() {
return false
}
} else {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return false
}
}

se, err := r.statsHandle.SPool().Get()
Expand Down Expand Up @@ -129,6 +135,7 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool {
statslogutil.StatsLogger().Debug("Job already running, skipping", zap.Int64("tableID", job.GetTableID()))
continue
}
statslogutil.StatsLogger().Info("job is valid to analyze", zap.Stringer("job", job))
if valid, failReason := job.IsValidToAnalyze(sctx); !valid {
statslogutil.SingletonStatsSamplerLogger().Info(
"Table not ready for analysis",
Expand Down

0 comments on commit 650c10c

Please sign in to comment.