Skip to content

Commit

Permalink
Merge branch 'master' into optimistic_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Jun 20, 2022
2 parents 13cb7d6 + 9a77892 commit 7f56a89
Show file tree
Hide file tree
Showing 203 changed files with 15,947 additions and 8,075 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolu
build:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --verbose_failures
test:ci --test_timeout=180
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600
2 changes: 1 addition & 1 deletion .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ header:
- '**/*.result'
- '**/*.example'
- '**/*.patch'
- 'DEPS.bzl'
- '**/*.bzl'
- '.codecov.yml'
- 'Jenkinsfile'
- '.editorconfig'
Expand Down
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1914,8 +1914,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:+46isFI9fR9R+nJVDMI55tCC/TCwp+bvVA4HLGEv1rY=",
version = "v0.0.0-20220314125451-bfb5c2c55188",
sum = "h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms=",
version = "v0.0.0-20220602075447-4847c5d68e73",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down Expand Up @@ -2244,8 +2244,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:KhOkCnqpxh/B2gGZdXSUyKgNRZaPzYsCIWGjNdrFmOA=",
version = "v2.0.1-0.20220531081749-2807409d4968",
sum = "h1:N5ivsNkDQDgimY0ZVqMnWqXjEnxy5uFChoB4wPIKpPI=",
version = "v2.0.1-0.20220613112734-be31f33ba03b",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ buildsucc:

all: dev server benchkv

parser:
@echo "remove this command later, when our CI script doesn't call it"

dev: checklist check explaintest gogenerate br_unit_test test_part_parser_dev ut
@>&2 echo "Great, all tests passed."

Expand Down Expand Up @@ -96,6 +93,9 @@ test_part_parser: parser_yacc test_part_parser_dev

test_part_parser_dev: parser_fmt parser_unit_test

parser:
@cd parser && make parser

parser_yacc:
@cd parser && mv parser.go parser.go.committed && make parser && diff -u parser.go.committed parser.go && rm parser.go.committed

Expand Down Expand Up @@ -327,6 +327,14 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
10 changes: 8 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ http_archive(
)

load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")

load("//:DEPS.bzl", "go_deps")
load("//build:lint.bzl", "nogo_deps")

# gazelle:repository_macro DEPS.bzl%go_deps
go_deps()

nogo_deps()

load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies", "go_repository")

go_rules_dependencies()

go_register_toolchains(version = "1.18.3")
go_register_toolchains(
nogo = "@//build:tidb_nogo",
version = "1.18.3",
)

gazelle_dependencies()

Expand Down
48 changes: 48 additions & 0 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,49 @@ func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool {
isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag())
switch meta.GetType() {
// MEDIUM INT
case mysql.TypeInt24:
if !isUnsigned {
return rowID > mysql.MaxInt24
}
return rowID > mysql.MaxUint24
// INT
case mysql.TypeLong:
if !isUnsigned {
return rowID > math.MaxInt32
}
return rowID > math.MaxUint32
// SMALLINT
case mysql.TypeShort:
if !isUnsigned {
return rowID > math.MaxInt16
}
return rowID > math.MaxUint16
// TINYINT
case mysql.TypeTiny:
if !isUnsigned {
return rowID > math.MaxInt8
}
return rowID > math.MaxUint8
// FLOAT
case mysql.TypeFloat:
if !isUnsigned {
return float32(rowID) > math.MaxFloat32
}
return float64(rowID) > math.MaxFloat32*2
// DOUBLE
case mysql.TypeDouble:
if !isUnsigned {
return float64(rowID) > math.MaxFloat64
}
// impossible for rowID exceeding MaxFloat64
}
return false
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
Expand Down Expand Up @@ -472,6 +515,11 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// rowID is going to auto-filled the omitted column,
// which should be checked before restore
if isRowIDOverflow(col.ToInfo(), rowID) {
return value, errors.Errorf("PK %d is out of range", rowID)
}
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
Expand Down
51 changes: 50 additions & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1000,6 +1001,21 @@ type Writer struct {
batchSize int64

lastMetaSeq int32
prevRowID int64 // only used for appendRowsSorted
}

