Skip to content

Commit

Permalink
statistics: handle the prune mode correctly in the refresher (#57096) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 7, 2024
1 parent 803b0ff commit df1eedb
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 78 deletions.
13 changes: 13 additions & 0 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -168,6 +169,18 @@ TASKLOOP:
}
}

if intest.InTest {
for {
stop := true
failpoint.Inject("mockStuckAnalyze", func() {
stop = false
})
if stop {
break
}
}
}

// Update analyze options to mysql.analyze_options for auto analyze.
err = e.saveV2AnalyzeOpts()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,9 @@ func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
// During the test, we need to fetch all DML changes before analyzing the highest priority tables.
if intest.InTest {
sa.refresher.ProcessDMLChangesForTest()
sa.refresher.RequeueFailedJobsForTest()
sa.refresher.RequeueMustRetryJobsForTest()
}
analyzed := sa.refresher.AnalyzeHighestPriorityTables()
analyzed := sa.refresher.AnalyzeHighestPriorityTables(sctx)
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
Expand Down Expand Up @@ -73,6 +74,7 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
Expand Down
109 changes: 73 additions & 36 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

Expand All @@ -40,9 +41,15 @@ const notInitializedErrMsg = "priority queue not initialized"
const (
lastAnalysisDurationRefreshInterval = time.Minute * 10
dmlChangesFetchInterval = time.Minute * 2
failedJobRequeueInterval = time.Minute * 5
mustRetryJobRequeueInterval = time.Minute * 5
)

// If the process takes longer than this threshold, we will log it as a slow log.
const slowLogThreshold = 150 * time.Millisecond

// Every 15 minutes, at most 1 log will be output.
var queueSamplerLogger = logutil.SampleLoggerFactory(15*time.Minute, 1, zap.String(logutil.LogFieldCategory, "stats"))

