Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Remove deprecated streaming #32765

Merged
merged 9 commits into from
Mar 10, 2022
9 changes: 3 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ type Config struct {
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
// Deprecated
EnableStreaming bool `toml:"-" json:"-"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand Down Expand Up @@ -630,7 +628,6 @@ var defaultConf = Config{
TempStoragePath: tempStorageDirName,
OOMAction: OOMActionCancel,
MemQuotaQuery: 1 << 30,
EnableStreaming: false,
EnableBatchDML: false,
CheckMb4ValueInUTF8: *NewAtomicBool(true),
MaxIndexLength: 3072,
Expand Down
1 change: 0 additions & 1 deletion config/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ var (
"Log.QueryLogMaxLen": {},
"Log.ExpensiveThreshold": {},
"CheckMb4ValueInUTF8": {},
"EnableStreaming": {},
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
"TxnLocalLatches.Capacity": {},
"CompatibleKillQuery": {},
"TreatOldVersionUTF8AsUTF8MB4": {},
Expand Down
7 changes: 1 addition & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}

if !sctx.GetSessionVars().EnableStreaming {
kvReq.Streaming = false
}
kvReq.Streaming = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that Streaming field for Request can be removed as well? Anyway even if so, it doesn't have to be done in this PR.

enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
Expand Down Expand Up @@ -219,9 +217,6 @@ func canUseChunkRPC(ctx sessionctx.Context) bool {
if !ctx.GetSessionVars().EnableChunkRPC {
return false
}
if ctx.GetSessionVars().EnableStreaming {
return false
}
if !checkAlignment() {
return false
}
Expand Down
71 changes: 0 additions & 71 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,6 @@ func TestSelectResultRuntimeStats(t *testing.T) {
require.Equal(t, expect, s1.String())
}

func TestSelectStreaming(t *testing.T) {
response, colTypes := createSelectStreaming(t, 1, 2)
// Test Next.
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err := response.Next(context.TODO(), chk)
require.NoError(t, err)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
break
}
}
require.Equal(t, 2, numAllRows)
require.NoError(t, response.Close())
}

func TestSelectStreamingWithNextRaw(t *testing.T) {
response, _ := createSelectStreaming(t, 1, 2)
data, err := response.NextRaw(context.TODO())
require.NoError(t, err)
require.Len(t, data, 16)
}

func TestSelectStreamingChunkSize(t *testing.T) {
response, colTypes := createSelectStreaming(t, 100, 1000000)
testChunkSize(t, response, colTypes)
require.NoError(t, response.Close())
}

func TestAnalyze(t *testing.T) {
sctx := newMockSessionContext()
sctx.GetSessionVars().EnableChunkRPC = false
Expand Down Expand Up @@ -472,44 +442,3 @@ func createSelectNormal(t *testing.T, batch, totalRows int, planIDs []int, sctx

return result, colTypes
}

func createSelectStreaming(t *testing.T, batch, totalRows int) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetStreaming(true).
Build()
require.NoError(t, err)

// 4 int64 types.
colTypes := []*types.FieldType{
{
Tp: mysql.TypeLonglong,
Flen: mysql.MaxIntWidth,
Decimal: 0,
Flag: mysql.BinaryFlag,
Charset: charset.CharsetBin,
Collate: charset.CollationBin,
},
}
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

sctx := newMockSessionContext()
sctx.GetSessionVars().EnableStreaming = true

response, err := Select(context.TODO(), sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
require.NoError(t, err)
result, ok := response.(*streamResult)
require.True(t, ok)
require.Len(t, colTypes, result.rowLen)

resp, ok := result.resp.(*mockResponse)
require.True(t, ok)
resp.total = totalRows
resp.batch = batch
return result, colTypes
}
11 changes: 0 additions & 11 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,17 +1012,6 @@ func TestAnalyzeIncremental(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_analyze_version = 1")
tk.Session().GetSessionVars().EnableStreaming = false
testAnalyzeIncremental(tk, t, dom)
}

func TestAnalyzeIncrementalStreaming(t *testing.T) {
t.Skip("unistore hasn't support streaming yet.")
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.Session().GetSessionVars().EnableStreaming = true
testAnalyzeIncremental(tk, t, dom)
}

Expand Down
13 changes: 0 additions & 13 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4106,19 +4106,6 @@ func (s *testSuite) TestLimit(c *C) {
))
}

func (s *testSuite) TestCoprocessorStreamingWarning(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a double)")
tk.MustExec("insert into t value(1.2)")
tk.MustExec("set @@session.tidb_enable_streaming = 1")

result := tk.MustQuery("select * from t where a/0 > 1")
result.Check(testkit.Rows())
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1365|Division by 0"))
}

func (s *testSuite3) TestYearTypeDeleteIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 0 additions & 3 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,6 @@ func TestValidateSetVar(t *testing.T) {
tk.MustExec("set @@tidb_pprof_sql_cpu=0;")
tk.MustQuery("select @@tidb_pprof_sql_cpu;").Check(testkit.Rows("0"))

tk.MustExec("set @@tidb_enable_streaming=1;")
tk.MustQuery("select @@tidb_enable_streaming;").Check(testkit.Rows("1"))

err = tk.ExecToErr("set @@tidb_batch_delete=3;")
require.True(t, terror.ErrorEqual(err, variable.ErrWrongValueForVar), fmt.Sprintf("err %v", err))

Expand Down
3 changes: 0 additions & 3 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,9 +1580,6 @@ func TestShowVar(t *testing.T) {
// Test Hidden tx_read_ts
res = tk.MustQuery("show variables like '%tx_read_ts'")
require.Len(t, res.Rows(), 0)
// Test Hidden tidb_enable_streaming
res = tk.MustQuery("show variables like '%tidb_enable_streaming%';")
require.Len(t, res.Rows(), 0)
}

func TestIssue19507(t *testing.T) {
Expand Down
62 changes: 29 additions & 33 deletions infoschema/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,39 +230,35 @@ func TestSelectClusterTable(t *testing.T) {
slowLogFileName := "tidb-slow.log"
prepareSlowLogfile(t, slowLogFileName)
defer func() { require.NoError(t, os.Remove(slowLogFileName)) }()
for i := 0; i < 2; i++ {
tk.MustExec("use information_schema")
tk.MustExec(fmt.Sprintf("set @@tidb_enable_streaming=%d", i))
tk.MustExec("set @@global.tidb_enable_stmt_summary=1")
tk.MustExec("set time_zone = '+08:00';")
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2"))
tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testutil.RowsWithSep("|", "2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1"))
tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1"))
tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows())
re := tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
// Test for TiDB issue 14915.
re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustExec("set @@global.tidb_enable_stmt_summary=0")
re = tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
}

tk.MustExec("use information_schema")
tk.MustExec("set @@global.tidb_enable_stmt_summary=1")
tk.MustExec("set time_zone = '+08:00';")
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY`").Check(testkit.Rows("2"))
tk.MustQuery("select time from `CLUSTER_SLOW_QUERY` where time='2019-02-12 19:33:56.571953'").Check(testutil.RowsWithSep("|", "2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_PROCESSLIST`").Check(testkit.Rows("1"))
tk.MustQuery("select * from `CLUSTER_PROCESSLIST`").Check(testkit.Rows(fmt.Sprintf(":10080 1 root 127.0.0.1 <nil> Query 9223372036 %s <nil> 0 0 ", "")))
tk.MustQuery("select query_time, conn_id from `CLUSTER_SLOW_QUERY` order by time limit 1").Check(testkit.Rows("4.895492 6"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` group by digest").Check(testkit.Rows("1", "1"))
tk.MustQuery("select digest, count(*) from `CLUSTER_SLOW_QUERY` group by digest order by digest").Check(testkit.Rows("124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc 1", "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 1"))
tk.MustQuery(`select length(query) as l,time from information_schema.cluster_slow_query where time > "2019-02-12 19:33:56" order by abs(l) desc limit 10;`).Check(testkit.Rows("21 2019-02-12 19:33:56.571953"))
tk.MustQuery("select count(*) from `CLUSTER_SLOW_QUERY` where time > now() group by digest").Check(testkit.Rows())
re := tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
// Test for TiDB issue 14915.
re = tk.MustQuery("select sum(exec_count*avg_mem) from cluster_statements_summary_history group by schema_name,digest,digest_text;")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Greater(t, len(re.Rows()), 0)
tk.MustExec("set @@global.tidb_enable_stmt_summary=0")
re = tk.MustQuery("select * from `CLUSTER_statements_summary`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
tk.MustQuery("select * from `CLUSTER_statements_summary_history`")
require.NotNil(t, re)
require.Equal(t, 0, len(re.Rows()))
}

func SubTestSelectClusterTablePrivilege(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/removed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var removedSysVars = map[string]string{
TiDBEnableGlobalTemporaryTable: "temporary table support is now always enabled",
TiDBSlowLogMasking: "use tidb_redact_log instead",
PlacementChecks: "placement_checks is removed and use tidb_placement_mode instead",
TiDBEnableStreaming: "streaming is no longer supported",
}

// IsRemovedSysVar returns true if the sysvar has been removed
Expand Down
12 changes: 0 additions & 12 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,10 +753,6 @@ type SessionVars struct {
// WaitSplitRegionTimeout defines the split region timeout.
WaitSplitRegionTimeout uint64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool

// EnableChunkRPC indicates whether the coprocessor request can use chunk API.
EnableChunkRPC bool

Expand Down Expand Up @@ -1288,14 +1284,6 @@ func NewSessionVars() *SessionVars {
MaxChunkSize: DefMaxChunkSize,
}
vars.DMLBatchSize = DefDMLBatchSize
var enableStreaming string
if config.GetGlobalConfig().EnableStreaming {
enableStreaming = "1"
} else {
enableStreaming = "0"
}
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.allowMPPExecution = DefTiDBAllowMPPExecution
vars.HashExchangeWithNewCollation = DefTiDBHashExchangeWithNewCollation
Expand Down
5 changes: 0 additions & 5 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,6 @@ var defaultSysVars = []*SysVar{
appendDeprecationWarning(vars, TiDBMemQuotaIndexLookupJoin, TiDBMemQuotaQuery)
return normalizedValue, nil
}},
// Deprecated: tidb_enable_streaming
{Scope: ScopeSession, Name: TiDBEnableStreaming, Value: Off, Type: TypeBool, skipInit: true, Hidden: true, SetSession: func(s *SessionVars, val string) error {
s.EnableStreaming = TiDBOptOn(val)
return nil
}},
{Scope: ScopeSession, Name: TiDBEnableChunkRPC, Value: On, Type: TypeBool, skipInit: true, SetSession: func(s *SessionVars, val string) error {
s.EnableChunkRPC = TiDBOptOn(val)
return nil
Expand Down
13 changes: 0 additions & 13 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,6 @@ func TestVarsutil(t *testing.T) {
require.NoError(t, err)
require.Equal(t, config.HideConfig(string(bVal)), val)

err = SetSessionSystemVar(v, TiDBEnableStreaming, "1")
require.NoError(t, err)
val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming)
require.NoError(t, err)
require.Equal(t, "ON", val)
require.True(t, v.EnableStreaming)
err = SetSessionSystemVar(v, TiDBEnableStreaming, "0")
require.NoError(t, err)
val, err = GetSessionOrGlobalSystemVar(v, TiDBEnableStreaming)
require.NoError(t, err)
require.Equal(t, "OFF", val)
require.False(t, v.EnableStreaming)

require.Equal(t, DefTiDBOptimizerSelectivityLevel, v.OptimizerSelectivityLevel)
err = SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, "1")
require.NoError(t, err)
Expand Down
Loading