Skip to content

Commit

Permalink
*: refactor slow log format and parse slow query log to SLOW_QUERY
Browse files Browse the repository at this point in the history
…table. (#9290) (#10060)
  • Loading branch information
crazycs520 authored and zz-jason committed Apr 9, 2019
1 parent e4461a6 commit 011559c
Show file tree
Hide file tree
Showing 17 changed files with 562 additions and 62 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ var defaultConf = Config{
Level: "info",
Format: "text",
File: logutil.NewFileLogConfig(true, logutil.DefaultLogMaxSize),
SlowQueryFile: "tidb-slow.log",
SlowThreshold: logutil.DefaultSlowThreshold,
ExpensiveThreshold: 10000,
QueryLogMaxLen: logutil.DefaultQueryLogMaxLen,
Expand Down
2 changes: 1 addition & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ format = "text"
disable-timestamp = false

# Stores slow query log into separated files.
slow-query-file = ""
slow-query-file = "tidb-slow.log"

# Queries with execution time greater than this value will be logged. (Milliseconds)
slow-threshold = 300
Expand Down
33 changes: 8 additions & 25 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,52 +394,35 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
sessVars := a.Ctx.GetSessionVars()
sql = QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo()

connID := sessVars.ConnectionID
currentDB := sessVars.CurrentDB
var tableIDs, indexIDs string
if len(sessVars.StmtCtx.TableIDs) > 0 {
tableIDs = strings.Replace(fmt.Sprintf("table_ids:%v ", a.Ctx.GetSessionVars().StmtCtx.TableIDs), " ", ",", -1)
tableIDs = strings.Replace(fmt.Sprintf("%v", a.Ctx.GetSessionVars().StmtCtx.TableIDs), " ", ",", -1)
}
if len(sessVars.StmtCtx.IndexIDs) > 0 {
indexIDs = strings.Replace(fmt.Sprintf("index_ids:%v ", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
user := sessVars.User
var internal string
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
indexIDs = strings.Replace(fmt.Sprintf("%v", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
logutil.SlowQueryLogger.Debugf(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, sql))
} else {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
logutil.SlowQueryLogger.Warnf(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
var userString string
if user != nil {
userString = user.String()
}
if len(tableIDs) > 10 {
tableIDs = tableIDs[10 : len(tableIDs)-1] // Remove "table_ids:" and the last ","
}
if len(indexIDs) > 10 {
indexIDs = indexIDs[10 : len(indexIDs)-1] // Remove "index_ids:" and the last ","
if sessVars.User != nil {
userString = sessVars.User.String()
}
domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{
SQL: sql,
Start: a.StartTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Succ: succ,
ConnID: connID,
ConnID: sessVars.ConnectionID,
TxnTS: txnTS,
User: userString,
DB: currentDB,
DB: sessVars.CurrentDB,
TableIDs: tableIDs,
IndexIDs: indexIDs,
Internal: sessVars.InRestrictedSQL,
Expand Down
2 changes: 1 addition & 1 deletion executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (s *testSuite) TestAggregation(c *C) {

result = tk.MustQuery("select count(*) from information_schema.columns")
// When adding new memory columns in information_schema, please update this variable.
columnCountOfAllInformationSchemaTables := "769"
columnCountOfAllInformationSchemaTables := "784"
result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables))

tk.MustExec("drop table if exists t1")
Expand Down
252 changes: 252 additions & 0 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"bufio"
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
log "github.com/sirupsen/logrus"
)

var slowQueryCols = []columnInfo{
{variable.SlowLogTimeStr, mysql.TypeTimestamp, 26, 0, nil, nil},
{variable.SlowLogTxnStartTSStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogUserStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogConnIDStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogQueryTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.ProcessTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.WaitTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.BackoffTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.RequestCountStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.TotalKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.ProcessKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogDBStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogIndexIDsStr, mysql.TypeVarchar, 100, 0, nil, nil},
{variable.SlowLogIsInternalStr, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
return parseSlowLogFile(ctx.GetSessionVars().Location(), ctx.GetSessionVars().SlowQueryFile)
}

// parseSlowLogFile uses to parse slow log file.
// TODO: Support parse multiple log-files.
func parseSlowLogFile(tz *time.Location, filePath string) ([][]types.Datum, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err = file.Close(); err != nil {
log.Error(err)
}
}()

return ParseSlowLog(tz, bufio.NewScanner(file))
}

// ParseSlowLog exports for testing.
// TODO: optimize for parse huge log-file.
func ParseSlowLog(tz *time.Location, scanner *bufio.Scanner) ([][]types.Datum, error) {
var rows [][]types.Datum
startFlag := false
var st *slowQueryTuple
var err error
for scanner.Scan() {
line := scanner.Text()
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
err = st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):])
if err != nil {
return rows, err
}
startFlag = true
continue
}

if startFlag {
// Parse slow log field.
if strings.Contains(line, variable.SlowLogPrefixStr) {
line = line[len(variable.SlowLogPrefixStr):]
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
err = st.setFieldValue(tz, field, fieldValues[i+1])
if err != nil {
return rows, err
}
}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
// Get the sql string, and mark the start flag to false.
err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)))
if err != nil {
return rows, err
}
rows = append(rows, st.convertToDatumRow())
startFlag = false
}
}
}
if err := scanner.Err(); err != nil {
return nil, errors.AddStack(err)
}
return rows, nil
}

