Skip to content

Commit

Permalink
lightning: sample files and pre-allocate rowID before restoring chunk (
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou authored Jun 16, 2022
1 parent 081bd10 commit 674def3
Show file tree
Hide file tree
Showing 26 changed files with 761 additions and 50 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,49 @@ func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool {
isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag())
switch meta.GetType() {
// MEDIUM INT
case mysql.TypeInt24:
if !isUnsigned {
return rowID > mysql.MaxInt24
}
return rowID > mysql.MaxUint24
// INT
case mysql.TypeLong:
if !isUnsigned {
return rowID > math.MaxInt32
}
return rowID > math.MaxUint32
// SMALLINT
case mysql.TypeShort:
if !isUnsigned {
return rowID > math.MaxInt16
}
return rowID > math.MaxUint16
// TINYINT
case mysql.TypeTiny:
if !isUnsigned {
return rowID > math.MaxInt8
}
return rowID > math.MaxUint8
// FLOAT
case mysql.TypeFloat:
if !isUnsigned {
return float32(rowID) > math.MaxFloat32
}
return float64(rowID) > math.MaxFloat32*2
// DOUBLE
case mysql.TypeDouble:
if !isUnsigned {
return float64(rowID) > math.MaxFloat64
}
// impossible for rowID exceeding MaxFloat64
}
return false
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
Expand Down Expand Up @@ -472,6 +515,11 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// rowID is going to auto-filled the omitted column,
// which should be checked before restore
if isRowIDOverflow(col.ToInfo(), rowID) {
return value, errors.Errorf("PK %d is out of range", rowID)
}
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
Expand Down
51 changes: 50 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1000,6 +1001,21 @@ type Writer struct {
batchSize int64

lastMetaSeq int32
prevRowID int64 // only used for appendRowsSorted
}

func (w *Writer) flushAndNewWriter() error {
var err error
err = w.flush(context.Background())
if err != nil {
return errors.Trace(err)
}
newWriter, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer = newWriter
return nil
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand All @@ -1010,6 +1026,17 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
w.writer = writer
}
if len(kvs) == 0 {
return nil
}
if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 {
// rowID leap. probably re-alloc id
// should write to different sst
err := w.flushAndNewWriter()
if err != nil {
return err
}
}

keyAdapter := w.engine.keyAdapter
totalKeySize := 0
Expand All @@ -1034,7 +1061,26 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
kvs = newKvs
}
return w.writer.writeKVs(kvs)
startIdx := 0
w.prevRowID = kvs[len(kvs)-1].RowID
for i := 1; i < len(kvs); i++ {
if kvs[i].RowID > kvs[i-1].RowID+1 {
// leap id
err := w.writer.writeKVs(kvs[startIdx:i])
if err != nil {
return err
}
err = w.flushAndNewWriter()
if err != nil {
return err
}
startIdx = i
}
}
if startIdx < len(kvs) {
return w.writer.writeKVs(kvs[startIdx:])
}
return nil
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand Down Expand Up @@ -1101,6 +1147,9 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
}

func (w *Writer) flush(ctx context.Context) error {
failpoint.Inject("MockFlushWriter", func() {
failpoint.Return(nil)
})
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
Expand Down
110 changes: 110 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/common"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
Expand Down Expand Up @@ -83,3 +85,111 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}

func TestAutoSplitSST(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter"))
}()
var err error
dir := os.TempDir()
w := &Writer{
engine: &Engine{
sstDir: dir,
keyAdapter: noopKeyAdapter{},
},
isKVSorted: true,
isWriteBatchSorted: true,
}
w.engine.closed.Store(false)
w.writer, err = w.createSSTWriter()
require.Nil(t, err)
kvs := []common.KvPair{
{
Key: []byte("1"),
Val: []byte("val1"),
RowID: 1,
},
{
Key: []byte("2"),
Val: []byte("val1"),
RowID: 2,
},
}
prevWriter := w.writer
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.True(t, prevWriter == w.writer)
kvs = []common.KvPair{
{
Key: []byte("10"),
Val: []byte("val10"),
RowID: 10,
},
{
Key: []byte("11"),
Val: []byte("val11"),
RowID: 11,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("12"),
Val: []byte("val12"),
RowID: 10,
},
{
Key: []byte("13"),
Val: []byte("val13"),
RowID: 11,
},
{
Key: []byte("15"),
Val: []byte("val15"),
RowID: 15,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("16"),
Val: []byte("val16"),
RowID: 16,
},
{
Key: []byte("17"),
Val: []byte("val17"),
RowID: 17,
},
{
Key: []byte("19"),
Val: []byte("val19"),
RowID: 19,
},
{
Key: []byte("20"),
Val: []byte("val20"),
RowID: 20,
},
{
Key: []byte("22"),
Val: []byte("val22"),
RowID: 22,
},
{
Key: []byte("23"),
Val: []byte("val23"),
RowID: 22,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
}
63 changes: 63 additions & 0 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
Expand Down Expand Up @@ -271,6 +272,12 @@ func makeSourceFileRegion(
if !isCsvFile {
divisor += 2
}
sizePerRow, err := GetSampledAvgRowSize(&fi, cfg, ioWorkers, store)
if err == nil && sizePerRow != 0 {
log.L().Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err))
divisor = sizePerRow
}
log.L().Debug("avg row size", zap.String("path", fi.FileMeta.Path), zap.Int64("size per row", sizePerRow))
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
Expand All @@ -292,6 +299,10 @@ func makeSourceFileRegion(
RowIDMax: fi.FileMeta.FileSize / divisor,
},
}
failpoint.Inject("MockInaccurateRowID", func() {
// only allocates 5 rows but contains 10 rows
tableRegion.Chunk.RowIDMax = 5
})

if tableRegion.Size() > tableRegionSizeWarningThreshold {
log.L().Warn(
Expand All @@ -302,6 +313,55 @@ func makeSourceFileRegion(
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
}

func GetSampledAvgRowSize(
fileInfo *FileInfo,
cfg *config.Config,
ioWorkers *worker.Pool,
store storage.ExternalStorage,
) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
reader, err := store.Open(ctx, fileInfo.FileMeta.Path)
if err != nil {
return 0, err
}
var parser Parser
switch fileInfo.FileMeta.Type {
case SourceTypeCSV:
hasHeader := cfg.Mydumper.CSV.Header
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
if err != nil {
return 0, err
}
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor)
if err != nil {
return 0, err
}
case SourceTypeSQL:
parser = NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers)
default:
return 0, errors.Errorf("source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path)
}
totalBytes := 0
totalRows := 0
defaultSampleRows := 10 // todo: may be configurable
for i := 0; i < defaultSampleRows; i++ {
err = parser.ReadRow()
if err != nil && errors.Cause(err) == io.EOF {
break
} else if err != nil {
return 0, err
}
totalBytes += parser.LastRow().Length
totalRows++
}
if totalRows > 0 {
return int64(totalBytes) / int64(totalRows), nil
} else {
return 0, nil
}
}

// because parquet files can't seek efficiently, there is no benefit in split.
// parquet file are column orient, so the offset is read line number
func makeParquetFileRegion(
Expand Down Expand Up @@ -381,6 +441,9 @@ func SplitLargeFile(
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
if curRowsCnt == 0 && endOffset != startOffset {
curRowsCnt = 1
}
rowIDMax := prevRowIdxMax + curRowsCnt
if endOffset != dataFile.FileMeta.FileSize {
r, err := store.Open(ctx, dataFile.FileMeta.Path)
Expand Down
Loading

0 comments on commit 674def3

Please sign in to comment.