Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/admin: support admin check index on partition table (#17183) #17392

Merged
merged 2 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ func (s *testSuite1) TestAdminCheckIndexRange(c *C) {
result.Check(testkit.Rows("-1 hi 4", "2 cd 2"))
}

func (s *testSuite5) TestAdminCheckIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
check := func() {
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (5, 5), (10, 10), (11, 11), (NULL, NULL)")
tk.MustExec("admin check index admin_test c1")
tk.MustExec("admin check index admin_test c2")
}
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2))")
check()

// Test for hash partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2)) partition by hash(c2) partitions 5;")
check()

// Test for range partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec(`create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2)) PARTITION BY RANGE ( c2 ) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (MAXVALUE))`)
check()
}

func (s *testSuite5) TestAdminRecoverIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
23 changes: 1 addition & 22 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildChange(v)
case *plannercore.CheckTable:
return b.buildCheckTable(v)
case *plannercore.CheckIndex:
return b.buildCheckIndex(v)
case *plannercore.RecoverIndex:
return b.buildRecoverIndex(v)
case *plannercore.CleanupIndex:
Expand Down Expand Up @@ -329,26 +327,6 @@ func (b *executorBuilder) buildShowSlow(v *plannercore.ShowSlow) Executor {
return e
}

func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor {
readerExec, err := buildNoRangeIndexLookUpReader(b, v.IndexLookUpReader)
if err != nil {
b.err = err
return nil
}

buildIndexLookUpChecker(b, v.IndexLookUpReader, readerExec)

e := &CheckIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tableName: readerExec.table.Meta().Name.L,
idxName: v.IdxName,
is: b.is,
src: readerExec,
}
return e
}

// buildIndexLookUpChecker builds check information to IndexLookUpReader.
func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader,
readerExec *IndexLookUpExecutor) {
Expand Down Expand Up @@ -401,6 +379,7 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(readerExecs)),
checkIndex: v.CheckIndex,
}
return e
}
Expand Down
63 changes: 6 additions & 57 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (

var (
_ Executor = &baseExecutor{}
_ Executor = &CheckIndexExec{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &HashJoinExec{}
Expand Down Expand Up @@ -598,6 +597,7 @@ type CheckTableExec struct {
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
checkIndex bool
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -685,6 +685,10 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
// For admin check index statement, for speed up and compatibility, doesn't do below checks.
if e.checkIndex {
return errors.Trace(err)
}
if greater == admin.IdxCntGreater {
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
Expand All @@ -707,7 +711,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
}
}, e.handlePanic)
}(i)
Expand Down Expand Up @@ -751,61 +755,6 @@ func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
return nil
}

// CheckIndexExec represents the executor of checking an index.
// It is built from the "admin check index" statement, and it checks
// the consistency of the index data with the records of the table.
type CheckIndexExec struct {
baseExecutor

dbName string
tableName string
idxName string
src *IndexLookUpExecutor
done bool
is infoschema.InfoSchema
}

// Open implements the Executor Open interface.
func (e *CheckIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
if err := e.src.Open(ctx); err != nil {
return err
}
e.done = false
return nil
}

// Close implements the Executor Close interface.
func (e *CheckIndexExec) Close() error {
return e.src.Close()
}

// Next implements the Executor Next interface.
func (e *CheckIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.done {
return nil
}
defer func() { e.done = true }()

_, _, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName})
if err != nil {
return err
}
chk := newFirstChunk(e.src)
for {
err := Next(ctx, e.src, chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
}
return nil
}

// ShowSlowExec represents the executor of showing the slow queries.
// It is build from the "admin show slow" statement:
// admin show slow top [internal | all] N
Expand Down
10 changes: 1 addition & 9 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type CheckTable struct {
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
CheckIndex bool
}

// RecoverIndex is used for backfilling corrupted index data.
Expand All @@ -98,15 +99,6 @@ type CleanupIndex struct {
IndexName string
}

// CheckIndex is used for checking index data, built from the 'admin check index' statement.
type CheckIndex struct {
baseSchemaProducer

IndexLookUpReader *PhysicalIndexLookUpReader
DBName string
IdxName string
}

