diff --git a/Makefile b/Makefile index 6e00960f7dbda..3254d88aab401 100644 --- a/Makefile +++ b/Makefile @@ -410,20 +410,20 @@ bazel_test: failpoint-enable bazel_ci_prepare bazel_coverage_test: failpoint-enable bazel_ci_prepare - bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ - --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \ + bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \ + --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... - bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ - --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \ + bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) --local_ram_resources=30720 --jobs=25 \ + --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... bazel_build: bazel_ci_prepare mkdir -p bin - bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \ + bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) --local_ram_resources=61440 --jobs=25 \ //... --//build:with_nogo_flag=true - bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \ + bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc build $(BAZEL_CMD_CONFIG) \ //cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index d14f12066c0f4..7038bd4d9255d 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -479,14 +479,15 @@ type PostRestore struct { type CSVConfig struct { // Separator, Delimiter and Terminator should all be in utf8mb4 encoding. - Separator string `toml:"separator" json:"separator"` - Delimiter string `toml:"delimiter" json:"delimiter"` - Terminator string `toml:"terminator" json:"terminator"` - Null string `toml:"null" json:"null"` - Header bool `toml:"header" json:"header"` - TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"` - NotNull bool `toml:"not-null" json:"not-null"` - BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"` + Separator string `toml:"separator" json:"separator"` + Delimiter string `toml:"delimiter" json:"delimiter"` + Terminator string `toml:"terminator" json:"terminator"` + Null string `toml:"null" json:"null"` + Header bool `toml:"header" json:"header"` + HeaderSchemaMatch bool `toml:"header-schema-match" json:"header-schema-match"` + TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"` + NotNull bool `toml:"not-null" json:"not-null"` + BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"` // hide these options for lightning configuration file, they can only be used by LOAD DATA // https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling StartingBy string `toml:"-" json:"-"` @@ -743,13 +744,14 @@ func NewConfig() *Config { Mydumper: MydumperRuntime{ ReadBlockSize: ReadBlockSize, CSV: CSVConfig{ - Separator: ",", - Delimiter: `"`, - Header: true, - NotNull: false, - Null: `\N`, - BackslashEscape: true, - TrimLastSep: false, + Separator: ",", + Delimiter: `"`, + Header: true, + HeaderSchemaMatch: true, + NotNull: false, + Null: `\N`, + BackslashEscape: true, + TrimLastSep: false, }, StrictFormat: false, MaxRegionSize: MaxRegionSize, diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index 26fb65a493183..d9cd033d70861 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -607,6 +607,9 @@ func (parser *CSVParser) ReadColumns() error { if err != nil { return errors.Trace(err) } + if !parser.cfg.HeaderSchemaMatch { + return nil + } parser.columns = make([]string, 0, len(columns)) for _, colName := range columns { colName, _, err = parser.unescapeString(colName) diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index adb057679b3a4..8980ce221fe75 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -481,12 +481,13 @@ func TestSyntaxErrorCSV(t *testing.T) { func TestTSV(t *testing.T) { cfg := config.CSVConfig{ - Separator: "\t", - Delimiter: "", - BackslashEscape: false, - NotNull: false, - Null: "", - Header: true, + Separator: "\t", + Delimiter: "", + BackslashEscape: false, + NotNull: false, + Null: "", + Header: true, + HeaderSchemaMatch: true, } parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f @@ -577,6 +578,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) { require.Nil(t, parser.Close()) cfg.Header = true + cfg.HeaderSchemaMatch = true data = " \r\na,b,c\r\n0,,abc\r\n" parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, true, nil) require.NoError(t, err) @@ -609,6 +611,7 @@ func TestEmpty(t *testing.T) { // Try again with headers. cfg.Header = true + cfg.HeaderSchemaMatch = true parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, true, nil) require.NoError(t, err) @@ -1292,3 +1295,72 @@ func BenchmarkReadRowUsingEncodingCSV(b *testing.B) { } require.Equal(b, b.N, rowsCount) } + +func TestHeaderSchemaMatch(t *testing.T) { + cfg := config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + }, + } + + inputData := `id,val1,val2,val3 +1,111,aaa,1.0 +2,222,bbb,2.0 +3,333,ccc,3.0 +4,444,ddd,4.0` + + parsedDataPart := [][]types.Datum{ + {types.NewStringDatum("1"), types.NewStringDatum("111"), types.NewStringDatum("aaa"), types.NewStringDatum("1.0")}, + {types.NewStringDatum("2"), types.NewStringDatum("222"), types.NewStringDatum("bbb"), types.NewStringDatum("2.0")}, + {types.NewStringDatum("3"), types.NewStringDatum("333"), types.NewStringDatum("ccc"), types.NewStringDatum("3.0")}, + {types.NewStringDatum("4"), types.NewStringDatum("444"), types.NewStringDatum("ddd"), types.NewStringDatum("4.0")}, + } + + type testCase struct { + Header bool + HeaderSchemaMatch bool + ExpectedData [][]types.Datum + ExpectedColumns []string + } + + for _, tc := range []testCase{ + { + Header: true, + HeaderSchemaMatch: true, + ExpectedData: parsedDataPart, + ExpectedColumns: []string{"id", "val1", "val2", "val3"}, + }, + { + Header: true, + HeaderSchemaMatch: false, + ExpectedData: parsedDataPart, + ExpectedColumns: nil, + }, + { + Header: false, + HeaderSchemaMatch: true, + ExpectedData: append([][]types.Datum{ + {types.NewStringDatum("id"), types.NewStringDatum("val1"), types.NewStringDatum("val2"), types.NewStringDatum("val3")}, + }, parsedDataPart...), + ExpectedColumns: nil, + }, + } { + comment := fmt.Sprintf("header = %v, header-schema-match = %v", tc.Header, tc.HeaderSchemaMatch) + cfg.CSV.Header = tc.Header + cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + assert.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkers, tc.Header, charsetConvertor) + assert.NoError(t, err) + for i, row := range tc.ExpectedData { + comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch) + e := parser.ReadRow() + assert.NoErrorf(t, e, "row = %d, error = %s", i+1, errors.ErrorStack(e)) + assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment) + assert.Equal(t, row, parser.LastRow().Row, comment) + } + assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, comment) + assert.Equal(t, tc.ExpectedColumns, parser.Columns(), comment) + } +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index aba71f666be2e..d57a2c4742c73 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -404,7 +404,9 @@ func SplitLargeFile( if err = parser.ReadColumns(); err != nil { return 0, nil, nil, err } - columns = parser.Columns() + if cfg.Mydumper.CSV.HeaderSchemaMatch { + columns = parser.Columns() + } startOffset, _ = parser.Pos() endOffset = startOffset + maxRegionSize if endOffset > dataFile.FileMeta.FileSize { diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 5aa2b3a85b752..362ff8603c7f9 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -174,13 +174,14 @@ func TestMakeSourceFileRegion(t *testing.T) { ReadBlockSize: config.ReadBlockSize, MaxRegionSize: 1, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - TrimLastSep: false, - NotNull: false, - Null: "NULL", - BackslashEscape: true, + Separator: ",", + Delimiter: "", + Header: true, + HeaderSchemaMatch: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, }, StrictFormat: true, Filter: []string{"*.*"}, @@ -230,13 +231,14 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) { ReadBlockSize: config.ReadBlockSize, MaxRegionSize: 1, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - TrimLastSep: false, - NotNull: false, - Null: "NULL", - BackslashEscape: true, + Separator: ",", + Delimiter: "", + Header: true, + HeaderSchemaMatch: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, }, StrictFormat: true, Filter: []string{"*.*"}, @@ -284,13 +286,14 @@ func TestSplitLargeFile(t *testing.T) { Mydumper: config.MydumperRuntime{ ReadBlockSize: config.ReadBlockSize, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - TrimLastSep: false, - NotNull: false, - Null: "NULL", - BackslashEscape: true, + Separator: ",", + Delimiter: "", + Header: true, + HeaderSchemaMatch: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, }, StrictFormat: true, Filter: []string{"*.*"}, @@ -342,13 +345,14 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) { Mydumper: config.MydumperRuntime{ ReadBlockSize: config.ReadBlockSize, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - TrimLastSep: false, - NotNull: false, - Null: "NULL", - BackslashEscape: true, + Separator: ",", + Delimiter: "", + Header: true, + HeaderSchemaMatch: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, }, StrictFormat: true, Filter: []string{"*.*"}, @@ -447,13 +451,14 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) { Mydumper: config.MydumperRuntime{ ReadBlockSize: config.ReadBlockSize, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: "", - Header: true, - TrimLastSep: false, - NotNull: false, - Null: "NULL", - BackslashEscape: true, + Separator: ",", + Delimiter: "", + Header: true, + HeaderSchemaMatch: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, }, StrictFormat: true, Filter: []string{"*.*"}, diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 5cfaeabc804d9..0f6d87892e329 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -306,6 +306,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() { // set csv header to true, this will cause check columns fail s.cfg.Mydumper.CSV.Header = true + s.cfg.Mydumper.CSV.HeaderSchemaMatch = true s.cfg.Mydumper.StrictFormat = true regionSize := s.cfg.Mydumper.MaxRegionSize s.cfg.Mydumper.MaxRegionSize = 5 @@ -455,6 +456,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { cfg.Mydumper.MaxRegionSize = 40 cfg.Mydumper.CSV.Header = true + cfg.Mydumper.CSV.HeaderSchemaMatch = true cfg.Mydumper.StrictFormat = true rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store} @@ -2135,13 +2137,14 @@ func (s *tableRestoreSuite) TestSchemaIsValid() { Mydumper: config.MydumperRuntime{ ReadBlockSize: config.ReadBlockSize, CSV: config.CSVConfig{ - Separator: ",", - Delimiter: `"`, - Header: ca.hasHeader, - NotNull: false, - Null: `\N`, - BackslashEscape: true, - TrimLastSep: false, + Separator: ",", + Delimiter: `"`, + Header: ca.hasHeader, + HeaderSchemaMatch: true, + NotNull: false, + Null: `\N`, + BackslashEscape: true, + TrimLastSep: false, }, IgnoreColumns: ca.ignoreColumns, }, @@ -2170,13 +2173,14 @@ func (s *tableRestoreSuite) TestGBKEncodedSchemaIsValid() { DataCharacterSet: "gb18030", DataInvalidCharReplace: string(utf8.RuneError), CSV: config.CSVConfig{ - Separator: ",", - Delimiter: `"`, - Header: true, - NotNull: false, - Null: `\N`, - BackslashEscape: true, - TrimLastSep: false, + Separator: ",", + Delimiter: `"`, + Header: true, + HeaderSchemaMatch: true, + NotNull: false, + Null: `\N`, + BackslashEscape: true, + TrimLastSep: false, }, IgnoreColumns: nil, }, diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 2d0ae75408ebb..c818c479e1fca 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/backup" @@ -1736,6 +1737,15 @@ func (rc *Client) PreCheckTableTiFlashReplica( tables []*metautil.Table, recorder *tiflashrec.TiFlashRecorder, ) error { + // For TiDB 6.6, we do not support recover TiFlash replica while enabling API V2. + // TODO(iosmanthus): remove this after TiFlash support API V2. + if rc.GetDomain().Store().GetCodec().GetAPIVersion() == kvrpcpb.APIVersion_V2 { + log.Warn("TiFlash does not support API V2, reset replica count to 0") + for _, table := range tables { + table.Info.TiFlashReplica = nil + } + return nil + } tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx) if err != nil { return err diff --git a/br/pkg/storage/parse_test.go b/br/pkg/storage/parse_test.go index b9a5d75d29322..3d86ccef1c38d 100644 --- a/br/pkg/storage/parse_test.go +++ b/br/pkg/storage/parse_test.go @@ -69,7 +69,7 @@ func TestCreateStorage(t *testing.T) { require.Equal(t, "TestKey", s3.SseKmsKeyId) // special character in access keys - s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw`, nil) + s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw&session-token=FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=`, nil) require.NoError(t, err) s3 = s.GetS3() require.NotNil(t, s3) @@ -77,6 +77,7 @@ func TestCreateStorage(t *testing.T) { require.Equal(t, "prefix/path", s3.Prefix) require.Equal(t, "NXN7IPIOSAAKDEEOLMAF", s3.AccessKey) require.Equal(t, "nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw", s3.SecretAccessKey) + require.Equal(t, "FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=", s3.SessionToken) require.True(t, s3.ForcePathStyle) // parse role ARN and external ID diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index a239de8ad794c..aa052ebccc9a5 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -140,6 +140,7 @@ type S3BackendOptions struct { ACL string `json:"acl" toml:"acl"` AccessKey string `json:"access-key" toml:"access-key"` SecretAccessKey string `json:"secret-access-key" toml:"secret-access-key"` + SessionToken string `json:"session-token" toml:"session-token"` Provider string `json:"provider" toml:"provider"` ForcePathStyle bool `json:"force-path-style" toml:"force-path-style"` UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` @@ -184,6 +185,7 @@ func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error { s3.Acl = options.ACL s3.AccessKey = options.AccessKey s3.SecretAccessKey = options.SecretAccessKey + s3.SessionToken = options.SessionToken s3.ForcePathStyle = options.ForcePathStyle s3.RoleArn = options.RoleARN s3.ExternalId = options.ExternalID @@ -262,7 +264,7 @@ func NewS3StorageForTest(svc s3iface.S3API, options *backuppb.S3) *S3Storage { // auto access without ak / sk. func autoNewCred(qs *backuppb.S3) (cred *credentials.Credentials, err error) { if qs.AccessKey != "" && qs.SecretAccessKey != "" { - return credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, ""), nil + return credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, qs.SessionToken), nil } endpoint := qs.Endpoint // if endpoint is empty,return no error and run default(aws) follow. @@ -330,6 +332,7 @@ func NewS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St // Clear the credentials if exists so that they will not be sent to TiKV backend.AccessKey = "" backend.SecretAccessKey = "" + backend.SessionToken = "" } else if ses.Config.Credentials != nil { if qs.AccessKey == "" || qs.SecretAccessKey == "" { v, cerr := ses.Config.Credentials.Get() @@ -338,6 +341,7 @@ func NewS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St } backend.AccessKey = v.AccessKeyID backend.SecretAccessKey = v.SecretAccessKey + backend.SessionToken = v.SessionToken } } diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 3600a757ef0c4..817e8f46f7f7d 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -144,6 +144,7 @@ func TestApplyUpdate(t *testing.T) { if test.setEnv { require.NoError(t, os.Setenv("AWS_ACCESS_KEY_ID", "ab")) require.NoError(t, os.Setenv("AWS_SECRET_ACCESS_KEY", "cd")) + require.NoError(t, os.Setenv("AWS_SESSION_TOKEN", "ef")) } u, err := ParseBackend("s3://bucket/prefix/", &BackendOptions{S3: test.options}) require.NoError(t, err) @@ -260,11 +261,13 @@ func TestApplyUpdate(t *testing.T) { Region: "us-west-2", AccessKey: "ab", SecretAccessKey: "cd", + SessionToken: "ef", }, s3: &backuppb.S3{ Region: "us-west-2", AccessKey: "ab", SecretAccessKey: "cd", + SessionToken: "ef", Bucket: "bucket", Prefix: "prefix", }, @@ -354,6 +357,7 @@ func TestS3Storage(t *testing.T) { Endpoint: s.URL, AccessKey: "ab", SecretAccessKey: "cd", + SessionToken: "ef", Bucket: "bucket", Prefix: "prefix", ForcePathStyle: true, @@ -1112,10 +1116,12 @@ func TestWalkDirWithEmptyPrefix(t *testing.T) { func TestSendCreds(t *testing.T) { accessKey := "ab" secretAccessKey := "cd" + sessionToken := "ef" backendOpt := BackendOptions{ S3: S3BackendOptions{ AccessKey: accessKey, SecretAccessKey: secretAccessKey, + SessionToken: sessionToken, }, } backend, err := ParseBackend("s3://bucket/prefix/", &backendOpt) @@ -1128,12 +1134,15 @@ func TestSendCreds(t *testing.T) { sentAccessKey := backend.GetS3().AccessKey require.Equal(t, accessKey, sentAccessKey) sentSecretAccessKey := backend.GetS3().SecretAccessKey - require.Equal(t, sentSecretAccessKey, sentSecretAccessKey) + require.Equal(t, secretAccessKey, sentSecretAccessKey) + sentSessionToken := backend.GetS3().SessionToken + require.Equal(t, sessionToken, sentSessionToken) backendOpt = BackendOptions{ S3: S3BackendOptions{ AccessKey: accessKey, SecretAccessKey: secretAccessKey, + SessionToken: sessionToken, }, } backend, err = ParseBackend("s3://bucket/prefix/", &backendOpt) @@ -1147,6 +1156,8 @@ func TestSendCreds(t *testing.T) { require.Equal(t, "", sentAccessKey) sentSecretAccessKey = backend.GetS3().SecretAccessKey require.Equal(t, "", sentSecretAccessKey) + sentSessionToken = backend.GetS3().SessionToken + require.Equal(t, "", sentSessionToken) } func TestObjectLock(t *testing.T) { diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 3c281563439de..0275b61592a1c 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "@com_github_golang_protobuf//proto", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/logbackuppb", "@com_github_pingcap_kvproto//pkg/metapb", @@ -78,6 +79,7 @@ go_test( "//tablecodec", "//util/codec", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/logbackuppb", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index b29cbd6956ae2..33c0e0898b66f 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { return case e, ok := <-ch: if !ok { + log.Info("[log backup advancer] Task watcher exits due to stream ends.") return } - log.Info("meet task event", zap.Stringer("event", &e)) + log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e)) if err := c.onTaskEvent(ctx, e); err != nil { if errors.Cause(e.Err) != context.Canceled { log.Error("listen task meet error, would reopen.", logutil.ShortError(err)) time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) }) } + log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err)) return } } diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index 059475e62b2b2..f3d7e1f279d5f 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -7,13 +7,16 @@ import ( "context" "encoding/binary" "fmt" + "io" "strings" "github.com/golang/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/kv" clientv3 "go.etcd.io/etcd/client/v3" @@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) { result := make([]TaskEvent, 0, len(resp.Events)) + if err := resp.Err(); err != nil { + return nil, err + } for _, event := range resp.Events { te, err := t.toTaskEvent(ctx, event) if err != nil { @@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE handleResponse := func(resp clientv3.WatchResponse) bool { events, err := t.eventFromWatch(ctx, resp) if err != nil { + log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err)) ch <- errorEvent(err) return false } @@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE } return true } + collectRemaining := func() { + log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c))) + defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.") + for { + select { + case resp, ok := <-c: + if !ok { + return + } + if !handleResponse(resp) { + return + } + default: + return + } + } + } go func() { defer close(ch) for { select { case resp, ok := <-c: + failpoint.Inject("advancer_close_channel", func() { + // We cannot really close the channel, just simulating it. + ok = false + }) if !ok { + ch <- errorEvent(io.EOF) return } if !handleResponse(resp) { return } case <-ctx.Done(): - // drain the remain event from channel. - for { - select { - case resp, ok := <-c: - if !ok { - return - } - if !handleResponse(resp) { - return - } - default: - return - } - } + collectRemaining() + ch <- errorEvent(ctx.Err()) + return } } }() diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index 81572f6b7890d..4b989dc4ab2ba 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -7,12 +7,14 @@ import ( "context" "encoding/binary" "fmt" + "io" "net" "net/url" "path" "testing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) { t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) + t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) } func TestChecking(t *testing.T) { @@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { taskInfo2 := simpleTask(taskName2, 4) require.NoError(t, metaCli.PutTask(ctx, taskInfo2)) require.NoError(t, metaCli.DeleteTask(ctx, taskName2)) + first := <-ch require.Equal(t, first.Type, streamhelper.EventAdd) require.Equal(t, first.Name, taskName) @@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { require.Equal(t, forth.Type, streamhelper.EventDel) require.Equal(t, forth.Name, taskName2) cancel() - _, ok := <-ch - require.False(t, ok) + fifth, ok := <-ch + require.True(t, ok) + require.Equal(t, fifth.Type, streamhelper.EventErr) + require.Error(t, fifth.Err, context.Canceled) + item, ok := <-ch + require.False(t, ok, "%v", item) +} + +func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + taskName := "close_simple" + taskInfo := simpleTask(taskName, 4) + + require.NoError(t, metaCli.PutTask(ctx, taskInfo)) + ch := make(chan streamhelper.TaskEvent, 1024) + require.NoError(t, metaCli.Begin(ctx, ch)) + require.NoError(t, metaCli.DeleteTask(ctx, taskName)) + first := <-ch + require.Equal(t, first.Type, streamhelper.EventAdd) + require.Equal(t, first.Name, taskName) + require.ElementsMatch(t, first.Ranges, simpleRanges(4)) + second := <-ch + require.Equal(t, second.Type, streamhelper.EventDel, "%s", second) + require.Equal(t, second.Name, taskName, "%s", second) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return")) + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel") + // We need to make the channel file some events hence we can simulate the closed channel. + taskName2 := "close_simple2" + taskInfo2 := simpleTask(taskName2, 4) + require.NoError(t, metaCli.PutTask(ctx, taskInfo2)) + require.NoError(t, metaCli.DeleteTask(ctx, taskName2)) + + third := <-ch + require.Equal(t, third.Type, streamhelper.EventErr) + require.Error(t, third.Err, io.EOF) + item, ok := <-ch + require.False(t, ok, "%#v", item) } func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) { diff --git a/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl-schema.sql b/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..93582d5178139 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER PRIMARY KEY, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl.csv b/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl.csv new file mode 100644 index 0000000000000..5958ab0c80cb2 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/data/mytest.testtbl.csv @@ -0,0 +1,6 @@ +aaa,bbb +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" diff --git a/br/tests/lightning_config_skip_csv_header/err_config.toml b/br/tests/lightning_config_skip_csv_header/err_config.toml new file mode 100644 index 0000000000000..95493db0dff44 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/err_config.toml @@ -0,0 +1,9 @@ +[lightning] +check-requirements=true + +[mydumper.csv] +header = true +header-schema-match = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_skip_csv_header/err_default_config.toml b/br/tests/lightning_config_skip_csv_header/err_default_config.toml new file mode 100644 index 0000000000000..a7b17c7276d92 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/err_default_config.toml @@ -0,0 +1,8 @@ +[lightning] +check-requirements=true + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_skip_csv_header/normal_config.toml b/br/tests/lightning_config_skip_csv_header/normal_config.toml new file mode 100644 index 0000000000000..190e635cfc4e9 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/normal_config.toml @@ -0,0 +1,9 @@ +[lightning] +check-requirements=true + +[mydumper.csv] +header = true +header-schema-match = false + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_skip_csv_header/run.sh b/br/tests/lightning_config_skip_csv_header/run.sh new file mode 100755 index 0000000000000..80ad201e2d323 --- /dev/null +++ b/br/tests/lightning_config_skip_csv_header/run.sh @@ -0,0 +1,59 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +mydir=$(dirname "${BASH_SOURCE[0]}") + +data_file="${mydir}/data/mytest.testtbl.csv" + +total_row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo ) + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' + +console_output_file="/tmp/${TEST_NAME}.out" + +echo "Use config that causes errors" +run_lightning --backend tidb --config "${mydir}/err_config.toml" 2>&1 | tee "${console_output_file}" +if [[ ${PIPESTATUS[0]} -eq 0 ]]; then + echo "The lightning import doesn't fail as expected" >&2 + exit 1 +fi + +grep -q "Lightning:Restore:ErrUnknownColumns" "${console_output_file}" + +# import a second time +echo "Use default config that causes errors" +run_lightning --backend tidb --config "${mydir}/err_default_config.toml" 2>&1 | tee "${console_output_file}" +if [[ ${PIPESTATUS[0]} -eq 0 ]]; then + echo "The lightning import doesn't fail as expected" >&2 + exit 1 +fi + +grep -q "Lightning:Restore:ErrUnknownColumns" "${console_output_file}" + +# import a thrid time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' + +echo "Use config that can sucessfully import the data" +run_lightning --backend tidb --config "${mydir}/normal_config.toml" + +run_sql 'SELECT * FROM mytest.testtbl' +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${total_row_count}" +run_sql 'SELECT COUNT(*) FROM mytest.testtbl WHERE id > 0' +check_contains "COUNT(*): ${total_row_count}" diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 2e95e345c534d..080f2ccbb1584 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -226,6 +226,7 @@ go_test( "//errno", "//executor", "//infoschema", + "//keyspace", "//kv", "//meta", "//meta/autoid", diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index c9497f2381f52..4d5300d600470 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/store/gcworker" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/gcutil" @@ -273,7 +274,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -331,7 +332,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -384,7 +385,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -448,7 +449,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -508,7 +509,7 @@ PARTITION BY RANGE (c) ( func TestDropSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -534,7 +535,7 @@ PARTITION BY RANGE (c) ( func TestDefaultKeyword(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 177cd4ebba7d2..86db2daa270ec 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -7617,14 +7617,11 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool { // AddResourceGroup implements the DDL interface, creates a resource group. func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { - groupInfo := &model.ResourceGroupInfo{ResourceGroupSettings: &model.ResourceGroupSettings{}} groupName := stmt.ResourceGroupName - groupInfo.Name = groupName - for _, opt := range stmt.ResourceGroupOptionList { - err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue, opt.BoolValue) - if err != nil { - return err - } + groupInfo := &model.ResourceGroupInfo{Name: groupName, ResourceGroupSettings: &model.ResourceGroupSettings{}} + groupInfo, err = buildResourceGroup(groupInfo, stmt.ResourceGroupOptionList) + if err != nil { + return err } if _, ok := d.GetInfoSchemaWithInterceptor(ctx).ResourceGroupByName(groupName); ok { @@ -7713,6 +7710,7 @@ func buildResourceGroup(oldGroup *model.ResourceGroupInfo, options []*ast.Resour return nil, err } } + groupInfo.ResourceGroupSettings.Adjust() return groupInfo, nil } diff --git a/ddl/main_test.go b/ddl/main_test.go index 84a713dc59bb3..b1bbd8516fa94 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/testkit/testsetup" "github.com/tikv/client-go/v2/tikv" @@ -54,7 +55,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/ddl/resource_group_test.go b/ddl/resource_group_test.go index fcc547ec42f8d..162a3ac233a05 100644 --- a/ddl/resource_group_test.go +++ b/ddl/resource_group_test.go @@ -87,7 +87,7 @@ func TestResourceGroupBasic(t *testing.T) { re.Equal(uint64(2000), g.RURate) re.Equal(int64(-1), g.BurstLimit) - tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 0 YES")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 YES")) tk.MustExec("drop resource group x") g = testResourceGroupNameFromIS(t, tk.Session(), "x") @@ -104,7 +104,7 @@ func TestResourceGroupBasic(t *testing.T) { require.Equal(t, "y", groupInfo.Name.L) require.Equal(t, groupID, groupInfo.ID) require.Equal(t, uint64(4000), groupInfo.RURate) - require.Equal(t, int64(0), groupInfo.BurstLimit) + require.Equal(t, int64(4000), groupInfo.BurstLimit) } g = testResourceGroupNameFromIS(t, tk.Session(), "y") checkFunc(g) @@ -126,25 +126,29 @@ func TestResourceGroupBasic(t *testing.T) { tk.MustContainErrMsg("create resource group x ru_per_sec=1000 ru_per_sec=200, ru_per_sec=300", "Dupliated options specified") tk.MustGetErrCode("create resource group x burstable, burstable", mysql.ErrParse) tk.MustContainErrMsg("create resource group x burstable, burstable", "Dupliated options specified") + tk.MustGetErrCode("create resource group x ru_per_sec=1000, burstable, burstable", mysql.ErrParse) + tk.MustContainErrMsg("create resource group x ru_per_sec=1000, burstable, burstable", "Dupliated options specified") + tk.MustGetErrCode("create resource group x burstable, ru_per_sec=1000, burstable", mysql.ErrParse) + tk.MustContainErrMsg("create resource group x burstable, ru_per_sec=1000, burstable", "Dupliated options specified") groups, err := infosync.ListResourceGroups(context.TODO()) require.Equal(t, 0, len(groups)) require.NoError(t, err) // Check information schema table information_schema.resource_groups tk.MustExec("create resource group x RU_PER_SEC=1000") - tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 1000 0 NO")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 1000 NO")) tk.MustExec("alter resource group x RU_PER_SEC=2000 BURSTABLE") - tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 0 YES")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 YES")) tk.MustExec("alter resource group x BURSTABLE RU_PER_SEC=3000") - tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 3000 0 YES")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 3000 YES")) tk.MustQuery("show create resource group x").Check(testkit.Rows("x CREATE RESOURCE GROUP `x` RU_PER_SEC=3000 BURSTABLE")) tk.MustExec("create resource group y BURSTABLE RU_PER_SEC=2000") - tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 2000 0 YES")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 2000 YES")) tk.MustQuery("show create resource group y").Check(testkit.Rows("y CREATE RESOURCE GROUP `y` RU_PER_SEC=2000 BURSTABLE")) tk.MustExec("alter resource group y RU_PER_SEC=4000 BURSTABLE") - tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 0 YES")) + tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 YES")) tk.MustQuery("show create resource group y").Check(testkit.Rows("y CREATE RESOURCE GROUP `y` RU_PER_SEC=4000 BURSTABLE")) tk.MustQuery("select count(*) from information_schema.resource_groups").Check(testkit.Rows("2")) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index f3988ea5f7c4d..d7a2447cf35b6 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -124,7 +124,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { stmtStats.RegisterStats(1, s1) stmtStats.RegisterStats(1, &s2) stats := stmtStats.GetRootStats(1) - expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}" + expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) @@ -135,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { } stmtStats.RegisterStats(2, s1) stats = stmtStats.GetRootStats(2) - expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" + expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) diff --git a/distsql/select_result.go b/distsql/select_result.go index 6d1f6308e4120..394298e8fa3b0 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -53,6 +53,12 @@ var ( errQueryInterrupted = dbterror.ClassExecutor.NewStd(errno.ErrQueryInterrupted) ) +var ( + telemetryBatchedQueryTaskCnt = metrics.TelemetryBatchedQueryTaskCnt + telemetryStoreBatchedCnt = metrics.TelemetryStoreBatchedCnt + telemetryStoreBatchedFallbackCnt = metrics.TelemetryStoreBatchedFallbackCnt +) + var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*serialSelectResults)(nil) @@ -157,7 +163,7 @@ func (r *selectResult) fetchResp(ctx context.Context) error { if r.stats != nil { // Ignore internal sql. if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 { - ratio := float64(r.stats.CoprCacheHitNum) / float64(len(r.stats.copRespTime)) + ratio := r.stats.calcCacheHit() if ratio >= 1 { telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc() } @@ -364,6 +370,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr rpcStat: tikv.NewRegionRequestRuntimeStats(), distSQLConcurrency: r.distSQLConcurrency, } + if ci, ok := r.resp.(copr.CopInfo); ok { + conc, extraConc := ci.GetConcurrency() + r.stats.distSQLConcurrency = conc + r.stats.extraConcurrency = extraConc + } } r.stats.mergeCopRuntimeStats(copStats, respTime) @@ -455,26 +466,42 @@ func (r *selectResult) Close() error { r.memConsume(-respSize) } if r.stats != nil { - defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats) + defer func() { + if ci, ok := r.resp.(copr.CopInfo); ok { + r.stats.buildTaskDuration = ci.GetBuildTaskElapsed() + batched, fallback := ci.GetStoreBatchInfo() + if batched != 0 || fallback != 0 { + r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback + telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum)) + telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum)) + telemetryBatchedQueryTaskCnt.Add(float64(len(r.stats.copRespTime))) + } + } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats) + }() } return r.resp.Close() } -// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. +// CopRuntimeStats is an interface uses to check whether the result has cop runtime stats. type CopRuntimeStats interface { // GetCopRuntimeStats gets the cop runtime stats information. GetCopRuntimeStats() *copr.CopRuntimeStats } type selectResultRuntimeStats struct { - copRespTime []time.Duration - procKeys []int64 - backoffSleep map[string]time.Duration - totalProcessTime time.Duration - totalWaitTime time.Duration - rpcStat tikv.RegionRequestRuntimeStats - distSQLConcurrency int - CoprCacheHitNum int64 + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats + distSQLConcurrency int + extraConcurrency int + CoprCacheHitNum int64 + storeBatchedNum uint64 + storeBatchedFallbackNum uint64 + buildTaskDuration time.Duration } func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) { @@ -495,12 +522,16 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { newRs := selectResultRuntimeStats{ - copRespTime: make([]time.Duration, 0, len(s.copRespTime)), - procKeys: make([]int64, 0, len(s.procKeys)), - backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), - rpcStat: tikv.NewRegionRequestRuntimeStats(), - distSQLConcurrency: s.distSQLConcurrency, - CoprCacheHitNum: s.CoprCacheHitNum, + copRespTime: make([]time.Duration, 0, len(s.copRespTime)), + procKeys: make([]int64, 0, len(s.procKeys)), + backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + distSQLConcurrency: s.distSQLConcurrency, + extraConcurrency: s.extraConcurrency, + CoprCacheHitNum: s.CoprCacheHitNum, + storeBatchedNum: s.storeBatchedNum, + storeBatchedFallbackNum: s.storeBatchedFallbackNum, + buildTaskDuration: s.buildTaskDuration, } newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) newRs.procKeys = append(newRs.procKeys, s.procKeys...) @@ -528,6 +559,15 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) { s.totalWaitTime += other.totalWaitTime s.rpcStat.Merge(other.rpcStat) s.CoprCacheHitNum += other.CoprCacheHitNum + if other.distSQLConcurrency > s.distSQLConcurrency { + s.distSQLConcurrency = other.distSQLConcurrency + } + if other.extraConcurrency > s.extraConcurrency { + s.extraConcurrency = other.extraConcurrency + } + s.storeBatchedNum += other.storeBatchedNum + s.storeBatchedFallbackNum += other.storeBatchedFallbackNum + s.buildTaskDuration += other.buildTaskDuration } func (s *selectResultRuntimeStats) String() string { @@ -579,14 +619,30 @@ func (s *selectResultRuntimeStats) String() string { } if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 { buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", - strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) + strconv.FormatFloat(s.calcCacheHit(), 'f', 2, 64))) } else { buf.WriteString(", copr_cache: disabled") } + if s.buildTaskDuration > 0 { + buf.WriteString(", build_task_duration: ") + buf.WriteString(execdetails.FormatDuration(s.buildTaskDuration)) + } if s.distSQLConcurrency > 0 { - buf.WriteString(", distsql_concurrency: ") + buf.WriteString(", max_distsql_concurrency: ") buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10)) } + if s.extraConcurrency > 0 { + buf.WriteString(", max_extra_concurrency: ") + buf.WriteString(strconv.FormatInt(int64(s.extraConcurrency), 10)) + } + if s.storeBatchedNum > 0 { + buf.WriteString(", store_batch_num: ") + buf.WriteString(strconv.FormatInt(int64(s.storeBatchedNum), 10)) + } + if s.storeBatchedFallbackNum > 0 { + buf.WriteString(", store_batch_fallback_num: ") + buf.WriteString(strconv.FormatInt(int64(s.storeBatchedFallbackNum), 10)) + } buf.WriteString("}") } @@ -615,3 +671,15 @@ func (s *selectResultRuntimeStats) String() string { func (*selectResultRuntimeStats) Tp() int { return execdetails.TpSelectResultRuntimeStats } + +func (s *selectResultRuntimeStats) calcCacheHit() float64 { + hit := s.CoprCacheHitNum + tot := len(s.copRespTime) + if s.storeBatchedNum > 0 { + tot += int(s.storeBatchedNum) + } + if tot == 0 { + return 0 + } + return float64(hit) / float64(tot) +} diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index ccbf75dd48ee6..ccb70230b9829 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -113,6 +113,7 @@ go_test( "//ddl", "//domain/infosync", "//errno", + "//keyspace", "//kv", "//metrics", "//parser/ast", diff --git a/domain/db_test.go b/domain/db_test.go index 428b63b6de05b..02f716a27b3a8 100644 --- a/domain/db_test.go +++ b/domain/db_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" @@ -73,7 +74,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +108,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/domain/domain.go b/domain/domain.go index 07b69fb365f4f..b4a7a2770afd3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1069,7 +1069,9 @@ func (do *Domain) Init( // step 1: prepare the info/schema syncer which domain reload needed. pdCli := do.GetPDClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, + do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(), + skipRegisterToDashboard) if err != nil { return err } diff --git a/domain/infosync/BUILD.bazel b/domain/infosync/BUILD.bazel index 0952dfc300490..f050873ee128f 100644 --- a/domain/infosync/BUILD.bazel +++ b/domain/infosync/BUILD.bazel @@ -41,10 +41,12 @@ go_library( "@com_github_gorilla_mux//:mux", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", @@ -62,6 +64,7 @@ go_test( deps = [ "//ddl/placement", "//ddl/util", + "//keyspace", "//parser/model", "//testkit/testsetup", "//util", diff --git a/domain/infosync/info.go b/domain/infosync/info.go index b45216daa8a9f..1e82ce308ac9b 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/versioninfo" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -192,6 +193,7 @@ func GlobalInfoSyncerInit( serverIDGetter func() uint64, etcdCli, unprefixedEtcdCli *clientv3.Client, pdCli pd.Client, + codec tikv.Codec, skipRegisterToDashBoard bool, ) (*InfoSyncer, error) { is := &InfoSyncer{ @@ -208,7 +210,7 @@ func GlobalInfoSyncerInit( is.labelRuleManager = initLabelRuleManager(etcdCli) is.placementManager = initPlacementManager(etcdCli) is.scheduleManager = initScheduleManager(etcdCli) - is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli) + is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec) is.resourceGroupManager = initResourceGroupManager(pdCli) setGlobalInfoSyncer(is) return is, nil @@ -261,13 +263,13 @@ func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient { return pdCli } -func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager { +func initTiFlashReplicaManager(etcdCli *clientv3.Client, codec tikv.Codec) TiFlashReplicaManager { if etcdCli == nil { m := mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)} return &m } logutil.BgLogger().Warn("init TiFlashReplicaManager", zap.Strings("pd addrs", etcdCli.Endpoints())) - return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64)} + return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64), codec: codec} } func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager { diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index b7af5ed167837..ee97406eef01a 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit/testsetup" util2 "github.com/pingcap/tidb/util" @@ -67,7 +68,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, keyspace.CodecV1, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -152,7 +153,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -216,7 +217,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, keyspace.CodecV1, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go index d5cc46f95db95..6e47d881215d4 100644 --- a/domain/infosync/tiflash_manager.go +++ b/domain/infosync/tiflash_manager.go @@ -32,12 +32,15 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" + "github.com/tikv/client-go/v2/tikv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -77,6 +80,7 @@ type TiFlashReplicaManagerCtx struct { etcdCli *clientv3.Client sync.RWMutex // protect tiflashProgressCache tiflashProgressCache map[int64]float64 + codec tikv.Codec } // Close is called to close TiFlashReplicaManagerCtx. @@ -230,6 +234,11 @@ func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) er // SetPlacementRule is a helper function to set placement rule. func (m *TiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { + // TiDB 6.6 doesn't support tiflash multi-tenancy yet. + // TODO(iosmanthus): remove this check after TiDB supports tiflash multi-tenancy. + if m.codec.GetAPIVersion() == kvrpcpb.APIVersion_V2 { + return errors.Trace(dbterror.ErrNotSupportedYet.GenWithStackByArgs("set TiFlash replica count while enabling API V2")) + } if err := m.SetTiFlashGroupConfig(ctx); err != nil { return err } diff --git a/executor/adapter.go b/executor/adapter.go index 33b321f60382e..f9fba0fea5d25 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -207,6 +207,7 @@ type TelemetryInfo struct { PartitionTelemetry *PartitionTelemetryInfo AccountLockTelemetry *AccountLockTelemetryInfo UseIndexMerge bool + UseTableLookUp bool } // PartitionTelemetryInfo records table partition telemetry information during execution. diff --git a/executor/builder.go b/executor/builder.go index 12cdeeaaa44ef..1005b51a764c6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3863,6 +3863,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + if b.Ti != nil { + b.Ti.UseTableLookUp = true + } is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil { b.err = err @@ -4000,6 +4003,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { if b.Ti != nil { b.Ti.UseIndexMerge = true + b.Ti.UseTableLookUp = true } ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { @@ -4445,6 +4449,9 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader, lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { + if builder.Ti != nil { + builder.Ti.UseTableLookUp = true + } e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, err diff --git a/executor/distsql.go b/executor/distsql.go index 3b9a6a7d4b288..a96954b9fba6f 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -76,7 +76,9 @@ type lookupTableTask struct { idxRows *chunk.Chunk cursor int - doneCh chan error + // after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic. + buildDoneTime time.Time + doneCh chan error // indexOrder map is used to save the original index order for the handles. // Without this map, the original index order might be lost. @@ -790,13 +792,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) { if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } + var ( + enableStats = e.stats != nil + start time.Time + indexFetchedInstant time.Time + ) + if enableStats { + start = time.Now() + } task, ok := <-e.resultCh if !ok { return nil, nil } + if enableStats { + indexFetchedInstant = time.Now() + } if err := <-task.doneCh; err != nil { return nil, err } + if enableStats { + e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start) + if task.buildDoneTime.After(indexFetchedInstant) { + e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant) + indexFetchedInstant = task.buildDoneTime + } + e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant) + } // Release the memory usage of last task before we handle a new task. if e.resultCurr != nil { @@ -1119,6 +1140,10 @@ type IndexLookUpRunTimeStats struct { TableRowScan int64 TableTaskNum int64 Concurrency int + // Record the `Next` call affected wait duration details. + NextWaitIndexScan time.Duration + NextWaitTableLookUpBuild time.Duration + NextWaitTableLookUpResp time.Duration } func (e *IndexLookUpRunTimeStats) String() string { @@ -1142,6 +1167,15 @@ func (e *IndexLookUpRunTimeStats) String() string { } buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) } + if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 { + if buf.Len() > 0 { + buf.WriteByte(',') + fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}", + execdetails.FormatDuration(e.NextWaitIndexScan), + execdetails.FormatDuration(e.NextWaitTableLookUpBuild), + execdetails.FormatDuration(e.NextWaitTableLookUpResp)) + } + } return buf.String() } @@ -1162,6 +1196,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { e.TaskWait += tmp.TaskWait e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum + e.NextWaitIndexScan += tmp.NextWaitIndexScan + e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild + e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp } // Tp implements the RuntimeStats interface. @@ -1300,6 +1337,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum { // Then we hold the returning rows and finish this task. func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { tableReader, err := w.idxLookup.buildTableReader(ctx, task) + task.buildDoneTime = time.Now() if err != nil { logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 65889a10d0377..50c4a311a1eb9 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) { func TestIndexLookUpStats(t *testing.T) { stats := &executor.IndexLookUpRunTimeStats{ - FetchHandleTotal: int64(5 * time.Second), - FetchHandle: int64(2 * time.Second), - TaskWait: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + FetchHandleTotal: int64(5 * time.Second), + FetchHandle: int64(2 * time.Second), + TaskWait: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, + NextWaitIndexScan: time.Second, + NextWaitTableLookUpBuild: 2 * time.Second, + NextWaitTableLookUpResp: 3 * time.Second, } - require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+ + ", table_task: {total_time: 2s, num: 2, concurrency: 1}"+ + ", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String()) require.Equal(t, stats.Clone().String(), stats.String()) stats.Merge(stats.Clone()) - require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+ + ", table_task: {total_time: 4s, num: 4, concurrency: 1}"+ + ", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String()) } func TestIndexLookUpGetResultChunk(t *testing.T) { diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index a016d5a459df1..d3e94d5b77353 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -3406,7 +3406,6 @@ func (e *memtableRetriever) setDataFromResourceGroups() error { row := types.MakeDatums( group.Name, group.RUSettings.RU.Settings.FillRate, - uint64(group.RUSettings.RU.Tokens), burstable, ) rows = append(rows, row) @@ -3416,7 +3415,6 @@ func (e *memtableRetriever) setDataFromResourceGroups() error { group.Name, nil, nil, - nil, ) rows = append(rows, row) } diff --git a/infoschema/tables.go b/infoschema/tables.go index 3cbf9ee1b464f..7a2631b4abe13 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1595,7 +1595,6 @@ var tableMemoryUsageOpsHistoryCols = []columnInfo{ var tableResourceGroupsCols = []columnInfo{ {name: "NAME", tp: mysql.TypeVarchar, size: resourcegroup.MaxGroupNameLength, flag: mysql.NotNullFlag}, {name: "RU_PER_SEC", tp: mysql.TypeLonglong, size: 21}, - {name: "RU_TOKENS", tp: mysql.TypeLonglong, size: 21}, {name: "BURSTABLE", tp: mysql.TypeVarchar, size: 3}, } diff --git a/metrics/telemetry.go b/metrics/telemetry.go index 591823f9952d9..db4c80714c0ea 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -169,6 +169,34 @@ var ( Name: "compact_partition_usage", Help: "Counter of compact table partition", }) + TelemetryStoreBatchedQueryCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched_query", + Help: "Counter of queries which use store batched coprocessor tasks", + }) + TelemetryBatchedQueryTaskCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "batched_query_task", + Help: "Counter of coprocessor tasks in batched queries", + }) + TelemetryStoreBatchedCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched", + Help: "Counter of store batched coprocessor tasks", + }) + TelemetryStoreBatchedFallbackCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "store_batched_fallback", + Help: "Counter of store batched fallback coprocessor tasks", + }) ) // readCounter reads the value of a prometheus.Counter. @@ -422,3 +450,37 @@ func GetIndexMergeCounter() IndexMergeUsageCounter { IndexMergeUsed: readCounter(TelemetryIndexMergeUsage), } } + +// StoreBatchCoprCounter records the usages of batch copr statements. +type StoreBatchCoprCounter struct { + // BatchSize is the global value of `tidb_store_batch_size` + BatchSize int `json:"batch_size"` + // BatchedQuery is the counter of queries that use this feature. + BatchedQuery int64 `json:"query"` + // BatchedQueryTask is the counter of total tasks in queries above. + BatchedQueryTask int64 `json:"tasks"` + // BatchedCount is the counter of successfully batched tasks. + BatchedCount int64 `json:"batched"` + // BatchedFallbackCount is the counter of fallback batched tasks by region miss. + BatchedFallbackCount int64 `json:"batched_fallback"` +} + +// Sub returns the difference of two counters. +func (n StoreBatchCoprCounter) Sub(rhs StoreBatchCoprCounter) StoreBatchCoprCounter { + return StoreBatchCoprCounter{ + BatchedQuery: n.BatchedQuery - rhs.BatchedQuery, + BatchedQueryTask: n.BatchedQueryTask - rhs.BatchedQueryTask, + BatchedCount: n.BatchedCount - rhs.BatchedCount, + BatchedFallbackCount: n.BatchedFallbackCount - rhs.BatchedFallbackCount, + } +} + +// GetStoreBatchCoprCounter gets the IndexMerge usage counter. +func GetStoreBatchCoprCounter() StoreBatchCoprCounter { + return StoreBatchCoprCounter{ + BatchedQuery: readCounter(TelemetryStoreBatchedQueryCnt), + BatchedQueryTask: readCounter(TelemetryBatchedQueryTaskCnt), + BatchedCount: readCounter(TelemetryStoreBatchedCnt), + BatchedFallbackCount: readCounter(TelemetryStoreBatchedFallbackCnt), + } +} diff --git a/parser/model/model.go b/parser/model/model.go index d530bea00ac9c..25afd90b4a47c 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1875,6 +1875,14 @@ func (p *ResourceGroupSettings) String() string { return sb.String() } +// Adjust adjusts the resource group settings. +func (p *ResourceGroupSettings) Adjust() { + // Curretly we only support ru_per_sec sytanx, so BurstLimit(capicity) is always same as ru_per_sec. + if p.BurstLimit == 0 { + p.BurstLimit = int64(p.RURate) + } +} + // Clone clones the resource group settings. func (p *ResourceGroupSettings) Clone() *ResourceGroupSettings { cloned := *p diff --git a/parser/parser.go b/parser/parser.go index 816226256ecf0..303aa5b3158de 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -11970,19 +11970,21 @@ yynewstate: } case 10: { - parser.yyVAL.item = append(yyS[yypt-1].item.([]*ast.ResourceGroupOption), yyS[yypt-0].item.(*ast.ResourceGroupOption)) - if yyS[yypt-1].item.([]*ast.ResourceGroupOption)[0].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp { + if yyS[yypt-1].item.([]*ast.ResourceGroupOption)[0].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp || + (len(yyS[yypt-1].item.([]*ast.ResourceGroupOption)) > 1 && yyS[yypt-1].item.([]*ast.ResourceGroupOption)[1].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp) { yylex.AppendError(yylex.Errorf("Dupliated options specified")) return 1 } + parser.yyVAL.item = append(yyS[yypt-1].item.([]*ast.ResourceGroupOption), yyS[yypt-0].item.(*ast.ResourceGroupOption)) } case 11: { - parser.yyVAL.item = append(yyS[yypt-2].item.([]*ast.ResourceGroupOption), yyS[yypt-0].item.(*ast.ResourceGroupOption)) - if yyS[yypt-2].item.([]*ast.ResourceGroupOption)[0].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp { + if yyS[yypt-2].item.([]*ast.ResourceGroupOption)[0].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp || + (len(yyS[yypt-2].item.([]*ast.ResourceGroupOption)) > 1 && yyS[yypt-2].item.([]*ast.ResourceGroupOption)[1].Tp == yyS[yypt-0].item.(*ast.ResourceGroupOption).Tp) { yylex.AppendError(yylex.Errorf("Dupliated options specified")) return 1 } + parser.yyVAL.item = append(yyS[yypt-2].item.([]*ast.ResourceGroupOption), yyS[yypt-0].item.(*ast.ResourceGroupOption)) } case 12: { diff --git a/parser/parser.y b/parser/parser.y index 31c32ca7f4d34..83d91e8803349 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -1599,19 +1599,21 @@ ResourceGroupOptionList: } | ResourceGroupOptionList DirectResourceGroupOption { - $$ = append($1.([]*ast.ResourceGroupOption), $2.(*ast.ResourceGroupOption)) - if $1.([]*ast.ResourceGroupOption)[0].Tp == $2.(*ast.ResourceGroupOption).Tp { + if $1.([]*ast.ResourceGroupOption)[0].Tp == $2.(*ast.ResourceGroupOption).Tp || + (len($1.([]*ast.ResourceGroupOption)) > 1 && $1.([]*ast.ResourceGroupOption)[1].Tp == $2.(*ast.ResourceGroupOption).Tp) { yylex.AppendError(yylex.Errorf("Dupliated options specified")) return 1 } + $$ = append($1.([]*ast.ResourceGroupOption), $2.(*ast.ResourceGroupOption)) } | ResourceGroupOptionList ',' DirectResourceGroupOption { + if $1.([]*ast.ResourceGroupOption)[0].Tp == $3.(*ast.ResourceGroupOption).Tp || + (len($1.([]*ast.ResourceGroupOption)) > 1 && $1.([]*ast.ResourceGroupOption)[1].Tp == $3.(*ast.ResourceGroupOption).Tp) { + yylex.AppendError(yylex.Errorf("Dupliated options specified")) + return 1 + } $$ = append($1.([]*ast.ResourceGroupOption), $3.(*ast.ResourceGroupOption)) - if $1.([]*ast.ResourceGroupOption)[0].Tp == $3.(*ast.ResourceGroupOption).Tp { - yylex.AppendError(yylex.Errorf("Dupliated options specified")) - return 1 - } } DirectResourceGroupOption: diff --git a/server/BUILD.bazel b/server/BUILD.bazel index c0477b9248e5c..2f7de500099cf 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -162,6 +162,7 @@ go_test( "//expression", "//extension", "//infoschema", + "//keyspace", "//kv", "//meta", "//metrics", diff --git a/server/stat_test.go b/server/stat_test.go index 2cea08933c2c0..dfa2228467911 100644 --- a/server/stat_test.go +++ b/server/stat_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/stretchr/testify/require" @@ -46,7 +47,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), keyspace.CodecV1, true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/session/session.go b/session/session.go index 2d84f1c0fa858..95822c17811a0 100644 --- a/session/session.go +++ b/session/session.go @@ -153,7 +153,8 @@ var ( telemetryUnlockUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("unlockUser") telemetryCreateOrAlterUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("createOrAlterUser") - telemetryIndexMerge = metrics.TelemetryIndexMergeUsage + telemetryIndexMerge = metrics.TelemetryIndexMergeUsage + telemetryStoreBatchedUsage = metrics.TelemetryStoreBatchedQueryCnt ) // Session context, it is consistent with the lifecycle of a client connection. @@ -4047,6 +4048,10 @@ func (s *session) updateTelemetryMetric(es *executor.ExecStmt) { telemetryUnlockUserUsage.Add(float64(ti.AccountLockTelemetry.UnlockUser)) telemetryCreateOrAlterUserUsage.Add(float64(ti.AccountLockTelemetry.CreateOrAlterUser)) } + + if ti.UseTableLookUp && s.sessionVars.StoreBatchSize > 0 { + telemetryStoreBatchedUsage.Inc() + } } // GetBuiltinFunctionUsage returns the replica of counting of builtin function usage diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2149e2387f236..3090ab630f3b7 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1168,7 +1168,7 @@ const ( DefTiDBTTLDeleteRateLimit = 0 DefPasswordReuseHistory = 0 DefPasswordReuseTime = 0 - DefTiDBStoreBatchSize = 0 + DefTiDBStoreBatchSize = 4 DefTiDBHistoricalStatsDuration = 7 * 24 * time.Hour DefTiDBEnableHistoricalStatsForCapture = false DefTiDBTTLJobScheduleWindowStartTime = "00:00 +0000" diff --git a/statistics/BUILD.bazel b/statistics/BUILD.bazel index 8dccd523fc887..7186245a79bda 100644 --- a/statistics/BUILD.bazel +++ b/statistics/BUILD.bazel @@ -82,6 +82,7 @@ go_test( data = glob(["testdata/**"]), embed = [":statistics"], flaky = True, + shard_count = 50, deps = [ "//config", "//domain", diff --git a/store/copr/copr_test/coprocessor_test.go b/store/copr/copr_test/coprocessor_test.go index 7931fb8432675..30247b8694a72 100644 --- a/store/copr/copr_test/coprocessor_test.go +++ b/store/copr/copr_test/coprocessor_test.go @@ -166,4 +166,19 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) { require.Nil(t, errRes) tasks = it.GetTasks() require.Equal(t, len(tasks), 4) + + // only small tasks will be batched. + ranges = copr.BuildKeyRanges("a", "b", "h", "i", "o", "p") + req = &kv.Request{ + Tp: kv.ReqTypeDAG, + KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 33, 32}), + Concurrency: 15, + StoreBatchSize: 3, + } + it, errRes = copClient.BuildCopIterator(ctx, req, vars, opt) + require.Nil(t, errRes) + tasks = it.GetTasks() + require.Equal(t, len(tasks), 2) + require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1) + require.Equal(t, len(tasks[1].ToPBBatchTasks()), 0) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index e1c5fb0b91d00..06ab6ab61efd1 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -146,11 +146,13 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars err error ) tryRowHint := optRowHint(req) + elapsed := time.Duration(0) buildOpt := &buildCopTaskOpt{ req: req, cache: c.store.GetRegionCache(), eventCb: eventCb, respChan: req.KeepOrder, + elapsed: &elapsed, } buildTaskFunc := func(ranges []kv.KeyRange, hints []int) error { keyRanges := NewKeyRanges(ranges) @@ -186,14 +188,15 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars return nil, copErrorResponse{err} } it := &copIterator{ - store: c.store, - req: req, - concurrency: req.Concurrency, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, - replicaReadSeed: c.replicaReadSeed, - rpcCancel: tikv.NewRPCanceller(), + store: c.store, + req: req, + concurrency: req.Concurrency, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + replicaReadSeed: c.replicaReadSeed, + rpcCancel: tikv.NewRPCanceller(), + buildTaskElapsed: *buildOpt.elapsed, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -284,13 +287,14 @@ func (r *copTask) ToPBBatchTasks() []*coprocessor.StoreBatchTask { } pbTasks := make([]*coprocessor.StoreBatchTask, 0, len(r.batchTaskList)) for _, task := range r.batchTaskList { - pbTasks = append(pbTasks, &coprocessor.StoreBatchTask{ + storeBatchTask := &coprocessor.StoreBatchTask{ RegionId: task.region.GetRegionId(), RegionEpoch: task.region.GetRegionEpoch(), Peer: task.peer, Ranges: task.region.GetRanges(), TaskId: task.task.taskID, - }) + } + pbTasks = append(pbTasks, storeBatchTask) } return pbTasks } @@ -304,6 +308,7 @@ type buildCopTaskOpt struct { eventCb trxevents.EventCallback respChan bool rowHints []int + elapsed *time.Duration } func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*copTask, error) { @@ -315,7 +320,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } rangesLen := ranges.Len() // something went wrong, disable hints to avoid out of range index. - if hints != nil && len(hints) != rangesLen { + if len(hints) != rangesLen { hints = nil } @@ -341,7 +346,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } var builder taskBuilder - if req.StoreBatchSize > 0 { + if req.StoreBatchSize > 0 && hints != nil { builder = newBatchTaskBuilder(bo, req, cache) } else { builder = newLegacyTaskBuilder(len(locs)) @@ -428,6 +433,9 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c if elapsed > time.Millisecond { defer tracing.StartRegion(bo.GetCtx(), "copr.buildCopTasks").End() } + if opt.elapsed != nil { + *opt.elapsed = *opt.elapsed + elapsed + } metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil } @@ -498,7 +506,8 @@ func (b *batchStoreTaskBuilder) handle(task *copTask) (err error) { b.tasks = append(b.tasks, task) } }() - if b.limit <= 0 { + // only batch small tasks for memory control. + if b.limit <= 0 || !isSmallTask(task) { return nil } batchedTask, err := b.cache.BuildBatchTask(b.bo, task, b.req.ReplicaRead) @@ -576,7 +585,9 @@ func isSmallTask(task *copTask) bool { // strictly, only RowCountHint == -1 stands for unknown task rows, // but when RowCountHint == 0, it may be caused by initialized value, // to avoid the future bugs, let the tasks with RowCountHint == 0 be non-small tasks. - return task.RowCountHint > 0 && task.RowCountHint <= CopSmallTaskRow + return task.RowCountHint > 0 && + (len(task.batchTaskList) == 0 && task.RowCountHint <= CopSmallTaskRow) || + (len(task.batchTaskList) > 0 && task.RowCountHint <= 2*CopSmallTaskRow) } // smallTaskConcurrency counts the small tasks of tasks, @@ -604,6 +615,16 @@ func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) { return res, extraConc } +// CopInfo is used to expose functions of copIterator. +type CopInfo interface { + // GetConcurrency returns the concurrency and small task concurrency. + GetConcurrency() (int, int) + // GetStoreBatchInfo returns the batched and fallback num. + GetStoreBatchInfo() (uint64, uint64) + // GetBuildTaskElapsed returns the duration of building task. + GetBuildTaskElapsed() time.Duration +} + type copIterator struct { store *Store req *kv.Request @@ -641,6 +662,10 @@ type copIterator struct { actionOnExceed *rateLimitAction pagingTaskIdx uint32 + + buildTaskElapsed time.Duration + storeBatchedNum atomic.Uint64 + storeBatchedFallbackNum atomic.Uint64 } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -660,6 +685,9 @@ type copIteratorWorker struct { enableCollectExecutionInfo bool pagingTaskIdx *uint32 + + storeBatchedNum *atomic.Uint64 + storeBatchedFallbackNum *atomic.Uint64 } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -792,6 +820,8 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC replicaReadSeed: it.replicaReadSeed, enableCollectExecutionInfo: enableCollectExecutionInfo, pagingTaskIdx: &it.pagingTaskIdx, + storeBatchedNum: &it.storeBatchedNum, + storeBatchedFallbackNum: &it.storeBatchedFallbackNum, } go worker.run(ctx) } @@ -890,6 +920,16 @@ func (it *copIterator) GetConcurrency() (int, int) { return it.concurrency, it.smallTaskConcurrency } +// GetStoreBatchInfo returns the batched and fallback num. +func (it *copIterator) GetStoreBatchInfo() (uint64, uint64) { + return it.storeBatchedNum.Load(), it.storeBatchedFallbackNum.Load() +} + +// GetBuildTaskElapsed returns the duration of building task. +func (it *copIterator) GetBuildTaskElapsed() time.Duration { + return it.buildTaskElapsed +} + // GetSendRate returns the rate-limit object. func (it *copIterator) GetSendRate() *util.RateLimit { return it.sendRate @@ -1086,30 +1126,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch Tasks: task.ToPBBatchTasks(), } - var cacheKey []byte - var cacheValue *coprCacheValue - - // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since - // computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key. - if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { - cKey, err := coprCacheBuildKey(&copReq) - if err == nil { - cacheKey = cKey - cValue := worker.store.coprCache.Get(cKey) - copReq.IsCacheEnabled = true - - if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { - // Append cache version to the request to skip Coprocessor computation if possible - // when request result is cached - copReq.CacheIfMatchVersion = cValue.RegionDataVersion - cacheValue = cValue - } else { - copReq.CacheIfMatchVersion = 0 - } - } else { - logutil.BgLogger().Warn("Failed to build copr cache key", zap.Error(err)) - } - } + cacheKey, cacheValue := worker.buildCacheKey(task, &copReq) req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), @@ -1279,13 +1296,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R if err != nil { return remains, err } - return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.GetBatchResponses(), task, ch) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { if err := worker.handleLockErr(bo, lockErr, task); err != nil { return nil, err } - return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) + return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.GetBatchResponses(), task, ch) } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1316,74 +1333,23 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } worker.handleCollectExecutionInfo(bo, rpcCtx, resp) resp.respTime = costTime - if resp.pbResp.IsCacheHit { - coprCacheCounterHit.Add(1) - if cacheValue == nil { - return nil, errors.New("Internal error: received illegal TiKV response") - } - // Cache hit and is valid: use cached data as response data and we don't update the cache. - data := make([]byte, len(cacheValue.Data)) - copy(data, cacheValue.Data) - resp.pbResp.Data = data - if worker.req.Paging.Enable { - var start, end []byte - if cacheValue.PageStart != nil { - start = make([]byte, len(cacheValue.PageStart)) - copy(start, cacheValue.PageStart) - } - if cacheValue.PageEnd != nil { - end = make([]byte, len(cacheValue.PageEnd)) - copy(end, cacheValue.PageEnd) - } - // When paging protocol is used, the response key range is part of the cache data. - if start != nil || end != nil { - resp.pbResp.Range = &coprocessor.KeyRange{ - Start: start, - End: end, - } - } else { - resp.pbResp.Range = nil - } - } - resp.detail.CoprCacheHit = true - } else { - coprCacheCounterMiss.Add(1) - // Cache not hit or cache hit but not valid: update the cache if the response can be cached. - if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 { - if resp.detail != nil { - if worker.store.coprCache.CheckResponseAdmission(resp.pbResp.Data.Size(), resp.detail.TimeDetail.ProcessTime, task.pagingTaskIdx) { - data := make([]byte, len(resp.pbResp.Data)) - copy(data, resp.pbResp.Data) - - newCacheValue := coprCacheValue{ - Data: data, - TimeStamp: worker.req.StartTs, - RegionID: task.region.GetID(), - RegionDataVersion: resp.pbResp.CacheLastVersion, - } - // When paging protocol is used, the response key range is part of the cache data. - if r := resp.pbResp.GetRange(); r != nil { - newCacheValue.PageStart = append([]byte{}, r.GetStart()...) - newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...) - } - worker.store.coprCache.Set(cacheKey, &newCacheValue) - } - } - } + if err := worker.handleCopCache(task, resp, cacheKey, cacheValue); err != nil { + return nil, err } + batchResps := resp.pbResp.BatchResponses worker.sendToRespCh(resp, ch, true) - return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch) + return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch) } -func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { if len(task.batchTaskList) == 0 { return remains, nil } batchedTasks := task.batchTaskList task.batchTaskList = nil - batchedRemains, err := worker.handleBatchCopResponse(bo, batchResp, batchedTasks, ch) + batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch) if err != nil { return nil, err } @@ -1392,11 +1358,19 @@ func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains // handle the batched cop response. // tasks will be changed, so the input tasks should not be used after calling this function. -func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse, + tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) (remainTasks []*copTask, err error) { if len(tasks) == 0 { return nil, nil } - var remainTasks []*copTask + batchedNum := len(tasks) + defer func() { + if err != nil { + return + } + worker.storeBatchedNum.Add(uint64(batchedNum - len(remainTasks))) + worker.storeBatchedFallbackNum.Add(uint64(len(remainTasks))) + }() appendRemainTasks := func(tasks ...*copTask) { if remainTasks == nil { // allocate size fo remain length @@ -1404,6 +1378,13 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp } remainTasks = append(remainTasks, tasks...) } + // need Addr for recording details. + var dummyRPCCtx *tikv.RPCContext + if rpcCtx != nil { + dummyRPCCtx = &tikv.RPCContext{ + Addr: rpcCtx.Addr, + } + } for _, batchResp := range batchResps { taskID := batchResp.GetTaskId() batchedTask, ok := tasks[taskID] @@ -1413,7 +1394,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp delete(tasks, taskID) resp := &copResponse{ pbResp: &coprocessor.Response{ - Data: batchResp.Data, + Data: batchResp.Data, + ExecDetailsV2: batchResp.ExecDetailsV2, }, } task := batchedTask.task @@ -1468,8 +1450,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp } return nil, errors.Trace(err) } - // TODO: check OOM - worker.sendToRespCh(resp, ch, false) + worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp) + worker.sendToRespCh(resp, ch, true) } for _, t := range tasks { task := t.task @@ -1525,6 +1507,90 @@ func (worker *copIteratorWorker) handleLockErr(bo *Backoffer, lockErr *kvrpcpb.L return nil } +func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request) (cacheKey []byte, cacheValue *coprCacheValue) { + // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since + // computing is not the main cost. Ignore requests with many ranges directly to avoid slowly building the cache key. + if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { + cKey, err := coprCacheBuildKey(copReq) + if err == nil { + cacheKey = cKey + cValue := worker.store.coprCache.Get(cKey) + copReq.IsCacheEnabled = true + + if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { + // Append cache version to the request to skip Coprocessor computation if possible + // when request result is cached + copReq.CacheIfMatchVersion = cValue.RegionDataVersion + cacheValue = cValue + } else { + copReq.CacheIfMatchVersion = 0 + } + } else { + logutil.BgLogger().Warn("Failed to build copr cache key", zap.Error(err)) + } + } + return +} + +func (worker *copIteratorWorker) handleCopCache(task *copTask, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue) error { + if resp.pbResp.IsCacheHit { + if cacheValue == nil { + return errors.New("Internal error: received illegal TiKV response") + } + coprCacheCounterHit.Add(1) + // Cache hit and is valid: use cached data as response data and we don't update the cache. + data := make([]byte, len(cacheValue.Data)) + copy(data, cacheValue.Data) + resp.pbResp.Data = data + if worker.req.Paging.Enable { + var start, end []byte + if cacheValue.PageStart != nil { + start = make([]byte, len(cacheValue.PageStart)) + copy(start, cacheValue.PageStart) + } + if cacheValue.PageEnd != nil { + end = make([]byte, len(cacheValue.PageEnd)) + copy(end, cacheValue.PageEnd) + } + // When paging protocol is used, the response key range is part of the cache data. + if start != nil || end != nil { + resp.pbResp.Range = &coprocessor.KeyRange{ + Start: start, + End: end, + } + } else { + resp.pbResp.Range = nil + } + } + resp.detail.CoprCacheHit = true + return nil + } + coprCacheCounterMiss.Add(1) + // Cache not hit or cache hit but not valid: update the cache if the response can be cached. + if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 { + if resp.detail != nil { + if worker.store.coprCache.CheckResponseAdmission(resp.pbResp.Data.Size(), resp.detail.TimeDetail.ProcessTime, task.pagingTaskIdx) { + data := make([]byte, len(resp.pbResp.Data)) + copy(data, resp.pbResp.Data) + + newCacheValue := coprCacheValue{ + Data: data, + TimeStamp: worker.req.StartTs, + RegionID: task.region.GetID(), + RegionDataVersion: resp.pbResp.CacheLastVersion, + } + // When paging protocol is used, the response key range is part of the cache data. + if r := resp.pbResp.GetRange(); r != nil { + newCacheValue.PageStart = append([]byte{}, r.GetStart()...) + newCacheValue.PageEnd = append([]byte{}, r.GetEnd()...) + } + worker.store.coprCache.Set(cacheKey, &newCacheValue) + } + } + } + return nil +} + func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetail { if !worker.enableCollectExecutionInfo { return nil diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index a6c79f7de596f..56376f0031109 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -77,6 +77,7 @@ go_test( "//testkit", "//testkit/testsetup", "@com_github_jeffail_gabs_v2//:gabs", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", diff --git a/telemetry/data.go b/telemetry/data.go index 0008b7fc8b88b..2a3ecffddaa1e 100644 --- a/telemetry/data.go +++ b/telemetry/data.go @@ -69,6 +69,8 @@ func postReportTelemetryData() { PostSavepointCount() postReportLazyPessimisticUniqueCheckSetCount() postReportDDLUsage() + postReportIndexMergeUsage() + postStoreBatchUsage() } // PostReportTelemetryDataForTest is for test. diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 1fe696870c291..1a186cc3451d2 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -17,6 +17,7 @@ package telemetry import ( "context" "errors" + "strconv" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" @@ -62,6 +63,7 @@ type featureUsage struct { IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` ResourceControlUsage *resourceControlUsage `json:"resourceControl"` TTLUsage *ttlUsageCounter `json:"ttlUsage"` + StoreBatchCoprUsage *m.StoreBatchCoprCounter `json:"storeBatchCopr"` } type placementPolicyUsage struct { @@ -121,6 +123,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.TTLUsage = getTTLUsageInfo(ctx, sctx) + usage.StoreBatchCoprUsage = getStoreBatchUsage(sctx) + return &usage, nil } @@ -264,6 +268,7 @@ var initialSavepointStmtCounter int64 var initialLazyPessimisticUniqueCheckSetCount int64 var initialDDLUsageCounter m.DDLUsageCounter var initialIndexMergeCounter m.IndexMergeUsageCounter +var initialStoreBatchCoprCounter m.StoreBatchCoprCounter // getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests. func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { @@ -432,3 +437,18 @@ func getIndexMergeUsageInfo() *m.IndexMergeUsageCounter { diff := curr.Sub(initialIndexMergeCounter) return &diff } + +func getStoreBatchUsage(ctx sessionctx.Context) *m.StoreBatchCoprCounter { + curr := m.GetStoreBatchCoprCounter() + diff := curr.Sub(initialStoreBatchCoprCounter) + if val, err := ctx.GetSessionVars().GetGlobalSystemVar(context.Background(), variable.TiDBStoreBatchSize); err == nil { + if batchSize, err := strconv.Atoi(val); err == nil { + diff.BatchSize = batchSize + } + } + return &diff +} + +func postStoreBatchUsage() { + initialStoreBatchCoprCounter = m.GetStoreBatchCoprCounter() +} diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index c8932cd27e35b..f8e5701f0e69a 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" _ "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -777,3 +778,69 @@ func TestTTLTelemetry(t *testing.T) { checkTableHistWithDeleteRows(1, 1, 0, 0, 0) checkTableHistWithDelay(0, 1, 1, 0, 1) } + +func TestStoreBatchCopr(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + init, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, init.StoreBatchCoprUsage.BatchSize, 4) + + tk.MustExec("drop table if exists tele_batch_t") + tk.MustExec("create table tele_batch_t (id int primary key, c int, k int, index i(k))") + tk.MustExec("select * from tele_batch_t force index(i) where k between 1 and 10 and k % 2 != 0") + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff := usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(1)) + require.Equal(t, diff.BatchedQueryTask, int64(0)) + require.Equal(t, diff.BatchedCount, int64(0)) + require.Equal(t, diff.BatchedFallbackCount, int64(0)) + + tk.MustExec("insert into tele_batch_t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (5, 5, 5), (7, 7, 7)") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/setRangesPerTask", "return(1)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/setRangesPerTask")) + }() + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(2)) + require.Equal(t, diff.BatchedQueryTask, int64(2)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(0)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/batchCopRegionError", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/batchCopRegionError")) + }() + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 4) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(3)) + require.Equal(t, diff.BatchedQueryTask, int64(4)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(1)) + + tk.MustExec("set global tidb_store_batch_size = 0") + tk.MustExec("set session tidb_store_batch_size = 0") + tk.MustQuery("select * from tele_batch_t force index(i) where k between 1 and 3 and k % 2 != 0").Sort(). + Check(testkit.Rows("1 1 1", "3 3 3")) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.StoreBatchCoprUsage.BatchSize, 0) + diff = usage.StoreBatchCoprUsage.Sub(*init.StoreBatchCoprUsage) + require.Equal(t, diff.BatchedQuery, int64(3)) + require.Equal(t, diff.BatchedQueryTask, int64(4)) + require.Equal(t, diff.BatchedCount, int64(1)) + require.Equal(t, diff.BatchedFallbackCount, int64(1)) +}