Skip to content

Commit

Permalink
cherry pick pingcap#34288 to release-6.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
buchuitoudegou authored and ti-srebot committed Jun 16, 2022
1 parent 36a9810 commit 172d1c8
Show file tree
Hide file tree
Showing 26 changed files with 939 additions and 50 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,14 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
113 changes: 113 additions & 0 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,119 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

<<<<<<< HEAD
=======
func isTableAutoRandom(tblMeta *model.TableInfo) bool {
return tblMeta.PKIsHandle && tblMeta.ContainsAutoRandomBits()
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
}

func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool {
isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag())
switch meta.GetType() {
// MEDIUM INT
case mysql.TypeInt24:
if !isUnsigned {
return rowID > mysql.MaxInt24
}
return rowID > mysql.MaxUint24
// INT
case mysql.TypeLong:
if !isUnsigned {
return rowID > math.MaxInt32
}
return rowID > math.MaxUint32
// SMALLINT
case mysql.TypeShort:
if !isUnsigned {
return rowID > math.MaxInt16
}
return rowID > math.MaxUint16
// TINYINT
case mysql.TypeTiny:
if !isUnsigned {
return rowID > math.MaxInt8
}
return rowID > math.MaxUint8
// FLOAT
case mysql.TypeFloat:
if !isUnsigned {
return float32(rowID) > math.MaxFloat32
}
return float64(rowID) > math.MaxFloat32*2
// DOUBLE
case mysql.TypeDouble:
if !isUnsigned {
return float64(rowID) > math.MaxFloat64
}
// impossible for rowID exceeding MaxFloat64
}
return false
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
err error
)

tblMeta := kvcodec.tbl.Meta()
cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
// we can assume that the `colIndex` always have a valid input
col := cols[colIndex]

isBadNullValue := false
if inputDatum != nil {
value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
}
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// rowID is going to auto-filled the omitted column,
// which should be checked before restore
if isRowIDOverflow(col.ToInfo(), rowID) {
return value, errors.Errorf("PK %d is out of range", rowID)
}
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
return value, err
}

>>>>>>> 674def3b7... lightning: sample files and pre-allocate rowID before restoring chunk (#34288)
// get record value for auto-increment field
//
// See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852
Expand Down
51 changes: 50 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -993,6 +994,21 @@ type Writer struct {
batchSize int64

lastMetaSeq int32
prevRowID int64 // only used for appendRowsSorted
}

func (w *Writer) flushAndNewWriter() error {
var err error
err = w.flush(context.Background())
if err != nil {
return errors.Trace(err)
}
newWriter, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer = newWriter
return nil
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand All @@ -1003,6 +1019,17 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
w.writer = writer
}
if len(kvs) == 0 {
return nil
}
if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 {
// rowID leap. probably re-alloc id
// should write to different sst
err := w.flushAndNewWriter()
if err != nil {
return err
}
}

keyAdapter := w.engine.keyAdapter
totalKeySize := 0
Expand All @@ -1027,7 +1054,26 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
kvs = newKvs
}
return w.writer.writeKVs(kvs)
startIdx := 0
w.prevRowID = kvs[len(kvs)-1].RowID
for i := 1; i < len(kvs); i++ {
if kvs[i].RowID > kvs[i-1].RowID+1 {
// leap id
err := w.writer.writeKVs(kvs[startIdx:i])
if err != nil {
return err
}
err = w.flushAndNewWriter()
if err != nil {
return err
}
startIdx = i
}
}
if startIdx < len(kvs) {
return w.writer.writeKVs(kvs[startIdx:])
}
return nil
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand Down Expand Up @@ -1094,6 +1140,9 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
}

func (w *Writer) flush(ctx context.Context) error {
failpoint.Inject("MockFlushWriter", func() {
failpoint.Return(nil)
})
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
Expand Down
Loading

0 comments on commit 172d1c8

Please sign in to comment.