Skip to content

Commit

Permalink
Fix known mysql type conversion issues (#6647)
Browse files Browse the repository at this point in the history
(cherry picked from commit d858d82)
  • Loading branch information
danielnelson committed Nov 12, 2019
1 parent c6f6c2a commit 29dff92
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 4 deletions.
31 changes: 27 additions & 4 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"sync"
"time"

"github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"

"github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf/plugins/inputs/mysql/v2"
)

type Mysql struct {
Expand All @@ -38,6 +38,8 @@ type Mysql struct {
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
IntervalSlow string `toml:"interval_slow"`
MetricVersion int `toml:"metric_version"`

Log telegraf.Logger `toml:"-"`
tls.ClientConfig
lastT time.Time
initDone bool
Expand Down Expand Up @@ -549,14 +551,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
return err
}
key = strings.ToLower(key)

// parse mysql version and put into field and tag
if strings.Contains(key, "version") {
fields[key] = string(val)
tags[key] = string(val)
}
if value, ok := m.parseValue(val); ok {

value, err := m.parseGlobalVariables(key, val)
if err != nil {
m.Log.Debugf("Error parsing global variable %q: %v", key, err)
} else {
fields[key] = value
}

// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql_variables", fields, tags)
Expand All @@ -570,6 +578,18 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
return nil
}

func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 {
v, ok := v1.ParseValue(value)
if ok {
return v, nil
}
return v, fmt.Errorf("could not parse value: %q", string(value))
} else {
return v2.ConvertGlobalVariables(key, value)
}
}

// gatherSlaveStatuses can be used to get replication analytics
// When the server is slave, then it returns only one row.
// If the multi-source replication is set, then everything works differently
Expand Down Expand Up @@ -743,7 +763,10 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
}
} else {
key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok {
value, err := v2.ConvertGlobalStatus(key, val)
if err != nil {
m.Log.Debugf("Error parsing global status: %v", err)
} else {
fields[key] = value
}
}
Expand Down
103 changes: 103 additions & 0 deletions plugins/inputs/mysql/v2/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package v2

import (
"bytes"
"database/sql"
"fmt"
"strconv"
)

type ConversionFunc func(value sql.RawBytes) (interface{}, error)

func ParseInt(value sql.RawBytes) (interface{}, error) {
v, err := strconv.ParseInt(string(value), 10, 64)

// Ignore ErrRange. When this error is set the returned value is "the
// maximum magnitude integer of the appropriate bitSize and sign."
if err, ok := err.(*strconv.NumError); ok && err.Err == strconv.ErrRange {
return v, nil
}

return v, err
}

func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
return int64(1), nil
}

return int64(0), nil
}

func ParseGTIDMode(value sql.RawBytes) (interface{}, error) {
// https://dev.mysql.com/doc/refman/8.0/en/replication-mode-change-online-concepts.html
v := string(value)
switch v {
case "OFF":
return int64(0), nil
case "ON":
return int64(1), nil
case "OFF_PERMISSIVE":
return int64(0), nil
case "ON_PERMISSIVE":
return int64(1), nil
default:
return nil, fmt.Errorf("unrecognized gtid_mode: %q", v)
}
}

func ParseValue(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
return 1, nil
}

if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 {
return 0, nil
}

if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
return val, nil
}
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
return val, nil
}

if len(string(value)) > 0 {
return string(value), nil
}

return nil, fmt.Errorf("unconvertible value: %q", string(value))
}

var GlobalStatusConversions = map[string]ConversionFunc{
"ssl_ctx_verify_depth": ParseInt,
"ssl_verify_depth": ParseInt,
}

var GlobalVariableConversions = map[string]ConversionFunc{
"gtid_mode": ParseGTIDMode,
}

func ConvertGlobalStatus(key string, value sql.RawBytes) (interface{}, error) {
if bytes.Equal(value, []byte("")) {
return nil, nil
}

if conv, ok := GlobalStatusConversions[key]; ok {
return conv(value)
}

return ParseValue(value)
}

func ConvertGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if bytes.Equal(value, []byte("")) {
return nil, nil
}

if conv, ok := GlobalVariableConversions[key]; ok {
return conv(value)
}

return ParseValue(value)
}
86 changes: 86 additions & 0 deletions plugins/inputs/mysql/v2/convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package v2

import (
"database/sql"
"testing"

"github.com/stretchr/testify/require"
)

func TestConvertGlobalStatus(t *testing.T) {
tests := []struct {
name string
key string
value sql.RawBytes
expected interface{}
expectedErr error
}{
{
name: "default",
key: "ssl_ctx_verify_depth",
value: []byte("0"),
expected: int64(0),
expectedErr: nil,
},
{
name: "overflow int64",
key: "ssl_ctx_verify_depth",
value: []byte("18446744073709551615"),
expected: int64(9223372036854775807),
expectedErr: nil,
},
{
name: "defined variable but unset",
key: "ssl_ctx_verify_depth",
value: []byte(""),
expected: nil,
expectedErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, err := ConvertGlobalStatus(tt.key, tt.value)
require.Equal(t, tt.expectedErr, err)
require.Equal(t, tt.expected, actual)
})
}
}

func TestCovertGlobalVariables(t *testing.T) {
tests := []struct {
name string
key string
value sql.RawBytes
expected interface{}
expectedErr error
}{
{
name: "boolean type mysql<=5.6",
key: "gtid_mode",
value: []byte("ON"),
expected: int64(1),
expectedErr: nil,
},
{
name: "enum type mysql>=5.7",
key: "gtid_mode",
value: []byte("ON_PERMISSIVE"),
expected: int64(1),
expectedErr: nil,
},
{
name: "defined variable but unset",
key: "ssl_ctx_verify_depth",
value: []byte(""),
expected: nil,
expectedErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, err := ConvertGlobalVariables(tt.key, tt.value)
require.Equal(t, tt.expectedErr, err)
require.Equal(t, tt.expected, actual)
})
}
}

0 comments on commit 29dff92

Please sign in to comment.