Skip to content

Commit

Permalink
add new cascades optimize push topn down projection
Browse files Browse the repository at this point in the history
  • Loading branch information
hsqlu committed Dec 4, 2019
2 parents 5077c3d + 5c4a9ee commit 9d19259
Show file tree
Hide file tree
Showing 194 changed files with 6,112 additions and 1,499 deletions.
4 changes: 4 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
/expression @pingcap/co-expression
/planner @pingcap/co-planner
/statistics @pingcap/co-planner
/util/ranger @pingcap/co-planner
/bindinfo @pingcap/co-planner
22 changes: 22 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,3 +528,25 @@ func (s *testSuite) TestAddEvolveTasks(c *C) {
status := rows[1][3].(string)
c.Assert(status == "using" || status == "rejected", IsTrue)
}

func (s *testSuite) TestBindingCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")
tk.MustExec("create database tmp")
tk.MustExec("use tmp")
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")

c.Assert(s.domain.BindHandle().Update(false), IsNil)
c.Assert(s.domain.BindHandle().Update(false), IsNil)
res := tk.MustQuery("show global bindings")
c.Assert(len(res.Rows()), Equals, 2)

tk.MustExec("drop global binding for select * from t")
c.Assert(s.domain.BindHandle().Update(false), IsNil)
c.Assert(len(s.domain.BindHandle().GetAllBindRecord()), Equals, 1)
}
7 changes: 4 additions & 3 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info"
if !fullLoad {
sql += " where update_time >= \"" + lastUpdateTime.String() + "\""
sql += " where update_time > \"" + lastUpdateTime.String() + "\""
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"
Expand Down Expand Up @@ -154,7 +154,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
lastUpdateTime = meta.Bindings[0].UpdateTime
}
if err != nil {
logutil.BgLogger().Error("update bindinfo failed", zap.Error(err))
logutil.BgLogger().Info("update bindinfo failed", zap.Error(err))
continue
}

Expand All @@ -163,7 +163,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
if len(newRecord.Bindings) > 0 {
newCache.setBindRecord(hash, newRecord)
} else {
newCache.removeDeletedBindRecord(hash, oldRecord)
newCache.removeDeletedBindRecord(hash, newRecord)
}
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db), true)
}
Expand Down Expand Up @@ -459,6 +459,7 @@ func (c cache) removeDeletedBindRecord(hash string, meta *BindRecord) {
}
}
}
c[hash] = metas
}

