Skip to content

Commit

Permalink
statistics: add AnalysisPriorityQueueV2
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <29879298+Rustin170506@users.noreply.github.com>
  • Loading branch information
Rustin170506 committed Sep 10, 2024
1 parent 2d1ed88 commit 3b5bea5
Show file tree
Hide file tree
Showing 8 changed files with 902 additions and 35 deletions.
70 changes: 35 additions & 35 deletions pkg/statistics/handle/autoanalyze/internal/heap/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ const (
type LessFunc[T any] func(T, T) bool

// KeyFunc is used to generate a key for an object.
type KeyFunc[T any, K comparable] func(T) (K, error)
type KeyFunc[K comparable, T any] func(T) (K, error)

type heapItem[T any] struct {
type heapItem[K comparable, T any] struct {
obj T // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}

type itemKeyValue[T any, K comparable] struct {
obj T
type itemKeyValue[K comparable, T any] struct {
key K
obj T
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData[T any, K comparable] struct {
items map[K]*heapItem[T]
keyFunc KeyFunc[T, K]
type heapData[K comparable, T any] struct {
items map[K]*heapItem[K, T]
keyFunc KeyFunc[K, T]
lessFunc LessFunc[T]
queue []K
}
Expand All @@ -60,7 +60,7 @@ var (
)

// Less is a standard heap interface function.
func (h *heapData[T, K]) Less(i, j int) bool {
func (h *heapData[K, T]) Less(i, j int) bool {
if i >= len(h.queue) || j >= len(h.queue) {
return false
}
Expand All @@ -76,10 +76,10 @@ func (h *heapData[T, K]) Less(i, j int) bool {
}

// Len is a standard heap interface function.
func (h *heapData[T, K]) Len() int { return len(h.queue) }
func (h *heapData[K, T]) Len() int { return len(h.queue) }

// Swap is a standard heap interface function.
func (h *heapData[T, K]) Swap(i, j int) {
func (h *heapData[K, T]) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
Expand All @@ -88,15 +88,15 @@ func (h *heapData[T, K]) Swap(i, j int) {
}

// Push is a standard heap interface function.
func (h *heapData[T, K]) Push(kv any) {
keyValue := kv.(*itemKeyValue[T, K])
func (h *heapData[K, T]) Push(kv any) {
keyValue := kv.(*itemKeyValue[K, T])
n := len(h.queue)
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.items[keyValue.key] = &heapItem[K, T]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}

// Pop is a standard heap interface function.
func (h *heapData[T, K]) Pop() any {
func (h *heapData[K, T]) Pop() any {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[:len(h.queue)-1]
item, ok := h.items[key]
Expand All @@ -108,23 +108,23 @@ func (h *heapData[T, K]) Pop() any {
}

// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
type Heap[T any, K comparable] struct {
data *heapData[T, K]
type Heap[K comparable, T any] struct {
data *heapData[K, T]
cond sync.Cond
lock sync.RWMutex
closed bool
}

// Close closes the heap.
func (h *Heap[T, K]) Close() {
func (h *Heap[K, T]) Close() {
h.lock.Lock()
defer h.lock.Unlock()
h.closed = true
h.cond.Broadcast()
}

// Add adds an object or updates it if it already exists.
func (h *Heap[T, K]) Add(obj T) error {
func (h *Heap[K, T]) Add(obj T) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -145,7 +145,7 @@ func (h *Heap[T, K]) Add(obj T) error {
}

// BulkAdd adds a list of objects to the heap.
func (h *Heap[T, K]) BulkAdd(list []T) error {
func (h *Heap[K, T]) BulkAdd(list []T) error {
h.lock.Lock()
defer h.lock.Unlock()
if h.closed {
Expand All @@ -168,7 +168,7 @@ func (h *Heap[T, K]) BulkAdd(list []T) error {
}

// AddIfNotPresent adds an object if it does not already exist.
func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
func (h *Heap[K, T]) AddIfNotPresent(obj T) error {
id, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -183,20 +183,20 @@ func (h *Heap[T, K]) AddIfNotPresent(obj T) error {
return nil
}

func (h *Heap[T, K]) addIfNotPresentLocked(key K, obj T) {
func (h *Heap[K, T]) addIfNotPresentLocked(key K, obj T) {
if _, exists := h.data.items[key]; exists {
return
}
heap.Push(h.data, &itemKeyValue[T, K]{obj, key})
heap.Push(h.data, &itemKeyValue[K, T]{key, obj})
}

// Update is an alias for Add.
func (h *Heap[T, K]) Update(obj T) error {
func (h *Heap[K, T]) Update(obj T) error {
return h.Add(obj)
}

// Delete removes an object from the heap.
func (h *Heap[T, K]) Delete(obj T) error {
func (h *Heap[K, T]) Delete(obj T) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return errors.Errorf("key error: %v", err)
Expand All @@ -211,7 +211,7 @@ func (h *Heap[T, K]) Delete(obj T) error {
}

// Peek returns the top object from the heap without removing it.
func (h *Heap[T, K]) Peek() (T, error) {
func (h *Heap[K, T]) Peek() (T, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if len(h.data.queue) == 0 {
Expand All @@ -222,7 +222,7 @@ func (h *Heap[T, K]) Peek() (T, error) {
}

// Pop removes the top object from the heap and returns it.
func (h *Heap[T, K]) Pop() (T, error) {
func (h *Heap[K, T]) Pop() (T, error) {
h.lock.Lock()
defer h.lock.Unlock()
for len(h.data.queue) == 0 {
Expand All @@ -241,7 +241,7 @@ func (h *Heap[T, K]) Pop() (T, error) {
}

// List returns a list of all objects in the heap.
func (h *Heap[T, K]) List() []T {
func (h *Heap[K, T]) List() []T {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]T, 0, len(h.data.items))
Expand All @@ -252,7 +252,7 @@ func (h *Heap[T, K]) List() []T {
}

// ListKeys returns a list of all keys in the heap.
func (h *Heap[T, K]) ListKeys() []K {
func (h *Heap[K, T]) ListKeys() []K {
h.lock.RLock()
defer h.lock.RUnlock()
list := make([]K, 0, len(h.data.items))
Expand All @@ -263,7 +263,7 @@ func (h *Heap[T, K]) ListKeys() []K {
}

// Get returns an object from the heap.
func (h *Heap[T, K]) Get(obj T) (T, bool, error) {
func (h *Heap[K, T]) Get(obj T) (T, bool, error) {
key, err := h.data.keyFunc(obj)
if err != nil {
var zero T
Expand All @@ -273,7 +273,7 @@ func (h *Heap[T, K]) Get(obj T) (T, bool, error) {
}

// GetByKey returns an object from the heap by key.
func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) {
func (h *Heap[K, T]) GetByKey(key K) (T, bool, error) {
h.lock.RLock()
defer h.lock.RUnlock()
item, exists := h.data.items[key]
Expand All @@ -285,17 +285,17 @@ func (h *Heap[T, K]) GetByKey(key K) (T, bool, error) {
}

// IsClosed returns true if the heap is closed.
func (h *Heap[T, K]) IsClosed() bool {
func (h *Heap[K, T]) IsClosed() bool {
h.lock.RLock()
defer h.lock.RUnlock()
return h.closed
}

// NewHeap returns a Heap which can be used to queue up items to process.
func NewHeap[T any, K comparable](keyFn KeyFunc[T, K], lessFn LessFunc[T]) *Heap[T, K] {
h := &Heap[T, K]{
data: &heapData[T, K]{
items: map[K]*heapItem[T]{},
func NewHeap[K comparable, T any](keyFn KeyFunc[K, T], lessFn LessFunc[T]) *Heap[K, T] {
h := &Heap[K, T]{
data: &heapData[K, T]{
items: map[K]*heapItem[K, T]{},
queue: []K{},
keyFunc: keyFn,
lessFunc: lessFn,
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "priorityqueue",
srcs = [
"build.go",
"calculator.go",
"dynamic_partitioned_table_analysis_job.go",
"interval.go",
"job.go",
"non_partitioned_table_analysis_job.go",
"queue.go",
"queue2.go",
"static_partitioned_table_analysis_job.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/internal/heap",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/logutil",
"//pkg/util/timeutil",
"@com_github_pkg_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
Expand Down
Loading

0 comments on commit 3b5bea5

Please sign in to comment.