Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: revert new policy of allocating rowid and refine it later #35817

Merged
merged 3 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,6 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
48 changes: 0 additions & 48 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,49 +450,6 @@ func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool {
isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag())
switch meta.GetType() {
// MEDIUM INT
case mysql.TypeInt24:
if !isUnsigned {
return rowID > mysql.MaxInt24
}
return rowID > mysql.MaxUint24
// INT
case mysql.TypeLong:
if !isUnsigned {
return rowID > math.MaxInt32
}
return rowID > math.MaxUint32
// SMALLINT
case mysql.TypeShort:
if !isUnsigned {
return rowID > math.MaxInt16
}
return rowID > math.MaxUint16
// TINYINT
case mysql.TypeTiny:
if !isUnsigned {
return rowID > math.MaxInt8
}
return rowID > math.MaxUint8
// FLOAT
case mysql.TypeFloat:
if !isUnsigned {
return float32(rowID) > math.MaxFloat32
}
return float64(rowID) > math.MaxFloat32*2
// DOUBLE
case mysql.TypeDouble:
if !isUnsigned {
return float64(rowID) > math.MaxFloat64
}
// impossible for rowID exceeding MaxFloat64
}
return false
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
Expand Down Expand Up @@ -520,11 +477,6 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// rowID is going to auto-filled the omitted column,
// which should be checked before restore
if isRowIDOverflow(col.ToInfo(), rowID) {
return value, errors.Errorf("PK %d is out of range", rowID)
}
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
Expand Down
51 changes: 1 addition & 50 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1003,21 +1002,6 @@ type Writer struct {
batchSize int64

lastMetaSeq int32
prevRowID int64 // only used for appendRowsSorted
}

func (w *Writer) flushAndNewWriter() error {
var err error
err = w.flush(context.Background())
if err != nil {
return errors.Trace(err)
}
newWriter, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer = newWriter
return nil
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand All @@ -1028,17 +1012,6 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
w.writer = writer
}
if len(kvs) == 0 {
return nil
}
if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 {
// rowID leap. probably re-alloc id
// should write to different sst
err := w.flushAndNewWriter()
if err != nil {
return err
}
}

keyAdapter := w.engine.keyAdapter
totalKeySize := 0
Expand All @@ -1063,26 +1036,7 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
kvs = newKvs
}
startIdx := 0
w.prevRowID = kvs[len(kvs)-1].RowID
for i := 1; i < len(kvs); i++ {
if kvs[i].RowID > kvs[i-1].RowID+1 {
// leap id
err := w.writer.writeKVs(kvs[startIdx:i])
if err != nil {
return err
}
err = w.flushAndNewWriter()
if err != nil {
return err
}
startIdx = i
}
}
if startIdx < len(kvs) {
return w.writer.writeKVs(kvs[startIdx:])
}
return nil
return w.writer.writeKVs(kvs)
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand Down Expand Up @@ -1149,9 +1103,6 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
}

func (w *Writer) flush(ctx context.Context) error {
failpoint.Inject("MockFlushWriter", func() {
failpoint.Return(nil)
})
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
Expand Down
111 changes: 0 additions & 111 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/stretchr/testify/require"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/common"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
Expand Down Expand Up @@ -87,112 +85,3 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}

