Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend logstash.node_stats metricset for logstash_stats stack monitoring data #11511

Merged
merged 12 commits into from
Jun 26, 2019
39 changes: 38 additions & 1 deletion metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,19 @@ type PipelineState struct {
Representation map[string]interface{} `json:"representation"`
BatchSize int `json:"batch_size"`
Workers int `json:"workers"`
ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602
ClusterIDs []string `json:"cluster_uuids,omitempty"`
}

// PipelineStats represents the stats of a Logstash pipeline
type PipelineStats struct {
ID string `json:"id"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
Events map[string]interface{} `json:"events"`
Reloads map[string]interface{} `json:"reloads"`
Queue map[string]interface{} `json:"queue"`
Vertices []map[string]interface{} `json:"vertices"`
ClusterIDs []string `json:"cluster_uuids,omitempty"`
}

// NewMetricSet creates a metricset that can be used to build other metricsets
Expand Down Expand Up @@ -91,6 +103,31 @@ func GetPipelines(http *helper.HTTP, resetURI string) ([]PipelineState, error) {
return pipelines, nil
}

// GetPipelinesStats returns the list of pipelines (and their stats) running on a Logstash node
func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, error) {
content, err := fetchPath(http, resetURI, "_node/stats", "vertices=true")
if err != nil {
return nil, errors.Wrap(err, "could not fetch node pipeline stats")
}

pipelinesResponse := struct {
Pipelines map[string]PipelineStats `json:"pipelines"`
}{}

err = json.Unmarshal(content, &pipelinesResponse)
if err != nil {
return nil, errors.Wrap(err, "could not parse node pipeline stats response")
}

var pipelines []PipelineStats
for pipelineID, pipeline := range pipelinesResponse.Pipelines {
pipeline.ID = pipelineID
pipelines = append(pipelines, pipeline)
}

return pipelines, nil
}

func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) {
defer http.SetURI(resetURI)

Expand Down
205 changes: 205 additions & 0 deletions metricbeat/module/logstash/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package node_stats

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name


import (
"encoding/json"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
)

type commonStats struct {
Events map[string]interface{} `json:"events"`
JVM map[string]interface{} `json:"jvm"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixbarny @axw Just found some more jvm metrics. I wonder if long term this also ties into our metrics discussion: elastic/ecs#474

Reloads map[string]interface{} `json:"reloads"`
Queue struct {
EventsCount int `json:"events_count"`
} `json:"queue"`
}

type cpu struct {
Percent int `json:"percent"`
LoadAverage map[string]interface{} `json:"load_average"`
NumCPUs int `json:"num_cpus"`
}

type process struct {
OpenFileDescriptors int `json:"open_file_descriptors"`
MaxFileDescriptors int `json:"max_file_descriptors"`
CPU cpu `json:"cpu"`
}

type os struct {
CPU cpu `json:"cpu"`
}

type nodeInfo struct {
ID string `json:"id,omitempty"`
UUID string `json:"uuid"`
EphemeralID string `json:"ephemeral_id"`
Name string `json:"name"`
Host string `json:"host"`
Version string `json:"version"`
Snapshot bool `json:"snapshot"`
Status string `json:"status"`
HTTPAddress string `json:"http_address"`
Pipeline map[string]interface{} `json:"pipeline"`
}

// NodeStats represents the stats of a Logstash node
type NodeStats struct {
nodeInfo
commonStats
Process process `json:"process"`
Pipelines map[string]PipelineStats `json:"pipelines"`
}

// LogstashStats represents the logstash_stats sub-document indexed into .monitoring-logstash-*
type LogstashStats struct {
commonStats
Process process `json:"process"`
OS os `json:"os"`
Pipelines []PipelineStats `json:"pipelines"`
Logstash nodeInfo `json:"logstash"`
Timestamp common.Time `json:"timestamp"`
}

