Skip to content

Commit

Permalink
planner: check view recursion when building source from view (#20398) (
Browse files Browse the repository at this point in the history
…#21000)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Dec 10, 2020
1 parent e0348fb commit c59a152
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
13 changes: 13 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,19 @@ func (s *testSuite3) TestCreateView(c *C) {
c.Assert(err.Error(), Equals, "update view v_issue_16253 is not supported now.")
}

func (s *testSuite3) TestViewRecursion(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table if not exists t(a int)")
tk.MustExec("create definer='root'@'localhost' view recursive_view1 as select * from t")
tk.MustExec("create definer='root'@'localhost' view recursive_view2 as select * from recursive_view1")
tk.MustExec("drop table t")
tk.MustExec("rename table recursive_view2 to t")
_, err := tk.Exec("select * from recursive_view1")
c.Assert(terror.ErrorEqual(err, plannercore.ErrViewRecursive), IsTrue)
tk.MustExec("drop view recursive_view1, t")
}

func (s *testSuite3) TestIssue16250(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions planner/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ var (
ErrWindowDuplicateName = terror.ClassOptimizer.New(codeWindowDuplicateName, mysql.MySQLErrName[mysql.ErrWindowDuplicateName])
ErrPartitionClauseOnNonpartitioned = terror.ClassOptimizer.New(codePartitionClauseOnNonpartitioned, mysql.MySQLErrName[mysql.ErrPartitionClauseOnNonpartitioned])
ErrNoSuchTable = terror.ClassOptimizer.New(mysql.ErrNoSuchTable, mysql.MySQLErrName[mysql.ErrNoSuchTable])
ErrViewRecursive = terror.ClassOptimizer.New(mysql.ErrViewRecursive, mysql.MySQLErrName[mysql.ErrViewRecursive])
errTooBigPrecision = terror.ClassExpression.New(mysql.ErrTooBigPrecision, mysql.MySQLErrName[mysql.ErrTooBigPrecision])
ErrDBaccessDenied = terror.ClassOptimizer.New(mysql.ErrDBaccessDenied, mysql.MySQLErrName[mysql.ErrDBaccessDenied])
ErrTableaccessDenied = terror.ClassOptimizer.New(mysql.ErrTableaccessDenied, mysql.MySQLErrName[mysql.ErrTableaccessDenied])
Expand Down
35 changes: 31 additions & 4 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/opcode"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -48,6 +49,7 @@ import (
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
)

const (
Expand Down Expand Up @@ -2313,9 +2315,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName) (L
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr)

if tableInfo.IsView() {
if b.capFlag&collectUnderlyingViewName != 0 {
b.underlyingViewNames.Insert(dbName.L + "." + tn.Name.L)
}
return b.BuildDataSourceFromView(ctx, dbName, tableInfo)
}

Expand Down Expand Up @@ -2431,8 +2430,33 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName) (L
return result, nil
}

// checkRecursiveView checks whether this view is recursively defined.
func (b *PlanBuilder) checkRecursiveView(dbName model.CIStr, tableName model.CIStr) (func(), error) {
viewFullName := dbName.L + "." + tableName.L
if b.buildingViewStack == nil {
b.buildingViewStack = set.NewStringSet()
}
// If this view has already been on the building stack, it means
// this view contains a recursive definition.
if b.buildingViewStack.Exist(viewFullName) {
return nil, ErrViewRecursive.GenWithStackByArgs(dbName.O, tableName.O)
}
// If the view is being renamed, we return the mysql compatible error message.
if b.capFlag&renameView != 0 && viewFullName == b.renamingViewName {
return nil, ErrNoSuchTable.GenWithStackByArgs(dbName.O, tableName.O)
}
b.buildingViewStack.Insert(viewFullName)
return func() { delete(b.buildingViewStack, viewFullName) }, nil
}

// BuildDataSourceFromView is used to build LogicalPlan from view
func (b *PlanBuilder) BuildDataSourceFromView(ctx context.Context, dbName model.CIStr, tableInfo *model.TableInfo) (LogicalPlan, error) {
deferFunc, err := b.checkRecursiveView(dbName, tableInfo.Name)
if err != nil {
return nil, err
}
defer deferFunc()

charset, collation := b.ctx.GetSessionVars().GetCharsetInfo()
viewParser := parser.New()
viewParser.EnableWindowFunc(b.ctx.GetSessionVars().EnableWindowFunction)
Expand All @@ -2444,7 +2468,10 @@ func (b *PlanBuilder) BuildDataSourceFromView(ctx context.Context, dbName model.
b.visitInfo = make([]visitInfo, 0)
selectLogicalPlan, err := b.Build(ctx, selectNode)
if err != nil {
err = ErrViewInvalid.GenWithStackByArgs(dbName.O, tableInfo.Name.O)
if terror.ErrorNotEqual(err, ErrViewRecursive) &&
terror.ErrorNotEqual(err, ErrNoSuchTable) {
err = ErrViewInvalid.GenWithStackByArgs(dbName.O, tableInfo.Name.O)
}
return nil, err
}

Expand Down
23 changes: 11 additions & 12 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ const (
// canExpandAST indicates whether the origin AST can be expanded during plan
// building. ONLY used for `CreateViewStmt` now.
canExpandAST
// collectUnderlyingViewName indicates whether to collect the underlying
// view names of a CreateViewStmt during plan building.
collectUnderlyingViewName
// renameView indicates a view is being renamed, so we cannot use the origin
// definition of that view.
renameView
)

// PlanBuilder builds Plan from an ast.Node.
Expand Down Expand Up @@ -207,8 +207,11 @@ type PlanBuilder struct {

// SelectLock need this information to locate the lock on partitions.
partitionedTable []table.PartitionedTable
// CreateView needs this information to check whether exists nested view.
underlyingViewNames set.StringSet

// buildingViewStack is used to check whether there is a recursive view.
buildingViewStack set.StringSet
// renamingViewName is the name of the view which is being renamed.
renamingViewName string
}

// GetVisitInfo gets the visitInfo of the PlanBuilder.
Expand Down Expand Up @@ -2225,20 +2228,16 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err
v.ReferTable.Name.L, "", authErr)
}
case *ast.CreateViewStmt:
b.capFlag |= canExpandAST
b.capFlag |= collectUnderlyingViewName
b.capFlag |= canExpandAST | renameView
b.renamingViewName = v.ViewName.Schema.L + "." + v.ViewName.Name.L
defer func() {
b.capFlag &= ^canExpandAST
b.capFlag &= ^collectUnderlyingViewName
b.capFlag &= ^renameView
}()
b.underlyingViewNames = set.NewStringSet()
plan, err := b.Build(ctx, v.Select)
if err != nil {
return nil, err
}
if b.underlyingViewNames.Exist(v.ViewName.Schema.L + "." + v.ViewName.Name.L) {
return nil, ErrNoSuchTable.GenWithStackByArgs(v.ViewName.Schema.O, v.ViewName.Name.O)
}
schema := plan.Schema()
if v.Cols == nil {
adjustOverlongViewColname(plan.(LogicalPlan))
Expand Down

0 comments on commit c59a152

Please sign in to comment.