Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: add metrics_summary_by_label table to query all detail metrics #14663

Merged
merged 9 commits into from
Feb 11, 2020
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
},
}
case strings.ToLower(infoschema.TableMetricSummaryByLabel):
return &ClusterReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &MetricSummaryByLabelRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
},
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
64 changes: 64 additions & 0 deletions executor/metric_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,67 @@ func (e *MetricSummaryRetriever) genMetricQuerySQLS(name, startTime, endTime str
}
return sqls
}

// MetricSummaryByLabelRetriever uses to read metric detail data.
type MetricSummaryByLabelRetriever struct {
dummyCloser
table *model.TableInfo
extractor *plannercore.MetricTableExtractor
retrieved bool
}

func (e *MetricSummaryByLabelRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved || e.extractor.SkipRequest {
return nil, nil
}
e.retrieved = true
totalRows := make([][]types.Datum, 0, len(infoschema.MetricTableMap))
quantiles := []float64{1, 0.999, 0.99, 0.90, 0.80}
tps := make([]*types.FieldType, 0, len(e.table.Columns))
for _, col := range e.table.Columns {
tps = append(tps, &col.FieldType)
}
startTime := e.extractor.StartTime.Format(plannercore.MetricTableTimeFormat)
endTime := e.extractor.EndTime.Format(plannercore.MetricTableTimeFormat)
for name, def := range infoschema.MetricTableMap {
sqls := e.genMetricQuerySQLS(name, startTime, endTime, quantiles, def)
for _, sql := range sqls {
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return nil, errors.Trace(err)
}
for _, row := range rows {
totalRows = append(totalRows, row.GetDatumRow(tps))
}
}
}
return totalRows, nil
}

func (e *MetricSummaryByLabelRetriever) genMetricQuerySQLS(name, startTime, endTime string, quantiles []float64, def infoschema.MetricTableDef) []string {
labels := ""
labelsColumn := `""`
if len(def.Labels) > 0 {
labels = "`" + strings.Join(def.Labels, "`, `") + "`"
labelsColumn = fmt.Sprintf("concat_ws(' = ', '%s', concat_ws(', ', %s))", strings.Join(def.Labels, ", "), labels)
}
if def.Quantile == 0 {
quantiles = []float64{0}
}
sqls := []string{}
for _, quantile := range quantiles {
var sql string
if quantile == 0 {
sql = fmt.Sprintf(`select "%[1]s", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s'`,
name, labelsColumn, startTime, endTime)
} else {
sql = fmt.Sprintf(`select "%[1]s_%[5]v", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s' and quantile=%[5]v`,
name, labelsColumn, startTime, endTime, quantile)
}
if len(def.Labels) > 0 {
sql += " group by " + labels
}
sqls = append(sqls, sql)
}
return sqls
}
15 changes: 14 additions & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ const (
// TableInspectionResult is the string constant of inspection result table
TableInspectionResult = "INSPECTION_RESULT"
// TableMetricSummary is a summary table that contains all metrics.
TableMetricSummary = "METRIC_SUMMARY"
TableMetricSummary = "METRICS_SUMMARY"
// TableMetricSummaryByLabel is a metric table that contains all metrics that group by label info.
TableMetricSummaryByLabel = "METRICS_SUMMARY_BY_LABEL"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -163,6 +165,7 @@ var tableIDMap = map[string]int64{
TableClusterSystemInfo: autoid.InformationSchemaDBID + 50,
TableInspectionResult: autoid.InformationSchemaDBID + 51,
TableMetricSummary: autoid.InformationSchemaDBID + 52,
TableMetricSummaryByLabel: autoid.InformationSchemaDBID + 53,
}

type columnInfo struct {
Expand Down Expand Up @@ -1136,6 +1139,15 @@ var tableMetricSummaryCols = []columnInfo{
{"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
}
var tableMetricSummaryByLabelCols = []columnInfo{
{"METRIC_NAME", mysql.TypeVarchar, 64, 0, nil, nil},
{"LABEL", mysql.TypeVarchar, 64, 0, nil, nil},
{"TIME", mysql.TypeDatetime, -1, 0, nil, nil},
{"SUM_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"AVG_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
{"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil},
}

func dataForSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum {
checker := privilege.GetPrivilegeManager(ctx)
Expand Down Expand Up @@ -2347,6 +2359,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableClusterSystemInfo: tableClusterSystemInfoCols,
TableInspectionResult: tableInspectionResultCols,
TableMetricSummary: tableMetricSummaryCols,
TableMetricSummaryByLabel: tableMetricSummaryByLabelCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2793,7 +2793,7 @@ func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tab
p.Extractor = &ClusterLogTableExtractor{}
case infoschema.TableInspectionResult:
p.Extractor = &InspectionResultTableExtractor{}
case infoschema.TableMetricSummary:
case infoschema.TableMetricSummary, infoschema.TableMetricSummaryByLabel:
p.Extractor = newMetricTableExtractor()
}
}
Expand Down
4 changes: 4 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,10 @@ func createSessionFunc(store kv.Storage) pools.Factory {
if err != nil {
return nil, errors.Trace(err)
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxAllowedPacket, types.NewStringDatum("67108864"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PR #11137, concat_ws will need this variable, so I set this variable here for internal sql.

if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
Expand Down