func (c cache) setBindRecord(hash string, meta *BindRecord) {
Expand Down
20 changes: 17 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/util/logutil"
tracing "github.com/uber/jaeger-client-go/config"
"go.uber.org/atomic"
Expand All @@ -38,7 +39,7 @@ import (
const (
MaxLogFileSize = 4096 // MB
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.
DefTxnTotalSizeLimit = 100 * 1024 * 1024
DefTxnTotalSizeLimit = 1024 * 1024 * 1024
)

// Valid config maps
Expand Down Expand Up @@ -74,8 +75,8 @@ type Config struct {
TxnLocalLatches TxnLocalLatches `toml:"txn-local-latches" json:"txn-local-latches"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`

LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
ServerVersion string `toml:"server-version" json:"server-version"`
Log Log `toml:"log" json:"log"`
Security Security `toml:"security" json:"security"`
Status Status `toml:"status" json:"status"`
Expand All @@ -100,6 +101,9 @@ type Config struct {
DelayCleanTableLock uint64 `toml:"delay-clean-table-lock" json:"delay-clean-table-lock"`
SplitRegionMaxNum uint64 `toml:"split-region-max-num" json:"split-region-max-num"`
StmtSummary StmtSummary `toml:"stmt-summary" json:"stmt-summary"`
// RepairMode indicates that the TiDB is in the repair mode for table meta.
RepairMode bool `toml:"repair-mode" json:"repair-mode"`
RepairTableList []string `toml:"repair-table-list" json:"repair-table-list"`
}

// nullableBool defaults unset bool options to unset instead of false, which enables us to know if the user has set 2
Expand Down Expand Up @@ -440,11 +444,14 @@ var defaultConf = Config{
EnableTableLock: false,
DelayCleanTableLock: 0,
SplitRegionMaxNum: 1000,
RepairMode: false,
RepairTableList: []string{},
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 2048000,
},
LowerCaseTableNames: 2,
ServerVersion: "",
Log: Log{
Level: "info",
Format: "text",
Expand Down Expand Up @@ -638,6 +645,9 @@ func (c *Config) Load(confFile string) error {
if c.TokenLimit == 0 {
c.TokenLimit = 1000
}
if len(c.ServerVersion) > 0 {
mysql.ServerVersion = c.ServerVersion
}
// If any items in confFile file are not mapped into the Config struct, issue
// an error and stop the server from starting.
undecoded := metaData.Undecoded()
Expand Down Expand Up @@ -700,6 +710,10 @@ func (c *Config) Valid() error {
if c.TiKVClient.GrpcConnectionCount == 0 {
return fmt.Errorf("grpc-connection-count should be greater than 0")
}

if c.Performance.TxnTotalSizeLimit > (10 << 30) {
return fmt.Errorf("txn-total-size-limit should be less than %d", 10<<30)
}
return nil
}

Expand Down
15 changes: 14 additions & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ split-region-max-num = 1000
# In order to support "drop primary key" operation , this flag must be true and the table does not have the pkIsHandle flag.
alter-primary-key = false

# server-version is used to change the version string of TiDB in the following scenarios:
# 1. the server version returned by builtin-function `VERSION()`.
# 2. the server version filled in handshake packets of MySQL Connection Protocol, see https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake for more details.
# if server-version = "", the default value(original TiDB version string) is used.
server-version = ""

# repair mode is used to repair the broken table meta in TiKV in extreme cases.
repair-mode = false

# Repair table list is used to list the tables in repair mode with the format like ["db.table",].
# In repair mode, repairing table which is not in repair list will get wrong database or wrong table error.
repair-table-list = []

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down Expand Up @@ -201,7 +214,7 @@ bind-info-lease = "3s"
# If using TiKV as the storage, the entry represents a key/value pair.
# WARNING: Do not set the value too large, otherwise it will make a very large impact on the TiKV cluster.
# Please adjust this configuration carefully.
txn-total-size-limit = 104857600
txn-total-size-limit = 1073741824

[proxy-protocol]
# PROXY protocol acceptable client networks.
Expand Down
25 changes: 24 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/BurntSushi/toml"
. "github.com/pingcap/check"
zaplog "github.com/pingcap/log"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/util/logutil"
tracing "github.com/uber/jaeger-client-go/config"
)
Expand Down Expand Up @@ -179,6 +180,8 @@ alter-primary-key = true
delay-clean-table-lock = 5
split-region-max-num=10000
enable-batch-dml = true
server-version = "test_version"
repair-mode = true
[performance]
txn-total-size-limit=2000
[tikv-client]
Expand All @@ -196,6 +199,8 @@ max-sql-length=1024

c.Assert(conf.Load(configFile), IsNil)

c.Assert(conf.ServerVersion, Equals, "test_version")
c.Assert(mysql.ServerVersion, Equals, conf.ServerVersion)
// Test that the original value will not be clear by load the config file that does not contain the option.
c.Assert(conf.Binlog.Enable, Equals, true)
c.Assert(conf.Binlog.Strategy, Equals, "hash")
Expand All @@ -215,6 +220,7 @@ max-sql-length=1024
c.Assert(conf.StmtSummary.MaxStmtCount, Equals, uint(1000))
c.Assert(conf.StmtSummary.MaxSQLLength, Equals, uint(1024))
c.Assert(conf.EnableBatchDML, Equals, true)
c.Assert(conf.RepairMode, Equals, true)
c.Assert(f.Close(), IsNil)
c.Assert(os.Remove(configFile), IsNil)

Expand All @@ -224,7 +230,7 @@ max-sql-length=1024
// Make sure the example config is the same as default config.
c.Assert(conf, DeepEquals, GetGlobalConfig())

// Test for lof config.
// Test for log config.
c.Assert(conf.Log.ToLogConfig(), DeepEquals, logutil.NewLogConfig("info", "text", "tidb-slow.log", conf.Log.File, false, func(config *zaplog.Config) { config.DisableErrorVerbose = conf.Log.getDisableErrorStack() }))

// Test for tracing config.
Expand Down Expand Up @@ -349,3 +355,20 @@ func (s *testConfigSuite) TestOOMActionValid(c *C) {
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}

func (s *testConfigSuite) TestTxnTotalSizeLimitValid(c *C) {
conf := NewConfig()
tests := []struct {
limit uint64
valid bool
}{
{4 << 10, true},
{10 << 30, true},
{10<<30 + 1, false},
}

for _, tt := range tests {
conf.Performance.TxnTotalSizeLimit = tt.limit
c.Assert(conf.Valid() == nil, Equals, tt.valid)
}
}
4 changes: 2 additions & 2 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
hookErr = errors.Trace(err1)
return
}
newCol := table.FindCol(t.(*tables.Table).Columns, newColName)
newCol := table.FindCol(t.(*tables.TableCommon).Columns, newColName)
if newCol == nil {
return
}
Expand Down Expand Up @@ -891,7 +891,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
hookErr = errors.Trace(err1)
return
}
col := table.FindCol(t.(*tables.Table).Columns, colName)
col := table.FindCol(t.(*tables.TableCommon).Columns, colName)
if col == nil {
checkOK = true
return
Expand Down
80 changes: 74 additions & 6 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand All @@ -37,14 +38,24 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"go.uber.org/zap"
)

var _ = Suite(&testStateChangeSuite{})
var _ = SerialSuites(&serialTestStateChangeSuite{})

type serialTestStateChangeSuite struct {
testStateChangeSuiteBase
}

type testStateChangeSuite struct {
testStateChangeSuiteBase
}

type testStateChangeSuiteBase struct {
lease time.Duration
store kv.Storage
dom *domain.Domain
Expand All @@ -53,7 +64,7 @@ type testStateChangeSuite struct {
preSQL string
}

func (s *testStateChangeSuite) SetUpSuite(c *C) {
func (s *testStateChangeSuiteBase) SetUpSuite(c *C) {
s.lease = 200 * time.Millisecond
ddl.WaitTimeWhenErrorOccured = 1 * time.Microsecond
var err error
Expand All @@ -71,7 +82,7 @@ func (s *testStateChangeSuite) SetUpSuite(c *C) {
s.p = parser.New()
}

func (s *testStateChangeSuite) TearDownSuite(c *C) {
func (s *testStateChangeSuiteBase) TearDownSuite(c *C) {
s.se.Execute(context.Background(), "drop database if exists test_db_state")
s.se.Close()
s.dom.Close()
Expand Down Expand Up @@ -534,7 +545,7 @@ func (s *testStateChangeSuite) TestDeleteOnly(c *C) {
s.runTestInSchemaState(c, model.StateDeleteOnly, "", dropColumnSQL, sqls, nil)
}

func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaState, tableName, alterTableSQL string,
func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.SchemaState, tableName, alterTableSQL string,
sqlWithErrs []sqlWithErr, expectQuery *expectQuery) {
_, err := s.se.Execute(context.Background(), `create table t (
c1 varchar(64),
Expand Down Expand Up @@ -592,7 +603,7 @@ func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaStat
}
}

func (s *testStateChangeSuite) execQuery(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
func (s *testStateChangeSuiteBase) execQuery(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
comment := Commentf("sql:%s, args:%v", sql, args)
rs, err := tk.Exec(sql, args...)
if err != nil {
Expand All @@ -611,7 +622,7 @@ func checkResult(result *testkit.Result, expected [][]interface{}) error {
return nil
}

func (s *testStateChangeSuite) CheckResult(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
func (s *testStateChangeSuiteBase) CheckResult(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
comment := Commentf("sql:%s, args:%v", sql, args)
rs, err := tk.Exec(sql, args...)
if err != nil {
Expand Down Expand Up @@ -829,7 +840,7 @@ func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) {

type checkRet func(c *C, err1, err2 error)

func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
Expand Down Expand Up @@ -1136,3 +1147,60 @@ func (s *testStateChangeSuite) TestParallelTruncateTableAndAddColumn(c *C) {
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

// TestParallelFlashbackTable tests parallel flashback table.
func (s *serialTestStateChangeSuite) TestParallelFlashbackTable(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`), IsNil)
defer func(originGC bool) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange"), IsNil)
if originGC {
ddl.EmulatorGCEnable()
} else {
ddl.EmulatorGCDisable()
}
}(ddl.IsEmulatorGCEnable())

// disable emulator GC.
// Disable emulator GC, otherwise, emulator GC will delete table record as soon as possible after executing drop table DDL.
ddl.EmulatorGCDisable()
gcTimeFormat := "20060102-15:04:05 -0700 MST"
timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk := testkit.NewTestKit(c, s.store)
// clear GC variables first.
tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )")
// set GC safe point
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
// set GC enable.
err := gcutil.EnableGC(tk.Se)
c.Assert(err, IsNil)

// prepare dropped table.
tk.MustExec("use test_db_state")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int);")
tk.MustExec("drop table if exists t")
// Test parallel flashback table.
ts := getDDLJobStartTime(tk, "test_db_state", "t")
sql1 := fmt.Sprintf("flashback table t until timestamp '%v' to t_flashback", ts)
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2, NotNil)
c.Assert(err2.Error(), Equals, "[schema:1050]Table 't_flashback' already exists")

}
s.testControlParallelExecSQL(c, sql1, sql1, f)
}

func getDDLJobStartTime(tk *testkit.TestKit, dbName, tblName string) string {
re := tk.MustQuery("admin show ddl jobs 100")
rows := re.Rows()
for _, row := range rows {
if row[1] == dbName && row[2] == tblName && (row[3] == "drop table" || row[3] == "truncate table") {
return row[8].(string)
}
}
return ""
}
Loading

0 comments on commit 9d19259

Please sign in to comment.