Skip to content

Commit

Permalink
merkleDB -- add inner heap type to syncWorkHeap (#1582)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Laine committed Jun 9, 2023
1 parent 6dad1d4 commit 54d1022
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 133 deletions.
77 changes: 38 additions & 39 deletions x/sync/syncworkheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import (
"github.com/google/btree"
)

var _ heap.Interface = (*syncWorkHeap)(nil)
var _ heap.Interface = (*innerHeap)(nil)

type heapItem struct {
workItem *syncWorkItem
heapIndex int
}

type innerHeap []*heapItem

// A priority queue of syncWorkItems.
// Note that work item ranges never overlap.
// Supports range merging and priority updating.
// Not safe for concurrent use.
type syncWorkHeap struct {
// Max heap of items by priority.
// i.e. heap.Pop returns highest priority item.
priorityHeap []*heapItem
innerHeap innerHeap
// The heap items sorted by range start.
// A nil start is considered to be the smallest.
sortedItems *btree.BTreeG[*heapItem]
Expand All @@ -35,7 +37,6 @@ type syncWorkHeap struct {

func newSyncWorkHeap() *syncWorkHeap {
return &syncWorkHeap{
priorityHeap: make([]*heapItem, 0),
sortedItems: btree.NewG(
2,
func(a, b *heapItem) bool {
Expand All @@ -56,7 +57,10 @@ func (wh *syncWorkHeap) Insert(item *syncWorkItem) {
return
}

heap.Push(wh, &heapItem{workItem: item})
wrappedItem := &heapItem{workItem: item}

heap.Push(&wh.innerHeap, wrappedItem)
wh.sortedItems.ReplaceOrInsert(wrappedItem)
}

// Pops and returns a work item from the heap.
Expand All @@ -65,7 +69,9 @@ func (wh *syncWorkHeap) GetWork() *syncWorkItem {
if wh.closed || wh.Len() == 0 {
return nil
}
return heap.Pop(wh).(*heapItem).workItem
item := heap.Pop(&wh.innerHeap).(*heapItem)
wh.sortedItems.Delete(item)
return item.workItem
}

// Insert the item into the heap, merging it with existing items
Expand Down Expand Up @@ -97,7 +103,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
// merged into [beforeItem.start, item.end]
beforeItem.workItem.end = item.end
beforeItem.workItem.priority = math.Max(item.priority, beforeItem.workItem.priority)
heap.Fix(wh, beforeItem.heapIndex)
heap.Fix(&wh.innerHeap, beforeItem.heapIndex)
mergedBefore = beforeItem
}
return false
Expand All @@ -113,7 +119,7 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
// [item.start, afterItem.end].
afterItem.workItem.start = item.start
afterItem.workItem.priority = math.Max(item.priority, afterItem.workItem.priority)
heap.Fix(wh, afterItem.heapIndex)
heap.Fix(&wh.innerHeap, afterItem.heapIndex)
mergedAfter = afterItem
}
return false
Expand All @@ -128,61 +134,54 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) {
wh.remove(mergedAfter)
// update the priority
mergedBefore.workItem.priority = math.Max(mergedBefore.workItem.priority, mergedAfter.workItem.priority)
heap.Fix(wh, mergedBefore.heapIndex)
heap.Fix(&wh.innerHeap, mergedBefore.heapIndex)
}

// nothing was merged, so add new item to the heap
if mergedBefore == nil && mergedAfter == nil {
// We didn't merge [item] with an existing one; put it in the heap.
heap.Push(wh, &heapItem{workItem: item})
wh.Insert(item)
}
}

// Deletes [item] from the heap.
func (wh *syncWorkHeap) remove(item *heapItem) {
oldIndex := item.heapIndex
newLength := len(wh.priorityHeap) - 1
heap.Remove(&wh.innerHeap, item.heapIndex)

// swap with last item, delete item, then fix heap if required
wh.Swap(newLength, item.heapIndex)
wh.priorityHeap[newLength] = nil
wh.priorityHeap = wh.priorityHeap[:newLength]

// the item was already the last item, so nothing needs to be fixed
if oldIndex != newLength {
heap.Fix(wh, oldIndex)
}
wh.sortedItems.Delete(item)
}

func (wh *syncWorkHeap) Len() int {
return wh.innerHeap.Len()
}

// below this line are the implementations required for heap.Interface

func (wh *syncWorkHeap) Len() int {
return len(wh.priorityHeap)
func (h innerHeap) Len() int {
return len(h)
}

func (wh *syncWorkHeap) Less(i int, j int) bool {
return wh.priorityHeap[i].workItem.priority > wh.priorityHeap[j].workItem.priority
func (h innerHeap) Less(i int, j int) bool {
return h[i].workItem.priority > h[j].workItem.priority
}

func (wh *syncWorkHeap) Swap(i int, j int) {
wh.priorityHeap[i], wh.priorityHeap[j] = wh.priorityHeap[j], wh.priorityHeap[i]
wh.priorityHeap[i].heapIndex = i
wh.priorityHeap[j].heapIndex = j
func (h innerHeap) Swap(i int, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIndex = i
h[j].heapIndex = j
}

func (wh *syncWorkHeap) Pop() interface{} {
newLength := len(wh.priorityHeap) - 1
value := wh.priorityHeap[newLength]
wh.priorityHeap[newLength] = nil
wh.priorityHeap = wh.priorityHeap[:newLength]
wh.sortedItems.Delete(value)
return value
func (h *innerHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return item
}

func (wh *syncWorkHeap) Push(x interface{}) {
func (h *innerHeap) Push(x interface{}) {
item := x.(*heapItem)
item.heapIndex = len(wh.priorityHeap)
wh.priorityHeap = append(wh.priorityHeap, item)
wh.sortedItems.ReplaceOrInsert(item)
item.heapIndex = len(*h)
*h = append(*h, item)
}
Loading

0 comments on commit 54d1022

Please sign in to comment.