Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/lightning: add byte reader implementation #45724

Merged
merged 12 commits into from
Aug 4, 2023
26 changes: 26 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "external",
srcs = ["byte_reader.go"],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/storage",
"//util/logutil",
"//util/mathutil",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "external_test",
timeout = "short",
srcs = ["byte_reader_test.go"],
embed = [":external"],
flaky = True,
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
162 changes: 162 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"io"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
)

type byteReader struct {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context
storageReader storage.ReadSeekCloser

buf []byte
bufOffset int

auxBuf []byte
cowSlice [][]byte // copy-on-write slices
isEOF bool
}

func openStoreReaderAndSeek(
ctx context.Context,
store storage.ExternalStorage,
name string,
initFileOffset uint64,
) (storage.ReadSeekCloser, error) {
storageReader, err := store.Open(ctx, name)
if err != nil {
return nil, err
}
_, err = storageReader.Seek(int64(initFileOffset), io.SeekStart)
if err != nil {
return nil, err
}
return storageReader, nil
}

func newByteReader(ctx context.Context, storageReader storage.ReadSeekCloser, bufSize int) *byteReader {
return &byteReader{
ctx: ctx,
storageReader: storageReader,
buf: make([]byte, bufSize),
bufOffset: 0,
}
}

type slice interface {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
get() []byte
}

type commonSlice []byte

func (c commonSlice) get() []byte {
return c
}

type lazySlice struct {
cowIdx int
byteReader *byteReader
}

func (s lazySlice) get() []byte {
return s.byteReader.cowSlice[s.cowIdx]
}

// sliceNext reads the next n bytes from the reader and returns a buffer slice containing those bytes.
func (r *byteReader) sliceNext(n int) (slice, error) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
b := r.next(n)
readLen := len(b)
if readLen == n {
return mkLazySlice(r, b), nil
}
// If the reader has fewer than n bytes remaining in current buffer,
// `auxBuf` is used as a container instead.
if cap(r.auxBuf) < n {
r.auxBuf = make([]byte, n)
}
r.auxBuf = r.auxBuf[:n]
copy(r.auxBuf, b)
for readLen < n {
r.cloneSlices()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
err := r.reload()
if err != nil {
return nil, err
}
b = r.next(n - readLen)
copy(r.auxBuf[readLen:], b)
readLen += len(b)
}
return commonSlice(r.auxBuf), nil
}

func mkLazySlice(r *byteReader, s []byte) lazySlice {
r.cowSlice = append(r.cowSlice, s)
return lazySlice{
cowIdx: len(r.cowSlice) - 1,
byteReader: r,
}
}

func (r *byteReader) reset() {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not important: we can also set elements to nil before truncate the retPointers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also add more reset() in unit tests

r.cowSlice = r.cowSlice[:0]
}

func (r *byteReader) cloneSlices() {
for i := range r.cowSlice {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
r.cowSlice[i] = append([]byte(nil), r.cowSlice[i]...)
}
}

func (r *byteReader) eof() bool {
return r.isEOF && len(r.buf) == r.bufOffset
}

func (r *byteReader) next(n int) []byte {
end := mathutil.Min(r.bufOffset+n, len(r.buf))
tangenta marked this conversation as resolved.
Show resolved Hide resolved
ret := r.buf[r.bufOffset:end]
r.bufOffset += len(ret)
return ret
}

func (r *byteReader) reload() error {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
nBytes, err := io.ReadFull(r.storageReader, r.buf[0:])
if err == io.EOF {
logutil.Logger(r.ctx).Error("unexpected EOF")
tangenta marked this conversation as resolved.
Show resolved Hide resolved
r.isEOF = true
return err
} else if err != nil && err == io.ErrUnexpectedEOF {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
r.isEOF = true
} else if err != nil {
logutil.Logger(r.ctx).Warn("other error during reading from external storage", zap.Error(err))
return err
}
r.bufOffset = 0
if nBytes < len(r.buf) {
// The last batch.
r.buf = r.buf[:nBytes]
}
return nil
}

func (r *byteReader) Close() error {
return r.storageReader.Close()
}
165 changes: 165 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"io"
"testing"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

