-
Notifications
You must be signed in to change notification settings - Fork 0
/
fileblock.go
124 lines (98 loc) · 2.52 KB
/
fileblock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package streedb
import (
"cmp"
"errors"
"fmt"
"slices"
"strings"
)
type FileblockCreator[O cmp.Ordered] interface {
NewFileblock(es *EntriesMap[O], builder *MetadataBuilder[O]) error
}
type FileblockListener[O cmp.Ordered] interface {
OnFileblockCreated(*Fileblock[O])
OnFileblockRemoved(*Fileblock[O])
}
func NewFileblock[O cmp.Ordered](cfg *Config, meta *MetaFile[O], filesystem Filesystem[O]) *Fileblock[O] {
return &Fileblock[O]{
MetaFile: *meta,
cfg: cfg,
filesystem: filesystem,
}
}
type Fileblock[O cmp.Ordered] struct {
MetaFile[O]
cfg *Config
filesystem Filesystem[O]
}
func (l *Fileblock[O]) Load() (*EntriesMap[O], error) {
return l.filesystem.Load(l)
}
func (l *Fileblock[O]) Find(v Entry[O]) bool {
for _, rowGroup := range l.Rows {
if EntryFallsInsideMinMax(rowGroup.Min, rowGroup.Max, v.Min()) {
return true
}
}
return false
}
func (l *Fileblock[O]) Metadata() *MetaFile[O] {
return &l.MetaFile
}
func (l *Fileblock[O]) Close() error {
return nil
}
func (l *Fileblock[O]) UUID() string {
return l.Uuid
}
func (l *Fileblock[O]) PrimaryIndex() string {
return l.PrimaryIdx
}
func (l *Fileblock[O]) SecondaryIndex() string {
sIdx := make([]string, 0, len(l.Rows))
for _, row := range l.Rows {
sIdx = append(sIdx, row.SecondaryIdx)
}
slices.Sort(sIdx)
return strings.Join(sIdx, ",")
}
func (l *Fileblock[O]) Equals(other Comparable[O]) bool {
return l.UUID() == other.UUID()
}
func (l *Fileblock[O]) LessThan(other Comparable[O]) bool {
if l.Min == nil {
return false
}
f, ok := other.(*Fileblock[O])
if !ok {
return false
}
return *l.Min < *f.Min
}
func Merge[O cmp.Ordered](a *Fileblock[O], b ...*Fileblock[O]) (*MetadataBuilder[O], *EntriesMap[O], error) {
entries, err := a.Load()
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to load block '%s'", a.Metadata().DataFilepath), err)
}
builder := NewMetadataBuilder[O](a.cfg)
var res *EntriesMap[O]
for _, c := range b {
entries2, err := c.Load()
if err != nil {
return nil, nil, errors.Join(fmt.Errorf("failed to load block '%s'", c.Metadata().DataFilepath), err)
}
res, err = entries.Merge(entries2)
if err != nil {
return nil, nil, errors.Join(errors.New("failed to merge entries"), err)
}
higherLevel := a.Metadata().Level
if c.Metadata().Level > higherLevel {
higherLevel = c.Metadata().Level
}
builder.WithLevel(higherLevel).
WithPrimaryIndex(a.PrimaryIdx).
WithMin(*a.Min).WithMin(*c.Min).
WithMax(*a.Max).WithMax(*c.Max)
}
return builder, res, nil
}