Skip to content

Commit

Permalink
Move parts of metricset code to Elasticsearch module
Browse files Browse the repository at this point in the history
This is inspired by elastic#7074. More work is needed.
  • Loading branch information
ruflin committed May 15, 2018
1 parent f85dbf8 commit f3f9e0b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 59 deletions.
31 changes: 10 additions & 21 deletions metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,43 @@ package index
import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "index", New,
mb.WithHostParser(hostParser),
mb.WithHostParser(elasticsearch.HostParser),
)
}

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
// TODO: This currently gets index data for all indices. Make it configurable.
DefaultPath: "_stats",
}.Build()
const (
statsPath = "/_stats"
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
*elasticsearch.MetricSet
}

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Experimental("The elasticsearch index metricset is experimental")

http, err := helper.NewHTTP(base)
// TODO: This currently gets index data for all indices. Make it configurable.
ms, err := elasticsearch.NewMetricSet(base, statsPath)
if err != nil {
return nil, err
}

return &MetricSet{
base,
http,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch gathers stats for each index from the _stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) {

isMaster, err := elasticsearch.IsMaster(m.http, m.HostData().SanitizedURI)
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statsPath)
if err != nil {
r.Error(err)
return
Expand All @@ -62,13 +51,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

content, err := m.http.FetchContent()
content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

info, err := elasticsearch.GetInfo(m.http, m.HostData().SanitizedURI)
info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI)
if err != nil {
r.Error(err)
return
Expand Down
58 changes: 58 additions & 0 deletions metricbeat/module/elasticsearch/metricset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package elasticsearch

import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)

const (
defaultScheme = "http"
pathConfigKey = "management_path_prefix"
)

var (
// HostParser parses host urls for RabbitMQ management plugin
HostParser = parse.URLHostParserBuilder{
DefaultScheme: defaultScheme,
PathConfigKey: pathConfigKey,
}.Build()
)

// MetricSet can be used to build other metric sets that query RabbitMQ
// management plugin
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
XPack bool
}

// NewMetricSet creates an metric set that can be used to build other metric
// sets that query RabbitMQ management plugin
func NewMetricSet(base mb.BaseMetricSet, subPath string) (*MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
return nil, err
}
http.SetURI(http.GetURI() + subPath)

config := struct {
XPack bool `config:"xpack.enabled"`
}{
XPack: false,
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

if config.XPack {
cfgwarn.Experimental("The experimental xpack.enabled flag in elasticsearch/node_stats metricset is enabled.")
}

return &MetricSet{
base,
http,
config.XPack,
}, nil
}
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
// master node will not be accurate anymore as often in these cases a proxy is in front
// of ES and it's not know if the request will be routed to the same node as before.
for nodeID, node := range nodesStruct.Nodes {
clusterID, err := elasticsearch.GetClusterID(m.http, m.HostData().SanitizedURI, nodeID)
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI, nodeID)
if err != nil {
logp.Err("could not fetch cluster id: %s", err)
continue
}

isMaster, _ := elasticsearch.IsMaster(m.http, m.HostData().SanitizedURI)
isMaster, _ := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI)

event := mb.Event{}
// Build source_node object
Expand Down
44 changes: 8 additions & 36 deletions metricbeat/module/elasticsearch/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,46 @@ package node_stats

import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "node_stats", New,
mb.WithHostParser(hostParser),
mb.WithHostParser(elasticsearch.HostParser),
mb.DefaultMetricSet(),
mb.WithNamespace("elasticsearch.node.stats"),
)
}

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
// Get the stats from the local node
DefaultPath: "_nodes/_local/stats",
}.Build()
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
xpack bool
*elasticsearch.MetricSet
}

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch node_stats metricset is beta")

config := struct {
XPack bool `config:"xpack.enabled"`
}{
XPack: false,
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

if config.XPack {
cfgwarn.Experimental("The experimental xpack.enabled flag in elasticsearch/node_stats metricset is enabled.")
}

http, err := helper.NewHTTP(base)
// Get the stats from the local node
ms, err := elasticsearch.NewMetricSet(base, "/_nodes/_local/stats")
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
http: http,
xpack: config.XPack,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

if m.xpack {
if m.MetricSet.XPack {
eventsMappingXPack(r, m, content)
} else {
eventsMapping(r, content)
Expand Down

0 comments on commit f3f9e0b

Please sign in to comment.