Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merkleDB -- add inner heap type to syncWorkHeap #1582

Merged
merged 13 commits into from
Jun 9, 2023
77 changes: 38 additions & 39 deletions x/sync/syncworkheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import (
"github.com/google/btree"
)

var _ heap.Interface = (*syncWorkHeap)(nil)
var _ heap.Interface = (*innerHeap)(nil)

type heapItem struct {
workItem *syncWorkItem
heapIndex int
}

type innerHeap []*heapItem

// A priority queue of syncWorkItems.
// Note that work item ranges never overlap.
// Supports range merging and priority updating.
// Not safe for concurrent use.
type syncWorkHeap struct {
// Max heap of items by priority.
// i.e. heap.Pop returns highest priority item.
priorityHeap []*heapItem
innerHeap innerHeap
// The heap items sorted by range start.
// A nil start is considered to be the smallest.
sortedItems *btree.BTreeG[*heapItem]
Expand All @@ -35,7 +37,6 @@ type syncWorkHeap struct {

func newSyncWorkHeap() *syncWorkHeap {
return &syncWorkHeap{
priorityHeap: make([]*heapItem, 0),
sortedItems: btree.NewG(
2,
func(a, b *heapItem) bool {
Expand All @@ -56,7 +57,10 @@ func (wh *syncWorkHeap) Insert(item *syncWorkItem) {
return
}

heap.Push(wh, &heapItem{workItem: item})
wrappedItem := &heapItem{workItem: item}

heap.Push(&wh.innerHeap, wrappedItem)
wh.sortedItems.ReplaceOrInsert(wrappedItem)
}

// Pops and returns a work item from the heap.
Expand All @@ -65,7 +69,9 @@ func (wh *syncWorkHeap) GetWork() *syncWorkItem {
if wh.closed || wh.Len() == 0 {
return nil
}
return heap.Pop(wh).(*heapItem).workItem
item := heap.Pop(&wh.innerHeap).(*heapItem)
wh.sortedItems.Delete(item)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

innerHeap.Pop doesn't call wh.sortedItems.Delete(item) so we do it here now

return item.workItem
}

// Insert the item into the heap, merging it with existing items
Expand Down Expand Up @@ -97,7 +103,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
// merged into [beforeItem.start, item.end]
beforeItem.workItem.end = item.end
beforeItem.workItem.priority = math.Max(item.priority, beforeItem.workItem.priority)
heap.Fix(wh, beforeItem.heapIndex)
heap.Fix(&wh.innerHeap, beforeItem.heapIndex)
mergedBefore = beforeItem
}
return false
Expand All @@ -113,7 +119,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
// [item.start, afterItem.end].
afterItem.workItem.start = item.start
afterItem.workItem.priority = math.Max(item.priority, afterItem.workItem.priority)
heap.Fix(wh, afterItem.heapIndex)
heap.Fix(&wh.innerHeap, afterItem.heapIndex)
mergedAfter = afterItem
}
return false
Expand All @@ -128,61 +134,54 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
wh.remove(mergedAfter)
// update the priority
mergedBefore.workItem.priority = math.Max(mergedBefore.workItem.priority, mergedAfter.workItem.priority)
heap.Fix(wh, mergedBefore.heapIndex)
heap.Fix(&wh.innerHeap, mergedBefore.heapIndex)
}

// nothing was merged, so add new item to the heap
if mergedBefore == nil && mergedAfter == nil {
// We didn't merge [item] with an existing one; put it in the heap.
heap.Push(wh, &heapItem{workItem: item})
wh.Insert(item)
}
}

// Deletes [item] from the heap.
func (wh *syncWorkHeap) remove(item *heapItem) {
oldIndex := item.heapIndex
newLength := len(wh.priorityHeap) - 1
heap.Remove(&wh.innerHeap, item.heapIndex)

// swap with last item, delete item, then fix heap if required
wh.Swap(newLength, item.heapIndex)
wh.priorityHeap[newLength] = nil
wh.priorityHeap = wh.priorityHeap[:newLength]

// the item was already the last item, so nothing needs to be fixed
if oldIndex != newLength {
heap.Fix(wh, oldIndex)
}
wh.sortedItems.Delete(item)
}

func (wh *syncWorkHeap) Len() int {
return wh.innerHeap.Len()
}

// below this line are the implementations required for heap.Interface

func (wh *syncWorkHeap) Len() int {
return len(wh.priorityHeap)
func (h innerHeap) Len() int {
return len(h)
}

func (wh *syncWorkHeap) Less(i int, j int) bool {
return wh.priorityHeap[i].workItem.priority > wh.priorityHeap[j].workItem.priority
func (h innerHeap) Less(i int, j int) bool {
return h[i].workItem.priority > h[j].workItem.priority
}

func (wh *syncWorkHeap) Swap(i int, j int) {
wh.priorityHeap[i], wh.priorityHeap[j] = wh.priorityHeap[j], wh.priorityHeap[i]
wh.priorityHeap[i].heapIndex = i
wh.priorityHeap[j].heapIndex = j
func (h innerHeap) Swap(i int, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIndex = i
h[j].heapIndex = j
}

func (wh *syncWorkHeap) Pop() interface{} {
newLength := len(wh.priorityHeap) - 1
value := wh.priorityHeap[newLength]
wh.priorityHeap[newLength] = nil
wh.priorityHeap = wh.priorityHeap[:newLength]
wh.sortedItems.Delete(value)
return value
func (h *innerHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return item
}

func (wh *syncWorkHeap) Push(x interface{}) {
func (h *innerHeap) Push(x interface{}) {
item := x.(*heapItem)
item.heapIndex = len(wh.priorityHeap)
wh.priorityHeap = append(wh.priorityHeap, item)
wh.sortedItems.ReplaceOrInsert(item)
item.heapIndex = len(*h)
*h = append(*h, item)
}
Loading