From fb8f914221d76f99046c855b7fb342dac1195713 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 28 Mar 2019 09:33:04 -0700 Subject: [PATCH 01/12] WIP: Adding GetPipelinesStats function --- metricbeat/module/logstash/logstash.go | 37 ++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index f33d1396069..ce72275a35d 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -48,6 +48,18 @@ type PipelineState struct { ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } +// PipelineStats represents the stats of a Logstash pipeline +type PipelineStats struct { + ID string `json:"id"` + Hash string `json:"hash"` + EphemeralID string `json:"ephemeral_id"` + Events map[string]interface{} `json:"events"` + Reloads map[string]interface{} `json:"reloads"` + Queue map[string]interface{} `json:"queue"` + Vertices []map[string]interface{} `json:"vertices"` + ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 +} + // NewMetricSet creates a metricset that can be used to build other metricsets // within the Logstash module. func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { @@ -91,6 +103,31 @@ func GetPipelines(http *helper.HTTP, resetURI string) ([]PipelineState, error) { return pipelines, nil } +// GetPipelinesStats returns the list of pipelines (and their stats) running on a Logstash node +func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, error) { + content, err := fetchPath(http, resetURI, "_node/stats/pipelines", "") + if err != nil { + return nil, errors.Wrap(err, "could not fetch node pipeline stats") + } + + pipelinesResponse := struct { + Pipelines map[string]PipelineStats `json:"pipelines"` + }{} + + err = json.Unmarshal(content, &pipelinesResponse) + if err != nil { + return nil, errors.Wrap(err, "could not parse node pipeline stats response") + } + + var pipelines []PipelineStats + for pipelineID, pipeline := range pipelinesResponse.Pipelines { + pipeline.ID = pipelineID + pipelines = append(pipelines, pipeline) + } + + return pipelines, nil +} + func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) { defer http.SetURI(resetURI) From f028a93752407e6333d24a82ad8c8778ca1746df Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 28 Mar 2019 12:09:11 -0700 Subject: [PATCH 02/12] WIP: initial code for x-pack path --- .../module/logstash/node_stats/data_xpack.go | 141 ++++++++++++++++++ .../module/logstash/node_stats/node_stats.go | 11 +- 2 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 metricbeat/module/logstash/node_stats/data_xpack.go diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go new file mode 100644 index 00000000000..a66b4a15096 --- /dev/null +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package node_stats + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" +) + +type commonStats struct { + Events map[string]interface{} `json:"events"` + JVM map[string]interface{} `json:"jvm"` + OS map[string]interface{} `json:"os"` + Process map[string]interface{} `json:"process"` + Reloads map[string]interface{} `json:"reloads"` +} + +// NodeStats represents the stats of a Logstash node +type NodeStats struct { + commonStats + Pipelines map[string]PipelineStats `json:"pipelines"` +} + +// LogstashStats represents the logstash_stats sub-document indexed into .monitoring-logstash-* +type LogstashStats struct { + commonStats + Pipelines []PipelineStats `json:"pipelines` + Logstash map[string]interface{} `json:"logstash"` + Queue map[string]interface{} `json:"queue"` + Timestamp common.Time `json:"timestamp"` +} + +// PipelineStats represents the stats of a Logstash pipeline +type PipelineStats struct { + ID string `json:"id"` + Hash string `json:"hash"` + EphemeralID string `json:"ephemeral_id"` + Events map[string]interface{} `json:"events"` + Reloads map[string]interface{} `json:"reloads"` + Queue map[string]interface{} `json:"queue"` + Vertices []map[string]interface{} `json:"vertices"` + ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 +} + +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { + var nodeStats NodeStats + err := json.Unmarshal(content, &nodeStats) + if err != nil { + return errors.Wrap(err, "could not parse node stats response") + } + + var pipelines []PipelineStats + for pipelineID, pipeline := range nodeStats.Pipelines { + pipeline.ID = pipelineID + pipelines = append(pipelines, pipeline) + } + + pipelines = getUserDefinedPipelines(pipelines) + clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines) + + for clusterUUID, clusterPipelines := range clusterToPipelinesMap { + timestamp := common.Time(time.Now()) + logstash := map[string]interface{}{} // TODO + queue := map[string]interface{}{} // TODO + + logstashStats := LogstashStats{nodeStats.commonStats, clusterPipelines, logstash, queue, timestamp} + + // TODO: massage logstashStats.Process + // TODO: massage logstashStats.OS + + event := mb.Event{} + event.RootFields = common.MapStr{ + "timestamp": timestamp, + "interval_ms": m.Module().Config().Period / time.Millisecond, + "type": "logstash_stats", + "logstash_stats": logstashStats, + } + + if clusterUUID != "" { + event.RootFields["cluster_uuid"] = clusterUUID + } + + event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Logstash) + r.Event(event) + } + + return nil +} + +func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats { + var clusterToPipelinesMap map[string][]PipelineStats + + for _, pipeline := range pipelines { + clusterUUIDs := pipeline.ClusterIDs + if clusterUUIDs == nil { + clusterUUIDs = []string{""} + } + + for _, clusterUUID := range clusterUUIDs { + clusterPipelines := clusterToPipelinesMap[clusterUUID] + if clusterPipelines == nil { + clusterPipelines = []PipelineStats{} + } + + clusterPipelines = append(clusterPipelines, pipeline) + } + } + + return clusterToPipelinesMap +} + +func getUserDefinedPipelines(pipelines []PipelineStats) []PipelineStats { + userDefinedPipelines := []PipelineStats{} + for _, pipeline := range pipelines { + if pipeline.ID[0] != '.' { + userDefinedPipelines = append(userDefinedPipelines, pipeline) + } + } + return userDefinedPipelines +} diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 5ac114f9a9e..8bd2f6d386a 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -75,5 +75,14 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return err } - return eventMapping(r, content) + if !m.MetricSet.XPack { + return eventMapping(r, content) + } + + err = eventMappingXPack(r, m, content) + if err != nil { + m.Logger().Error(err) + } + + return nil } From c7cbc8417f2dc823d466fcf4099aeef87eb68135 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 28 Mar 2019 12:36:15 -0700 Subject: [PATCH 03/12] Implementing a couple of TODOs --- .../module/logstash/node_stats/data_xpack.go | 49 ++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index a66b4a15096..4970519d3ce 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -31,21 +31,38 @@ import ( type commonStats struct { Events map[string]interface{} `json:"events"` JVM map[string]interface{} `json:"jvm"` - OS map[string]interface{} `json:"os"` - Process map[string]interface{} `json:"process"` Reloads map[string]interface{} `json:"reloads"` } +type cpu struct { + Percent int `json:"percent,omitempty"` + LoadAverage map[string]interface{} `json:"load_average,omitempty"` + NumCPUs int `json:"num_cpus,omitempty"` +} + +type process struct { + OpenFileDescriptors int `json:"open_file_descriptors"` + MaxFileDescriptors int `json:"max_file_descriptors"` + CPU cpu `json:"cpu"` +} + +type os struct { + CPU cpu `json:"cpu"` +} + // NodeStats represents the stats of a Logstash node type NodeStats struct { commonStats + Process process `json:"process"` Pipelines map[string]PipelineStats `json:"pipelines"` } // LogstashStats represents the logstash_stats sub-document indexed into .monitoring-logstash-* type LogstashStats struct { commonStats - Pipelines []PipelineStats `json:"pipelines` + Process process `json:"process"` + OS os `json:"os"` + Pipelines []PipelineStats `json:"pipelines"` Logstash map[string]interface{} `json:"logstash"` Queue map[string]interface{} `json:"queue"` Timestamp common.Time `json:"timestamp"` @@ -81,13 +98,31 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { for clusterUUID, clusterPipelines := range clusterToPipelinesMap { timestamp := common.Time(time.Now()) + proc := process{ + nodeStats.Process.OpenFileDescriptors, + nodeStats.Process.MaxFileDescriptors, + cpu{ + Percent: nodeStats.Process.CPU.Percent, + }, + } + o := os{ + cpu{ + LoadAverage: nodeStats.Process.CPU.LoadAverage, + NumCPUs: nodeStats.Process.CPU.NumCPUs, + }, + } logstash := map[string]interface{}{} // TODO queue := map[string]interface{}{} // TODO - logstashStats := LogstashStats{nodeStats.commonStats, clusterPipelines, logstash, queue, timestamp} - - // TODO: massage logstashStats.Process - // TODO: massage logstashStats.OS + logstashStats := LogstashStats{ + nodeStats.commonStats, + proc, + o, + clusterPipelines, + logstash, + queue, + timestamp, + } event := mb.Event{} event.RootFields = common.MapStr{ From 228db5366ea758e884107b960cf91960016497ed Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 28 Mar 2019 13:44:06 -0700 Subject: [PATCH 04/12] Reference issues in TODOs --- metricbeat/module/logstash/node_stats/data_xpack.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 4970519d3ce..de3bea59044 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -111,8 +111,8 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { NumCPUs: nodeStats.Process.CPU.NumCPUs, }, } - logstash := map[string]interface{}{} // TODO - queue := map[string]interface{}{} // TODO + logstash := map[string]interface{}{} // TODO; see https://github.com/elastic/logstash/issues/10121 + queue := map[string]interface{}{} // TODO: see https://github.com/elastic/logstash/issues/10610 logstashStats := LogstashStats{ nodeStats.commonStats, From 7cd5e1793150aef2aa9d72e8d179b1ea10582324 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 29 Mar 2019 04:14:43 -0700 Subject: [PATCH 05/12] Getting logstash basic node info --- metricbeat/module/logstash/logstash.go | 31 ++++++++++++ .../module/logstash/node_stats/data_xpack.go | 47 ++++++++++++------- .../module/logstash/node_stats/node_stats.go | 11 ++++- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index ce72275a35d..ce1cbf385e1 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" ) @@ -60,6 +61,20 @@ type PipelineStats struct { ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } +// Info represents the basic info of a Logstash node +type Info struct { + ID string `json:"id,omitempty"` + UUID string `json:"uuid"` + EphemeralID string `json:"ephemeral_id"` + Name string `json:"name"` + Host string `json:"host"` + Version *common.Version `json:"version"` + Snapshot bool `json:"snapshot"` + Status string `json:"status"` + HTTPAddress string `json:"http_address"` + Pipeline map[string]interface{} `json:"pipeline"` // TODO: https://github.com/elastic/logstash/issues/10121#issuecomment-477960900 +} + // NewMetricSet creates a metricset that can be used to build other metricsets // within the Logstash module. func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { @@ -128,6 +143,22 @@ func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, err return pipelines, nil } +// GetInfo returns the basic info for a Logstash node +func GetInfo(http *helper.HTTP, resetURI string) (*Info, error) { + content, err := fetchPath(http, resetURI, "/", "") + if err != nil { + return nil, errors.Wrap(err, "could not fetch node basic info") + } + + info := &Info{} + err = json.Unmarshal(content, info) + if err != nil { + return nil, errors.Wrap(err, "could not parse node basic info response") + } + + return info, nil +} + func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) { defer http.SetURI(resetURI) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index de3bea59044..5d4c615c82c 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -21,6 +21,8 @@ import ( "encoding/json" "time" + "github.com/elastic/beats/metricbeat/module/logstash" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -80,13 +82,39 @@ type PipelineStats struct { ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } -func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte, info *logstash.Info) error { var nodeStats NodeStats err := json.Unmarshal(content, &nodeStats) if err != nil { return errors.Wrap(err, "could not parse node stats response") } + timestamp := common.Time(time.Now()) + + // Massage Logstash node basic info + info.UUID = info.ID + info.ID = "" + logstash := map[string]interface{}{ + "logstash": info, + } + + proc := process{ + nodeStats.Process.OpenFileDescriptors, + nodeStats.Process.MaxFileDescriptors, + cpu{ + Percent: nodeStats.Process.CPU.Percent, + }, + } + + o := os{ + cpu{ + LoadAverage: nodeStats.Process.CPU.LoadAverage, + NumCPUs: nodeStats.Process.CPU.NumCPUs, + }, + } + + queue := map[string]interface{}{} // TODO: see https://github.com/elastic/logstash/issues/10610 + var pipelines []PipelineStats for pipelineID, pipeline := range nodeStats.Pipelines { pipeline.ID = pipelineID @@ -97,23 +125,6 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines) for clusterUUID, clusterPipelines := range clusterToPipelinesMap { - timestamp := common.Time(time.Now()) - proc := process{ - nodeStats.Process.OpenFileDescriptors, - nodeStats.Process.MaxFileDescriptors, - cpu{ - Percent: nodeStats.Process.CPU.Percent, - }, - } - o := os{ - cpu{ - LoadAverage: nodeStats.Process.CPU.LoadAverage, - NumCPUs: nodeStats.Process.CPU.NumCPUs, - }, - } - logstash := map[string]interface{}{} // TODO; see https://github.com/elastic/logstash/issues/10121 - queue := map[string]interface{}{} // TODO: see https://github.com/elastic/logstash/issues/10610 - logstashStats := LogstashStats{ nodeStats.commonStats, proc, diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 8bd2f6d386a..4e26548faa1 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -35,11 +35,13 @@ func init() { ) } +const nodeStatsPath = "_node/stats" + var ( hostParser = parse.URLHostParserBuilder{ DefaultScheme: "http", PathConfigKey: "path", - DefaultPath: "_node/stats", + DefaultPath: nodeStatsPath, }.Build() ) @@ -79,7 +81,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return eventMapping(r, content) } - err = eventMappingXPack(r, m, content) + info, err := logstash.GetInfo(m.http, m.HostData().SanitizedURI+nodeStatsPath) + if err != nil { + m.Logger().Error(err) + } + + err = eventMappingXPack(r, m, content, info) if err != nil { m.Logger().Error(err) } From af4300e05bbee48f02ea02203ac795501aa0f856 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 29 Mar 2019 04:18:56 -0700 Subject: [PATCH 06/12] Get basic node info from GET _node/stats call --- metricbeat/module/logstash/logstash.go | 31 ------------------- .../module/logstash/node_stats/data_xpack.go | 24 ++++++++++---- .../module/logstash/node_stats/node_stats.go | 11 +++---- 3 files changed, 22 insertions(+), 44 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index ce1cbf385e1..ce72275a35d 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -23,7 +23,6 @@ import ( "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" ) @@ -61,20 +60,6 @@ type PipelineStats struct { ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } -// Info represents the basic info of a Logstash node -type Info struct { - ID string `json:"id,omitempty"` - UUID string `json:"uuid"` - EphemeralID string `json:"ephemeral_id"` - Name string `json:"name"` - Host string `json:"host"` - Version *common.Version `json:"version"` - Snapshot bool `json:"snapshot"` - Status string `json:"status"` - HTTPAddress string `json:"http_address"` - Pipeline map[string]interface{} `json:"pipeline"` // TODO: https://github.com/elastic/logstash/issues/10121#issuecomment-477960900 -} - // NewMetricSet creates a metricset that can be used to build other metricsets // within the Logstash module. func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { @@ -143,22 +128,6 @@ func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, err return pipelines, nil } -// GetInfo returns the basic info for a Logstash node -func GetInfo(http *helper.HTTP, resetURI string) (*Info, error) { - content, err := fetchPath(http, resetURI, "/", "") - if err != nil { - return nil, errors.Wrap(err, "could not fetch node basic info") - } - - info := &Info{} - err = json.Unmarshal(content, info) - if err != nil { - return nil, errors.Wrap(err, "could not parse node basic info response") - } - - return info, nil -} - func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) { defer http.SetURI(resetURI) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 5d4c615c82c..1888dee18c3 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -21,8 +21,6 @@ import ( "encoding/json" "time" - "github.com/elastic/beats/metricbeat/module/logstash" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -52,8 +50,22 @@ type os struct { CPU cpu `json:"cpu"` } +type nodeInfo struct { + ID string `json:"id,omitempty"` + UUID string `json:"uuid"` + EphemeralID string `json:"ephemeral_id"` + Name string `json:"name"` + Host string `json:"host"` + Version *common.Version `json:"version"` + Snapshot bool `json:"snapshot"` + Status string `json:"status"` + HTTPAddress string `json:"http_address"` + Pipeline map[string]interface{} `json:"pipeline"` // TODO: https://github.com/elastic/logstash/issues/10121#issuecomment-477960900 +} + // NodeStats represents the stats of a Logstash node type NodeStats struct { + nodeInfo commonStats Process process `json:"process"` Pipelines map[string]PipelineStats `json:"pipelines"` @@ -82,7 +94,7 @@ type PipelineStats struct { ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } -func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte, info *logstash.Info) error { +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { var nodeStats NodeStats err := json.Unmarshal(content, &nodeStats) if err != nil { @@ -92,10 +104,10 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte, info *logs timestamp := common.Time(time.Now()) // Massage Logstash node basic info - info.UUID = info.ID - info.ID = "" + nodeStats.nodeInfo.UUID = nodeStats.nodeInfo.ID + nodeStats.nodeInfo.ID = "" logstash := map[string]interface{}{ - "logstash": info, + "logstash": nodeStats.nodeInfo, } proc := process{ diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 4e26548faa1..25b0beb86e0 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -35,7 +35,9 @@ func init() { ) } -const nodeStatsPath = "_node/stats" +const ( + nodeStatsPath = "/_node/stats" +) var ( hostParser = parse.URLHostParserBuilder{ @@ -81,12 +83,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return eventMapping(r, content) } - info, err := logstash.GetInfo(m.http, m.HostData().SanitizedURI+nodeStatsPath) - if err != nil { - m.Logger().Error(err) - } - - err = eventMappingXPack(r, m, content, info) + err = eventMappingXPack(r, m, content) if err != nil { m.Logger().Error(err) } From 4de94b6e9696174b34bba4ad15c04ac0fa5166a3 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 1 May 2019 05:12:50 -0700 Subject: [PATCH 07/12] Inlining single-use const --- metricbeat/module/logstash/node_stats/node_stats.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 25b0beb86e0..8bd2f6d386a 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -35,15 +35,11 @@ func init() { ) } -const ( - nodeStatsPath = "/_node/stats" -) - var ( hostParser = parse.URLHostParserBuilder{ DefaultScheme: "http", PathConfigKey: "path", - DefaultPath: nodeStatsPath, + DefaultPath: "_node/stats", }.Build() ) From 0f65bfdf14fcd4bc2606f6af61353fe69ca65bc2 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 10 Jun 2019 12:03:42 -0700 Subject: [PATCH 08/12] Updating code per LS API changes --- metricbeat/module/logstash/logstash.go | 2 +- .../module/logstash/node_stats/data_xpack.go | 22 ++++++++++++++----- .../module/logstash/node_stats/node_stats.go | 21 ++++++++++++++---- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index ce72275a35d..bb90e66b799 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -105,7 +105,7 @@ func GetPipelines(http *helper.HTTP, resetURI string) ([]PipelineState, error) { // GetPipelinesStats returns the list of pipelines (and their stats) running on a Logstash node func GetPipelinesStats(http *helper.HTTP, resetURI string) ([]PipelineStats, error) { - content, err := fetchPath(http, resetURI, "_node/stats/pipelines", "") + content, err := fetchPath(http, resetURI, "_node/stats", "vertices=true") if err != nil { return nil, errors.Wrap(err, "could not fetch node pipeline stats") } diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 1888dee18c3..01ca390c437 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -91,7 +91,6 @@ type PipelineStats struct { Reloads map[string]interface{} `json:"reloads"` Queue map[string]interface{} `json:"queue"` Vertices []map[string]interface{} `json:"vertices"` - ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 } func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { @@ -168,20 +167,31 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats { var clusterToPipelinesMap map[string][]PipelineStats + clusterToPipelinesMap = make(map[string][]PipelineStats) for _, pipeline := range pipelines { - clusterUUIDs := pipeline.ClusterIDs - if clusterUUIDs == nil { - clusterUUIDs = []string{""} + var clusterUUIDs []string + for _, vertex := range pipeline.Vertices { + c, ok := vertex["cluster_uuid"] + if !ok { + continue + } + + clusterUUID, ok := c.(string) + if !ok { + continue + } + + clusterUUIDs = append(clusterUUIDs, clusterUUID) } for _, clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { - clusterPipelines = []PipelineStats{} + clusterToPipelinesMap[clusterUUID] = []PipelineStats{} } - clusterPipelines = append(clusterPipelines, pipeline) + clusterToPipelinesMap[clusterUUID] = append(clusterPipelines, pipeline) } } diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 8bd2f6d386a..ea959ff5ffb 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -35,18 +35,22 @@ func init() { ) } +const ( + nodeStatsPath = "_node/stats" +) + var ( hostParser = parse.URLHostParserBuilder{ DefaultScheme: "http", PathConfigKey: "path", - DefaultPath: "_node/stats", + DefaultPath: nodeStatsPath, }.Build() ) // MetricSet type defines all fields of the MetricSet type MetricSet struct { *logstash.MetricSet - http *helper.HTTP + *helper.HTTP } // New create a new instance of the MetricSet @@ -60,6 +64,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + if ms.XPack { + http.SetURI(http.GetURI() + "?vertices=true") + } + return &MetricSet{ ms, http, @@ -70,12 +79,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - content, err := m.http.FetchContent() + content, err := m.HTTP.FetchContent() if err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } return err } - if !m.MetricSet.XPack { + if !m.XPack { return eventMapping(r, content) } From d04e66e4a0b1caf1693a2b42bbb969316ee736c0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 10 Jun 2019 13:38:59 -0700 Subject: [PATCH 09/12] Resolving remaining TODOs --- metricbeat/module/logstash/node_stats/data_xpack.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 01ca390c437..41a585de33c 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -32,12 +32,15 @@ type commonStats struct { Events map[string]interface{} `json:"events"` JVM map[string]interface{} `json:"jvm"` Reloads map[string]interface{} `json:"reloads"` + Queue struct { + EventsCount int `json:"events_count"` + } `json:"queue"` } type cpu struct { - Percent int `json:"percent,omitempty"` - LoadAverage map[string]interface{} `json:"load_average,omitempty"` - NumCPUs int `json:"num_cpus,omitempty"` + Percent int `json:"percent"` + LoadAverage map[string]interface{} `json:"load_average"` + NumCPUs int `json:"num_cpus"` } type process struct { @@ -78,7 +81,6 @@ type LogstashStats struct { OS os `json:"os"` Pipelines []PipelineStats `json:"pipelines"` Logstash map[string]interface{} `json:"logstash"` - Queue map[string]interface{} `json:"queue"` Timestamp common.Time `json:"timestamp"` } @@ -124,8 +126,6 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { }, } - queue := map[string]interface{}{} // TODO: see https://github.com/elastic/logstash/issues/10610 - var pipelines []PipelineStats for pipelineID, pipeline := range nodeStats.Pipelines { pipeline.ID = pipelineID @@ -142,7 +142,6 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { o, clusterPipelines, logstash, - queue, timestamp, } From 72f78df45456b1b2970e00d09f0ba786645de43e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 10 Jun 2019 13:42:51 -0700 Subject: [PATCH 10/12] Fixing version serialization --- metricbeat/module/logstash/node_stats/data_xpack.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 41a585de33c..ad9e9d72717 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -59,7 +59,7 @@ type nodeInfo struct { EphemeralID string `json:"ephemeral_id"` Name string `json:"name"` Host string `json:"host"` - Version *common.Version `json:"version"` + Version string `json:"version"` Snapshot bool `json:"snapshot"` Status string `json:"status"` HTTPAddress string `json:"http_address"` From c3e405c34516898c5d6162fb2ce0681d4e6debb4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 10 Jun 2019 17:22:07 -0700 Subject: [PATCH 11/12] Fixing logstash double nesting --- .../module/logstash/node_stats/data_xpack.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index ad9e9d72717..a95d579040b 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -77,11 +77,11 @@ type NodeStats struct { // LogstashStats represents the logstash_stats sub-document indexed into .monitoring-logstash-* type LogstashStats struct { commonStats - Process process `json:"process"` - OS os `json:"os"` - Pipelines []PipelineStats `json:"pipelines"` - Logstash map[string]interface{} `json:"logstash"` - Timestamp common.Time `json:"timestamp"` + Process process `json:"process"` + OS os `json:"os"` + Pipelines []PipelineStats `json:"pipelines"` + Logstash nodeInfo `json:"logstash"` + Timestamp common.Time `json:"timestamp"` } // PipelineStats represents the stats of a Logstash pipeline @@ -107,9 +107,6 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { // Massage Logstash node basic info nodeStats.nodeInfo.UUID = nodeStats.nodeInfo.ID nodeStats.nodeInfo.ID = "" - logstash := map[string]interface{}{ - "logstash": nodeStats.nodeInfo, - } proc := process{ nodeStats.Process.OpenFileDescriptors, @@ -141,7 +138,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { proc, o, clusterPipelines, - logstash, + nodeStats.nodeInfo, timestamp, } From 0977c1a2e417ba7c2fc7321f63e02599090492c4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 25 Jun 2019 09:23:04 -0400 Subject: [PATCH 12/12] Removing TODOs as they have been implemented now --- metricbeat/module/logstash/logstash.go | 4 ++-- metricbeat/module/logstash/node_stats/data_xpack.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index bb90e66b799..60d13927585 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -45,7 +45,7 @@ type PipelineState struct { Representation map[string]interface{} `json:"representation"` BatchSize int `json:"batch_size"` Workers int `json:"workers"` - ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 + ClusterIDs []string `json:"cluster_uuids,omitempty"` } // PipelineStats represents the stats of a Logstash pipeline @@ -57,7 +57,7 @@ type PipelineStats struct { Reloads map[string]interface{} `json:"reloads"` Queue map[string]interface{} `json:"queue"` Vertices []map[string]interface{} `json:"vertices"` - ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602 + ClusterIDs []string `json:"cluster_uuids,omitempty"` } // NewMetricSet creates a metricset that can be used to build other metricsets diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index a95d579040b..8da19338b88 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -63,7 +63,7 @@ type nodeInfo struct { Snapshot bool `json:"snapshot"` Status string `json:"status"` HTTPAddress string `json:"http_address"` - Pipeline map[string]interface{} `json:"pipeline"` // TODO: https://github.com/elastic/logstash/issues/10121#issuecomment-477960900 + Pipeline map[string]interface{} `json:"pipeline"` } // NodeStats represents the stats of a Logstash node