// mockExtStore is only used for test.
type mockExtStore struct {
src []byte
idx uint64
}

func (s *mockExtStore) Read(p []byte) (n int, err error) {
// Read from src to p.
if s.idx >= uint64(len(s.src)) {
return 0, io.EOF
}
n = copy(p, s.src[s.idx:])
s.idx += uint64(n)
return n, nil
}

func (s *mockExtStore) Seek(_ int64, _ int) (int64, error) {
return 0, errors.Errorf("unsupported operation")
}

func (s *mockExtStore) Close() error {
return nil
}

func TestByteReader(t *testing.T) {
// Test basic next() usage.
br := newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, br.reload())
x := br.next(1)
require.Equal(t, 1, len(x))
require.Equal(t, byte('a'), x[0])
x = br.next(2)
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[0])
require.Equal(t, byte('c'), x[1])
require.NoError(t, br.reload())
require.False(t, br.eof())
require.Error(t, br.reload())
require.False(t, br.eof()) // Data in buffer is not consumed.
br.next(2)
require.True(t, br.eof())
require.NoError(t, br.Close())

// Test basic sliceNext() usage.
br = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, br.reload())
y, err := br.sliceNext(2)
require.NoError(t, err)
x = y.get()
require.Equal(t, 2, len(x))
require.Equal(t, byte('a'), x[0])
require.Equal(t, byte('b'), x[1])
require.NoError(t, br.Close())

br = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, br.reload())
y, err = br.sliceNext(5) // Read all the data.
require.NoError(t, err)
x = y.get()
require.Equal(t, 5, len(x))
require.Equal(t, byte('e'), x[4])
require.NoError(t, br.Close())

br = newByteReader(context.Background(), &mockExtStore{src: []byte("abcde")}, 3)
require.NoError(t, br.reload())
_, err = br.sliceNext(7) // EOF
require.Error(t, err)

ms := &mockExtStore{src: []byte("abcdef")}
br = newByteReader(context.Background(), ms, 2)
require.NoError(t, br.reload())
y, err = br.sliceNext(3)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
x = y.get()
require.Equal(t, 3, len(x))
require.Equal(t, byte('c'), x[2])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("abcdef")}
br = newByteReader(context.Background(), ms, 2)
require.NoError(t, br.reload())
y, err = br.sliceNext(2)
require.NoError(t, err)
// Pollute mockExtStore to verify if the slice is not affected.
for i, b := range []byte{'x', 'y', 'z'} {
ms.src[i] = b
}
x = y.get()
require.Equal(t, 2, len(x))
require.Equal(t, byte('b'), x[1])
require.NoError(t, br.Close())
}

func TestByteReaderClone(t *testing.T) {
ms := &mockExtStore{src: []byte("0123456789")}
br := newByteReader(context.Background(), ms, 4)
require.NoError(t, br.reload())
y1, err := br.sliceNext(2)
require.NoError(t, err)
y2, err := br.sliceNext(1)
require.NoError(t, err)
x1, x2 := y1.get(), y2.get()
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
require.NoError(t, br.reload()) // Perform a reload to overwrite buffer.
x1, x2 = y1.get(), y2.get()
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('4'), x1[0]) // Verify if the buffer is overwritten.
require.Equal(t, byte('6'), x2[0])
require.NoError(t, br.Close())

ms = &mockExtStore{src: []byte("0123456789")}
br = newByteReader(context.Background(), ms, 4)
require.NoError(t, br.reload())
y1, err = br.sliceNext(2)
require.NoError(t, err)
y2, err = br.sliceNext(1)
require.NoError(t, err)
x1, x2 = y1.get(), y2.get()
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0])
require.Equal(t, byte('2'), x2[0])
br.cloneSlices()
require.NoError(t, br.reload()) // Perform a reload to overwrite buffer.
x1, x2 = y1.get(), y2.get()
require.Len(t, x1, 2)
require.Len(t, x2, 1)
require.Equal(t, byte('0'), x1[0]) // Verify if the buffer is NOT overwritten.
require.Equal(t, byte('2'), x2[0])
br.reset()
require.NoError(t, br.Close())
}