Skip to content

Commit

Permalink
Merge branch 'master' into show_pump_and_drainer_status
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Mar 2, 2019
2 parents 00cf6e8 + 560e8cf commit 68c6e5b
Show file tree
Hide file tree
Showing 18 changed files with 126 additions and 55 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (a *ExecStmt) IsReadOnly() bool {
func (a *ExecStmt) RebuildPlan() (int64, error) {
is := GetInfoSchema(a.Ctx)
a.InfoSchema = is
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, false); err != nil {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, plannercore.InTxnRetry); err != nil {
return 0, errors.Trace(err)
}
p, err := planner.Optimize(a.Ctx, a.StmtNode, is)
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}

infoSchema := GetInfoSchema(c.Ctx)
if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema, false); err != nil {
if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema); err != nil {
return nil, errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,7 @@ func (s *testSuite) TestIsPointGet(c *C) {
for sqlStr, result := range tests {
stmtNode, err := s.ParseOneStmt(sqlStr, "", "")
c.Check(err, IsNil)
err = plannercore.Preprocess(ctx, stmtNode, infoSchema, false)
err = plannercore.Preprocess(ctx, stmtNode, infoSchema)
c.Check(err, IsNil)
p, err := planner.Optimize(ctx, stmtNode, infoSchema)
c.Check(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *testSuite2) TestStmtLabel(c *C) {
stmtNode, err := parser.New().ParseOneStmt(tt.sql, "", "")
c.Check(err, IsNil)
is := executor.GetInfoSchema(tk.Se)
err = plannercore.Preprocess(tk.Se.(sessionctx.Context), stmtNode, is, false)
err = plannercore.Preprocess(tk.Se.(sessionctx.Context), stmtNode, is)
c.Assert(err, IsNil)
_, err = planner.Optimize(tk.Se, stmtNode, is)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
return ErrPsManyParam
}

err = plannercore.Preprocess(e.ctx, stmt, e.is, true)
err = plannercore.Preprocess(e.ctx, stmt, e.is, plannercore.InPrepare)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3707,7 +3707,7 @@ func (s *testIntegrationSuite) TestFilterExtractFromDNF(c *C) {
c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr))
c.Assert(stmts, HasLen, 1)
is := domain.GetDomain(ctx).InfoSchema()
err = plannercore.Preprocess(ctx, stmts[0], is, false)
err = plannercore.Preprocess(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr))
p, err := plannercore.BuildLogicalPlan(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr))
Expand Down
2 changes: 1 addition & 1 deletion expression/typeinfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *testInferTypeSuite) TestInferType(c *C) {
c.Assert(err, IsNil)

is := domain.GetDomain(ctx).InfoSchema()
err = plannercore.Preprocess(ctx, stmt, is, false)
err = plannercore.Preprocess(ctx, stmt, is)
c.Assert(err, IsNil, comment)
p, err := plannercore.BuildLogicalPlan(ctx, stmt, is)
c.Assert(err, IsNil, comment)
Expand Down
10 changes: 5 additions & 5 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (s *testAnalyzeSuite) TestIndexRead(c *C) {
c.Assert(stmts, HasLen, 1)
stmt := stmts[0]
is := domain.GetDomain(ctx).InfoSchema()
err = core.Preprocess(ctx, stmt, is, false)
err = core.Preprocess(ctx, stmt, is)
c.Assert(err, IsNil)
p, err := planner.Optimize(ctx, stmt, is)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -427,7 +427,7 @@ func (s *testAnalyzeSuite) TestEmptyTable(c *C) {
c.Assert(stmts, HasLen, 1)
stmt := stmts[0]
is := domain.GetDomain(ctx).InfoSchema()
err = core.Preprocess(ctx, stmt, is, false)
err = core.Preprocess(ctx, stmt, is)
c.Assert(err, IsNil)
p, err := planner.Optimize(ctx, stmt, is)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -543,7 +543,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) {
c.Assert(stmts, HasLen, 1)
stmt := stmts[0]
is := domain.GetDomain(ctx).InfoSchema()
err = core.Preprocess(ctx, stmt, is, false)
err = core.Preprocess(ctx, stmt, is)
c.Assert(err, IsNil)
p, err := planner.Optimize(ctx, stmt, is)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -620,7 +620,7 @@ func (s *testAnalyzeSuite) TestPreparedNullParam(c *C) {
stmt := stmts[0]

is := domain.GetDomain(ctx).InfoSchema()
err = core.Preprocess(ctx, stmt, is, true)
err = core.Preprocess(ctx, stmt, is, core.InPrepare)
c.Assert(err, IsNil)
p, err := planner.Optimize(ctx, stmt, is)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -874,7 +874,7 @@ func BenchmarkOptimize(b *testing.B) {
c.Assert(stmts, HasLen, 1)
stmt := stmts[0]
is := domain.GetDomain(ctx).InfoSchema()
err = core.Preprocess(ctx, stmt, is, false)
err = core.Preprocess(ctx, stmt, is)
c.Assert(err, IsNil)

b.Run(tt.sql, func(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (e *Execute) OptimizePreparedPlan(ctx sessionctx.Context, is infoschema.Inf
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
err := Preprocess(ctx, prepared.Stmt, is, true)
err := Preprocess(ctx, prepared.Stmt, is, InPrepare)
if err != nil {
return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
Expand Down
20 changes: 10 additions & 10 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (s *testPlanSuite) TestSubquery(c *C) {
stmt, err := s.ParseOneStmt(ca.sql, "", "")
c.Assert(err, IsNil, comment)

Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
p, err := BuildLogicalPlan(s.ctx, stmt, s.is)
c.Assert(err, IsNil)
if lp, ok := p.(LogicalPlan); ok {
Expand Down Expand Up @@ -867,7 +867,7 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) {
c.Assert(err, IsNil, comment)

s.ctx.GetSessionVars().HashJoinConcurrency = 1
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
p, err := BuildLogicalPlan(s.ctx, stmt, s.is)
c.Assert(err, IsNil)
if lp, ok := p.(LogicalPlan); ok {
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func (s *testPlanSuite) TestValidate(c *C) {
comment := Commentf("for %s", sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
_, err = BuildLogicalPlan(s.ctx, stmt, s.is)
if tt.err == nil {
c.Assert(err, IsNil, comment)
Expand Down Expand Up @@ -1639,7 +1639,7 @@ func (s *testPlanSuite) TestVisitInfo(c *C) {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
colMapper: make(map[*ast.ColumnNameExpr]int),
ctx: MockContext(),
Expand Down Expand Up @@ -1757,7 +1757,7 @@ func (s *testPlanSuite) TestUnion(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down Expand Up @@ -1889,7 +1889,7 @@ func (s *testPlanSuite) TestTopNPushDown(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down Expand Up @@ -2000,7 +2000,7 @@ func (s *testPlanSuite) TestOuterJoinEliminator(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down Expand Up @@ -2031,7 +2031,7 @@ func (s *testPlanSuite) TestSelectView(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down Expand Up @@ -2201,7 +2201,7 @@ func (s *testPlanSuite) TestWindowFunction(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down Expand Up @@ -2286,7 +2286,7 @@ func (s *testPlanSuite) TestSkylinePruning(c *C) {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
Preprocess(s.ctx, stmt, s.is, false)
Preprocess(s.ctx, stmt, s.is)
builder := &PlanBuilder{
ctx: MockContext(),
is: s.is,
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderBasePhysicalPlan(c *C) {
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

core.Preprocess(se, stmt, s.is, false)
core.Preprocess(se, stmt, s.is)
p, err := planner.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
Expand Down
80 changes: 56 additions & 24 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,46 +27,71 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/types/parser_driver"
)

// PreprocessOpt presents optional parameters to `Preprocess` method.
type PreprocessOpt func(*preprocessor)

// InPrepare is a PreprocessOpt that indicates preprocess is executing under prepare statement.
func InPrepare(p *preprocessor) {
p.flag |= inPrepare
}

// InTxnRetry is a PreprocessOpt that indicates preprocess is executing under transaction retry.
func InTxnRetry(p *preprocessor) {
p.flag |= inTxnRetry
}

// Preprocess resolves table names of the node, and checks some statements validation.
func Preprocess(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema, inPrepare bool) error {
v := preprocessor{is: is, ctx: ctx, inPrepare: inPrepare, tableAliasInJoin: make([]map[string]interface{}, 0)}
func Preprocess(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema, preprocessOpt ...PreprocessOpt) error {
v := preprocessor{is: is, ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0)}
for _, optFn := range preprocessOpt {
optFn(&v)
}
node.Accept(&v)
return errors.Trace(v.err)
}

type preprocessorFlag uint8

const (
// inPrepare is set when visiting in prepare statement.
inPrepare preprocessorFlag = 1 << iota
// inTxnRetry is set when visiting in transaction retry.
inTxnRetry
// inCreateOrDropTable is set when visiting create/drop table statement.
inCreateOrDropTable
// parentIsJoin is set when visiting node's parent is join.
parentIsJoin
)

// preprocessor is an ast.Visitor that preprocess
// ast Nodes parsed from parser.
type preprocessor struct {
is infoschema.InfoSchema
ctx sessionctx.Context
err error
inPrepare bool
// inCreateOrDropTable is true when visiting create/drop table statement.
inCreateOrDropTable bool
is infoschema.InfoSchema
ctx sessionctx.Context
err error
flag preprocessorFlag

// tableAliasInJoin is a stack that keeps the table alias names for joins.
// len(tableAliasInJoin) may bigger than 1 because the left/right child of join may be subquery that contains `JOIN`
tableAliasInJoin []map[string]interface{}

parentIsJoin bool
}

func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
switch node := in.(type) {
case *ast.CreateTableStmt:
p.inCreateOrDropTable = true
p.flag |= inCreateOrDropTable
p.checkCreateTableGrammar(node)
case *ast.CreateViewStmt:
p.inCreateOrDropTable = true
p.flag |= inCreateOrDropTable
p.checkCreateViewGrammar(node)
case *ast.DropTableStmt:
p.inCreateOrDropTable = true
p.flag |= inCreateOrDropTable
p.checkDropTableGrammar(node)
case *ast.RenameTableStmt:
p.inCreateOrDropTable = true
p.flag |= inCreateOrDropTable
p.checkRenameTableGrammar(node)
case *ast.CreateIndexStmt:
p.checkCreateIndexGrammar(node)
Expand All @@ -91,23 +116,23 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
// table not exists error. But admin restore is use to restore the dropped table. So skip children here.
return in, node.Tp == ast.AdminRestoreTable
default:
p.parentIsJoin = false
p.flag &= ^parentIsJoin
}
return in, p.err != nil
}

func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) {
switch x := in.(type) {
case *ast.CreateTableStmt:
p.inCreateOrDropTable = false
p.flag &= ^inCreateOrDropTable
p.checkAutoIncrement(x)
p.checkContainDotColumn(x)
case *ast.CreateViewStmt:
p.inCreateOrDropTable = false
p.flag &= ^inCreateOrDropTable
case *ast.DropTableStmt, *ast.AlterTableStmt, *ast.RenameTableStmt:
p.inCreateOrDropTable = false
p.flag &= ^inCreateOrDropTable
case *driver.ParamMarkerExpr:
if !p.inPrepare {
if p.flag&inPrepare == 0 {
p.err = parser.ErrSyntax.GenWithStack("syntax error, unexpected '?'")
return
}
Expand Down Expand Up @@ -146,6 +171,13 @@ func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) {
}
break
}

// no need sleep when retry transaction and avoid unexpect sleep caused by retry.
if p.flag&inTxnRetry > 0 && x.FnName.L == ast.Sleep {
if len(x.Args) == 1 {
x.Args[0] = ast.NewValueExpr(0)
}
}
}

return in, p.err == nil
Expand Down Expand Up @@ -359,7 +391,7 @@ func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
}

func (p *preprocessor) checkNonUniqTableAlias(stmt *ast.Join) {
if !p.parentIsJoin {
if p.flag&parentIsJoin == 0 {
p.tableAliasInJoin = append(p.tableAliasInJoin, make(map[string]interface{}))
}
tableAliases := p.tableAliasInJoin[len(p.tableAliasInJoin)-1]
Expand All @@ -371,7 +403,7 @@ func (p *preprocessor) checkNonUniqTableAlias(stmt *ast.Join) {
p.err = err
return
}
p.parentIsJoin = true
p.flag |= parentIsJoin
}

func isTableAliasDuplicate(node ast.ResultSetNode, tableAliases map[string]interface{}) error {
Expand Down Expand Up @@ -616,7 +648,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) {
}
tn.Schema = model.NewCIStr(currentDB)
}
if p.inCreateOrDropTable {
if p.flag&inCreateOrDropTable > 0 {
// The table may not exist in create table or drop table statement.
// Skip resolving the table to avoid error.
return
Expand Down Expand Up @@ -656,7 +688,7 @@ func (p *preprocessor) resolveShowStmt(node *ast.ShowStmt) {
func (p *preprocessor) resolveAlterTableStmt(node *ast.AlterTableStmt) {
for _, spec := range node.Specs {
if spec.Tp == ast.AlterTableRenameTable {
p.inCreateOrDropTable = true
p.flag |= inCreateOrDropTable
break
}
}
Expand Down
6 changes: 5 additions & 1 deletion planner/core/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ func (s *testValidatorSuite) TestValidator(c *C) {
c.Assert(err1, IsNil)
c.Assert(stmts, HasLen, 1)
stmt := stmts[0]
err = core.Preprocess(ctx, stmt, is, tt.inPrepare)
var opts []core.PreprocessOpt
if tt.inPrepare {
opts = append(opts, core.InPrepare)
}
err = core.Preprocess(ctx, stmt, is, opts...)
c.Assert(terror.ErrorEqual(err, tt.err), IsTrue, Commentf("sql: %s, err:%v", tt.sql, err))
}
}
Loading

0 comments on commit 68c6e5b

Please sign in to comment.