Skip to content

Commit

Permalink
executor: Migrate dataForStatistics, dataForTikVRegionPeers, dataForT…
Browse files Browse the repository at this point in the history
…iDBHotRegions from package infoschema to executor (#15202)
  • Loading branch information
gauss1314 authored Mar 11, 2020
1 parent 248be5b commit 3e16e1f
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 229 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
},
}
case strings.ToLower(infoschema.TableSchemata),
strings.ToLower(infoschema.TableStatistics),
strings.ToLower(infoschema.TableTiDBIndexes),
strings.ToLower(infoschema.TableViews),
strings.ToLower(infoschema.TableTables),
Expand All @@ -1419,6 +1420,8 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableUserPrivileges),
strings.ToLower(infoschema.TableMetricTables),
strings.ToLower(infoschema.TableCollationCharacterSetApplicability),
strings.ToLower(infoschema.TableTiKVRegionPeers),
strings.ToLower(infoschema.TableTiDBHotRegions),
strings.ToLower(infoschema.TableSessionVar),
strings.ToLower(infoschema.TableConstraints):
return &MemTableReaderExec{
Expand Down
213 changes: 213 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -29,8 +30,12 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
)

Expand Down Expand Up @@ -59,6 +64,8 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
switch e.table.Name.O {
case infoschema.TableSchemata:
e.setDataFromSchemata(sctx, dbs)
case infoschema.TableStatistics:
e.setDataForStatistics(sctx, dbs)
case infoschema.TableTables:
err = e.setDataFromTables(sctx, dbs)
case infoschema.TablePartitions:
Expand All @@ -81,6 +88,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
e.dataForCollationCharacterSetApplicability()
case infoschema.TableUserPrivileges:
e.setDataFromUserPrivileges(sctx)
case infoschema.TableTiKVRegionPeers:
err = e.setDataForTikVRegionPeers(sctx)
case infoschema.TableTiDBHotRegions:
err = e.setDataForTiDBHotRegions(sctx)
case infoschema.TableConstraints:
e.setDataFromTableConstraints(sctx, dbs)
case infoschema.TableSessionVar:
Expand Down Expand Up @@ -270,6 +281,93 @@ func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas
e.rows = rows
}

func (e *memtableRetriever) setDataForStatistics(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
for _, schema := range schemas {
for _, table := range schema.Tables {
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
e.setDataForStatisticsInTable(schema, table)
}
}
}

func (e *memtableRetriever) setDataForStatisticsInTable(schema *model.DBInfo, table *model.TableInfo) {
var rows [][]types.Datum
if table.PKIsHandle {
for _, col := range table.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
"0", // NON_UNIQUE
schema.Name.O, // INDEX_SCHEMA
"PRIMARY", // INDEX_NAME
1, // SEQ_IN_INDEX
col.Name.O, // COLUMN_NAME
"A", // COLLATION
0, // CARDINALITY
nil, // SUB_PART
nil, // PACKED
"", // NULLABLE
"BTREE", // INDEX_TYPE
"", // COMMENT
"NULL", // Expression
"", // INDEX_COMMENT
)
rows = append(rows, record)
}
}
}
nameToCol := make(map[string]*model.ColumnInfo, len(table.Columns))
for _, c := range table.Columns {
nameToCol[c.Name.L] = c
}
for _, index := range table.Indices {
nonUnique := "1"
if index.Unique {
nonUnique = "0"
}
for i, key := range index.Columns {
col := nameToCol[key.Name.L]
nullable := "YES"
if mysql.HasNotNullFlag(col.Flag) {
nullable = ""
}
colName := col.Name.O
expression := "NULL"
tblCol := table.Columns[col.Offset]
if tblCol.Hidden {
colName = "NULL"
expression = fmt.Sprintf("(%s)", tblCol.GeneratedExprString)
}
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
nonUnique, // NON_UNIQUE
schema.Name.O, // INDEX_SCHEMA
index.Name.O, // INDEX_NAME
i+1, // SEQ_IN_INDEX
colName, // COLUMN_NAME
"A", // COLLATION
0, // CARDINALITY
nil, // SUB_PART
nil, // PACKED
nullable, // NULLABLE
"BTREE", // INDEX_TYPE
"", // COMMENT
expression, // Expression
"", // INDEX_COMMENT
)
rows = append(rows, record)
}
}
e.rows = append(e.rows, rows...)
}

