Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: sample files and pre-allocate rowID before restoring chunk #34288

Merged
merged 29 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
411ba4e
feat: sample files and pre-alloc id
buchuitoudegou May 5, 2022
d2b4ac7
fix: region_test
buchuitoudegou May 6, 2022
6c8574e
revert: dyn alloc row-id
buchuitoudegou May 6, 2022
aae8c91
fix: resolve conflict
buchuitoudegou May 6, 2022
f0dff5d
Merge branch 'master' of https://github.com/pingcap/tidb into fix-aut…
buchuitoudegou May 6, 2022
ab9db32
fix: ut
buchuitoudegou May 7, 2022
6638d9c
try fix it
buchuitoudegou May 7, 2022
2fca896
feat: realloc id when run out
buchuitoudegou May 11, 2022
9251a9c
fix: parallel import
buchuitoudegou May 23, 2022
ca863ea
fix: parallel import
buchuitoudegou May 24, 2022
6a5fba1
fix lint
buchuitoudegou May 24, 2022
d62c98e
fix it
buchuitoudegou May 24, 2022
a29c3bd
fix comment
buchuitoudegou May 27, 2022
4b5b415
add comment for reallocRowID
buchuitoudegou May 27, 2022
640ec4b
Merge branch 'master' of https://github.com/pingcap/tidb into fix-aut…
buchuitoudegou May 30, 2022
b36b531
fix lint
buchuitoudegou May 30, 2022
4df0f47
refine comment
buchuitoudegou May 30, 2022
e2b32c7
Merge branch 'master' into fix-auto-incr
buchuitoudegou May 31, 2022
c338f47
fix: comment
buchuitoudegou Jun 6, 2022
6f7a44f
fix: return newRowIDBase & fix comments
buchuitoudegou Jun 13, 2022
3c83880
rowID overflow check
buchuitoudegou Jun 13, 2022
de368c8
Merge branch 'master' into fix-auto-incr
buchuitoudegou Jun 13, 2022
53ef112
Merge branch 'master' of https://github.com/pingcap/tidb into fix-aut…
buchuitoudegou Jun 14, 2022
c31335b
fix: compile error
buchuitoudegou Jun 14, 2022
6246695
fix ut
buchuitoudegou Jun 14, 2022
3bfd82c
fix it
buchuitoudegou Jun 14, 2022
5a839c9
add log & remove reduntant test
buchuitoudegou Jun 16, 2022
839b857
Merge branch 'master' into fix-auto-incr
ti-chi-bot Jun 16, 2022
92f2e52
Merge branch 'master' into fix-auto-incr
ti-chi-bot Jun 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
58 changes: 57 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1000,6 +1001,21 @@ type Writer struct {
batchSize int64

lastMetaSeq int32
prevRowID int64 // only used for appendRowsSorted
}

func (w *Writer) flushAndNewWriter() error {
var err error
err = w.flush(context.Background())
if err != nil {
return errors.Trace(err)
}
newWriter, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer = newWriter
return nil
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand All @@ -1010,6 +1026,17 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
w.writer = writer
}
if len(kvs) == 0 {
return nil
}
if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 {
// rowID leap. probably re-alloc id
// should write to different sst
err := w.flushAndNewWriter()
if err != nil {
return err
}
}

keyAdapter := w.engine.keyAdapter
totalKeySize := 0
Expand All @@ -1034,7 +1061,33 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
kvs = newKvs
}
return w.writer.writeKVs(kvs)
tempKvs := make([]common.KvPair, 0)
tempKvs = append(tempKvs, kvs[0])
buchuitoudegou marked this conversation as resolved.
Show resolved Hide resolved
w.prevRowID = kvs[0].RowID
for i := 1; i < len(kvs); i++ {
if kvs[i].RowID > kvs[i-1].RowID+1 {
// leap id
err := w.writer.writeKVs(tempKvs)
if err != nil {
return err
}
err = w.flushAndNewWriter()
if err != nil {
return err
}
tempKvs = make([]common.KvPair, 0)
tempKvs = append(tempKvs, kvs[i])
} else {
tempKvs = append(tempKvs, kvs[i])
}
if i == len(kvs)-1 {
w.prevRowID = kvs[i].RowID
}
}
if len(tempKvs) > 0 {
return w.writer.writeKVs(tempKvs)
}
return nil
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand Down Expand Up @@ -1101,6 +1154,9 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
}

func (w *Writer) flush(ctx context.Context) error {
failpoint.Inject("MockFlushWriter", func() {
failpoint.Return(nil)
})
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
Expand Down
110 changes: 110 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/common"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
Expand Down Expand Up @@ -83,3 +85,111 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}

