diff --git a/pkg/server/handler/optimizor/BUILD.bazel b/pkg/server/handler/optimizor/BUILD.bazel index cccf9b5498c63..97b7c232c5b49 100644 --- a/pkg/server/handler/optimizor/BUILD.bazel +++ b/pkg/server/handler/optimizor/BUILD.bazel @@ -43,7 +43,7 @@ go_test( "statistics_handler_test.go", ], flaky = True, - shard_count = 6, + shard_count = 7, deps = [ ":optimizor", "//pkg/config", @@ -56,6 +56,7 @@ go_test( "//pkg/server/internal/testutil", "//pkg/server/internal/util", "//pkg/session", + "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/store/mockstore/unistore", "//pkg/testkit", diff --git a/pkg/server/handler/optimizor/statistics_handler.go b/pkg/server/handler/optimizor/statistics_handler.go index 897b91309ea46..8a0fe4264f4ef 100644 --- a/pkg/server/handler/optimizor/statistics_handler.go +++ b/pkg/server/handler/optimizor/statistics_handler.go @@ -144,3 +144,26 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s } return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName)) } + +// StatsPriorityQueueHandler is the handler for dumping the stats priority queue snapshot. +type StatsPriorityQueueHandler struct { + do *domain.Domain +} + +// NewStatsPriorityQueueHandler creates a new StatsPriorityQueueHandler. +func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler { + return &StatsPriorityQueueHandler{do: do} +} + +// ServeHTTP dumps the stats priority queue snapshot to json. +func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + + h := sh.do.StatsHandle() + tables, err := h.GetPriorityQueueSnapshot() + if err != nil { + handler.WriteError(w, err) + } else { + handler.WriteData(w, tables) + } +} diff --git a/pkg/server/handler/optimizor/statistics_handler_test.go b/pkg/server/handler/optimizor/statistics_handler_test.go index d91fa92121cdb..2c54cb2a83146 100644 --- a/pkg/server/handler/optimizor/statistics_handler_test.go +++ b/pkg/server/handler/optimizor/statistics_handler_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/statistics/handle/types" util2 "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -289,3 +290,58 @@ func checkData(t *testing.T, path string, client *testserverclient.TestServerCli require.Equal(t, int64(4), count) require.NoError(t, rows.Close()) } + +func TestStatsPriorityQueueAPI(t *testing.T) { + store := testkit.CreateMockStore(t) + driver := server2.NewTiDBDriver(store) + client := testserverclient.NewTestServerClient() + cfg := util.NewTestConfig() + cfg.Port = client.Port + cfg.Status.StatusPort = client.StatusPort + cfg.Status.ReportStatus = true + cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano()) + + server, err := server2.NewServer(cfg, driver) + require.NoError(t, err) + defer server.Close() + + dom, err := session.GetDomain(store) + require.NoError(t, err) + server.SetDomain(dom) + go func() { + err := server.Run(nil) + require.NoError(t, err) + }() + <-server2.RunInGoTestChan + client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr()) + client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr()) + client.WaitUntilServerOnline() + + router := mux.NewRouter() + handler := optimizor.NewStatsPriorityQueueHandler(dom) + router.Handle("/stats/priority-queue", handler) + + resp, err := client.FetchStatus("/stats/priority-queue") + require.NoError(t, err) + defer resp.Body.Close() + + js, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "priority queue not initialized", string(js)) + + // Init the queue. + handle := dom.StatsHandle() + require.False(t, handle.HandleAutoAnalyze()) + + resp, err = client.FetchStatus("/stats/priority-queue") + require.NoError(t, err) + defer resp.Body.Close() + + js, err = io.ReadAll(resp.Body) + require.NoError(t, err) + var snapshot types.PriorityQueueSnapshot + err = json.Unmarshal(js, &snapshot) + require.NoError(t, err) + require.Empty(t, snapshot.CurrentJobs) + require.Empty(t, snapshot.MustRetryTables) +} diff --git a/pkg/server/http_status.go b/pkg/server/http_status.go index cfaa51d7cbf5f..ef861efd86498 100644 --- a/pkg/server/http_status.go +++ b/pkg/server/http_status.go @@ -217,6 +217,8 @@ func (s *Server) startHTTPServer() { Name("StatsDump") router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()). Name("StatsHistoryDump") + router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()). + Name("StatsPriorityQueue") router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump") router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump") @@ -621,3 +623,17 @@ func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler { } return optimizor.NewStatsHistoryHandler(do) } + +func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler { + store, ok := s.driver.(*TiDBDriver) + if !ok { + panic("Illegal driver") + } + + do, err := session.GetDomain(store.store) + if err != nil { + panic("Failed to get domain") + } + + return optimizor.NewStatsPriorityQueueHandler(do) +} diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index b25ac73977643..d752ffc143094 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -317,6 +317,11 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID return statistics.CheckAnalyzeVerOnTable(tbl, version) } +// GetPriorityQueueSnapshot returns the stats priority queue snapshot. +func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { + return sa.refresher.GetPriorityQueueSnapshot() +} + func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool { defer func() { if r := recover(); r != nil { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go index bef3ee8929ef9..870e13bee326e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go @@ -289,3 +289,7 @@ func (j *TestJob) SetIndicators(indicators priorityqueue.Indicators) { func (j *TestJob) HasNewlyAddedIndex() bool { return false } + +func (j *TestJob) AsJSON() types.AnalysisJobJSON { + panic("unimplemented") +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 0fa00e5626060..3a550309119c6 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -364,3 +364,20 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { } return names } + +// AsJSON converts the job to a JSON object. +func (j *DynamicPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { + partitionIDs := make([]int64, 0, len(j.PartitionIDs)) + for partition := range j.PartitionIDs { + partitionIDs = append(partitionIDs, partition) + } + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.GlobalTableID, + PartitionIDs: partitionIDs, + PartitionIndexIDs: j.PartitionIndexIDs, + Weight: j.Weight, + Indicators: asJSONIndicators(j.Indicators), + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go index 53ecd941403e0..13590237bd6a3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go @@ -67,6 +67,9 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) { func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) { panic("implement me") } +func (t testHeapObject) AsJSON() statstypes.AnalysisJobJSON { + panic("implement me") +} func (t testHeapObject) String() string { panic("implement me") } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index a75c8ce5fd50d..8a2cc918cb5a4 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -91,6 +91,9 @@ type AnalysisJob interface { // RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed. RegisterFailureHook(hook FailureJobHook) + // AsJSON converts the job to a JSON object. + AsJSON() statstypes.AnalysisJobJSON + fmt.Stringer } @@ -186,3 +189,12 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool { _, ok := job.(*DynamicPartitionedTableAnalysisJob) return ok } + +// asJSONIndicators converts the indicators to a JSON object. +func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { + return statstypes.IndicatorsJSON{ + ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), + TableSize: fmt.Sprintf("%.2f", indicators.TableSize), + LastAnalysisDuration: fmt.Sprintf("%v", indicators.LastAnalysisDuration), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index a8dff7f0921ad..ad4b27e5f80d3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -270,3 +270,19 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st return sql, params } + +// AsJSON converts the job to a JSON object. +func (j *NonPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { + indexes := make([]int64, 0, len(j.IndexIDs)) + for index := range j.IndexIDs { + indexes = append(indexes, index) + } + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.TableID, + IndexIDs: indexes, + Weight: j.Weight, + Indicators: asJSONIndicators(j.Indicators), + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 8d25714f283c5..a64ef48933669 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -16,6 +16,7 @@ package priorityqueue import ( "context" + "sort" "sync" "time" @@ -800,6 +801,38 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { return pq.syncFields.inner.len(), nil } +// Snapshot returns a snapshot of all the jobs in the priority queue. +func (pq *AnalysisPriorityQueue) Snapshot() ( + snapshot statstypes.PriorityQueueSnapshot, + err error, +) { + pq.syncFields.mu.RLock() + defer pq.syncFields.mu.RUnlock() + if !pq.syncFields.initialized { + return statstypes.PriorityQueueSnapshot{}, errors.New(notInitializedErrMsg) + } + + currentJobs := pq.syncFields.inner.list() + mustRetryTables := make([]int64, 0, len(pq.syncFields.mustRetryJobs)) + for tableID := range pq.syncFields.mustRetryJobs { + mustRetryTables = append(mustRetryTables, tableID) + } + + jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs)) + for i, job := range currentJobs { + jsonJobs[i] = job.AsJSON() + } + // Sort by the weight in descending order. + sort.Slice(jsonJobs, func(i, j int) bool { + return jsonJobs[i].Weight > jsonJobs[j].Weight + }) + + return statstypes.PriorityQueueSnapshot{ + CurrentJobs: jsonJobs, + MustRetryTables: mustRetryTables, + }, nil +} + // Close closes the priority queue. // Note: This function is thread-safe. func (pq *AnalysisPriorityQueue) Close() { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 7de6df698cedc..7e1ec3983709d 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -306,3 +306,19 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex return sql, params } + +// AsJSON converts the job to a JSON object. +func (j *StaticPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { + indexes := make([]int64, 0, len(j.IndexIDs)) + for index := range j.IndexIDs { + indexes = append(indexes, index) + } + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.StaticPartitionID, + IndexIDs: indexes, + Weight: j.Weight, + Indicators: asJSONIndicators(j.Indicators), + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 6d58c7c01be86..4bbb0d975a665 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -188,6 +188,11 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool { return false } +// GetPriorityQueueSnapshot returns the stats priority queue. +func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { + return r.jobs.Snapshot() +} + func (r *Refresher) setAutoAnalysisTimeWindow( parameters map[string]string, ) error { diff --git a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go index 179b1f7f37789..40c4c87e28c21 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go @@ -67,6 +67,9 @@ func (m *mockAnalysisJob) GetIndicators() priorityqueue.Indicators { func (m *mockAnalysisJob) SetIndicators(indicators priorityqueue.Indicators) { panic("not implemented") } +func (m *mockAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { + panic("not implemented") +} func TestWorker(t *testing.T) { _, dom := testkit.CreateMockStoreAndDomain(t) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index c6112bb08bc14..3fd8a4711fa08 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -118,6 +118,33 @@ type StatsHistory interface { RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) } +// PriorityQueueSnapshot is the snapshot of the stats priority queue. +type PriorityQueueSnapshot struct { + CurrentJobs []AnalysisJobJSON `json:"current_jobs"` + MustRetryTables []int64 `json:"must_retry_tables"` +} + +// AnalysisJobJSON represents the JSON format of an AnalysisJob. +// +//nolint:fieldalignment +type AnalysisJobJSON struct { + Type string `json:"type"` + TableID int64 `json:"table_id"` + Weight float64 `json:"weight"` + PartitionIDs []int64 `json:"partition_ids"` + IndexIDs []int64 `json:"index_ids"` + PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` + Indicators IndicatorsJSON `json:"indicators"` + HasNewlyAddedIndex bool `json:"has_newly_added_index"` +} + +// IndicatorsJSON represents the JSON format of Indicators. +type IndicatorsJSON struct { + ChangePercentage string `json:"change_percentage"` + TableSize string `json:"table_size"` + LastAnalysisDuration string `json:"last_analysis_duration"` +} + // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. type StatsAnalyze interface { owner.Listener @@ -161,6 +188,9 @@ type StatsAnalyze interface { // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool + // GetPriorityQueueSnapshot returns the stats priority queue. + GetPriorityQueueSnapshot() (PriorityQueueSnapshot, error) + // Close closes the analyze worker. Close() }