Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into lit-refactor-part4
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Apr 3, 2023
2 parents ba76d6a + 5f87e88 commit b67374e
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 40 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/addindextest/...

bazel_loaddatatest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/loaddatatest/...

bazel_lint: bazel_prepare
bazel build //... --//build:with_nogo_flag=true

Expand Down
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
embed = [":dispatcher"],
flaky = True,
race = "on",
shard_count = 6,
deps = [
"//disttask/framework/proto",
"//disttask/framework/storage",
Expand Down
1 change: 1 addition & 0 deletions planner/core/plan_cache_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
buf := new(strings.Builder)
buf.Reset()
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf)
restoreCtx.Flags ^= format.RestoreKeyWordUppercase
return restoreCtx
}}
)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/plan_cache_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestParameterize(t *testing.T) {
},
{
"select a+1, sum(b) from t where a<10 group by a+1",
"SELECT `a`+1,SUM(`b`) FROM `t` WHERE `a`<? GROUP BY `a`+1",
"SELECT `a`+1,sum(`b`) FROM `t` WHERE `a`<? GROUP BY `a`+1",
[]interface{}{int64(10)},
"SELECT `a`+1,SUM(`b`) FROM `t` WHERE `a`<10 GROUP BY `a`+1",
},
Expand Down
44 changes: 39 additions & 5 deletions planner/core/plan_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ func TestNonPreparedPlanCacheReason(t *testing.T) {
tk.MustExec(`explain format = 'plan_cache' select * from t where a=1`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(`explain format = 'plan_cache' select * from t t1, t t2`)
tk.MustQuery(`show warnings`).Check(testkit.Rows(`Warning 1105 skip non-prepared plan-cache: queries that access multiple tables are not supported`))

tk.MustExec(`explain format = 'plan_cache' select * from (select * from t) tx`)
tk.MustQuery(`show warnings`).Check(testkit.Rows(`Warning 1105 skip non-prepared plan-cache: queries that have sub-queries are not supported`))

Expand Down Expand Up @@ -1622,7 +1619,6 @@ func TestNonPreparedPlanExplainWarning(t *testing.T) {
"select /*+ use_index(t1, idx_b) */ * from t1 where a > 1 and b < 2", // hint
"select a, sum(b) as c from t1 where a > 1 and b < 2 group by a having sum(b) > 1", // having
"select * from t1 limit 1", // limit
"select * from t1, t2", // join
"select * from (select * from t1) t", // sub-query
"insert into t1 values(1, 1)", // insert
"insert into t1(a, b) select a, b from t1", // insert into select
Expand Down Expand Up @@ -1656,7 +1652,6 @@ func TestNonPreparedPlanExplainWarning(t *testing.T) {
"skip non-prepared plan-cache: queries that have hints, aggregation, window-function, order-by, limit and lock are not supported",
"skip non-prepared plan-cache: queries that have hints, aggregation, window-function, order-by, limit and lock are not supported",
"skip non-prepared plan-cache: queries that have hints, aggregation, window-function, order-by, limit and lock are not supported",
"skip non-prepared plan-cache: queries that access multiple tables are not supported",
"skip non-prepared plan-cache: queries that have sub-queries are not supported",
"skip non-prepared plan-cache: not a select statement",
"skip non-prepared plan-cache: not a select statement",
Expand Down Expand Up @@ -1771,6 +1766,45 @@ func TestNonPreparedPlanCachePanic(t *testing.T) {
}
}

func TestNonPreparedPlanCacheJoin(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`set tidb_enable_non_prepared_plan_cache=1`)
tk.MustExec("create table t1 (a int, b int, c int)")
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("create table t3 (a int, b int, c int)")

supported := []string{
"select * from t1, t2 where t1.a=t2.a and t1.b<10",
"select * from t1, t2",
"select * from t1, t2 where t1.a<t2.a and t2.c=10",
"select * from t1 tx, t1 ty",
"select * from t1 tx, t1 ty where tx.a=ty.a",
"select * from t1 inner join t2",
"select * from t1 inner join t2 on t1.a=t2.a",
"select * from t1 inner join t2 on t1.a=t2.a and t2.c<10",
"select * from t1 left join t2 on t1.a=t2.a",
"select * from t1 left join t2 on t1.a=t2.a and t2.c<10",
}
unsupported := []string{
"select * from t1, t2, t3", // 3-way join
"select * from t1, t2, t1 tx", // 3-way join
"select * from t1, (select * from t2) t2", // subquery
}

for _, sql := range supported {
tk.MustExec(sql)
tk.MustExec(sql)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}
for _, sql := range unsupported {
tk.MustExec(sql)
tk.MustExec(sql)
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}
}

func TestNonPreparedPlanCacheAgg(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
87 changes: 63 additions & 24 deletions planner/core/plan_cacheable_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,16 @@ func NonPreparedPlanCacheableWithCtx(sctx sessionctx.Context, node ast.Node, is
return false, "queries that have sub-queries are not supported"
}
tableRefs := from.TableRefs
if tableRefs.Right != nil {
// We don't support the join for the non-prepared plan cache now.
return false, "queries that access multiple tables are not supported"
}

var tableNode *ast.TableName
switch x := tableRefs.Left.(type) {
case *ast.TableSource:
tbl, isTableName := x.Source.(*ast.TableName)
if !isTableName {
return false, "queries that have sub-queries are not supported"
}
tableNode = tbl
// match table names, currently only support 2 tables(2-way join) at most.
tableNames, ok, reason := extractTableNames(tableRefs, nil)
if !ok {
return false, reason
}

// allocate and init the checker
checker := nonPrepCacheCheckerPool.Get().(*nonPreparedPlanCacheableChecker)
checker.reset(sctx, is, tableNode)
checker.reset(sctx, is, tableNames)

node.Accept(checker)
cacheable, reason := checker.cacheable, checker.reason
Expand All @@ -262,6 +254,46 @@ func NonPreparedPlanCacheableWithCtx(sctx sessionctx.Context, node ast.Node, is
return cacheable, reason
}

// extractTableNames extracts table names from the input node.
// Currently support 2 tables(2-way join) at most.
func extractTableNames(node ast.ResultSetNode, names []*ast.TableName) ([]*ast.TableName, bool, string) {
var ok bool
var reason string
switch x := node.(type) {
case *ast.TableSource:
name, isName := x.Source.(*ast.TableName)
if isName {
names = append(names, name)
} else {
if x.Source != nil {
names, ok, reason = extractTableNames(x.Source, names)
if !ok {
return nil, ok, reason
}
}
}
case *ast.Join:
if x.Left != nil {
names, ok, reason = extractTableNames(x.Left, names)
if !ok {
return nil, ok, reason
}
}
if x.Right != nil {
names, ok, reason = extractTableNames(x.Right, names)
if !ok {
return nil, ok, reason
}
}
default:
return names, false, "queries that have sub-queries are not supported"
}
if len(names) > 2 {
return names, false, "queries that have more than 2 tables are not supported"
}
return names, true, ""
}

// nonPreparedPlanCacheableChecker checks whether a query's plan can be cached for non-prepared plan cache.
// NOTE: we can add more rules in the future.
type nonPreparedPlanCacheableChecker struct {
Expand All @@ -270,17 +302,18 @@ type nonPreparedPlanCacheableChecker struct {
reason string // reason why this statement cannot hit the cache
schema infoschema.InfoSchema

tableNode *ast.TableName
tableNodes []*ast.TableName // only support 2-way joins currently

constCnt int // the number of constants/parameters in this query
filterCnt int // the number of filters in the current node
}

func (checker *nonPreparedPlanCacheableChecker) reset(sctx sessionctx.Context, schema infoschema.InfoSchema, tableNode *ast.TableName) {
func (checker *nonPreparedPlanCacheableChecker) reset(sctx sessionctx.Context, schema infoschema.InfoSchema, tableNodes []*ast.TableName) {
checker.sctx = sctx
checker.cacheable = true
checker.schema = schema
checker.reason = ""
checker.tableNode = tableNode
checker.tableNodes = tableNodes
checker.constCnt = 0
checker.filterCnt = 0
}
Expand All @@ -292,19 +325,24 @@ func (checker *nonPreparedPlanCacheableChecker) Enter(in ast.Node) (out ast.Node
}

switch node := in.(type) {
case *ast.SelectStmt, *ast.FieldList, *ast.SelectField, *ast.TableRefsClause, *ast.Join, *ast.BetweenExpr,
case *ast.SelectStmt, *ast.FieldList, *ast.SelectField, *ast.TableRefsClause, *ast.Join, *ast.BetweenExpr, *ast.OnCondition,
*ast.TableSource, *ast.ColumnNameExpr, *ast.PatternInExpr, *ast.BinaryOperationExpr, *ast.ByItem, *ast.AggregateFuncExpr:
return in, !checker.cacheable // skip child if un-cacheable
case *ast.ColumnName:
if checker.filterCnt > 0 {
// this column is appearing some filters, e.g. `col = 1`
colType, found := getColType(checker.schema, checker.tableNode, node)
if !found {
checker.cacheable = false
checker.reason = "some column is not found in table schema"
} else if colType == mysql.TypeJSON || colType == mysql.TypeEnum || colType == mysql.TypeSet || colType == mysql.TypeBit {
checker.cacheable = false
checker.reason = "query has some filters with JSON, Enum, Set or Bit columns"
for _, tableNode := range checker.tableNodes {
if tableNode == nil {
continue
}
colType, found := getColType(checker.schema, tableNode, node)
if !found {
checker.cacheable = false
checker.reason = "some column is not found in table schema"
} else if colType == mysql.TypeJSON || colType == mysql.TypeEnum || colType == mysql.TypeSet || colType == mysql.TypeBit {
checker.cacheable = false
checker.reason = "query has some filters with JSON, Enum, Set or Bit columns"
}
}
}
return in, !checker.cacheable
Expand Down Expand Up @@ -394,6 +432,7 @@ func (checker *nonPreparedPlanCacheableChecker) Enter(in ast.Node) (out ast.Node
}
return in, !checker.cacheable
}

checker.cacheable = false // unexpected cases
checker.reason = "query has some unsupported Node"
return in, !checker.cacheable
Expand Down
9 changes: 8 additions & 1 deletion planner/core/plan_cacheable_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ func TestNonPreparedPlanCacheable(t *testing.T) {
"select * from test.t where d>now()", // now
"select a+1 from test.t where a<13",
"select mod(a, 10) from test.t where a<13",

// 2-way joins
"select * from test.t inner join test.t3 on test.t.a=test.t3.a",
"select * from test.t inner join test.t3 on test.t.a=test.t3.a where test.t.a<10",
"select * from test.t, test.t3",
"select * from test.t, test.t3 where test.t.a=test.t3.a",
"select * from test.t, test.t3 where test.t.a=test.t3.a and test.t.b=t3.b",
"select * from test.t, test.t3 where test.t.a=test.t3.a and test.t.a<10",
}

unsupported := []string{
Expand All @@ -312,7 +320,6 @@ func TestNonPreparedPlanCacheable(t *testing.T) {
"select a, sum(b) as c from test.t1 where a > 1 and b < 2 group by a having sum(b) > 1", // having
"select * from test.t1 limit 1", // limit
"select * from test.t1 order by a", // order by
"select * from test.t1, test.t2", // join
"select * from (select * from test.t1) t", // sub-query
"insert into test.t1 values(1, 1)", // insert
"insert into t1(a, b) select a, b from test.t1", // insert into select
Expand Down
12 changes: 4 additions & 8 deletions resourcemanager/pool/spool/spool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,11 @@ func TestPoolTuneScaleUpAndDown(t *testing.T) {
for i := 0; i < 6; i++ {
c <- struct{}{}
}
time.Sleep(200 * time.Millisecond)
require.Equal(t, int32(2), p.Running())
require.Eventually(t, func() bool { return p.Running() == 2 }, 1*time.Second, 200*time.Millisecond)
for i := 0; i < 2; i++ {
c <- struct{}{}
}
time.Sleep(100 * time.Millisecond)
require.Equal(t, int32(0), p.Running())
require.Eventually(t, func() bool { return p.Running() == 0 }, 1*time.Second, 200*time.Millisecond)

// test with RunWithConcurrency
var cnt atomic.Int32
Expand All @@ -125,12 +123,10 @@ func TestPoolTuneScaleUpAndDown(t *testing.T) {
for i := 0; i < 10; i++ {
fnChan <- workerFn
}
time.Sleep(100 * time.Millisecond)
require.Equal(t, int32(10), cnt.Load())
require.Eventually(t, func() bool { return cnt.Load() == 10 }, 1*time.Second, 200*time.Millisecond)
require.Equal(t, int32(2), p.Running())
close(fnChan)
time.Sleep(100 * time.Microsecond)
require.Equal(t, int32(0), p.Running())
require.Eventually(t, func() bool { return p.Running() == 0 }, 1*time.Second, 200*time.Millisecond)
p.ReleaseAndWait()
}

Expand Down
21 changes: 21 additions & 0 deletions tests/realtikvtest/loaddatatest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "loaddatatest_test",
timeout = "short",
srcs = [
"main_test.go",
"util_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//kv",
"//testkit",
"//tests/realtikvtest",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_stretchr_testify//suite",
"@org_uber_go_goleak//:goleak",
],
)
38 changes: 38 additions & 0 deletions tests/realtikvtest/loaddatatest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loaddatatest

import (
"testing"

"github.com/pingcap/tidb/config"
"go.uber.org/goleak"
)

func init() {
// need a real PD
config.UpdateGlobal(func(conf *config.Config) {
conf.Path = "127.0.0.1:2379"
})
}

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading

0 comments on commit b67374e

Please sign in to comment.