func TestAutoSplitSST(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter"))
}()
var err error
dir := os.TempDir()
w := &Writer{
engine: &Engine{
sstDir: dir,
keyAdapter: noopKeyAdapter{},
},
isKVSorted: true,
isWriteBatchSorted: true,
}
w.engine.closed.Store(false)
w.writer, err = w.createSSTWriter()
require.Nil(t, err)
kvs := []common.KvPair{
{
Key: []byte("1"),
Val: []byte("val1"),
RowID: 1,
},
{
Key: []byte("2"),
Val: []byte("val1"),
RowID: 2,
},
}
prevWriter := w.writer
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.True(t, prevWriter == w.writer)
kvs = []common.KvPair{
{
Key: []byte("10"),
Val: []byte("val10"),
RowID: 10,
},
{
Key: []byte("11"),
Val: []byte("val11"),
RowID: 11,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("12"),
Val: []byte("val12"),
RowID: 10,
},
{
Key: []byte("13"),
Val: []byte("val13"),
RowID: 11,
},
{
Key: []byte("15"),
Val: []byte("val15"),
RowID: 15,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("16"),
Val: []byte("val16"),
RowID: 16,
},
{
Key: []byte("17"),
Val: []byte("val17"),
RowID: 17,
},
{
Key: []byte("19"),
Val: []byte("val19"),
RowID: 19,
},
{
Key: []byte("20"),
Val: []byte("val20"),
RowID: 20,
},
{
Key: []byte("22"),
Val: []byte("val22"),
RowID: 22,
},
{
Key: []byte("23"),
Val: []byte("val23"),
RowID: 22,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
}
63 changes: 63 additions & 0 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
buchuitoudegou marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
Expand Down Expand Up @@ -271,6 +272,11 @@ func makeSourceFileRegion(
if !isCsvFile {
divisor += 2
}
sizePerRow, err := GetSampledAvgRowSize(&fi, cfg, ioWorkers, store)
if err == nil && sizePerRow != 0 {
buchuitoudegou marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err))
divisor = sizePerRow
}
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
Expand All @@ -292,6 +298,10 @@ func makeSourceFileRegion(
RowIDMax: fi.FileMeta.FileSize / divisor,
},
}
failpoint.Inject("MockInaccurateRowID", func() {
// only allocates 5 rows but contains 10 rows
tableRegion.Chunk.RowIDMax = 5
})

if tableRegion.Size() > tableRegionSizeWarningThreshold {
log.L().Warn(
Expand All @@ -302,6 +312,56 @@ func makeSourceFileRegion(
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
}

func GetSampledAvgRowSize(
fileInfo *FileInfo,
cfg *config.Config,
ioWorkers *worker.Pool,
store storage.ExternalStorage,
) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
reader, err := store.Open(ctx, fileInfo.FileMeta.Path)
if err != nil {
return 0, err
}
var parser Parser
switch fileInfo.FileMeta.Type {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
case SourceTypeCSV:
hasHeader := cfg.Mydumper.CSV.Header
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
if err != nil {
return 0, err
}
parser, err = NewCSVParser(&cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor)
if err != nil {
return 0, err
}
case SourceTypeSQL:
parser = NewChunkParser(cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers)
default:
err = errors.New("unknown source type")
return 0, errors.Annotatef(err, "source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path)
buchuitoudegou marked this conversation as resolved.
Show resolved Hide resolved
}
totalBytes := 0
totalRows := 0
defaultSampleRows := 10 // todo: may be configurable
for i := 0; i < defaultSampleRows; i++ {
err = parser.ReadRow()
if err != nil && errors.Cause(err) == io.EOF {
break
} else if err != nil {
return 0, err
}
totalBytes += parser.LastRow().Length
totalRows++
}
if totalRows > 0 {
return int64(totalBytes) / int64(totalRows), nil
} else {
return 0, nil
}
}

// because parquet files can't seek efficiently, there is no benefit in split.
// parquet file are column orient, so the offset is read line number
func makeParquetFileRegion(
Expand Down Expand Up @@ -381,6 +441,9 @@ func SplitLargeFile(
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
if curRowsCnt == 0 && endOffset != startOffset {
curRowsCnt = 1
}
rowIDMax := prevRowIdxMax + curRowsCnt
if endOffset != dataFile.FileMeta.FileSize {
r, err := store.Open(ctx, dataFile.FileMeta.Path)
Expand Down
Loading