From b71f0c079679bc08f2cfed4d5e69dd5a5eae8993 Mon Sep 17 00:00:00 2001 From: Rustin Date: Tue, 5 Nov 2024 19:16:06 +0800 Subject: [PATCH] statistics: handle the prune mode correctly in the refresher (#57096) ref pingcap/tidb#55906 --- .../handle/autoanalyze/autoanalyze.go | 2 +- .../autoanalyze/priorityqueue/queue_test.go | 44 ++++++-- .../handle/autoanalyze/refresher/BUILD.bazel | 3 +- .../handle/autoanalyze/refresher/refresher.go | 20 ++-- .../autoanalyze/refresher/refresher_test.go | 105 ++++++++++++++++-- .../handle/updatetest/update_test.go | 4 +- 6 files changed, 143 insertions(+), 35 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index e6f4daf2f99a1..45d541640b366 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -333,7 +333,7 @@ func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool { sa.refresher.ProcessDMLChangesForTest() sa.refresher.RequeueFailedJobsForTest() } - analyzed := sa.refresher.AnalyzeHighestPriorityTables() + analyzed := sa.refresher.AnalyzeHighestPriorityTables(sctx) // During the test, we need to wait for the auto analyze job to be finished. if intest.InTest { sa.refresher.WaitAutoAnalyzeFinishedForTest() diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index ad2f6ceb872c1..0b667fb19d92b 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -167,21 +167,37 @@ func TestRefreshLastAnalysisDuration(t *testing.T) { require.Len(t, runningJobs, 2) } -func TestProcessDMLChanges(t *testing.T) { +func testProcessDMLChanges(t *testing.T, partitioned bool) { store, dom := testkit.CreateMockStoreAndDomain(t) handle := dom.StatsHandle() tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t1 (a int)") - tk.MustExec("create table t2 (a int)") + ctx := context.Background() + if partitioned { + tk.MustExec("use test") + tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec("create table t2 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + // Because we don't handle the DDL events in unit tests by default, + // we need to use this way to make sure the stats record for the global table is created. + // Insert some rows into the tables. + tk.MustExec("insert into t1 values (11)") + tk.MustExec("insert into t2 values (12)") + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + // Analyze the tables. + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + } else { + tk.MustExec("use test") + tk.MustExec("create table t1 (a int)") + tk.MustExec("create table t2 (a int)") + } tk.MustExec("insert into t1 values (1)") - tk.MustExec("insert into t2 values (1)") + tk.MustExec("insert into t2 values (1), (2)") statistics.AutoAnalyzeMinCnt = 0 defer func() { statistics.AutoAnalyzeMinCnt = 1000 }() - ctx := context.Background() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) schema := pmodel.NewCIStr("test") @@ -205,10 +221,10 @@ func TestProcessDMLChanges(t *testing.T) { require.NoError(t, job2.Analyze(handle, dom.SysProcTracker())) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) - // Insert 10 rows into t1. - tk.MustExec("insert into t1 values (2), (3), (4), (5), (6), (7), (8), (9), (10), (11)") - // Insert 2 rows into t2. - tk.MustExec("insert into t2 values (2), (3)") + // Insert 9 rows into t1. + tk.MustExec("insert into t1 values (3), (4), (5), (6), (7), (8), (9), (10), (11)") + // Insert 1 row into t2. + tk.MustExec("insert into t2 values (3)") // Dump the stats to kv. require.NoError(t, handle.DumpStatsDeltaToKV(true)) @@ -242,6 +258,14 @@ func TestProcessDMLChanges(t *testing.T) { require.Equal(t, tbl2.Meta().ID, updatedJob2.GetTableID(), "t2 should have higher weight due to smaller table size") } +func TestProcessDMLChanges(t *testing.T) { + testProcessDMLChanges(t, false) +} + +func TestProcessDMLChangesPartitioned(t *testing.T) { + testProcessDMLChanges(t, true) +} + func TestProcessDMLChangesWithRunningJobs(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) handle := dom.StatsHandle() diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index bd3dd2a443fb2..b50565d9e3a77 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -33,7 +33,7 @@ go_test( "worker_test.go", ], flaky = True, - shard_count = 8, + shard_count = 9, deps = [ ":refresher", "//pkg/parser/model", @@ -42,6 +42,7 @@ go_test( "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/priorityqueue", "//pkg/statistics/handle/types", + "//pkg/statistics/handle/util", "//pkg/testkit", "//pkg/testkit/testsetup", "@com_github_stretchr_testify//require", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index ab90523f6c6b7..efd55146f73b6 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -82,17 +82,11 @@ func (r *Refresher) UpdateConcurrency() { } // AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them. -func (r *Refresher) AnalyzeHighestPriorityTables() bool { - se, err := r.statsHandle.SPool().Get() - if err != nil { - statslogutil.StatsLogger().Error("Failed to get session context", zap.Error(err)) - return false - } - defer r.statsHandle.SPool().Put(se) - - sctx := se.(sessionctx.Context) +// Note: Make sure the session has the latest variable values. +// Usually, this is done by the caller through `util.CallWithSCtx`. +func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool { parameters := exec.GetAutoAnalyzeParameters(sctx) - err = r.setAutoAnalysisTimeWindow(parameters) + err := r.setAutoAnalysisTimeWindow(parameters) if err != nil { statslogutil.StatsLogger().Error("Set auto analyze time window failed", zap.Error(err)) return false @@ -100,15 +94,17 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool { if !r.isWithinTimeWindow() { return false } + currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) + currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) if !r.jobs.IsInitialized() { if err := r.jobs.Initialize(); err != nil { statslogutil.StatsLogger().Error("Failed to initialize the queue", zap.Error(err)) return false } + r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio + r.lastSeenPruneMode = currentPruneMode } else { // Only do this if the queue is already initialized. - currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) - currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) if currentAutoAnalyzeRatio != r.lastSeenAutoAnalyzeRatio || currentPruneMode != r.lastSeenPruneMode { r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio r.lastSeenPruneMode = currentPruneMode diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index 05418795d80e5..4061fa9ef8c4b 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -19,12 +19,66 @@ import ( "testing" pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" + "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) +func TestChangePruneMode(t *testing.T) { + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + }() + + store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (140))") + tk.MustExec("insert into t1 values (0, 0)") + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) + tk.MustExec("analyze table t1") + r := refresher.NewRefresher(handle, dom.SysProcTracker(), dom.DDLNotifier()) + defer r.Close() + + // Insert more data to each partition. + tk.MustExec("insert into t1 values (1, 1), (11, 11)") + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) + + // Two jobs are added because the prune mode is static. + tk.MustExec("set global tidb_partition_prune_mode = 'static'") + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, handle.HandleAutoAnalyze()) + return nil + })) + r.WaitAutoAnalyzeFinishedForTest() + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) + r.WaitAutoAnalyzeFinishedForTest() + require.Equal(t, 0, r.Len()) + + // Insert more data to each partition. + tk.MustExec("insert into t1 values (2, 2), (3, 3), (4, 4), (12, 12), (13, 13), (14, 14)") + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) + + // One job is added because the prune mode is dynamic. + tk.MustExec("set global tidb_partition_prune_mode = 'dynamic'") + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) + r.WaitAutoAnalyzeFinishedForTest() + require.Equal(t, 0, r.Len()) +} + func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) { statistics.AutoAnalyzeMinCnt = 0 defer func() { @@ -67,12 +121,19 @@ func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) { r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() // No jobs are added. - require.False(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.False(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) require.Equal(t, 0, r.Len()) // Enable the auto analyze. tk.MustExec("set global tidb_auto_analyze_ratio = 0.2") // Jobs are added. - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) + require.Equal(t, 0, r.Len()) } func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) { @@ -92,7 +153,10 @@ func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) { sysProcTracker := dom.SysProcTracker() r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() - require.False(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.False(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil") } @@ -113,7 +177,10 @@ func TestIgnoreNilOrPseudoStatsOfNonPartitionedTable(t *testing.T) { sysProcTracker := dom.SysProcTracker() r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() - require.False(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.False(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil") } @@ -158,7 +225,10 @@ func TestIgnoreTinyTable(t *testing.T) { sysProcTracker := dom.SysProcTracker() r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) require.Equal(t, 0, r.Len(), "Only t1 is added to the job queue, because t2 is a tiny table(not enough data)") } @@ -194,7 +264,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) { r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() // Analyze t1 first. - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) @@ -212,7 +285,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) { tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2) require.Equal(t, int64(6), tblStats2.ModifyCount) // Do one more round. - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) r.WaitAutoAnalyzeFinishedForTest() // t2 is analyzed. pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID @@ -257,7 +333,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) { r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() // Analyze tables concurrently. - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) @@ -284,7 +363,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) { require.Equal(t, int64(4), tblStats3.ModifyCount) // Do one more round to analyze t3. - require.True(t, r.AnalyzeHighestPriorityTables()) + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) @@ -322,7 +404,10 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) { r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier()) defer r.Close() - r.AnalyzeHighestPriorityTables() + require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { + require.True(t, r.AnalyzeHighestPriorityTables(sctx)) + return nil + })) r.WaitAutoAnalyzeFinishedForTest() is := dom.InfoSchema() diff --git a/pkg/statistics/handle/updatetest/update_test.go b/pkg/statistics/handle/updatetest/update_test.go index 4fbe344b3b46f..2b3766d6d47b6 100644 --- a/pkg/statistics/handle/updatetest/update_test.go +++ b/pkg/statistics/handle/updatetest/update_test.go @@ -1284,6 +1284,8 @@ func TestAutoAnalyzePartitionTableAfterAddingIndex(t *testing.T) { tblInfo := tbl.Meta() idxInfo := tblInfo.Indices[0] require.Nil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID)) - require.True(t, h.HandleAutoAnalyze()) + require.Eventually(t, func() bool { + return h.HandleAutoAnalyze() + }, 3*time.Second, time.Millisecond*100) require.NotNil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID)) }