// pqHeap is an interface that wraps the methods of a priority queue heap.
type pqHeap interface {
// getByKey returns the job by the given table ID.
Expand Down Expand Up @@ -90,8 +97,13 @@ type AnalysisPriorityQueue struct {
runningJobs map[int64]struct{}
// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
lastDMLUpdateFetchTimestamp uint64
// failedJobs is a slice to store the failed jobs.
failedJobs map[int64]struct{}
// mustRetryJobs is a slice to store the must retry jobs.
// For now, we have two types of jobs:
// 1. The jobs that failed to be executed. We have to try it later.
// 2. The jobs failed to enqueue due to the ongoing analysis,
// particularly for tables with new indexes created during this process.
// We will requeue the must retry jobs periodically.
mustRetryJobs map[int64]struct{}
// initialized is a flag to check if the queue is initialized.
initialized bool
}
Expand Down Expand Up @@ -142,7 +154,7 @@ func (pq *AnalysisPriorityQueue) Initialize() error {
pq.ctx = ctx
pq.syncFields.cancel = cancel
pq.syncFields.runningJobs = make(map[int64]struct{})
pq.syncFields.failedJobs = make(map[int64]struct{})
pq.syncFields.mustRetryJobs = make(map[int64]struct{})
pq.syncFields.initialized = true
pq.syncFields.mu.Unlock()

Expand Down Expand Up @@ -298,23 +310,23 @@ func (pq *AnalysisPriorityQueue) run() {
defer dmlChangesFetchInterval.Stop()
timeRefreshInterval := time.NewTicker(lastAnalysisDurationRefreshInterval)
defer timeRefreshInterval.Stop()
failedJobRequeueInterval := time.NewTicker(failedJobRequeueInterval)
defer failedJobRequeueInterval.Stop()
mustRetryJobRequeueInterval := time.NewTicker(mustRetryJobRequeueInterval)
defer mustRetryJobRequeueInterval.Stop()

for {
select {
case <-pq.ctx.Done():
statslogutil.StatsLogger().Info("Priority queue stopped")
return
case <-dmlChangesFetchInterval.C:
statslogutil.StatsLogger().Info("Start to fetch DML changes of jobs")
queueSamplerLogger().Info("Start to fetch DML changes of tables")
pq.ProcessDMLChanges()
case <-timeRefreshInterval.C:
statslogutil.StatsLogger().Info("Start to refresh last analysis durations of jobs")
queueSamplerLogger().Info("Start to refresh last analysis durations of jobs")
pq.RefreshLastAnalysisDuration()
case <-failedJobRequeueInterval.C:
statslogutil.StatsLogger().Info("Start to request failed jobs")
pq.RequeueFailedJobs()
case <-mustRetryJobRequeueInterval.C:
queueSamplerLogger().Info("Start to requeue must retry jobs")
pq.RequeueMustRetryJobs()
}
}
}
Expand All @@ -329,7 +341,10 @@ func (pq *AnalysisPriorityQueue) ProcessDMLChanges() {
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("DML changes processed", zap.Duration("duration", time.Since(start)))
duration := time.Since(start)
if duration > slowLogThreshold {
queueSamplerLogger().Info("DML changes processed", zap.Duration("duration", duration))
}
}()

parameters := exec.GetAutoAnalyzeParameters(sctx)
Expand Down Expand Up @@ -596,23 +611,27 @@ func (pq *AnalysisPriorityQueue) GetLastFetchTimestamp() uint64 {
return pq.syncFields.lastDMLUpdateFetchTimestamp
}

// RequeueFailedJobs requeues the failed jobs.
func (pq *AnalysisPriorityQueue) RequeueFailedJobs() {
// RequeueMustRetryJobs requeues the must retry jobs.
func (pq *AnalysisPriorityQueue) RequeueMustRetryJobs() {
pq.syncFields.mu.Lock()
defer pq.syncFields.mu.Unlock()

if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("Failed jobs requeued", zap.Duration("duration", time.Since(start)))
duration := time.Since(start)
if duration > slowLogThreshold {
queueSamplerLogger().Info("Must retry jobs requeued", zap.Duration("duration", duration))
}
}()

is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
for tableID := range pq.syncFields.failedJobs {
delete(pq.syncFields.failedJobs, tableID)
for tableID := range pq.syncFields.mustRetryJobs {
// Note: Delete the job first to ensure it can be added back to the queue
delete(pq.syncFields.mustRetryJobs, tableID)
tblInfo, ok := pq.statsHandle.TableInfoByID(is, tableID)
if !ok {
statslogutil.StatsLogger().Warn("Table info not found during requeueing failed jobs", zap.Int64("tableID", tableID))
statslogutil.StatsLogger().Warn("Table info not found during requeueing must retry jobs", zap.Int64("tableID", tableID))
continue
}
err := pq.recreateAndPushJobForTable(sctx, tblInfo.Meta())
Expand All @@ -623,7 +642,7 @@ func (pq *AnalysisPriorityQueue) RequeueFailedJobs() {
}
return nil
}, statsutil.FlagWrapTxn); err != nil {
statslogutil.StatsLogger().Error("Failed to requeue failed jobs", zap.Error(err))
statslogutil.StatsLogger().Error("Failed to requeue must retry jobs", zap.Error(err))
}
}

Expand All @@ -636,7 +655,10 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() {
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("Last analysis duration refreshed", zap.Duration("duration", time.Since(start)))
duration := time.Since(start)
if duration > slowLogThreshold {
queueSamplerLogger().Info("Last analysis duration refreshed", zap.Duration("duration", duration))
}
}()
jobs := pq.syncFields.inner.list()
currentTs, err := statsutil.GetStartTS(sctx)
Expand Down Expand Up @@ -707,17 +729,13 @@ func (pq *AnalysisPriorityQueue) pushWithoutLock(job AnalysisJob) error {
if job == nil {
return nil
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := pq.calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
// Skip the must retry jobs.
// Avoiding requeueing the must retry jobs before the next must retry job requeue interval.
// Otherwise, we may requeue the same job multiple times in a short time.
if _, ok := pq.syncFields.mustRetryJobs[job.GetTableID()]; ok {
return nil
}
job.SetWeight(weight)

// Skip the current running jobs.
// Safety:
// Let's say we have a job in the priority queue, and it is already running.
Expand All @@ -727,14 +745,23 @@ func (pq *AnalysisPriorityQueue) pushWithoutLock(job AnalysisJob) error {
// In this process, we will not miss any DML changes of the table. Because when we try to delete the table from the current running jobs,
// we guarantee that the job is finished and the stats cache is updated.(The last step of the analysis job is to update the stats cache).
if _, ok := pq.syncFields.runningJobs[job.GetTableID()]; ok {
// Mark the job as must retry.
// Because potentially the job can be analyzed in the near future.
// For example, the table has new indexes added when the job is running.
pq.syncFields.mustRetryJobs[job.GetTableID()] = struct{}{}
return nil
}
// Skip the failed jobs.
// Avoiding requeueing the failed jobs before the next failed job requeue interval.
// Otherwise, we may requeue the same job multiple times in a short time.
if _, ok := pq.syncFields.failedJobs[job.GetTableID()]; ok {
return nil
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := pq.calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
return pq.syncFields.inner.addOrUpdate(job)
}

Expand Down Expand Up @@ -763,7 +790,7 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) {
defer pq.syncFields.mu.Unlock()
// Mark the job as failed and remove it from the running jobs.
delete(pq.syncFields.runningJobs, j.GetTableID())
pq.syncFields.failedJobs[j.GetTableID()] = struct{}{}
pq.syncFields.mustRetryJobs[j.GetTableID()] = struct{}{}
})
return job, nil
}
Expand Down Expand Up @@ -817,4 +844,14 @@ func (pq *AnalysisPriorityQueue) Close() {
pq.syncFields.cancel()
}
pq.wg.Wait()

// Reset the initialized flag to allow the priority queue to be closed and re-initialized.
pq.syncFields.initialized = false
// The rest fields will be reset when the priority queue is initialized.
// But we do it here for double safety.
pq.syncFields.inner = nil
pq.syncFields.runningJobs = nil
pq.syncFields.mustRetryJobs = nil
pq.syncFields.lastDMLUpdateFetchTimestamp = 0
pq.syncFields.cancel = nil
}
Loading

0 comments on commit df1eedb

Please sign in to comment.