func (e *memtableRetriever) setDataFromTables(ctx sessionctx.Context, schemas []*model.DBInfo) error {
tableRowsMap, colLengthMap, err := tableStatsCache.get(ctx)
if err != nil {
Expand Down Expand Up @@ -780,6 +878,121 @@ func keyColumnUsageInTable(schema *model.DBInfo, table *model.TableInfo) [][]typ
return rows
}

func (e *memtableRetriever) setDataForTikVRegionPeers(ctx sessionctx.Context) error {
tikvStore, ok := ctx.GetStore().(tikv.Storage)
if !ok {
return errors.New("Information about TiKV region status can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
regionsInfo, err := tikvHelper.GetRegionsInfo()
if err != nil {
return err
}
for _, region := range regionsInfo.Regions {
e.setNewTiKVRegionPeersCols(&region)
}
return nil
}

func (e *memtableRetriever) setNewTiKVRegionPeersCols(region *helper.RegionInfo) {
records := make([][]types.Datum, 0, len(region.Peers))
pendingPeerIDSet := set.NewInt64Set()
for _, peer := range region.PendingPeers {
pendingPeerIDSet.Insert(peer.ID)
}
downPeerMap := make(map[int64]int64, len(region.DownPeers))
for _, peerStat := range region.DownPeers {
downPeerMap[peerStat.ID] = peerStat.DownSec
}
for _, peer := range region.Peers {
row := make([]types.Datum, len(infoschema.TableTiKVRegionPeersCols))
row[0].SetInt64(region.ID)
row[1].SetInt64(peer.ID)
row[2].SetInt64(peer.StoreID)
if peer.IsLearner {
row[3].SetInt64(1)
} else {
row[3].SetInt64(0)
}
if peer.ID == region.Leader.ID {
row[4].SetInt64(1)
} else {
row[4].SetInt64(0)
}
if pendingPeerIDSet.Exist(peer.ID) {
row[5].SetString(pendingPeer, mysql.DefaultCollationName)
} else if downSec, ok := downPeerMap[peer.ID]; ok {
row[5].SetString(downPeer, mysql.DefaultCollationName)
row[6].SetInt64(downSec)
} else {
row[5].SetString(normalPeer, mysql.DefaultCollationName)
}
records = append(records, row)
}
e.rows = append(e.rows, records...)
}

const (
normalPeer = "NORMAL"
pendingPeer = "PENDING"
downPeer = "DOWN"
)

func (e *memtableRetriever) setDataForTiDBHotRegions(ctx sessionctx.Context) error {
tikvStore, ok := ctx.GetStore().(tikv.Storage)
if !ok {
return errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
allSchemas := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema).AllSchemas()
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
metrics, err := tikvHelper.ScrapeHotInfo(pdapi.HotRead, allSchemas)
if err != nil {
return err
}
e.setDataForHotRegionByMetrics(metrics, "read")
metrics, err = tikvHelper.ScrapeHotInfo(pdapi.HotWrite, allSchemas)
if err != nil {
return err
}
e.setDataForHotRegionByMetrics(metrics, "write")
return nil
}

func (e *memtableRetriever) setDataForHotRegionByMetrics(metrics []helper.HotTableIndex, tp string) {
rows := make([][]types.Datum, 0, len(metrics))
for _, tblIndex := range metrics {
row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsCols))
if tblIndex.IndexName != "" {
row[1].SetInt64(tblIndex.IndexID)
row[4].SetString(tblIndex.IndexName, mysql.DefaultCollationName)
} else {
row[1].SetNull()
row[4].SetNull()
}
row[0].SetInt64(tblIndex.TableID)
row[2].SetString(tblIndex.DbName, mysql.DefaultCollationName)
row[3].SetString(tblIndex.TableName, mysql.DefaultCollationName)
row[5].SetUint64(tblIndex.RegionID)
row[6].SetString(tp, mysql.DefaultCollationName)
if tblIndex.RegionMetric == nil {
row[7].SetNull()
row[8].SetNull()
} else {
row[7].SetInt64(int64(tblIndex.RegionMetric.MaxHotDegree))
row[8].SetInt64(int64(tblIndex.RegionMetric.Count))
}
row[9].SetUint64(tblIndex.RegionMetric.FlowBytes)
rows = append(rows, row)
}
e.rows = append(e.rows, rows...)
}

// setDataFromTableConstraints constructs data for table information_schema.constraints.See https://dev.mysql.com/doc/refman/5.7/en/table-constraints-table.html
func (e *memtableRetriever) setDataFromTableConstraints(ctx sessionctx.Context, schemas []*model.DBInfo) {
checker := privilege.GetPrivilegeManager(ctx)
Expand Down
Loading

0 comments on commit 3e16e1f

Please sign in to comment.