diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel b/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel deleted file mode 100644 index 9dde234245b42..0000000000000 --- a/pkg/statistics/handle/autoanalyze/internal/heap/BUILD.bazel +++ /dev/null @@ -1,26 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "heap", - srcs = ["heap.go"], - importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap", - visibility = ["//pkg/statistics/handle/autoanalyze:__subpackages__"], - deps = ["@com_github_pingcap_errors//:errors"], -) - -go_test( - name = "heap_test", - timeout = "short", - srcs = [ - "heap_test.go", - "main_test.go", - ], - embed = [":heap"], - flaky = True, - shard_count = 13, - deps = [ - "//pkg/testkit/testsetup", - "@com_github_stretchr_testify//require", - "@org_uber_go_goleak//:goleak", - ], -) diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap.go deleted file mode 100644 index 11665b876227f..0000000000000 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap.go +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2017 The Kubernetes Authors. -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// Modifications: -// 1. Use the `errors` package from PingCAP. -// 2. Use generics to define the `heapData` struct. -// 3. Add a peak API. -// 4. Add an IsEmpty API. -// 5. Remove the thread-safe and blocking properties. -// 6. Add a Len API. - -package heap - -import ( - "container/heap" - - "github.com/pingcap/errors" -) - -// LessFunc is used to compare two objects in the heap. -type LessFunc[V any] func(V, V) bool - -// KeyFunc is used to generate a key for an object. -type KeyFunc[K comparable, V any] func(V) (K, error) - -type heapItem[K comparable, V any] struct { - obj V // The object which is stored in the heap. - index int // The index of the object's key in the Heap.queue. -} - -type itemKeyValue[K comparable, V any] struct { - key K - obj V -} - -// heapData is an internal struct that implements the standard heap interface -// and keeps the data stored in the heap. -type heapData[K comparable, V any] struct { - items map[K]*heapItem[K, V] - keyFunc KeyFunc[K, V] - lessFunc LessFunc[V] - queue []K -} - -var ( - _ = heap.Interface(&heapData[any, any]{}) // heapData is a standard heap -) - -// Less is a standard heap interface function. -func (h *heapData[K, V]) Less(i, j int) bool { - if i >= len(h.queue) || j >= len(h.queue) { - return false - } - itemi, ok := h.items[h.queue[i]] - if !ok { - return false - } - itemj, ok := h.items[h.queue[j]] - if !ok { - return false - } - return h.lessFunc(itemi.obj, itemj.obj) -} - -// Len is a standard heap interface function. -func (h *heapData[K, V]) Len() int { return len(h.queue) } - -// Swap is a standard heap interface function. -func (h *heapData[K, V]) Swap(i, j int) { - h.queue[i], h.queue[j] = h.queue[j], h.queue[i] - item := h.items[h.queue[i]] - item.index = i - item = h.items[h.queue[j]] - item.index = j -} - -// Push is a standard heap interface function. -func (h *heapData[K, V]) Push(kv any) { - keyValue := kv.(*itemKeyValue[K, V]) - n := len(h.queue) - h.items[keyValue.key] = &heapItem[K, V]{keyValue.obj, n} - h.queue = append(h.queue, keyValue.key) -} - -// Pop is a standard heap interface function. -func (h *heapData[K, V]) Pop() any { - key := h.queue[len(h.queue)-1] - h.queue = h.queue[:len(h.queue)-1] - item, ok := h.items[key] - if !ok { - return nil - } - delete(h.items, key) - return item.obj -} - -// Heap is a thread-safe producer/consumer queue that implements a heap data structure. -type Heap[K comparable, V any] struct { - data *heapData[K, V] -} - -// Add adds an object or updates it if it already exists. -func (h *Heap[K, V]) Add(obj V) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return errors.Errorf("key error: %v", err) - } - if _, exists := h.data.items[key]; exists { - h.data.items[key].obj = obj - heap.Fix(h.data, h.data.items[key].index) - } else { - h.addIfNotPresentLocked(key, obj) - } - return nil -} - -// BulkAdd adds a list of objects to the heap. -func (h *Heap[K, V]) BulkAdd(list []V) error { - for _, obj := range list { - key, err := h.data.keyFunc(obj) - if err != nil { - return errors.Errorf("key error: %v", err) - } - if _, exists := h.data.items[key]; exists { - h.data.items[key].obj = obj - heap.Fix(h.data, h.data.items[key].index) - } else { - h.addIfNotPresentLocked(key, obj) - } - } - return nil -} - -// AddIfNotPresent adds an object if it does not already exist. -func (h *Heap[K, V]) AddIfNotPresent(obj V) error { - id, err := h.data.keyFunc(obj) - if err != nil { - return errors.Errorf("key error: %v", err) - } - h.addIfNotPresentLocked(id, obj) - return nil -} - -func (h *Heap[K, V]) addIfNotPresentLocked(key K, obj V) { - if _, exists := h.data.items[key]; exists { - return - } - heap.Push(h.data, &itemKeyValue[K, V]{key, obj}) -} - -// Update is an alias for Add. -func (h *Heap[K, V]) Update(obj V) error { - return h.Add(obj) -} - -// Delete removes an object from the heap. -func (h *Heap[K, V]) Delete(obj V) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return errors.Errorf("key error: %v", err) - } - if item, ok := h.data.items[key]; ok { - heap.Remove(h.data, item.index) - return nil - } - return errors.New("object not found") -} - -// Peek returns the top object from the heap without removing it. -func (h *Heap[K, V]) Peek() (V, error) { - if len(h.data.queue) == 0 { - var zero V - return zero, errors.New("heap is empty") - } - return h.data.items[h.data.queue[0]].obj, nil -} - -// Pop removes the top object from the heap and returns it. -func (h *Heap[K, V]) Pop() (V, error) { - if len(h.data.queue) == 0 { - var zero V - return zero, errors.New("heap is empty") - } - obj := heap.Pop(h.data) - if obj == nil { - var zero V - return zero, errors.New("object was removed from heap data") - } - return obj.(V), nil -} - -// List returns a list of all objects in the heap. -func (h *Heap[K, V]) List() []V { - list := make([]V, 0, len(h.data.items)) - for _, item := range h.data.items { - list = append(list, item.obj) - } - return list -} - -// Len returns the number of objects in the heap. -func (h *Heap[K, V]) Len() int { - return h.data.Len() -} - -// ListKeys returns a list of all keys in the heap. -func (h *Heap[K, V]) ListKeys() []K { - list := make([]K, 0, len(h.data.items)) - for key := range h.data.items { - list = append(list, key) - } - return list -} - -// Get returns an object from the heap. -func (h *Heap[K, V]) Get(obj V) (V, bool, error) { - key, err := h.data.keyFunc(obj) - if err != nil { - var zero V - return zero, false, errors.Errorf("key error: %v", err) - } - return h.GetByKey(key) -} - -// GetByKey returns an object from the heap by key. -func (h *Heap[K, V]) GetByKey(key K) (V, bool, error) { - item, exists := h.data.items[key] - if !exists { - var zero V - return zero, false, nil - } - return item.obj, true, nil -} - -// IsEmpty returns true if the heap is empty. -func (h *Heap[K, V]) IsEmpty() bool { - return len(h.data.queue) == 0 -} - -// NewHeap returns a Heap which can be used to queue up items to process. -func NewHeap[K comparable, V any](keyFn KeyFunc[K, V], lessFn LessFunc[V]) *Heap[K, V] { - h := &Heap[K, V]{ - data: &heapData[K, V]{ - items: map[K]*heapItem[K, V]{}, - queue: []K{}, - keyFunc: keyFn, - lessFunc: lessFn, - }, - } - return h -} diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go b/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go deleted file mode 100644 index 338e4ef078490..0000000000000 --- a/pkg/statistics/handle/autoanalyze/internal/heap/heap_test.go +++ /dev/null @@ -1,311 +0,0 @@ -// Copyright 2017 The Kubernetes Authors. -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// Modifications: -// 1. Use "github.com/stretchr/testify/require" to do assertions. -// 2. Test max heap instead of min heap. -// 3. Add a test for the peak API. -// 4. Add a test for the IsEmpty API. -// 5. Remove concurrency and thread-safety tests. -// 6. Add a test for the Len API. - -package heap - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func testHeapObjectKeyFunc(obj testHeapObject) (string, error) { - return obj.name, nil -} - -type testHeapObject struct { - name string - val int -} - -func mkHeapObj(name string, val int) testHeapObject { - return testHeapObject{name: name, val: val} -} - -// max heap -func compareInts(val1 testHeapObject, val2 testHeapObject) bool { - return val1.val > val2.val -} - -func TestHeap_Add(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("baz", 11)) - h.Add(mkHeapObj("zab", 30)) - h.Add(mkHeapObj("foo", 13)) // This updates "foo". - - item, err := h.Pop() - require.NoError(t, err) - require.Equal(t, 30, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 13, item.val) - - h.Delete(mkHeapObj("baz", 11)) // Nothing is deleted. - h.Add(mkHeapObj("foo", 14)) // foo is updated. - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 14, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 1, item.val) -} - -func TestHeap_BulkAdd(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - const amount = 500 - var l []testHeapObject - for i := 1; i <= amount; i++ { - l = append(l, mkHeapObj(string([]rune{'a', rune(i)}), i)) - } - h.BulkAdd(l) - prevNum := 501 - for i := 0; i < amount; i++ { - obj, err := h.Pop() - require.NoError(t, err) - num := obj.val - require.Less(t, num, prevNum, "Items should be in descending order") - prevNum = num - } -} - -func TestHeapEmptyPop(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - _, err := h.Pop() - require.EqualError(t, err, "heap is empty") -} - -func TestHeap_AddIfNotPresent(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.AddIfNotPresent(mkHeapObj("foo", 10)) - h.AddIfNotPresent(mkHeapObj("bar", 1)) - h.AddIfNotPresent(mkHeapObj("baz", 11)) - h.AddIfNotPresent(mkHeapObj("zab", 30)) - h.AddIfNotPresent(mkHeapObj("foo", 13)) // This is not added. - - require.Len(t, h.data.items, 4) - require.Equal(t, 10, h.data.items["foo"].obj.val) - - item, err := h.Pop() - require.NoError(t, err) - require.Equal(t, 30, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 11, item.val) - - h.AddIfNotPresent(mkHeapObj("bar", 14)) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 10, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 1, item.val) -} - -func TestHeap_Delete(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) - - err := h.Delete(mkHeapObj("bal", 200)) - require.NoError(t, err) - - item, err := h.Pop() - require.NoError(t, err) - require.Equal(t, 11, item.val) - - h.Add(mkHeapObj("zab", 30)) - h.Add(mkHeapObj("faz", 30)) - l := h.data.Len() - - err = h.Delete(mkHeapObj("non-existent", 10)) - require.Error(t, err) - require.Equal(t, l, h.data.Len()) - - err = h.Delete(mkHeapObj("bar", 31)) - require.NoError(t, err) - - err = h.Delete(mkHeapObj("zab", 30)) - require.NoError(t, err) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 30, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 10, item.val) - - require.Equal(t, 0, h.data.Len()) -} - -func TestHeap_Update(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) - - h.Update(mkHeapObj("baz", 50)) - require.Equal(t, "baz", h.data.queue[0]) - require.Equal(t, 0, h.data.items["baz"].index) - - item, err := h.Pop() - require.NoError(t, err) - require.Equal(t, 50, item.val) - - h.Update(mkHeapObj("bar", 100)) - require.Equal(t, "bar", h.data.queue[0]) - require.Equal(t, 0, h.data.items["bar"].index) -} - -func TestHeap_Get(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) - - obj, exists, err := h.Get(mkHeapObj("baz", 0)) - require.NoError(t, err) - require.True(t, exists) - require.Equal(t, 11, obj.val) - - _, exists, err = h.Get(mkHeapObj("non-existing", 0)) - require.NoError(t, err) - require.False(t, exists) -} - -func TestHeap_GetByKey(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) - - obj, exists, err := h.GetByKey("baz") - require.NoError(t, err) - require.True(t, exists) - require.Equal(t, 11, obj.val) - - _, exists, err = h.GetByKey("non-existing") - require.NoError(t, err) - require.False(t, exists) -} - -func TestHeap_List(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - list := h.List() - require.Empty(t, list) - - items := map[string]int{ - "foo": 10, - "bar": 1, - "bal": 30, - "baz": 11, - "faz": 30, - } - for k, v := range items { - h.Add(mkHeapObj(k, v)) - } - list = h.List() - require.Len(t, list, len(items)) - for _, obj := range list { - heapObj := obj - v, ok := items[heapObj.name] - require.True(t, ok) - require.Equal(t, v, heapObj.val) - } -} - -func TestHeap_ListKeys(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - list := h.ListKeys() - require.Empty(t, list) - - items := map[string]int{ - "foo": 10, - "bar": 1, - "bal": 30, - "baz": 11, - "faz": 30, - } - for k, v := range items { - h.Add(mkHeapObj(k, v)) - } - list = h.ListKeys() - require.Len(t, list, len(items)) - for _, key := range list { - _, ok := items[key] - require.True(t, ok) - } -} - -func TestHeap_Peek(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - _, err := h.Peek() - require.EqualError(t, err, "heap is empty") - - h.Add(mkHeapObj("foo", 10)) - h.Add(mkHeapObj("bar", 1)) - h.Add(mkHeapObj("bal", 31)) - h.Add(mkHeapObj("baz", 11)) - - item, err := h.Peek() - require.NoError(t, err) - require.Equal(t, 31, item.val) - - item, err = h.Pop() - require.NoError(t, err) - require.Equal(t, 31, item.val) -} - -func TestHeap_IsEmpty(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - require.True(t, h.IsEmpty()) - - h.Add(mkHeapObj("foo", 10)) - require.False(t, h.IsEmpty()) - - h.Pop() - require.True(t, h.IsEmpty()) -} - -func TestHeap_Len(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) - require.Zero(t, h.Len()) - - h.Add(mkHeapObj("foo", 10)) - require.Equal(t, 1, h.Len()) - - h.Pop() - require.Zero(t, h.Len()) -} diff --git a/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go b/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go deleted file mode 100644 index c3e3b61659f27..0000000000000 --- a/pkg/statistics/handle/autoanalyze/internal/heap/main_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package heap - -import ( - "testing" - - "github.com/pingcap/tidb/pkg/testkit/testsetup" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - opts := []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), - goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"), - goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), - goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - } - testsetup.SetupForCommonTest() - goleak.VerifyTestMain(m, opts...) -} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index 0b00f0e29319c..6744b2ecce357 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "analysis_job_factory.go", "calculator.go", "dynamic_partitioned_table_analysis_job.go", + "heap.go", "interval.go", "job.go", "non_partitioned_table_analysis_job.go", @@ -24,7 +25,6 @@ go_library( "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/exec", - "//pkg/statistics/handle/autoanalyze/internal/heap", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/types", @@ -45,6 +45,7 @@ go_test( "analysis_job_factory_test.go", "calculator_test.go", "dynamic_partitioned_table_analysis_job_test.go", + "heap_test.go", "interval_test.go", "job_test.go", "main_test.go", @@ -53,10 +54,10 @@ go_test( "queue_test.go", "static_partitioned_table_analysis_job_test.go", ], + embed = [":priorityqueue"], flaky = True, - shard_count = 48, + shard_count = 50, deps = [ - ":priorityqueue", "//pkg/ddl/notifier", "//pkg/domain", "//pkg/domain/infosync", @@ -64,7 +65,9 @@ go_test( "//pkg/parser/model", "//pkg/session", "//pkg/sessionctx", + "//pkg/sessionctx/sysproctrack", "//pkg/statistics", + "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/store/mockstore", "//pkg/testkit", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/heap.go b/pkg/statistics/handle/autoanalyze/priorityqueue/heap.go new file mode 100644 index 0000000000000..0055af44927a1 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/heap.go @@ -0,0 +1,202 @@ +// Copyright 2017 The Kubernetes Authors. +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Modifications: +// 1. Use the `errors` package from PingCAP. +// 2. Use AnalysisJob as the object type. +// 3. Add a peak API. +// 4. Add an IsEmpty API. +// 5. Remove the thread-safe and blocking properties. +// 6. Add a Len API. +// 7. Remove the BulkAdd API. + +package priorityqueue + +import ( + "container/heap" + + "github.com/pingcap/errors" +) + +type heapItem struct { + obj AnalysisJob + index int +} + +// heapData is an internal struct that implements the standard heap interface +// and keeps the data stored in the heap. +type heapData struct { + items map[int64]*heapItem + queue []int64 +} + +var ( + _ = heap.Interface(&heapData{}) // heapData is a standard heap +) + +// Less is a standard heap interface function. +func (h *heapData) Less(i, j int) bool { + if i >= len(h.queue) || j >= len(h.queue) { + return false + } + itemi, ok := h.items[h.queue[i]] + if !ok { + return false + } + itemj, ok := h.items[h.queue[j]] + if !ok { + return false + } + return itemi.obj.GetWeight() > itemj.obj.GetWeight() +} + +// Len is a standard heap interface function. +func (h *heapData) Len() int { return len(h.queue) } + +// Swap is a standard heap interface function. +func (h *heapData) Swap(i, j int) { + h.queue[i], h.queue[j] = h.queue[j], h.queue[i] + item := h.items[h.queue[i]] + item.index = i + item = h.items[h.queue[j]] + item.index = j +} + +// Push is a standard heap interface function. +func (h *heapData) Push(x any) { + obj := x.(AnalysisJob) + n := len(h.queue) + h.items[obj.GetTableID()] = &heapItem{obj, n} + h.queue = append(h.queue, obj.GetTableID()) +} + +// Pop is a standard heap interface function. +func (h *heapData) Pop() any { + key := h.queue[len(h.queue)-1] + h.queue = h.queue[:len(h.queue)-1] + item, ok := h.items[key] + if !ok { + return nil + } + delete(h.items, key) + return item.obj +} + +var ( + // ErrHeapIsEmpty is returned when the heap is empty. + ErrHeapIsEmpty = errors.New("heap is empty") +) + +// pqHeapImpl is a producer/consumer queue that implements a heap data structure. +type pqHeapImpl struct { + data *heapData +} + +// addOrUpdate adds an object or updates it if it already exists. +func (h *pqHeapImpl) addOrUpdate(obj AnalysisJob) error { + if _, exists := h.data.items[obj.GetTableID()]; exists { + h.data.items[obj.GetTableID()].obj = obj + heap.Fix(h.data, h.data.items[obj.GetTableID()].index) + } else { + heap.Push(h.data, obj) + } + return nil +} + +// update is an alias for Add. +func (h *pqHeapImpl) update(obj AnalysisJob) error { + return h.addOrUpdate(obj) +} + +// delete removes an object from the heap. +func (h *pqHeapImpl) delete(obj AnalysisJob) error { + if item, ok := h.data.items[obj.GetTableID()]; ok { + heap.Remove(h.data, item.index) + return nil + } + return errors.New("object not found") +} + +// peek returns the top object from the heap without removing it. +func (h *pqHeapImpl) peek() (AnalysisJob, error) { + if len(h.data.queue) == 0 { + return nil, ErrHeapIsEmpty + } + return h.data.items[h.data.queue[0]].obj, nil +} + +// pop removes the top object from the heap and returns it. +func (h *pqHeapImpl) pop() (AnalysisJob, error) { + if len(h.data.queue) == 0 { + return nil, ErrHeapIsEmpty + } + obj := heap.Pop(h.data) + if obj == nil { + return nil, errors.New("object was removed from heap data") + } + return obj.(AnalysisJob), nil +} + +// list returns a list of all objects in the heap. +func (h *pqHeapImpl) list() []AnalysisJob { + list := make([]AnalysisJob, 0, len(h.data.items)) + for _, item := range h.data.items { + list = append(list, item.obj) + } + return list +} + +// len returns the number of objects in the heap. +func (h *pqHeapImpl) len() int { + return h.data.Len() +} + +// ListKeys returns a list of all keys in the heap. +func (h *pqHeapImpl) ListKeys() []int64 { + list := make([]int64, 0, len(h.data.items)) + for key := range h.data.items { + list = append(list, key) + } + return list +} + +// Get returns an object from the heap. +func (h *pqHeapImpl) Get(obj AnalysisJob) (AnalysisJob, bool, error) { + return h.getByKey(obj.GetTableID()) +} + +// getByKey returns an object from the heap by key. +func (h *pqHeapImpl) getByKey(key int64) (AnalysisJob, bool, error) { + item, exists := h.data.items[key] + if !exists { + return nil, false, nil + } + return item.obj, true, nil +} + +// isEmpty returns true if the heap is empty. +func (h *pqHeapImpl) isEmpty() bool { + return len(h.data.queue) == 0 +} + +// newHeap returns a Heap which can be used to queue up items to process. +func newHeap() *pqHeapImpl { + h := &pqHeapImpl{ + data: &heapData{ + items: map[int64]*heapItem{}, + queue: []int64{}, + }, + } + return h +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go new file mode 100644 index 0000000000000..440d631389f60 --- /dev/null +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go @@ -0,0 +1,317 @@ +// Copyright 2017 The Kubernetes Authors. +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Modifications: +// 1. Use "github.com/stretchr/testify/require" to do assertions. +// 2. Test max heap instead of min heap. +// 3. Add a test for the peak API. +// 4. Add a test for the IsEmpty API. +// 5. Remove concurrency and thread-safety tests. +// 6. Add a test for the Len API. +// 7. Remove the BulkAdd related tests. + +package priorityqueue + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/stretchr/testify/require" +) + +type testHeapObject struct { + tableID int64 + val float64 +} + +func (t testHeapObject) IsValidToAnalyze(sctx sessionctx.Context) (bool, string) { + panic("implement me") +} +func (t testHeapObject) Analyze(statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.Tracker) error { + panic("implement me") +} +func (t testHeapObject) SetWeight(weight float64) { + panic("implement me") +} +func (t testHeapObject) GetWeight() float64 { + return t.val +} +func (t testHeapObject) HasNewlyAddedIndex() bool { + panic("implement me") +} +func (t testHeapObject) GetIndicators() Indicators { + panic("implement me") +} +func (t testHeapObject) SetIndicators(indicators Indicators) { + panic("implement me") +} +func (t testHeapObject) GetTableID() int64 { + return t.tableID +} +func (t testHeapObject) RegisterSuccessHook(hook JobHook) { + panic("implement me") +} +func (t testHeapObject) RegisterFailureHook(hook JobHook) { + panic("implement me") +} +func (t testHeapObject) String() string { + panic("implement me") +} +func mkHeapObj( + tableID int64, + val float64, +) testHeapObject { + return testHeapObject{ + tableID: tableID, + val: val, + } +} + +func TestHeap_AddOrUpdate(t *testing.T) { + h := newHeap() + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 11)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 30)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(1, 13)) // This updates object with tableID 1. + require.NoError(t, err) + + item, err := h.pop() + require.NoError(t, err) + require.Equal(t, int64(4), item.GetTableID()) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(1), item.GetTableID()) + + err = h.delete(mkHeapObj(3, 11)) // Deletes object with tableID 3. + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(1, 14)) // Updates object with tableID 1. + require.NoError(t, err) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(1), item.GetTableID()) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(2), item.GetTableID()) +} + +func TestHeapEmptyPop(t *testing.T) { + h := newHeap() + _, err := h.pop() + require.EqualError(t, err, "heap is empty") +} + +func TestHeap_Delete(t *testing.T) { + h := newHeap() + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 31)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 11)) + require.NoError(t, err) + + err = h.delete(mkHeapObj(3, 31)) + require.NoError(t, err) + + item, err := h.pop() + require.NoError(t, err) + require.Equal(t, int64(4), item.GetTableID()) + + err = h.addOrUpdate(mkHeapObj(5, 30)) + require.NoError(t, err) + + err = h.delete(mkHeapObj(2, 1)) + require.NoError(t, err) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(5), item.GetTableID()) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(1), item.GetTableID()) + + require.Equal(t, 0, h.len()) +} + +func TestHeap_Update(t *testing.T) { + h := newHeap() + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 31)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 11)) + require.NoError(t, err) + + err = h.update(mkHeapObj(4, 50)) + require.NoError(t, err) + require.Equal(t, int64(4), h.data.queue[0]) + + item, err := h.pop() + require.NoError(t, err) + require.Equal(t, int64(4), item.GetTableID()) + + err = h.update(mkHeapObj(2, 100)) + require.NoError(t, err) + require.Equal(t, int64(2), h.data.queue[0]) +} + +func TestHeap_Get(t *testing.T) { + h := newHeap() + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 31)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 11)) + require.NoError(t, err) + + obj, exists, err := h.Get(mkHeapObj(4, 0)) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, int64(4), obj.GetTableID()) + + _, exists, err = h.Get(mkHeapObj(5, 0)) + require.NoError(t, err) + require.False(t, exists) +} + +func TestHeap_GetByKey(t *testing.T) { + h := newHeap() + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 31)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 11)) + require.NoError(t, err) + + obj, exists, err := h.getByKey(4) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, int64(4), obj.GetTableID()) + + _, exists, err = h.getByKey(5) + require.NoError(t, err) + require.False(t, exists) +} + +func TestHeap_List(t *testing.T) { + h := newHeap() + list := h.list() + require.Empty(t, list) + + items := map[int64]float64{ + 1: 10, + 2: 1, + 3: 30, + 4: 11, + 5: 30, + } + for k, v := range items { + h.addOrUpdate(mkHeapObj(k, v)) + } + list = h.list() + require.Len(t, list, len(items)) + for _, obj := range list { + require.Equal(t, items[obj.GetTableID()], obj.GetWeight()) + } +} + +func TestHeap_ListKeys(t *testing.T) { + h := newHeap() + list := h.ListKeys() + require.Empty(t, list) + + items := map[int64]float64{ + 1: 10, + 2: 1, + 3: 30, + 4: 11, + 5: 30, + } + for k, v := range items { + h.addOrUpdate(mkHeapObj(k, v)) + } + list = h.ListKeys() + require.Len(t, list, len(items)) + for _, key := range list { + _, ok := items[key] + require.True(t, ok) + } +} + +func TestHeap_Peek(t *testing.T) { + h := newHeap() + _, err := h.peek() + require.EqualError(t, err, "heap is empty") + + err = h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(2, 1)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(3, 31)) + require.NoError(t, err) + err = h.addOrUpdate(mkHeapObj(4, 11)) + require.NoError(t, err) + + item, err := h.peek() + require.NoError(t, err) + require.Equal(t, int64(3), item.GetTableID()) + + item, err = h.pop() + require.NoError(t, err) + require.Equal(t, int64(3), item.GetTableID()) +} + +func TestHeap_IsEmpty(t *testing.T) { + h := newHeap() + require.True(t, h.isEmpty()) + + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + require.False(t, h.isEmpty()) + + _, err = h.pop() + require.NoError(t, err) + require.True(t, h.isEmpty()) +} + +func TestHeap_Len(t *testing.T) { + h := newHeap() + require.Zero(t, h.len()) + + err := h.addOrUpdate(mkHeapObj(1, 10)) + require.NoError(t, err) + require.Equal(t, 1, h.len()) + + _, err = h.pop() + require.NoError(t, err) + require.Zero(t, h.len()) +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 90faacc098d90..9d295374a7912 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec" - "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" @@ -44,6 +43,28 @@ const ( failedJobRequeueInterval = time.Minute * 5 ) +// pqHeap is an interface that wraps the methods of a priority queue heap. +type pqHeap interface { + // getByKey returns the job by the given table ID. + getByKey(tableID int64) (AnalysisJob, bool, error) + // addOrUpdate adds a job to the heap or updates the job if it already exists. + addOrUpdate(job AnalysisJob) error + // update updates a job in the heap. + update(job AnalysisJob) error + // delete deletes a job from the heap. + delete(job AnalysisJob) error + // list returns all jobs in the heap. + list() []AnalysisJob + // pop pops the job with the highest priority from the heap. + pop() (AnalysisJob, error) + // peek peeks the job with the highest priority from the heap without removing it. + peek() (AnalysisJob, error) + // isEmpty returns true if the heap is empty. + isEmpty() bool + // len returns the number of jobs in the heap. + len() int +} + // AnalysisPriorityQueue is a priority queue for TableAnalysisJobs. // Testing shows that keeping all jobs in memory is feasible. // Memory usage for one million tables is approximately 300 to 500 MiB, which is acceptable. @@ -62,7 +83,7 @@ type AnalysisPriorityQueue struct { syncFields struct { // mu is used to protect the following fields. mu sync.RWMutex - inner *heap.Heap[int64, AnalysisJob] + inner pqHeap // runningJobs is a map to store the running jobs. Used to avoid duplicate jobs. runningJobs map[int64]struct{} // lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch. @@ -147,14 +168,7 @@ func (pq *AnalysisPriorityQueue) Rebuild() error { // rebuildWithoutLock rebuilds the priority queue without holding the lock. // Note: Please hold the lock before calling this function. func (pq *AnalysisPriorityQueue) rebuildWithoutLock() error { - keyFunc := func(job AnalysisJob) (int64, error) { - return job.GetTableID(), nil - } - // We want the job with the highest weight to be at the top of the priority queue. - lessFunc := func(a, b AnalysisJob) bool { - return a.GetWeight() > b.GetWeight() - } - pq.syncFields.inner = heap.NewHeap(keyFunc, lessFunc) + pq.syncFields.inner = newHeap() // We need to fetch the next check version with offset before fetching all tables and building analysis jobs. // Otherwise, we may miss some DML changes happened during the process because this operation takes time. @@ -390,7 +404,7 @@ func (pq *AnalysisPriorityQueue) processTableStats( // This means we will always enter the tryCreateJob branch for these partitions. // Since we store the stats meta for each partition and the parent table, there may be redundant calculations. // This is acceptable for now, but in the future, we may consider separating the analysis job for each partition. - job, ok, _ := pq.syncFields.inner.GetByKey(stats.PhysicalID) + job, ok, _ := pq.syncFields.inner.getByKey(stats.PhysicalID) if !ok { job = pq.tryCreateJob(is, stats, pruneMode, jobFactory, lockedTables) } else { @@ -401,7 +415,7 @@ func (pq *AnalysisPriorityQueue) processTableStats( // For dynamic partitioned tables, if the parent table is locked, we skip the whole table here as well. if _, ok := lockedTables[stats.PhysicalID]; ok { // Clean up the job if the table is locked. - err := pq.syncFields.inner.Delete(job) + err := pq.syncFields.inner.delete(job) if err != nil { statslogutil.StatsLogger().Error( "Failed to delete job from priority queue", @@ -625,12 +639,13 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { defer func() { statslogutil.StatsLogger().Info("Last analysis duration refreshed", zap.Duration("duration", time.Since(start))) }() - jobs := pq.syncFields.inner.List() + jobs := pq.syncFields.inner.list() currentTs, err := statsutil.GetStartTS(sctx) if err != nil { return errors.Trace(err) } jobFactory := NewAnalysisJobFactory(sctx, 0, currentTs) + // TODO: We can directly rebuild the priority queue instead of updating the indicators of each job. for _, job := range jobs { indicators := job.GetIndicators() tableStats, ok := pq.statsHandle.Get(job.GetTableID()) @@ -640,7 +655,7 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { zap.String("job", job.String()), ) // TODO: Remove this after handling the DDL event. - err := pq.syncFields.inner.Delete(job) + err := pq.syncFields.inner.delete(job) if err != nil { statslogutil.StatsLogger().Error("Failed to delete job from priority queue", zap.Error(err), @@ -651,7 +666,7 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats) job.SetIndicators(indicators) job.SetWeight(pq.calculator.CalculateWeight(job)) - if err := pq.syncFields.inner.Update(job); err != nil { + if err := pq.syncFields.inner.update(job); err != nil { statslogutil.StatsLogger().Error("Failed to add job to priority queue", zap.Error(err), zap.String("job", job.String()), @@ -721,7 +736,7 @@ func (pq *AnalysisPriorityQueue) pushWithoutLock(job AnalysisJob) error { if _, ok := pq.syncFields.failedJobs[job.GetTableID()]; ok { return nil } - return pq.syncFields.inner.Add(job) + return pq.syncFields.inner.addOrUpdate(job) } // Pop pops a job from the priority queue and marks it as running. @@ -733,7 +748,7 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) { return nil, errors.New(notInitializedErrMsg) } - job, err := pq.syncFields.inner.Pop() + job, err := pq.syncFields.inner.pop() if err != nil { return nil, errors.Trace(err) } @@ -762,7 +777,7 @@ func (pq *AnalysisPriorityQueue) Peek() (AnalysisJob, error) { return nil, errors.New(notInitializedErrMsg) } - return pq.syncFields.inner.Peek() + return pq.syncFields.inner.peek() } // IsEmpty checks whether the priority queue is empty. @@ -774,7 +789,7 @@ func (pq *AnalysisPriorityQueue) IsEmpty() (bool, error) { return false, errors.New(notInitializedErrMsg) } - return pq.syncFields.inner.IsEmpty(), nil + return pq.syncFields.inner.isEmpty(), nil } // Len returns the number of jobs in the priority queue. @@ -786,7 +801,7 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { return 0, errors.New(notInitializedErrMsg) } - return pq.syncFields.inner.Len(), nil + return pq.syncFields.inner.len(), nil } // Close closes the priority queue. diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler.go index 40b477445570c..740bdcbc6f7ab 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler.go @@ -80,7 +80,7 @@ func (pq *AnalysisPriorityQueue) HandleDDLEvent(_ context.Context, sctx sessionc // getAndDeleteJob tries to get a job from the priority queue and delete it if it exists. func (pq *AnalysisPriorityQueue) getAndDeleteJob(tableID int64) error { - job, ok, err := pq.syncFields.inner.GetByKey(tableID) + job, ok, err := pq.syncFields.inner.getByKey(tableID) if err != nil { statslogutil.StatsLogger().Error( "Failed to get the job from priority queue", @@ -90,7 +90,7 @@ func (pq *AnalysisPriorityQueue) getAndDeleteJob(tableID int64) error { return errors.Trace(err) } if ok { - err := pq.syncFields.inner.Delete(job) + err := pq.syncFields.inner.delete(job) if err != nil { statslogutil.StatsLogger().Error( "Failed to delete table from priority queue", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 93279c2dba586..f79c3ae16b763 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -15,6 +15,7 @@ package refresher import ( + stderrors "errors" "time" "github.com/pingcap/errors" @@ -132,11 +133,15 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool { analyzedCount := 0 for analyzedCount < remainConcurrency { - // TODO: Change the API to avoid return error. job, err := r.jobs.Pop() if err != nil { // No more jobs to analyze. - break + if stderrors.Is(err, priorityqueue.ErrHeapIsEmpty) { + break + } + intest.Assert(false, "Failed to pop job from the queue", zap.Error(err)) + statslogutil.StatsLogger().Error("Failed to pop job from the queue", zap.Error(err)) + return false } if _, isRunning := currentRunningJobs[job.GetTableID()]; isRunning {