Skip to content

Commit

Permalink
statistics: add a priority queue API (pingcap#57385)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Nov 19, 2024
1 parent 8382fdb commit cfa52d0
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/server/handler/optimizor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ go_test(
"statistics_handler_test.go",
],
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
":optimizor",
"//pkg/config",
Expand All @@ -56,6 +56,7 @@ go_test(
"//pkg/server/internal/testutil",
"//pkg/server/internal/util",
"//pkg/session",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/store/mockstore/unistore",
"//pkg/testkit",
Expand Down
23 changes: 23 additions & 0 deletions pkg/server/handler/optimizor/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,26 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s
}
return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName))
}

// StatsPriorityQueueHandler is the handler for dumping the stats priority queue snapshot.
type StatsPriorityQueueHandler struct {
do *domain.Domain
}

// NewStatsPriorityQueueHandler creates a new StatsPriorityQueueHandler.
func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler {
return &StatsPriorityQueueHandler{do: do}
}

// ServeHTTP dumps the stats priority queue snapshot to json.
func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")

h := sh.do.StatsHandle()
tables, err := h.GetPriorityQueueSnapshot()
if err != nil {
handler.WriteError(w, err)
} else {
handler.WriteData(w, tables)
}
}
56 changes: 56 additions & 0 deletions pkg/server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/server/internal/testutil"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
util2 "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -289,3 +290,58 @@ func checkData(t *testing.T, path string, client *testserverclient.TestServerCli
require.Equal(t, int64(4), count)
require.NoError(t, rows.Close())
}

func TestStatsPriorityQueueAPI(t *testing.T) {
store := testkit.CreateMockStore(t)
driver := server2.NewTiDBDriver(store)
client := testserverclient.NewTestServerClient()
cfg := util.NewTestConfig()
cfg.Port = client.Port
cfg.Status.StatusPort = client.StatusPort
cfg.Status.ReportStatus = true
cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano())

server, err := server2.NewServer(cfg, driver)
require.NoError(t, err)
defer server.Close()

dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()

router := mux.NewRouter()
handler := optimizor.NewStatsPriorityQueueHandler(dom)
router.Handle("/stats/priority-queue", handler)

resp, err := client.FetchStatus("/stats/priority-queue")
require.NoError(t, err)
defer resp.Body.Close()

js, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "priority queue not initialized", string(js))

// Init the queue.
handle := dom.StatsHandle()
require.False(t, handle.HandleAutoAnalyze())

resp, err = client.FetchStatus("/stats/priority-queue")
require.NoError(t, err)
defer resp.Body.Close()

js, err = io.ReadAll(resp.Body)
require.NoError(t, err)
var snapshot types.PriorityQueueSnapshot
err = json.Unmarshal(js, &snapshot)
require.NoError(t, err)
require.Empty(t, snapshot.CurrentJobs)
require.Empty(t, snapshot.MustRetryTables)
}
16 changes: 16 additions & 0 deletions pkg/server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (s *Server) startHTTPServer() {
Name("StatsDump")
router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()).
Name("StatsHistoryDump")
router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()).
Name("StatsPriorityQueue")

router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump")
router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump")
Expand Down Expand Up @@ -621,3 +623,17 @@ func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler {
}
return optimizor.NewStatsHistoryHandler(do)
}

func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler {
store, ok := s.driver.(*TiDBDriver)
if !ok {
panic("Illegal driver")
}

do, err := session.GetDomain(store.store)
if err != nil {
panic("Failed to get domain")
}

return optimizor.NewStatsPriorityQueueHandler(do)
}
5 changes: 5 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// GetPriorityQueueSnapshot returns the stats priority queue snapshot.
func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) {
return sa.refresher.GetPriorityQueueSnapshot()
}

