-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
query.go
157 lines (133 loc) · 3.94 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package query
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/mb"
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("sql", "query", New,
mb.WithHostParser(ParseDSN),
)
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
Driver string
Query string
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The sql query metricset is beta.")
config := struct {
Driver string `config:"driver" validate:"nonzero,required"`
Query string `config:"sql_query" validate:"nonzero,required"`
}{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
Driver: config.Driver,
Query: config.Query,
}, nil
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
db, err := sqlx.Open(m.Driver, m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error opening connection")
}
defer db.Close()
err = db.Ping()
if err != nil {
return errors.Wrap(err, "error testing connection")
}
rows, err := db.Queryx(m.Query)
if err != nil {
return errors.Wrap(err, "error executing query")
}
defer rows.Close()
// Extracted from
// https://stackoverflow.com/questions/23507531/is-golangs-sql-package-incapable-of-ad-hoc-exploratory-queries/23507765#23507765
cols, err := rows.Columns()
if err != nil {
return errors.Wrap(err, "error getting columns")
}
for k, v := range cols {
cols[k] = strings.ToLower(v)
}
vals := make([]interface{}, len(cols))
for i := 0; i < len(cols); i++ {
vals[i] = new(interface{})
}
for rows.Next() {
err = rows.Scan(vals...)
if err != nil {
m.Logger().Debug(errors.Wrap(err, "error trying to scan rows"))
continue
}
numericMetrics := common.MapStr{}
stringMetrics := common.MapStr{}
for i := 0; i < len(vals); i++ {
value := getValue(vals[i].(*interface{}))
num, err := strconv.ParseFloat(value, 64)
if err == nil {
numericMetrics[cols[i]] = num
} else {
stringMetrics[cols[i]] = value
}
}
report.Event(mb.Event{
RootFields: common.MapStr{
"sql": common.MapStr{
"driver": m.Driver,
"query": m.Query,
"metrics": common.MapStr{
"numeric": numericMetrics,
"string": stringMetrics,
},
},
},
})
}
if rows.Err() != nil {
m.Logger().Debug(errors.Wrap(err, "error trying to read rows"))
}
return nil
}
func getValue(pval *interface{}) string {
switch v := (*pval).(type) {
case nil:
return "NULL"
case bool:
if v {
return "true"
}
return "false"
case []byte:
return string(v)
case time.Time:
return v.Format("2006-01-02 15:04:05.999")
default:
return fmt.Sprint(v)
}
}