Skip to content

Commit

Permalink
Merge branch 'master' into replica_read_backup
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Feb 8, 2023
2 parents 8835b91 + 1140b7c commit 3587c74
Show file tree
Hide file tree
Showing 56 changed files with 942 additions and 275 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,20 @@ bazel_test: failpoint-enable bazel_ci_prepare


bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) --local_ram_resources=61440 --jobs=25 \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
Expand Down
32 changes: 17 additions & 15 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,15 @@ type PostRestore struct {

type CSVConfig struct {
// Separator, Delimiter and Terminator should all be in utf8mb4 encoding.
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
HeaderSchemaMatch bool `toml:"header-schema-match" json:"header-schema-match"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
Expand Down Expand Up @@ -743,13 +744,14 @@ func NewConfig() *Config {
Mydumper: MydumperRuntime{
ReadBlockSize: ReadBlockSize,
CSV: CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ func (parser *CSVParser) ReadColumns() error {
if err != nil {
return errors.Trace(err)
}
if !parser.cfg.HeaderSchemaMatch {
return nil
}
parser.columns = make([]string, 0, len(columns))
for _, colName := range columns {
colName, _, err = parser.unescapeString(colName)
Expand Down
84 changes: 78 additions & 6 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func TestSyntaxErrorCSV(t *testing.T) {

func TestTSV(t *testing.T) {
cfg := config.CSVConfig{
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
HeaderSchemaMatch: true,
}

parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f
Expand Down Expand Up @@ -577,6 +578,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) {
require.Nil(t, parser.Close())

cfg.Header = true
cfg.HeaderSchemaMatch = true
data = " \r\na,b,c\r\n0,,abc\r\n"
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -609,6 +611,7 @@ func TestEmpty(t *testing.T) {
// Try again with headers.

cfg.Header = true
cfg.HeaderSchemaMatch = true

parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1292,3 +1295,72 @@ func BenchmarkReadRowUsingEncodingCSV(b *testing.B) {
}
require.Equal(b, b.N, rowsCount)
}

func TestHeaderSchemaMatch(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
},
}

inputData := `id,val1,val2,val3
1,111,aaa,1.0
2,222,bbb,2.0
3,333,ccc,3.0
4,444,ddd,4.0`

parsedDataPart := [][]types.Datum{
{types.NewStringDatum("1"), types.NewStringDatum("111"), types.NewStringDatum("aaa"), types.NewStringDatum("1.0")},
{types.NewStringDatum("2"), types.NewStringDatum("222"), types.NewStringDatum("bbb"), types.NewStringDatum("2.0")},
{types.NewStringDatum("3"), types.NewStringDatum("333"), types.NewStringDatum("ccc"), types.NewStringDatum("3.0")},
{types.NewStringDatum("4"), types.NewStringDatum("444"), types.NewStringDatum("ddd"), types.NewStringDatum("4.0")},
}

type testCase struct {
Header bool
HeaderSchemaMatch bool
ExpectedData [][]types.Datum
ExpectedColumns []string
}

for _, tc := range []testCase{
{
Header: true,
HeaderSchemaMatch: true,
ExpectedData: parsedDataPart,
ExpectedColumns: []string{"id", "val1", "val2", "val3"},
},
{
Header: true,
HeaderSchemaMatch: false,
ExpectedData: parsedDataPart,
ExpectedColumns: nil,
},
{
Header: false,
HeaderSchemaMatch: true,
ExpectedData: append([][]types.Datum{
{types.NewStringDatum("id"), types.NewStringDatum("val1"), types.NewStringDatum("val2"), types.NewStringDatum("val3")},
}, parsedDataPart...),
ExpectedColumns: nil,
},
} {
comment := fmt.Sprintf("header = %v, header-schema-match = %v", tc.Header, tc.HeaderSchemaMatch)
cfg.CSV.Header = tc.Header
cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkers, tc.Header, charsetConvertor)
assert.NoError(t, err)
for i, row := range tc.ExpectedData {
comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch)
e := parser.ReadRow()
assert.NoErrorf(t, e, "row = %d, error = %s", i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, comment)
assert.Equal(t, tc.ExpectedColumns, parser.Columns(), comment)
}
}
4 changes: 3 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ func SplitLargeFile(
if err = parser.ReadColumns(); err != nil {
return 0, nil, nil, err
}
columns = parser.Columns()
if cfg.Mydumper.CSV.HeaderSchemaMatch {
columns = parser.Columns()
}
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
Expand Down
75 changes: 40 additions & 35 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ func TestMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -230,13 +231,14 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -284,13 +286,14 @@ func TestSplitLargeFile(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -342,13 +345,14 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -447,13 +451,14 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down
32 changes: 18 additions & 14 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() {

// set csv header to true, this will cause check columns fail
s.cfg.Mydumper.CSV.Header = true
s.cfg.Mydumper.CSV.HeaderSchemaMatch = true
s.cfg.Mydumper.StrictFormat = true
regionSize := s.cfg.Mydumper.MaxRegionSize
s.cfg.Mydumper.MaxRegionSize = 5
Expand Down Expand Up @@ -455,6 +456,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.MaxRegionSize = 40

cfg.Mydumper.CSV.Header = true
cfg.Mydumper.CSV.HeaderSchemaMatch = true
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

Expand Down Expand Up @@ -2135,13 +2137,14 @@ func (s *tableRestoreSuite) TestSchemaIsValid() {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: ca.ignoreColumns,
},
Expand Down Expand Up @@ -2170,13 +2173,14 @@ func (s *tableRestoreSuite) TestGBKEncodedSchemaIsValid() {
DataCharacterSet: "gb18030",
DataInvalidCharReplace: string(utf8.RuneError),
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: nil,
},
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
Expand Down Expand Up @@ -1736,6 +1737,15 @@ func (rc *Client) PreCheckTableTiFlashReplica(
tables []*metautil.Table,
recorder *tiflashrec.TiFlashRecorder,
) error {
// For TiDB 6.6, we do not support recover TiFlash replica while enabling API V2.
// TODO(iosmanthus): remove this after TiFlash support API V2.
if rc.GetDomain().Store().GetCodec().GetAPIVersion() == kvrpcpb.APIVersion_V2 {
log.Warn("TiFlash does not support API V2, reset replica count to 0")
for _, table := range tables {
table.Info.TiFlashReplica = nil
}
return nil
}
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 3587c74

Please sign in to comment.