-
Notifications
You must be signed in to change notification settings - Fork 2
/
merger.go
80 lines (65 loc) · 1.98 KB
/
merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package mapreduce
import (
"container/heap"
)
type mappedDataList struct {
data []MappedData
compare KeyHandler
}
func (a mappedDataList) Len() int { return len(a.data) }
func (a mappedDataList) Swap(i, j int) { a.data[i], a.data[j] = a.data[j], a.data[i] }
func (a mappedDataList) Less(i, j int) bool { return a.compare.Less(a.data[i].Key, a.data[j].Key) }
type mappedDataMergeItem struct {
iterator IntermediateStorageIterator
datum MappedData
}
type mappedDataMerger struct {
items []mappedDataMergeItem
compare KeyHandler
inited bool
}
func (a *mappedDataMerger) Len() int { return len(a.items) }
func (a *mappedDataMerger) Less(i, j int) bool {
return a.compare.Less(a.items[i].datum.Key, a.items[j].datum.Key)
}
func (a *mappedDataMerger) Swap(i, j int) { a.items[i], a.items[j] = a.items[j], a.items[i] }
func (a *mappedDataMerger) Push(x interface{}) { a.items = append(a.items, x.(mappedDataMergeItem)) }
func (a *mappedDataMerger) Pop() interface{} {
x := a.items[len(a.items)-1]
a.items = a.items[0 : len(a.items)-1]
return x
}
func (merger *mappedDataMerger) next() (*MappedData, error) {
if len(merger.items) == 0 {
return nil, nil
}
if !merger.inited {
heap.Init(merger)
merger.inited = true
}
item := heap.Pop(merger).(mappedDataMergeItem)
if newItem, exists, err := item.iterator.Next(); err != nil {
return nil, err
} else if exists {
heap.Push(merger, mappedDataMergeItem{item.iterator, newItem})
}
return &item.datum, nil
}
func (merger *mappedDataMerger) empty() bool {
return len(merger.items) == 0
}
func newMerger(comparator KeyHandler) *mappedDataMerger {
return &mappedDataMerger{
items: make([]mappedDataMergeItem, 0),
compare: comparator,
}
}
func (merger *mappedDataMerger) addSource(iterator IntermediateStorageIterator) error {
firstItem, exists, err := iterator.Next()
if err != nil {
return err
} else if exists {
merger.items = append(merger.items, mappedDataMergeItem{iterator, firstItem})
}
return nil
}