Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: port RangeSplitter #46447

Merged
merged 14 commits into from
Sep 5, 2023
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"file.go",
"iter.go",
"kv_reader.go",
"split.go",
"stat_reader.go",
"util.go",
"writer.go",
Expand Down Expand Up @@ -46,12 +47,13 @@ go_test(
"engine_test.go",
"file_test.go",
"iter_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 22,
shard_count = 28,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
46 changes: 32 additions & 14 deletions br/pkg/lightning/backend/external/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ import (
"encoding/binary"
)

// rangeProperty stores properties of a range:
// - key: the start key of the range.
// - offset: the start offset of the range in the file.
// - size: the size of the range.
// - keys: the number of keys in the range.
type rangeProperty struct {
key []byte
offset uint64
size uint64
keys uint64
firstKey []byte
lastKey []byte
offset uint64
size uint64
keys uint64
}

// decodeMultiProps is only used for test.
Expand All @@ -40,22 +46,31 @@ func decodeMultiProps(data []byte) []*rangeProperty {

func decodeProp(data []byte) *rangeProperty {
rp := &rangeProperty{}
keyLen := binary.BigEndian.Uint32(data[0:4])
rp.key = data[4 : 4+keyLen]
rp.size = binary.BigEndian.Uint64(data[4+keyLen : 12+keyLen])
rp.keys = binary.BigEndian.Uint64(data[12+keyLen : 20+keyLen])
rp.offset = binary.BigEndian.Uint64(data[20+keyLen : 28+keyLen])
n := 0
keyLen := int(binary.BigEndian.Uint32(data[n : n+4]))
n += 4
rp.firstKey = data[n : n+keyLen]
n += keyLen
keyLen = int(binary.BigEndian.Uint32(data[n : n+4]))
n += 4
rp.lastKey = data[n : n+keyLen]
n += keyLen
rp.size = binary.BigEndian.Uint64(data[n : n+8])
n += 8
rp.keys = binary.BigEndian.Uint64(data[n : n+8])
n += 8
rp.offset = binary.BigEndian.Uint64(data[n : n+8])
return rp
}

// keyLen + p.size + p.keys + p.offset
const propertyLengthExceptKey = 4 + 8 + 8 + 8
// keyLen * 2 + p.size + p.keys + p.offset
const propertyLengthExceptKeys = 4*2 + 8 + 8 + 8

func encodeMultiProps(buf []byte, props []*rangeProperty) []byte {
var propLen [4]byte
for _, p := range props {
binary.BigEndian.PutUint32(propLen[:],
uint32(propertyLengthExceptKey+len(p.key)))
uint32(propertyLengthExceptKeys+len(p.firstKey)+len(p.lastKey)))
buf = append(buf, propLen[:4]...)
buf = encodeProp(buf, p)
}
Expand All @@ -64,9 +79,12 @@ func encodeMultiProps(buf []byte, props []*rangeProperty) []byte {

func encodeProp(buf []byte, r *rangeProperty) []byte {
var b [8]byte
binary.BigEndian.PutUint32(b[:], uint32(len(r.key)))
binary.BigEndian.PutUint32(b[:], uint32(len(r.firstKey)))
buf = append(buf, b[:4]...)
buf = append(buf, r.key...)
buf = append(buf, r.firstKey...)
binary.BigEndian.PutUint32(b[:], uint32(len(r.lastKey)))
buf = append(buf, b[:4]...)
buf = append(buf, r.lastKey...)
binary.BigEndian.PutUint64(b[:], r.size)
buf = append(buf, b[:]...)
binary.BigEndian.PutUint64(b[:], r.keys)
Expand Down
18 changes: 13 additions & 5 deletions br/pkg/lightning/backend/external/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ import (

func TestRangePropertyCodec(t *testing.T) {
prop := &rangeProperty{
key: []byte("key"),
offset: 1,
size: 2,
keys: 3,
firstKey: []byte("key"),
lastKey: []byte("key2"),
offset: 1,
size: 2,
keys: 3,
}
buf := encodeProp(nil, prop)
prop2 := decodeProp(buf)
require.EqualValues(t, prop, prop2)

p1, p2, p3 := &rangeProperty{}, &rangeProperty{}, &rangeProperty{}
for i, p := range []*rangeProperty{p1, p2, p3} {
p.key = []byte(fmt.Sprintf("key%d", i))
p.firstKey = []byte(fmt.Sprintf("key%d", i))
p.lastKey = []byte(fmt.Sprintf("key%d9", i))
p.offset = uint64(10 * i)
p.size = uint64(20 * i)
p.keys = uint64(30 * i)
Expand All @@ -43,3 +45,9 @@ func TestRangePropertyCodec(t *testing.T) {
props := decodeMultiProps(buf)
require.EqualValues(t, []*rangeProperty{p1, p2, p3}, props)
}

func TestPropertyLengthExceptKeys(t *testing.T) {
zero := &rangeProperty{}
bs := encodeProp(nil, zero)
require.EqualValues(t, propertyLengthExceptKeys, len(bs))
}
25 changes: 16 additions & 9 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,43 @@ func NewKeyValueStore(
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
kvLen := len(key) + len(value) + 16
var b [8]byte
var (
b [8]byte
kvLen = 0
)

// data layout: keyLen + key + valueLen + value
_, err := s.dataWriter.Write(
n, err := s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(key))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, key)
kvLen += n
n, err = s.dataWriter.Write(s.ctx, key)
if err != nil {
return err
}
_, err = s.dataWriter.Write(
kvLen += n
n, err = s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(value))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, value)
kvLen += n
n, err = s.dataWriter.Write(s.ctx, value)
if err != nil {
return err
}
kvLen += n

if len(s.rc.currProp.key) == 0 {
s.rc.currProp.key = key
if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key

s.offset += uint64(kvLen)
s.rc.currProp.size += uint64(len(key) + len(value))
Expand All @@ -97,7 +104,7 @@ func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
newProp := *s.rc.currProp
s.rc.props = append(s.rc.props, &newProp)

s.rc.currProp.key = nil
s.rc.currProp.firstKey = nil
s.rc.currProp.offset = s.offset
s.rc.currProp.keys = 0
s.rc.currProp.size = 0
Expand Down
36 changes: 20 additions & 16 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
firstKey: k1,
lastKey: k2,
offset: 0,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
}
require.Equal(t, expected, rc.props[0])
encoded = rc.encode()
Expand All @@ -74,10 +75,11 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
kvStore.Close()
expected = &rangeProperty{
key: k3,
offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16),
size: uint64(len(k3) + len(v3)),
keys: 1,
firstKey: k3,
lastKey: k3,
offset: uint64(len(k1) + len(v1) + 16 + len(k2) + len(v2) + 16),
size: uint64(len(k3) + len(v3)),
keys: 1,
}
require.Len(t, rc.props, 2)
require.Equal(t, expected, rc.props[1])
Expand All @@ -95,21 +97,23 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
key: k1,
offset: 0,
size: uint64(len(k1) + len(v1)),
keys: 1,
firstKey: k1,
lastKey: k1,
offset: 0,
size: uint64(len(k1) + len(v1)),
keys: 1,
}
require.Equal(t, expected, rc.props[0])

err = kvStore.AddKeyValue(k2, v2)
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
key: k2,
offset: uint64(len(k1) + len(v1) + 16),
size: uint64(len(k2) + len(v2)),
keys: 1,
firstKey: k2,
lastKey: k2,
offset: uint64(len(k1) + len(v1) + 16),
size: uint64(len(k2) + len(v2)),
keys: 1,
}
require.Equal(t, expected, rc.props[1])
kvStore.Close()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (i *MergeKVIter) Close() error {
}

func (p rangeProperty) sortKey() []byte {
return p.key
return p.firstKey
}

type statReaderProxy struct {
Expand Down
Loading
Loading