func TestAutoSplitSST(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter"))
}()
var err error
dir := os.TempDir()
w := &Writer{
engine: &Engine{
sstDir: dir,
keyAdapter: noopKeyAdapter{},
logger: log.L(),
},
isKVSorted: true,
isWriteBatchSorted: true,
}
w.engine.closed.Store(false)
w.writer, err = w.createSSTWriter()
require.Nil(t, err)
kvs := []common.KvPair{
{
Key: []byte("1"),
Val: []byte("val1"),
RowID: 1,
},
{
Key: []byte("2"),
Val: []byte("val1"),
RowID: 2,
},
}
prevWriter := w.writer
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.True(t, prevWriter == w.writer)
kvs = []common.KvPair{
{
Key: []byte("10"),
Val: []byte("val10"),
RowID: 10,
},
{
Key: []byte("11"),
Val: []byte("val11"),
RowID: 11,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("12"),
Val: []byte("val12"),
RowID: 10,
},
{
Key: []byte("13"),
Val: []byte("val13"),
RowID: 11,
},
{
Key: []byte("15"),
Val: []byte("val15"),
RowID: 15,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("16"),
Val: []byte("val16"),
RowID: 16,
},
{
Key: []byte("17"),
Val: []byte("val17"),
RowID: 17,
},
{
Key: []byte("19"),
Val: []byte("val19"),
RowID: 19,
},
{
Key: []byte("20"),
Val: []byte("val20"),
RowID: 20,
},
{
Key: []byte("22"),
Val: []byte("val22"),
RowID: 22,
},
{
Key: []byte("23"),
Val: []byte("val23"),
RowID: 22,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
}
63 changes: 0 additions & 63 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
Expand Down Expand Up @@ -272,12 +271,6 @@ func makeSourceFileRegion(
if !isCsvFile {
divisor += 2
}
sizePerRow, err := GetSampledAvgRowSize(&fi, cfg, ioWorkers, store)
if err == nil && sizePerRow != 0 {
log.FromContext(ctx).Warn("fail to sample file", zap.String("path", fi.FileMeta.Path), zap.Error(err))
divisor = sizePerRow
}
log.FromContext(ctx).Debug("avg row size", zap.String("path", fi.FileMeta.Path), zap.Int64("size per row", sizePerRow))
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
Expand All @@ -299,10 +292,6 @@ func makeSourceFileRegion(
RowIDMax: fi.FileMeta.FileSize / divisor,
},
}
failpoint.Inject("MockInaccurateRowID", func() {
// only allocates 5 rows but contains 10 rows
tableRegion.Chunk.RowIDMax = 5
})

if tableRegion.Size() > tableRegionSizeWarningThreshold {
log.FromContext(ctx).Warn(
Expand All @@ -313,55 +302,6 @@ func makeSourceFileRegion(
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
}

func GetSampledAvgRowSize(
fileInfo *FileInfo,
cfg *config.Config,
ioWorkers *worker.Pool,
store storage.ExternalStorage,
) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
reader, err := store.Open(ctx, fileInfo.FileMeta.Path)
if err != nil {
return 0, err
}
var parser Parser
switch fileInfo.FileMeta.Type {
case SourceTypeCSV:
hasHeader := cfg.Mydumper.CSV.Header
charsetConvertor, err := NewCharsetConvertor(cfg.Mydumper.DataCharacterSet, cfg.Mydumper.DataInvalidCharReplace)
if err != nil {
return 0, err
}
parser, err = NewCSVParser(ctx, &cfg.Mydumper.CSV, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers, hasHeader, charsetConvertor)
if err != nil {
return 0, err
}
case SourceTypeSQL:
parser = NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, int64(cfg.Mydumper.ReadBlockSize), ioWorkers)
default:
return 0, errors.Errorf("source file %s is none of csv, sql, or parquet file", fileInfo.FileMeta.Path)
}
totalBytes := 0
totalRows := 0
defaultSampleRows := 10 // todo: may be configurable
for i := 0; i < defaultSampleRows; i++ {
err = parser.ReadRow()
if err != nil && errors.Cause(err) == io.EOF {
break
} else if err != nil {
return 0, err
}
totalBytes += parser.LastRow().Length
totalRows++
}
if totalRows > 0 {
return int64(totalBytes) / int64(totalRows), nil
} else {
return 0, nil
}
}

// because parquet files can't seek efficiently, there is no benefit in split.
// parquet file are column orient, so the offset is read line number
func makeParquetFileRegion(
Expand Down Expand Up @@ -441,9 +381,6 @@ func SplitLargeFile(
}
for {
curRowsCnt := (endOffset - startOffset) / divisor
if curRowsCnt == 0 && endOffset != startOffset {
curRowsCnt = 1
}
rowIDMax := prevRowIdxMax + curRowsCnt
if endOffset != dataFile.FileMeta.FileSize {
r, err := store.Open(ctx, dataFile.FileMeta.Path)
Expand Down
Loading