// PipelineStats represents the stats of a Logstash pipeline
type PipelineStats struct {
ID string `json:"id"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
Events map[string]interface{} `json:"events"`
Reloads map[string]interface{} `json:"reloads"`
Queue map[string]interface{} `json:"queue"`
Vertices []map[string]interface{} `json:"vertices"`
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var nodeStats NodeStats
err := json.Unmarshal(content, &nodeStats)
if err != nil {
return errors.Wrap(err, "could not parse node stats response")
}

timestamp := common.Time(time.Now())

// Massage Logstash node basic info
nodeStats.nodeInfo.UUID = nodeStats.nodeInfo.ID
nodeStats.nodeInfo.ID = ""

proc := process{
nodeStats.Process.OpenFileDescriptors,
nodeStats.Process.MaxFileDescriptors,
cpu{
Percent: nodeStats.Process.CPU.Percent,
},
}

o := os{
cpu{
LoadAverage: nodeStats.Process.CPU.LoadAverage,
NumCPUs: nodeStats.Process.CPU.NumCPUs,
},
}

var pipelines []PipelineStats
for pipelineID, pipeline := range nodeStats.Pipelines {
pipeline.ID = pipelineID
pipelines = append(pipelines, pipeline)
}

pipelines = getUserDefinedPipelines(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines)

for clusterUUID, clusterPipelines := range clusterToPipelinesMap {
logstashStats := LogstashStats{
nodeStats.commonStats,
proc,
o,
clusterPipelines,
nodeStats.nodeInfo,
timestamp,
}

event := mb.Event{}
event.RootFields = common.MapStr{
"timestamp": timestamp,
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "logstash_stats",
"logstash_stats": logstashStats,
}

if clusterUUID != "" {
event.RootFields["cluster_uuid"] = clusterUUID
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Logstash)
r.Event(event)
}

return nil
}

func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats {
var clusterToPipelinesMap map[string][]PipelineStats
clusterToPipelinesMap = make(map[string][]PipelineStats)

for _, pipeline := range pipelines {
var clusterUUIDs []string
for _, vertex := range pipeline.Vertices {
c, ok := vertex["cluster_uuid"]
if !ok {
continue
}

clusterUUID, ok := c.(string)
if !ok {
continue
}

clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

for _, clusterUUID := range clusterUUIDs {
clusterPipelines := clusterToPipelinesMap[clusterUUID]
if clusterPipelines == nil {
clusterToPipelinesMap[clusterUUID] = []PipelineStats{}
}

clusterToPipelinesMap[clusterUUID] = append(clusterPipelines, pipeline)
}
}

return clusterToPipelinesMap
}

func getUserDefinedPipelines(pipelines []PipelineStats) []PipelineStats {
userDefinedPipelines := []PipelineStats{}
for _, pipeline := range pipelines {
if pipeline.ID[0] != '.' {
userDefinedPipelines = append(userDefinedPipelines, pipeline)
}
}
return userDefinedPipelines
}
30 changes: 26 additions & 4 deletions metricbeat/module/logstash/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,22 @@ func init() {
)
}

const (
nodeStatsPath = "_node/stats"
)

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
DefaultPath: "_node/stats",
DefaultPath: nodeStatsPath,
}.Build()
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
*logstash.MetricSet
http *helper.HTTP
*helper.HTTP
}

// New create a new instance of the MetricSet
Expand All @@ -60,6 +64,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

if ms.XPack {
http.SetURI(http.GetURI() + "?vertices=true")
}

return &MetricSet{
ms,
http,
Expand All @@ -70,10 +79,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.http.FetchContent()
content, err := m.HTTP.FetchContent()
if err != nil {
if m.XPack {
m.Logger().Error(err)
return nil
}
return err
}

return eventMapping(r, content)
if !m.XPack {
return eventMapping(r, content)
}

err = eventMappingXPack(r, m, content)
if err != nil {
m.Logger().Error(err)
}

return nil
}