diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index f33d1396069..60d13927585 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -45,7 +45,19 @@ type PipelineState struct { Representation map[string]interface{} `json:"representation"` BatchSize int `json:"batch_size"` Workers int `json:"workers"` - ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 + ClusterIDs []string `json:"cluster_uuids,omitempty"` +} + +// PipelineStats represents the stats of a Logstash pipeline +type PipelineStats struct { + ID string `json:"id"` + Hash string `json:"hash"` + EphemeralID string `json:"ephemeral_id"` + Events map[string]interface{} `json:"events"` + Reloads map[string]interface{} `json:"reloads"` + Queue map[string]interface{} `json:"queue"` + Vertices []map[string]interface{} `json:"vertices"` + ClusterIDs []string `json:"cluster_uuids,omitempty"` } // NewMetricSet creates a metricset that can be used to build other metricsets @@ -91,6 +103,31 @@ func GetPipelines(http *helper.HTTP, resetURI string) ([]PipelineState, error) { return pipelines, nil } +// GetPipelinesStats returns the list of pipelines (and their stats) running on a Logstash node +func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, error) { + content, err := fetchPath(http, resetURI, "_node/stats", "vertices=true") + if err != nil { + return nil, errors.Wrap(err, "could not fetch node pipeline stats") + } + + pipelinesResponse := struct { + Pipelines map[string]PipelineStats `json:"pipelines"` + }{} + + err = json.Unmarshal(content, &pipelinesResponse) + if err != nil { + return nil, errors.Wrap(err, "could not parse node pipeline stats response") + } + + var pipelines []PipelineStats + for pipelineID, pipeline := range pipelinesResponse.Pipelines { + pipeline.ID = pipelineID + pipelines = append(pipelines, pipeline) + } + + return pipelines, nil +} + func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) { defer http.SetURI(resetURI) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go new file mode 100644 index 00000000000..8da19338b88 --- /dev/null +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -0,0 +1,205 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package node_stats + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" +) + +type commonStats struct { + Events map[string]interface{} `json:"events"` + JVM map[string]interface{} `json:"jvm"` + Reloads map[string]interface{} `json:"reloads"` + Queue struct { + EventsCount int `json:"events_count"` + } `json:"queue"` +} + +type cpu struct { + Percent int `json:"percent"` + LoadAverage map[string]interface{} `json:"load_average"` + NumCPUs int `json:"num_cpus"` +} + +type process struct { + OpenFileDescriptors int `json:"open_file_descriptors"` + MaxFileDescriptors int `json:"max_file_descriptors"` + CPU cpu `json:"cpu"` +} + +type os struct { + CPU cpu `json:"cpu"` +} + +type nodeInfo struct { + ID string `json:"id,omitempty"` + UUID string `json:"uuid"` + EphemeralID string `json:"ephemeral_id"` + Name string `json:"name"` + Host string `json:"host"` + Version string `json:"version"` + Snapshot bool `json:"snapshot"` + Status string `json:"status"` + HTTPAddress string `json:"http_address"` + Pipeline map[string]interface{} `json:"pipeline"` +} + +// NodeStats represents the stats of a Logstash node +type NodeStats struct { + nodeInfo + commonStats + Process process `json:"process"` + Pipelines map[string]PipelineStats `json:"pipelines"` +} + +// LogstashStats represents the logstash_stats sub-document indexed into .monitoring-logstash-* +type LogstashStats struct { + commonStats + Process process `json:"process"` + OS os `json:"os"` + Pipelines []PipelineStats `json:"pipelines"` + Logstash nodeInfo `json:"logstash"` + Timestamp common.Time `json:"timestamp"` +} + +// PipelineStats represents the stats of a Logstash pipeline +type PipelineStats struct { + ID string `json:"id"` + Hash string `json:"hash"` + EphemeralID string `json:"ephemeral_id"` + Events map[string]interface{} `json:"events"` + Reloads map[string]interface{} `json:"reloads"` + Queue map[string]interface{} `json:"queue"` + Vertices []map[string]interface{} `json:"vertices"` +} + +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { + var nodeStats NodeStats + err := json.Unmarshal(content, &nodeStats) + if err != nil { + return errors.Wrap(err, "could not parse node stats response") + } + + timestamp := common.Time(time.Now()) + + // Massage Logstash node basic info + nodeStats.nodeInfo.UUID = nodeStats.nodeInfo.ID + nodeStats.nodeInfo.ID = "" + + proc := process{ + nodeStats.Process.OpenFileDescriptors, + nodeStats.Process.MaxFileDescriptors, + cpu{ + Percent: nodeStats.Process.CPU.Percent, + }, + } + + o := os{ + cpu{ + LoadAverage: nodeStats.Process.CPU.LoadAverage, + NumCPUs: nodeStats.Process.CPU.NumCPUs, + }, + } + + var pipelines []PipelineStats + for pipelineID, pipeline := range nodeStats.Pipelines { + pipeline.ID = pipelineID + pipelines = append(pipelines, pipeline) + } + + pipelines = getUserDefinedPipelines(pipelines) + clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines) + + for clusterUUID, clusterPipelines := range clusterToPipelinesMap { + logstashStats := LogstashStats{ + nodeStats.commonStats, + proc, + o, + clusterPipelines, + nodeStats.nodeInfo, + timestamp, + } + + event := mb.Event{} + event.RootFields = common.MapStr{ + "timestamp": timestamp, + "interval_ms": m.Module().Config().Period / time.Millisecond, + "type": "logstash_stats", + "logstash_stats": logstashStats, + } + + if clusterUUID != "" { + event.RootFields["cluster_uuid"] = clusterUUID + } + + event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Logstash) + r.Event(event) + } + + return nil +} + +func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats { + var clusterToPipelinesMap map[string][]PipelineStats + clusterToPipelinesMap = make(map[string][]PipelineStats) + + for _, pipeline := range pipelines { + var clusterUUIDs []string + for _, vertex := range pipeline.Vertices { + c, ok := vertex["cluster_uuid"] + if !ok { + continue + } + + clusterUUID, ok := c.(string) + if !ok { + continue + } + + clusterUUIDs = append(clusterUUIDs, clusterUUID) + } + + for _, clusterUUID := range clusterUUIDs { + clusterPipelines := clusterToPipelinesMap[clusterUUID] + if clusterPipelines == nil { + clusterToPipelinesMap[clusterUUID] = []PipelineStats{} + } + + clusterToPipelinesMap[clusterUUID] = append(clusterPipelines, pipeline) + } + } + + return clusterToPipelinesMap +} + +func getUserDefinedPipelines(pipelines []PipelineStats) []PipelineStats { + userDefinedPipelines := []PipelineStats{} + for _, pipeline := range pipelines { + if pipeline.ID[0] != '.' { + userDefinedPipelines = append(userDefinedPipelines, pipeline) + } + } + return userDefinedPipelines +} diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 5ac114f9a9e..ea959ff5ffb 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -35,18 +35,22 @@ func init() { ) } +const ( + nodeStatsPath = "_node/stats" +) + var ( hostParser = parse.URLHostParserBuilder{ DefaultScheme: "http", PathConfigKey: "path", - DefaultPath: "_node/stats", + DefaultPath: nodeStatsPath, }.Build() ) // MetricSet type defines all fields of the MetricSet type MetricSet struct { *logstash.MetricSet - http *helper.HTTP + *helper.HTTP } // New create a new instance of the MetricSet @@ -60,6 +64,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + if ms.XPack { + http.SetURI(http.GetURI() + "?vertices=true") + } + return &MetricSet{ ms, http, @@ -70,10 +79,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - content, err := m.http.FetchContent() + content, err := m.HTTP.FetchContent() if err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } return err } - return eventMapping(r, content) + if !m.XPack { + return eventMapping(r, content) + } + + err = eventMappingXPack(r, m, content) + if err != nil { + m.Logger().Error(err) + } + + return nil }