type slowQueryTuple struct {
time time.Time
txnStartTs uint64
user string
connID uint64
queryTime float64
processTime float64
waitTime float64
backOffTime float64
requestCount uint64
totalKeys uint64
processKeys uint64
db string
indexNames string
isInternal bool
sql string
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
switch field {
case variable.SlowLogTimeStr:
t, err := ParseTime(value)
if err != nil {
return err
}
if t.Location() != tz {
t = t.In(tz)
}
st.time = t
case variable.SlowLogTxnStartTSStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.txnStartTs = num
case variable.SlowLogUserStr:
st.user = value
case variable.SlowLogConnIDStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.connID = num
case variable.SlowLogQueryTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.queryTime = num
case execdetails.ProcessTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.processTime = num
case execdetails.WaitTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.waitTime = num
case execdetails.BackoffTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.backOffTime = num
case execdetails.RequestCountStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.requestCount = num
case execdetails.TotalKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.totalKeys = num
case execdetails.ProcessKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.processKeys = num
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexIDsStr:
st.indexNames = value
case variable.SlowLogIsInternalStr:
st.isInternal = value == "true"
case variable.SlowLogQuerySQLStr:
st.sql = value
}
return nil
}

func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record := make([]types.Datum, 0, len(slowQueryCols))
record = append(record, types.NewTimeDatum(types.Time{
Time: types.FromGoTime(st.time),
Type: mysql.TypeDatetime,
Fsp: types.MaxFsp,
}))
record = append(record, types.NewUintDatum(st.txnStartTs))
record = append(record, types.NewStringDatum(st.user))
record = append(record, types.NewUintDatum(st.connID))
record = append(record, types.NewFloat64Datum(st.queryTime))
record = append(record, types.NewFloat64Datum(st.processTime))
record = append(record, types.NewFloat64Datum(st.waitTime))
record = append(record, types.NewFloat64Datum(st.backOffTime))
record = append(record, types.NewUintDatum(st.requestCount))
record = append(record, types.NewUintDatum(st.totalKeys))
record = append(record, types.NewUintDatum(st.processKeys))
record = append(record, types.NewStringDatum(st.db))
record = append(record, types.NewStringDatum(st.indexNames))
record = append(record, types.NewDatum(st.isInternal))
record = append(record, types.NewStringDatum(st.sql))
return record
}

// ParseTime exports for testing.
func ParseTime(s string) (time.Time, error) {
t, err := time.Parse(logutil.SlowLogTimeFormat, s)
if err != nil {
err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\", err: %v", s, logutil.SlowLogTimeFormat, err)
}
return t, err
}
Loading

0 comments on commit 011559c

Please sign in to comment.