// CheckIndexRange is used for checking index data, output the index values that handle within begin and end.
type CheckIndexRange struct {
baseSchemaProducer
Expand Down
105 changes: 58 additions & 47 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,53 +880,15 @@ func (b *PlanBuilder) buildPrepare(x *ast.PrepareStmt) Plan {
return p
}

func (b *PlanBuilder) buildCheckIndex(ctx context.Context, dbName model.CIStr, as *ast.AdminStmt) (Plan, error) {
tblName := as.Tables[0]
tbl, err := b.is.TableByName(dbName, tblName.Name)
if err != nil {
return nil, err
}
tblInfo := tbl.Meta()

// get index information
var idx *model.IndexInfo
for _, index := range tblInfo.Indices {
if index.Name.L == strings.ToLower(as.Index) {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State)
}

return b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idx)
}

func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, error) {
var ret Plan
var err error
switch as.Tp {
case ast.AdminCheckTable:
case ast.AdminCheckTable, ast.AdminCheckIndex:
ret, err = b.buildAdminCheckTable(ctx, as)
if err != nil {
return ret, err
}
case ast.AdminCheckIndex:
dbName := as.Tables[0].Schema
readerPlan, err := b.buildCheckIndex(ctx, dbName, as)
if err != nil {
return ret, err
}

ret = &CheckIndex{
DBName: dbName.L,
IdxName: as.Index,
IndexLookUpReader: readerPlan.(*PhysicalIndexLookUpReader),
}
case ast.AdminRecoverIndex:
p := &RecoverIndex{Table: as.Tables[0], IndexName: as.Index}
p.setSchemaAndNames(buildRecoverIndexFields())
Expand Down Expand Up @@ -1163,12 +1125,12 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName
return rootT.p, nil
}

func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbName model.CIStr, tbl table.Table) ([]Plan, []*model.IndexInfo, error) {
func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbName model.CIStr, tbl table.Table, indices []table.Index) ([]Plan, []*model.IndexInfo, error) {
tblInfo := tbl.Meta()
// get index information
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))
for _, idx := range tbl.Indices() {
for _, idx := range indices {
idxInfo := idx.Meta()
if idxInfo.State != model.StatePublic {
logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public",
Expand Down Expand Up @@ -1202,17 +1164,40 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
}

func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStmt) (*CheckTable, error) {
tbl := as.Tables[0]
tblName := as.Tables[0]
tableInfo := as.Tables[0].TableInfo
table, ok := b.is.TableByID(tableInfo.ID)
tbl, ok := b.is.TableByID(tableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O)
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tblName.DBInfo.Name.O, tableInfo.Name.O)
}
p := &CheckTable{
DBName: tbl.Schema.O,
Table: table,
DBName: tblName.Schema.O,
Table: tbl,
}
var readerPlans []Plan
var indexInfos []*model.IndexInfo
var err error
if as.Tp == ast.AdminCheckIndex {
// get index information
var idx table.Index
idxName := strings.ToLower(as.Index)
for _, index := range tbl.Indices() {
if index.Meta().Name.L == idxName {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.Meta().State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.Meta().State)
}
p.CheckIndex = true
readerPlans, indexInfos, err = b.buildPhysicalIndexLookUpReaders(ctx, tblName.Schema, tbl, []table.Index{idx})
} else {
readerPlans, indexInfos, err = b.buildPhysicalIndexLookUpReaders(ctx, tblName.Schema, tbl, tbl.Indices())
}
readerPlans, indexInfos, err := b.buildPhysicalIndexLookUpReaders(ctx, tbl.Schema, table)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1225,6 +1210,32 @@ func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStm
return p, nil
}

func (b *PlanBuilder) buildCheckIndex(ctx context.Context, dbName model.CIStr, as *ast.AdminStmt) (Plan, error) {
tblName := as.Tables[0]
tbl, err := b.is.TableByName(dbName, tblName.Name)
if err != nil {
return nil, err
}
tblInfo := tbl.Meta()

// get index information
var idx *model.IndexInfo
for _, index := range tblInfo.Indices {
if index.Name.L == strings.ToLower(as.Index) {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State)
}

return b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idx)
}

func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) (*expression.Schema, types.NameSlice, error) {
schema := expression.NewSchema()
var names types.NameSlice
Expand Down