From 5d23bf063643e1cfde5e40eac998d04fd20ddc10 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 2 Mar 2022 18:15:15 -0700 Subject: [PATCH 1/3] sessionctx/variable: remove streaming option --- sessionctx/variable/sysvar.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a9a4b6f963b4a..bd1b4d6f62f61 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -584,11 +584,6 @@ var defaultSysVars = []*SysVar{ appendDeprecationWarning(vars, TiDBMemQuotaIndexLookupJoin, TiDBMemQuotaQuery) return normalizedValue, nil }}, - // Deprecated: tidb_enable_streaming - {Scope: ScopeSession, Name: TiDBEnableStreaming, Value: Off, Type: TypeBool, skipInit: true, Hidden: true, SetSession: func(s *SessionVars, val string) error { - s.EnableStreaming = TiDBOptOn(val) - return nil - }}, {Scope: ScopeSession, Name: TiDBEnableChunkRPC, Value: On, Type: TypeBool, skipInit: true, SetSession: func(s *SessionVars, val string) error { s.EnableChunkRPC = TiDBOptOn(val) return nil From 08f85fa4b38d5be472a698827e824caada6d36a5 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 2 Mar 2022 18:47:40 -0700 Subject: [PATCH 2/3] *: Removing deprecated streaming option --- config/config.go | 9 ++-- config/config_util.go | 1 - distsql/distsql.go | 7 +-- distsql/distsql_test.go | 71 ---------------------------- executor/analyze_test.go | 11 ----- sessionctx/variable/removed.go | 1 + sessionctx/variable/session.go | 12 ----- sessionctx/variable/varsutil_test.go | 13 ----- store/driver/sql_fail_test.go | 67 -------------------------- 9 files changed, 5 insertions(+), 187 deletions(-) diff --git a/config/config.go b/config/config.go index b75a69128e3c2..f43aae33e3479 100644 --- a/config/config.go +++ b/config/config.go @@ -109,11 +109,9 @@ type Config struct { MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"` // TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled // If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error - TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes - // Deprecated - EnableStreaming bool `toml:"-" json:"-"` - EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"` - TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"` + TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes + EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"` + TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"` // Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html. // TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive. LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"` @@ -630,7 +628,6 @@ var defaultConf = Config{ TempStoragePath: tempStorageDirName, OOMAction: OOMActionCancel, MemQuotaQuery: 1 << 30, - EnableStreaming: false, EnableBatchDML: false, CheckMb4ValueInUTF8: *NewAtomicBool(true), MaxIndexLength: 3072, diff --git a/config/config_util.go b/config/config_util.go index f8ae5221c2617..c192a54fd7dbc 100644 --- a/config/config_util.go +++ b/config/config_util.go @@ -53,7 +53,6 @@ var ( "Log.QueryLogMaxLen": {}, "Log.ExpensiveThreshold": {}, "CheckMb4ValueInUTF8": {}, - "EnableStreaming": {}, "TxnLocalLatches.Capacity": {}, "CompatibleKillQuery": {}, "TreatOldVersionUTF8AsUTF8MB4": {}, diff --git a/distsql/distsql.go b/distsql/distsql.go index b9ec41f1f718b..06104f0e368f9 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -78,9 +78,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie hook.(func(*kv.Request))(kvReq) } - if !sctx.GetSessionVars().EnableStreaming { - kvReq.Streaming = false - } + kvReq.Streaming = false enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL eventCb := func(event trxevents.TransactionEvent) { @@ -219,9 +217,6 @@ func canUseChunkRPC(ctx sessionctx.Context) bool { if !ctx.GetSessionVars().EnableChunkRPC { return false } - if ctx.GetSessionVars().EnableStreaming { - return false - } if !checkAlignment() { return false } diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6559ef8a0917e..15d9919cca0ba 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -152,36 +152,6 @@ func TestSelectResultRuntimeStats(t *testing.T) { require.Equal(t, expect, s1.String()) } -func TestSelectStreaming(t *testing.T) { - response, colTypes := createSelectStreaming(t, 1, 2) - // Test Next. - chk := chunk.New(colTypes, 32, 32) - numAllRows := 0 - for { - err := response.Next(context.TODO(), chk) - require.NoError(t, err) - numAllRows += chk.NumRows() - if chk.NumRows() == 0 { - break - } - } - require.Equal(t, 2, numAllRows) - require.NoError(t, response.Close()) -} - -func TestSelectStreamingWithNextRaw(t *testing.T) { - response, _ := createSelectStreaming(t, 1, 2) - data, err := response.NextRaw(context.TODO()) - require.NoError(t, err) - require.Len(t, data, 16) -} - -func TestSelectStreamingChunkSize(t *testing.T) { - response, colTypes := createSelectStreaming(t, 100, 1000000) - testChunkSize(t, response, colTypes) - require.NoError(t, response.Close()) -} - func TestAnalyze(t *testing.T) { sctx := newMockSessionContext() sctx.GetSessionVars().EnableChunkRPC = false @@ -472,44 +442,3 @@ func createSelectNormal(t *testing.T, batch, totalRows int, planIDs []int, sctx return result, colTypes } - -func createSelectStreaming(t *testing.T, batch, totalRows int) (*streamResult, []*types.FieldType) { - request, err := (&RequestBuilder{}).SetKeyRanges(nil). - SetDAGRequest(&tipb.DAGRequest{}). - SetDesc(false). - SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). - SetStreaming(true). - Build() - require.NoError(t, err) - - // 4 int64 types. - colTypes := []*types.FieldType{ - { - Tp: mysql.TypeLonglong, - Flen: mysql.MaxIntWidth, - Decimal: 0, - Flag: mysql.BinaryFlag, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - }, - } - colTypes = append(colTypes, colTypes[0]) - colTypes = append(colTypes, colTypes[0]) - colTypes = append(colTypes, colTypes[0]) - - sctx := newMockSessionContext() - sctx.GetSessionVars().EnableStreaming = true - - response, err := Select(context.TODO(), sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false)) - require.NoError(t, err) - result, ok := response.(*streamResult) - require.True(t, ok) - require.Len(t, colTypes, result.rowLen) - - resp, ok := result.resp.(*mockResponse) - require.True(t, ok) - resp.total = totalRows - resp.batch = batch - return result, colTypes -} diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 0be814d76a068..49fb056963dac 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -1012,17 +1012,6 @@ func TestAnalyzeIncremental(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set @@tidb_analyze_version = 1") - tk.Session().GetSessionVars().EnableStreaming = false - testAnalyzeIncremental(tk, t, dom) -} - -func TestAnalyzeIncrementalStreaming(t *testing.T) { - t.Skip("unistore hasn't support streaming yet.") - store, dom, clean := testkit.CreateMockStoreAndDomain(t) - defer clean() - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.Session().GetSessionVars().EnableStreaming = true testAnalyzeIncremental(tk, t, dom) } diff --git a/sessionctx/variable/removed.go b/sessionctx/variable/removed.go index 7659aef18f6d2..61c3c208db198 100644 --- a/sessionctx/variable/removed.go +++ b/sessionctx/variable/removed.go @@ -26,6 +26,7 @@ var removedSysVars = map[string]string{ TiDBEnableGlobalTemporaryTable: "temporary table support is now always enabled", TiDBSlowLogMasking: "use tidb_redact_log instead", PlacementChecks: "placement_checks is removed and use tidb_placement_mode instead", + TiDBEnableStreaming: "streaming is no longer supported", } // IsRemovedSysVar returns true if the sysvar has been removed diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f1111bbdbe433..b4c38e952cb91 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -753,10 +753,6 @@ type SessionVars struct { // WaitSplitRegionTimeout defines the split region timeout. WaitSplitRegionTimeout uint64 - // EnableStreaming indicates whether the coprocessor request can use streaming API. - // TODO: remove this after tidb-server configuration "enable-streaming' removed. - EnableStreaming bool - // EnableChunkRPC indicates whether the coprocessor request can use chunk API. EnableChunkRPC bool @@ -1288,14 +1284,6 @@ func NewSessionVars() *SessionVars { MaxChunkSize: DefMaxChunkSize, } vars.DMLBatchSize = DefDMLBatchSize - var enableStreaming string - if config.GetGlobalConfig().EnableStreaming { - enableStreaming = "1" - } else { - enableStreaming = "0" - } - terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) - vars.AllowBatchCop = DefTiDBAllowBatchCop vars.allowMPPExecution = DefTiDBAllowMPPExecution vars.HashExchangeWithNewCollation = DefTiDBHashExchangeWithNewCollation diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index bac034ca48e62..1080a9f073948 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -252,19 +252,6 @@ func TestVarsutil(t *testing.T) { require.NoError(t, err) require.Equal(t, config.HideConfig(string(bVal)), val) - err = SetSessionSystemVar(v, TiDBEnableStreaming, "1") - require.NoError(t, err) - val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming) - require.NoError(t, err) - require.Equal(t, "ON", val) - require.True(t, v.EnableStreaming) - err = SetSessionSystemVar(v, TiDBEnableStreaming, "0") - require.NoError(t, err) - val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming) - require.NoError(t, err) - require.Equal(t, "OFF", val) - require.False(t, v.EnableStreaming) - require.Equal(t, DefTiDBOptimizerSelectivityLevel, v.OptimizerSelectivityLevel) err = SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, "1") require.NoError(t, err) diff --git a/store/driver/sql_fail_test.go b/store/driver/sql_fail_test.go index ef5fc5a1a439c..35111d6096e2d 100644 --- a/store/driver/sql_fail_test.go +++ b/store/driver/sql_fail_test.go @@ -16,17 +16,13 @@ package driver import ( "context" - "fmt" "testing" "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/tikv" ) func TestFailBusyServerCop(t *testing.T) { @@ -60,66 +56,3 @@ func TestFailBusyServerCop(t *testing.T) { }) wg.Wait() } - -func TestCoprocessorStreamRecvTimeout(t *testing.T) { - store, _, clean := createTestStore(t) - defer clean() - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table cop_stream_timeout (id int)") - for i := 0; i < 200; i++ { - tk.MustExec(fmt.Sprintf("insert into cop_stream_timeout values (%d)", i)) - } - tk.Session().GetSessionVars().EnableStreaming = true - - tests := []struct { - name string - timeout time.Duration - }{ - {"timeout", tikv.ReadTimeoutMedium + 100*time.Second}, - {"no timeout", time.Millisecond}, - } - - for _, test := range tests { - timeout := test.timeout - t.Run(test.name, func(t *testing.T) { - enable := true - visited := make(chan int, 1) - isTimeout := false - ctx := context.WithValue(context.Background(), mock.HookKeyForTest("mockTiKVStreamRecvHook"), func(ctx context.Context) { - if !enable { - return - } - visited <- 1 - - select { - case <-ctx.Done(): - case <-time.After(timeout): - isTimeout = true - } - enable = false - }) - - res, err := tk.Session().Execute(ctx, "select * from cop_stream_timeout") - require.NoError(t, err) - - req := res[0].NewChunk(nil) - for i := 0; ; i++ { - err := res[0].Next(ctx, req) - require.NoError(t, err) - if req.NumRows() == 0 { - break - } - req.Reset() - } - select { - case <-visited: - // run with mock tikv - require.False(t, isTimeout) - default: - // run with real tikv - } - }) - } -} From ef98e3d96472e1d1cbe9d85e563a5c3992603c59 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 2 Mar 2022 19:36:47 -0700 Subject: [PATCH 3/3] fix missed tests --- executor/executor_test.go | 13 ------- executor/set_test.go | 3 -- executor/show_test.go | 3 -- infoschema/cluster_tables_test.go | 62 +++++++++++++++---------------- 4 files changed, 29 insertions(+), 52 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 2bbc5653d14d4..ea6d04a744715 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4106,19 +4106,6 @@ func (s *testSuite) TestLimit(c *C) { )) } -func (s *testSuite) TestCoprocessorStreamingWarning(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a double)") - tk.MustExec("insert into t value(1.2)") - tk.MustExec("set @@session.tidb_enable_streaming = 1") - - result := tk.MustQuery("select * from t where a/0 > 1") - result.Check(testkit.Rows()) - tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1365|Division by 0")) -} - func (s *testSuite3) TestYearTypeDeleteIndex(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/set_test.go b/executor/set_test.go index c64b6d8cb8a8e..e7beb2b810d91 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -801,9 +801,6 @@ func TestValidateSetVar(t *testing.T) { tk.MustExec("set @@tidb_pprof_sql_cpu=0;") tk.MustQuery("select @@tidb_pprof_sql_cpu;").Check(testkit.Rows("0")) - tk.MustExec("set @@tidb_enable_streaming=1;") - tk.MustQuery("select @@tidb_enable_streaming;").Check(testkit.Rows("1")) - err = tk.ExecToErr("set @@tidb_batch_delete=3;") require.True(t, terror.ErrorEqual(err, variable.ErrWrongValueForVar), fmt.Sprintf("err %v", err)) diff --git a/executor/show_test.go b/executor/show_test.go index d3171873518bf..5d61cb322ffff 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -1580,9 +1580,6 @@ func TestShowVar(t *testing.T) { // Test Hidden tx_read_ts res = tk.MustQuery("show variables like '%tx_read_ts'") require.Len(t, res.Rows(), 0) - // Test Hidden tidb_enable_streaming - res = tk.MustQuery("show variables like '%tidb_enable_streaming%';") - require.Len(t, res.Rows(), 0) } func TestIssue19507(t *testing.T) { diff --git a/infoschema/cluster_tables_test.go b/infoschema/cluster_tables_test.go index 258f35141c0f7..091e6cfe8e06c 100644 --- a/infoschema/cluster_tables_test.go +++ b/infoschema/cluster_tables_test.go @@ -230,39 +230,35 @@ func TestSelectClusterTable(t *testing.T) { slowLogFileName := "tidb-slow.log" prepareSlowLogfile(t, slowLogFileName) defer func() { require.NoError(t, os.Remove(slowLogFileName)) }() - for i := 0; i < 2; i++ { - tk.MustExec("use information_schema") - tk.MustExec(fmt.Sprintf("set @@tidb_enable_streaming=%d", i)) - tk.MustExec("set @@global.tidb_enable_stmt_summary=1") - tk.MustExec("set time_zone = '+08:00';") - tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2")) - tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testutil.RowsWithSep("|", "2019-02-12 19:33:56.571953")) - tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1")) - tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 Query 9223372036 %s 0 0 ", ""))) - tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6")) - tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1")) - tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1")) - tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953")) - tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows()) - re := tk.MustQuery("select * from `CLUSTER_statements_summary`") - require.NotNil(t, re) - require.Greater(t, len(re.Rows()), 0) - // Test for TiDB issue 14915. - re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;") - require.NotNil(t, re) - require.Greater(t, len(re.Rows()), 0) - tk.MustQuery("select * from `CLUSTER_statements_summary_history`") - require.NotNil(t, re) - require.Greater(t, len(re.Rows()), 0) - tk.MustExec("set @@global.tidb_enable_stmt_summary=0") - re = tk.MustQuery("select * from `CLUSTER_statements_summary`") - require.NotNil(t, re) - require.Equal(t, 0, len(re.Rows())) - tk.MustQuery("select * from `CLUSTER_statements_summary_history`") - require.NotNil(t, re) - require.Equal(t, 0, len(re.Rows())) - } - + tk.MustExec("use information_schema") + tk.MustExec("set @@global.tidb_enable_stmt_summary=1") + tk.MustExec("set time_zone = '+08:00';") + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2")) + tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testutil.RowsWithSep("|", "2019-02-12 19:33:56.571953")) + tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1")) + tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 Query 9223372036 %s 0 0 ", ""))) + tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6")) + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1")) + tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1")) + tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953")) + tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows()) + re := tk.MustQuery("select * from `CLUSTER_statements_summary`") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + // Test for TiDB issue 14915. + re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + tk.MustQuery("select * from `CLUSTER_statements_summary_history`") + require.NotNil(t, re) + require.Greater(t, len(re.Rows()), 0) + tk.MustExec("set @@global.tidb_enable_stmt_summary=0") + re = tk.MustQuery("select * from `CLUSTER_statements_summary`") + require.NotNil(t, re) + require.Equal(t, 0, len(re.Rows())) + tk.MustQuery("select * from `CLUSTER_statements_summary_history`") + require.NotNil(t, re) + require.Equal(t, 0, len(re.Rows())) } func SubTestSelectClusterTablePrivilege(t *testing.T) {