Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report kibana_settings to X-Pack Monitoring (#7664) #7943

Merged
merged 1 commit into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions metricbeat/module/kibana/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

const (
// StatsAPIAvailableVersion is the version of Kibana since when the stats API is available
StatsAPIAvailableVersion = "6.4.0"

// SettingsAPIAvailableVersion is the version of Kibana since when the settings API is available
SettingsAPIAvailableVersion = "6.5.0"
)

// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from Kibana
func ReportErrorForMissingField(field string, r mb.ReporterV2) error {
Expand Down Expand Up @@ -64,6 +72,30 @@ func GetVersion(http *helper.HTTP, currentPath string) (string, error) {
return versionStr, nil
}

func isKibanaAPIAvailable(currentKibanaVersion, apiAvailableInKibanaVersion string) (bool, error) {
currentVersion, err := common.NewVersion(currentKibanaVersion)
if err != nil {
return false, err
}

wantVersion, err := common.NewVersion(apiAvailableInKibanaVersion)
if err != nil {
return false, err
}

return !currentVersion.LessThan(wantVersion), nil
}

// IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana
func IsStatsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, StatsAPIAvailableVersion)
}

// IsSettingsAPIAvailable returns whether the settings API is available in the given version of Kibana
func IsSettingsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, SettingsAPIAvailableVersion)
}

func fetchPath(http *helper.HTTP, currentPath, newPath string) ([]byte, error) {
currentURI := http.GetURI()
defer http.SetURI(currentURI) // Reset after this request
Expand Down
18 changes: 18 additions & 0 deletions metricbeat/module/kibana/stats/_meta/test/settings.700.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"cluster_uuid":"u5ii0pnQRka_P0gimfmthg",
"settings":{
"xpack":{
"default_admin_email":"jane@doe.com"
},
"kibana":{
"uuid":"5b2de169-2785-441b-ae8c-186a1936b17d",
"name":"Janes-MBP-2",
"index":".kibana",
"host":"localhost",
"transport_address":"localhost:5601",
"version":"7.0.0-alpha1",
"snapshot":false,
"status":"green"
}
}
}
75 changes: 52 additions & 23 deletions metricbeat/module/kibana/stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
schemaXPackMonitoring = s.Schema{
schemaXPackMonitoringStats = s.Schema{
"concurrent_connections": c.Int("concurrent_connections"),
"os": c.Dict("os", s.Schema{
"load": c.Dict("load", s.Schema{
Expand Down Expand Up @@ -105,61 +105,90 @@ var (
}
)

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
r.Error(err)
return err
type dataParser func(mb.ReporterV2, common.MapStr, time.Time) (string, string, common.MapStr, error)

func statsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["clusterUuid"].(string)
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
}

kibanaStatsFields, err := schemaXPackMonitoring.Apply(data)
kibanaStatsFields, err := schemaXPackMonitoringStats.Apply(data)
if err != nil {
r.Error(err)
return err
return "", "", nil, err
}

process, ok := data["process"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
}
memory, ok := process["memory"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
}

rss, ok := memory["resident_set_size_bytes"].(float64)
if !ok {
return elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
}
kibanaStatsFields.Put("process.memory.resident_set_size_in_bytes", int64(rss))

timestamp := time.Now()
kibanaStatsFields.Put("timestamp", timestamp)
kibanaStatsFields.Put("timestamp", now)

// Make usage field passthrough as-is
usage, ok := data["usage"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
}
kibanaStatsFields.Put("usage", usage)

clusterUUID, ok := data["clusterUuid"].(string)
return "kibana_stats", clusterUUID, kibanaStatsFields, nil
}

func settingsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["cluster_uuid"].(string)
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("cluster_uuid", elastic.Kibana, r)
}

kibanaSettingsFields, ok := data["settings"]
if !ok {
return elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("settings", elastic.Kibana, r)
}

return "kibana_settings", clusterUUID, kibanaSettingsFields.(map[string]interface{}), nil
}

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte, dataParserFunc dataParser) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

t, clusterUUID, fields, err := dataParserFunc(r, data, now)
if err != nil {
return err
}

var event mb.Event
event.RootFields = common.MapStr{
"cluster_uuid": clusterUUID,
"timestamp": timestamp,
"timestamp": now,
"interval_ms": intervalMs,
"type": "kibana_stats",
"kibana_stats": kibanaStatsFields,
"type": t,
t: fields,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Kibana)
r.Event(event)

r.Event(event)
return nil
}

func eventMappingStatsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, statsDataParser)
}

func eventMappingSettingsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, settingsDataParser)
}
26 changes: 24 additions & 2 deletions metricbeat/module/kibana/stats/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"

"github.com/stretchr/testify/assert"
)

func TestEventMappingXPack(t *testing.T) {
func TestEventMappingStatsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/stats-legacy.*.json")
assert.NoError(t, err)
Expand All @@ -39,7 +40,28 @@ func TestEventMappingXPack(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventMappingXPack(reporter, 10000, input)
now := time.Now()

err = eventMappingStatsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
}
}

func TestEventMappingSettingsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/settings.*.json")
assert.NoError(t, err)

for _, f := range files {
input, err := ioutil.ReadFile(f)
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
now := time.Now()

err = eventMappingSettingsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
Expand Down
79 changes: 65 additions & 14 deletions metricbeat/module/kibana/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package stats

import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand All @@ -37,8 +39,8 @@ func init() {
}

const (
statsPath = "api/stats"
kibanaStatsAPIAvailableVersion = "6.4.0"
statsPath = "api/stats"
settingsPath = "api/settings"
)

var (
Expand All @@ -52,7 +54,8 @@ var (
// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
statsHTTP *helper.HTTP
settingsHTTP *helper.HTTP
xPackEnabled bool
}

Expand All @@ -62,7 +65,7 @@ func isKibanaStatsAPIAvailable(kibanaVersion string) (bool, error) {
return false, err
}

wantVersion, err := common.NewVersion(kibanaStatsAPIAvailableVersion)
wantVersion, err := common.NewVersion(kibana.StatsAPIAvailableVersion)
if err != nil {
return false, err
}
Expand All @@ -79,36 +82,62 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}

http, err := helper.NewHTTP(base)
statsHTTP, err := helper.NewHTTP(base)
if err != nil {
return nil, err
}

kibanaVersion, err := kibana.GetVersion(http, statsPath)
kibanaVersion, err := kibana.GetVersion(statsHTTP, statsPath)
if err != nil {
return nil, err
}

isAPIAvailable, err := isKibanaStatsAPIAvailable(kibanaVersion)
isStatsAPIAvailable, err := kibana.IsStatsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isAPIAvailable {
if !isStatsAPIAvailable {
const errorMsg = "The kibana stats metricset is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibanaStatsAPIAvailableVersion, kibanaVersion)
return nil, fmt.Errorf(errorMsg, kibana.StatsAPIAvailableVersion, kibanaVersion)
}

if config.XPackEnabled {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

// Use legacy API response so we can passthru usage as-is
http.SetURI(http.GetURI() + "&legacy=true")
statsHTTP.SetURI(statsHTTP.GetURI() + "&legacy=true")
}

var settingsHTTP *helper.HTTP
if config.XPackEnabled {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

isSettingsAPIAvailable, err := kibana.IsSettingsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isSettingsAPIAvailable {
const errorMsg = "The kibana stats metricset with X-Pack enabled is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibana.SettingsAPIAvailableVersion, kibanaVersion)
}

settingsHTTP, err = helper.NewHTTP(base)
if err != nil {
return nil, err
}

// HACK! We need to do this because there might be a basepath involved, so we
// only search/replace the actual API paths
settingsURI := strings.Replace(statsHTTP.GetURI(), statsPath, settingsPath, 1)
settingsHTTP.SetURI(settingsURI)
}

return &MetricSet{
base,
http,
statsHTTP,
settingsHTTP,
config.XPackEnabled,
}, nil
}
Expand All @@ -117,17 +146,39 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
now := time.Now()

m.fetchStats(r, now)
if m.xPackEnabled {
m.fetchSettings(r, now)
}
}

func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) {
content, err := m.statsHTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

if m.xPackEnabled {
intervalMs := m.Module().Config().Period.Nanoseconds() / 1000 / 1000
eventMappingXPack(r, intervalMs, content)
intervalMs := m.calculateIntervalMs()
eventMappingStatsXPack(r, intervalMs, now, content)
} else {
eventMapping(r, content)
}
}

func (m *MetricSet) fetchSettings(r mb.ReporterV2, now time.Time) {
content, err := m.settingsHTTP.FetchContent()
if err != nil {
return
}

intervalMs := m.calculateIntervalMs()
eventMappingSettingsXPack(r, intervalMs, now, content)
}

func (m *MetricSet) calculateIntervalMs() int64 {
return m.Module().Config().Period.Nanoseconds() / 1000 / 1000
}