func (w *Writer) flushAndNewWriter() error {
var err error
err = w.flush(context.Background())
if err != nil {
return errors.Trace(err)
}
newWriter, err := w.createSSTWriter()
if err != nil {
return errors.Trace(err)
}
w.writer = newWriter
return nil
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand All @@ -1010,6 +1026,17 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
w.writer = writer
}
if len(kvs) == 0 {
return nil
}
if w.prevRowID != 0 && kvs[0].RowID > w.prevRowID+1 {
// rowID leap. probably re-alloc id
// should write to different sst
err := w.flushAndNewWriter()
if err != nil {
return err
}
}

keyAdapter := w.engine.keyAdapter
totalKeySize := 0
Expand All @@ -1034,7 +1061,26 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
}
kvs = newKvs
}
return w.writer.writeKVs(kvs)
startIdx := 0
w.prevRowID = kvs[len(kvs)-1].RowID
for i := 1; i < len(kvs); i++ {
if kvs[i].RowID > kvs[i-1].RowID+1 {
// leap id
err := w.writer.writeKVs(kvs[startIdx:i])
if err != nil {
return err
}
err = w.flushAndNewWriter()
if err != nil {
return err
}
startIdx = i
}
}
if startIdx < len(kvs) {
return w.writer.writeKVs(kvs[startIdx:])
}
return nil
}

func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
Expand Down Expand Up @@ -1101,6 +1147,9 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
}

func (w *Writer) flush(ctx context.Context) error {
failpoint.Inject("MockFlushWriter", func() {
failpoint.Return(nil)
})
w.Lock()
defer w.Unlock()
if w.batchCount == 0 {
Expand Down
110 changes: 110 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/common"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
Expand Down Expand Up @@ -83,3 +85,111 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}

func TestAutoSplitSST(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/MockFlushWriter"))
}()
var err error
dir := os.TempDir()
w := &Writer{
engine: &Engine{
sstDir: dir,
keyAdapter: noopKeyAdapter{},
},
isKVSorted: true,
isWriteBatchSorted: true,
}
w.engine.closed.Store(false)
w.writer, err = w.createSSTWriter()
require.Nil(t, err)
kvs := []common.KvPair{
{
Key: []byte("1"),
Val: []byte("val1"),
RowID: 1,
},
{
Key: []byte("2"),
Val: []byte("val1"),
RowID: 2,
},
}
prevWriter := w.writer
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.True(t, prevWriter == w.writer)
kvs = []common.KvPair{
{
Key: []byte("10"),
Val: []byte("val10"),
RowID: 10,
},
{
Key: []byte("11"),
Val: []byte("val11"),
RowID: 11,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("12"),
Val: []byte("val12"),
RowID: 10,
},
{
Key: []byte("13"),
Val: []byte("val13"),
RowID: 11,
},
{
Key: []byte("15"),
Val: []byte("val15"),
RowID: 15,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
prevWriter = w.writer
kvs = []common.KvPair{
{
Key: []byte("16"),
Val: []byte("val16"),
RowID: 16,
},
{
Key: []byte("17"),
Val: []byte("val17"),
RowID: 17,
},
{
Key: []byte("19"),
Val: []byte("val19"),
RowID: 19,
},
{
Key: []byte("20"),
Val: []byte("val20"),
RowID: 20,
},
{
Key: []byte("22"),
Val: []byte("val22"),
RowID: 22,
},
{
Key: []byte("23"),
Val: []byte("val23"),
RowID: 22,
},
}
err = w.appendRowsSorted(kvs)
require.Nil(t, err)
require.False(t, prevWriter == w.writer) // id leap, should flush and create
}
Loading

0 comments on commit 7f56a89

Please sign in to comment.