func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
defer func() {
if r := recover(); r != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,7 @@ func (j *TestJob) SetIndicators(indicators priorityqueue.Indicators) {
func (j *TestJob) HasNewlyAddedIndex() bool {
return false
}

func (j *TestJob) AsJSON() types.AnalysisJobJSON {
panic("unimplemented")
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,20 @@ func getPartitionNames(partitionIndexes map[string][]string) []string {
}
return names
}

// AsJSON converts the job to a JSON object.
func (j *DynamicPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
partitionIDs := make([]int64, 0, len(j.PartitionIDs))
for partition := range j.PartitionIDs {
partitionIDs = append(partitionIDs, partition)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.GlobalTableID,
PartitionIDs: partitionIDs,
PartitionIndexIDs: j.PartitionIndexIDs,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) {
func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) {
panic("implement me")
}
func (t testHeapObject) AsJSON() statstypes.AnalysisJobJSON {
panic("implement me")
}
func (t testHeapObject) String() string {
panic("implement me")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type AnalysisJob interface {
// RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed.
RegisterFailureHook(hook FailureJobHook)

// AsJSON converts the job to a JSON object.
AsJSON() statstypes.AnalysisJobJSON

fmt.Stringer
}

Expand Down Expand Up @@ -186,3 +189,12 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool {
_, ok := job.(*DynamicPartitionedTableAnalysisJob)
return ok
}

// asJSONIndicators converts the indicators to a JSON object.
func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON {
return statstypes.IndicatorsJSON{
ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100),
TableSize: fmt.Sprintf("%.2f", indicators.TableSize),
LastAnalysisDuration: fmt.Sprintf("%v", indicators.LastAnalysisDuration),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,19 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st

return sql, params
}

// AsJSON converts the job to a JSON object.
func (j *NonPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
indexes := make([]int64, 0, len(j.IndexIDs))
for index := range j.IndexIDs {
indexes = append(indexes, index)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.TableID,
IndexIDs: indexes,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
33 changes: 33 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package priorityqueue

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -800,6 +801,38 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) {
return pq.syncFields.inner.len(), nil
}

// Snapshot returns a snapshot of all the jobs in the priority queue.
func (pq *AnalysisPriorityQueue) Snapshot() (
snapshot statstypes.PriorityQueueSnapshot,
err error,
) {
pq.syncFields.mu.RLock()
defer pq.syncFields.mu.RUnlock()
if !pq.syncFields.initialized {
return statstypes.PriorityQueueSnapshot{}, errors.New(notInitializedErrMsg)
}

currentJobs := pq.syncFields.inner.list()
mustRetryTables := make([]int64, 0, len(pq.syncFields.mustRetryJobs))
for tableID := range pq.syncFields.mustRetryJobs {
mustRetryTables = append(mustRetryTables, tableID)
}

jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs))
for i, job := range currentJobs {
jsonJobs[i] = job.AsJSON()
}
// Sort by the weight in descending order.
sort.Slice(jsonJobs, func(i, j int) bool {
return jsonJobs[i].Weight > jsonJobs[j].Weight
})

return statstypes.PriorityQueueSnapshot{
CurrentJobs: jsonJobs,
MustRetryTables: mustRetryTables,
}, nil
}

// Close closes the priority queue.
// Note: This function is thread-safe.
func (pq *AnalysisPriorityQueue) Close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,19 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex

return sql, params
}

// AsJSON converts the job to a JSON object.
func (j *StaticPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
indexes := make([]int64, 0, len(j.IndexIDs))
for index := range j.IndexIDs {
indexes = append(indexes, index)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.StaticPartitionID,
IndexIDs: indexes,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
5 changes: 5 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
return false
}

// GetPriorityQueueSnapshot returns the stats priority queue.
func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) {
return r.jobs.Snapshot()
}

func (r *Refresher) setAutoAnalysisTimeWindow(
parameters map[string]string,
) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (m *mockAnalysisJob) GetIndicators() priorityqueue.Indicators {
func (m *mockAnalysisJob) SetIndicators(indicators priorityqueue.Indicators) {
panic("not implemented")
}
func (m *mockAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
panic("not implemented")
}

func TestWorker(t *testing.T) {
_, dom := testkit.CreateMockStoreAndDomain(t)
Expand Down
30 changes: 30 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,33 @@ type StatsHistory interface {
RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error)
}

// PriorityQueueSnapshot is the snapshot of the stats priority queue.
type PriorityQueueSnapshot struct {
CurrentJobs []AnalysisJobJSON `json:"current_jobs"`
MustRetryTables []int64 `json:"must_retry_tables"`
}

// AnalysisJobJSON represents the JSON format of an AnalysisJob.
//
//nolint:fieldalignment
type AnalysisJobJSON struct {
Type string `json:"type"`
TableID int64 `json:"table_id"`
Weight float64 `json:"weight"`
PartitionIDs []int64 `json:"partition_ids"`
IndexIDs []int64 `json:"index_ids"`
PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"`
Indicators IndicatorsJSON `json:"indicators"`
HasNewlyAddedIndex bool `json:"has_newly_added_index"`
}

// IndicatorsJSON represents the JSON format of Indicators.
type IndicatorsJSON struct {
ChangePercentage string `json:"change_percentage"`
TableSize string `json:"table_size"`
LastAnalysisDuration string `json:"last_analysis_duration"`
}

// StatsAnalyze is used to handle auto-analyze and manage analyze jobs.
type StatsAnalyze interface {
owner.Listener
Expand Down Expand Up @@ -161,6 +188,9 @@ type StatsAnalyze interface {
// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool

// GetPriorityQueueSnapshot returns the stats priority queue.
GetPriorityQueueSnapshot() (PriorityQueueSnapshot, error)

// Close closes the analyze worker.
Close()
}
Expand Down

0 comments on commit cfa52d0

Please sign in to comment.