diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 904ea7f199a..49721794728 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -244,6 +244,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add overview dashboard for googlecloud compute metricset. {issue}16534[16534] {pull}16819[16819] - Add Prometheus remote write endpoint {pull}16609[16609] - Release STAN module as GA. {pull}16980[16980] +- Add query metricset for prometheus module. {pull}17104[17104] - Release ActiveMQ module as GA. {issue}17047[17047] {pull}17049[17049] - Release Zookeeper/connection module as GA. {issue}14281[14281] {pull}17043[17043] - Add support for CouchDB v2 {issue}16352[16352] {pull}16455[16455] diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index f888d580077..7099b3213c9 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -31605,6 +31605,22 @@ type: object -- +*`prometheus.query.*`*:: ++ +-- +Prometheus value resulted from PromQL + + +type: object + +-- + +[float] +=== query + +query metricset + + [float] === remote_write diff --git a/metricbeat/docs/modules/prometheus.asciidoc b/metricbeat/docs/modules/prometheus.asciidoc index 13065dff7c7..a6fc4b82bed 100644 --- a/metricbeat/docs/modules/prometheus.asciidoc +++ b/metricbeat/docs/modules/prometheus.asciidoc @@ -45,6 +45,7 @@ metricbeat.modules: #ssl.certificate_authorities: # - /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt + # Metrics sent by a Prometheus server using remote_write option - module: prometheus metricsets: ["remote_write"] @@ -54,6 +55,32 @@ metricbeat.modules: # Secure settings for the server using TLS/SSL: #ssl.certificate: "/etc/pki/server/cert.pem" #ssl.key: "/etc/pki/server/cert.key" + +# Metrics that will be collected using a PromQL +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + #- name: "instant_vector" + # path: "/api/v1/query" + # params: + # query: "sum(rate(prometheus_http_requests_total[1m]))" + #- name: "range_vector" + # path: "/api/v1/query_range" + # params: + # query: "up" + # start: "2019-12-20T00:00:00.000Z" + # end: "2019-12-21T00:00:00.000Z" + # step: 1h + #- name: "scalar" + # path: "/api/v1/query" + # params: + # query: "100" + #- name: "string" + # path: "/api/v1/query" + # params: + # query: "some_value" ---- This module supports TLS connections when using `ssl` config field, as described in <>. @@ -66,9 +93,13 @@ The following metricsets are available: * <> +* <> + * <> include::prometheus/collector.asciidoc[] +include::prometheus/query.asciidoc[] + include::prometheus/remote_write.asciidoc[] diff --git a/metricbeat/docs/modules/prometheus/query.asciidoc b/metricbeat/docs/modules/prometheus/query.asciidoc new file mode 100644 index 00000000000..72c0fde9278 --- /dev/null +++ b/metricbeat/docs/modules/prometheus/query.asciidoc @@ -0,0 +1,23 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-prometheus-query]] +=== Prometheus query metricset + +beta[] + +include::../../../module/prometheus/query/_meta/docs.asciidoc[] + + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/prometheus/query/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index b4c39a46fa3..4f680fb7c0f 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -205,7 +205,8 @@ This file is generated! See scripts/mage/docs_collector.go |<> |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | -.2+| .2+| |<> +.3+| .3+| |<> +|<> beta[] |<> beta[] |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .4+| .4+| |<> diff --git a/metricbeat/include/list_common.go b/metricbeat/include/list_common.go index 62ce2cd478b..f15d6a4be16 100644 --- a/metricbeat/include/list_common.go +++ b/metricbeat/include/list_common.go @@ -125,6 +125,7 @@ import ( _ "github.com/elastic/beats/v7/metricbeat/module/postgresql/statement" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus/collector" + _ "github.com/elastic/beats/v7/metricbeat/module/prometheus/query" _ "github.com/elastic/beats/v7/metricbeat/module/prometheus/remote_write" _ "github.com/elastic/beats/v7/metricbeat/module/rabbitmq" _ "github.com/elastic/beats/v7/metricbeat/module/rabbitmq/connection" diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index dc008249b70..4db72883ad5 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -717,6 +717,7 @@ metricbeat.modules: #ssl.certificate_authorities: # - /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt + # Metrics sent by a Prometheus server using remote_write option - module: prometheus metricsets: ["remote_write"] @@ -727,6 +728,32 @@ metricbeat.modules: #ssl.certificate: "/etc/pki/server/cert.pem" #ssl.key: "/etc/pki/server/cert.key" +# Metrics that will be collected using a PromQL +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + #- name: "instant_vector" + # path: "/api/v1/query" + # params: + # query: "sum(rate(prometheus_http_requests_total[1m]))" + #- name: "range_vector" + # path: "/api/v1/query_range" + # params: + # query: "up" + # start: "2019-12-20T00:00:00.000Z" + # end: "2019-12-21T00:00:00.000Z" + # step: 1h + #- name: "scalar" + # path: "/api/v1/query" + # params: + # query: "100" + #- name: "string" + # path: "/api/v1/query" + # params: + # query: "some_value" + #------------------------------- RabbitMQ Module ------------------------------- - module: rabbitmq metricsets: ["node", "queue", "connection"] diff --git a/metricbeat/module/prometheus/_meta/config.yml b/metricbeat/module/prometheus/_meta/config.yml index 439cfac9db5..aa855dd2dec 100644 --- a/metricbeat/module/prometheus/_meta/config.yml +++ b/metricbeat/module/prometheus/_meta/config.yml @@ -15,6 +15,7 @@ #ssl.certificate_authorities: # - /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt + # Metrics sent by a Prometheus server using remote_write option - module: prometheus metricsets: ["remote_write"] @@ -24,3 +25,29 @@ # Secure settings for the server using TLS/SSL: #ssl.certificate: "/etc/pki/server/cert.pem" #ssl.key: "/etc/pki/server/cert.key" + +# Metrics that will be collected using a PromQL +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + #- name: "instant_vector" + # path: "/api/v1/query" + # params: + # query: "sum(rate(prometheus_http_requests_total[1m]))" + #- name: "range_vector" + # path: "/api/v1/query_range" + # params: + # query: "up" + # start: "2019-12-20T00:00:00.000Z" + # end: "2019-12-21T00:00:00.000Z" + # step: 1h + #- name: "scalar" + # path: "/api/v1/query" + # params: + # query: "100" + #- name: "string" + # path: "/api/v1/query" + # params: + # query: "some_value" diff --git a/metricbeat/module/prometheus/_meta/fields.yml b/metricbeat/module/prometheus/_meta/fields.yml index ceea6b49dd2..eaeb1070386 100644 --- a/metricbeat/module/prometheus/_meta/fields.yml +++ b/metricbeat/module/prometheus/_meta/fields.yml @@ -21,3 +21,9 @@ object_type_mapping_type: "*" description: > Prometheus metric + - name: query.* + type: object + object_type: double + object_type_mapping_type: "*" + description: > + Prometheus value resulted from PromQL diff --git a/metricbeat/module/prometheus/_meta/prometheus.yml b/metricbeat/module/prometheus/_meta/prometheus.yml index fece2586616..06707841f8d 100644 --- a/metricbeat/module/prometheus/_meta/prometheus.yml +++ b/metricbeat/module/prometheus/_meta/prometheus.yml @@ -1,6 +1,6 @@ # my global config global: - scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute. + scrape_interval: 1s # Set the scrape interval to every 1 second. Default is every 1 minute. evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). diff --git a/metricbeat/module/prometheus/fields.go b/metricbeat/module/prometheus/fields.go index d6422afffe9..d8b0eb230f4 100644 --- a/metricbeat/module/prometheus/fields.go +++ b/metricbeat/module/prometheus/fields.go @@ -32,5 +32,5 @@ func init() { // AssetPrometheus returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/prometheus. func AssetPrometheus() string { - return "eJyckk2O2zAMhfc+xYO6C5IcwIteoQW6LApDtmlbjf5AMg1y+8KxnVGSAQYzXL4nkh8pHnCia43MKZBOdJYKUKeeapifd9FUQE/SscvqUqzxvQKAX2pVIB3bTD0GTgEWb1mg2Ofkoh4rQKbE2nQpDm6sMVgvVAFMnqxQjdHOb0jVxVFq/DYi3uxhJtVs/lTA4Mj3Ut/6HhBtoCfqOfSa51qcznlVyrQ5vuEH98RwAhdyYrVRMRHTHt625AUX5z2C1W7C4Fh0D50ITKKwTOjTufV0r7ehLMnH3d3YYFL7lzot5EVoFvdE10vivrDfWfMWxWYDKbtu7foCs7ifp3ma7cFtgs3ZxXF9anbmi9AF7ePvP87AFJJSc2Gn9DJG+cMf9F/q4FZnW8xyqgWZEP8jrsqslawltYW+ndP/AAAA//89EfHZ" + return "eJzMkkGO2zAMRfc+xYe7G2TmAF70BAXaosuiCBT7O1ZHllSSniC3LxzHGU0yQJF2Uy75RfLxi4945rFBljTSBk5aAeYtsEH95ZKsK6CjtuKz+RQbfKwA4Js5U2grLrNDL2mEw2sVGLucfLSnCtAhiW3bFHu/b9C7oKwAYaBTNti7+Q3NfNxrg++1aqg3qAezXP+ogN4zdNqc5j4iupFX1HPYMc+9JE35nCnL5viAz9JR4BV+zEnMRcNA4QbB7RgUBx8CRmftgN6L2gY2EEI1OCG6NO0CL/1WlKX46eEirDBp95OtFeklsV3UZx4PSbpCfsfmNQpnR5r49jz1BmZR76e52u2Nuh1dzj7uz0/rh/ovoW9of02U4//G+uLCdPr1Kdh627P89VPB//Z639nqZqfyNP8Ac2qwfiVLHy5jdzRX5K9vfUURjsm4PYg3/gvR0genPivYqzNn45TyQrmD9ncAAAD//1baTA8=" } diff --git a/metricbeat/module/prometheus/query/_meta/data.json b/metricbeat/module/prometheus/query/_meta/data.json new file mode 100644 index 00000000000..6f03bd4ce7e --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/data.json @@ -0,0 +1,27 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "event": { + "dataset": "prometheus.query", + "duration": 115000, + "module": "prometheus" + }, + "metricset": { + "name": "query", + "period": 10000 + }, + "prometheus": { + "labels": { + "__name__": "go_threads", + "instance": "localhost:9090", + "job": "prometheus" + }, + "query": { + "dataType": "vector", + "go_threads": 26 + } + }, + "service": { + "address": "localhost:32768", + "type": "prometheus" + } +} \ No newline at end of file diff --git a/metricbeat/module/prometheus/query/_meta/docs.asciidoc b/metricbeat/module/prometheus/query/_meta/docs.asciidoc new file mode 100644 index 00000000000..60025f0d96d --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/docs.asciidoc @@ -0,0 +1,62 @@ +This is the `query` metricset to query from https://prometheus.io/docs/prometheus/latest/querying/api/#expression-queries[querying API of Promtheus]. + + +[float] +=== Configuration + +[float] +==== Instant queries + +The following configuration performs an instant query for `up` metric at a single point in time: +[source,yaml] +------------------------------------------------------------------------------------- +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + - name: 'up' + path: '/api/v1/query' + params: + query: "up" +------------------------------------------------------------------------------------- + + +More complex PromQL expressions can also be used like the following one which calculates the per-second rate of HTTP +requests as measured over the last 5 minutes. +[source,yaml] +------------------------------------------------------------------------------------- +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + - name: "rate_http_requests_total" + path: "/api/v1/query" + params: + query: "rate(prometheus_http_requests_total[5m])" +------------------------------------------------------------------------------------- + + + + +[float] +==== Range queries + + +The following example evaluates the expression `up` over a 30-second range with a query resolution of 15 seconds: +[source,yaml] +------------------------------------------------------------------------------------- +- module: prometheus + period: 10s + metricsets: ["query"] + hosts: ["node:9100"] + queries: + - name: "up_master" + path: "/api/v1/query_range" + params: + query: "up{node='master01'}" + start: "2019-12-20T23:30:30.000Z" + end: "2019-12-21T23:31:00.000Z" + step: 15s +------------------------------------------------------------------------------------- diff --git a/metricbeat/module/prometheus/query/_meta/fields.yml b/metricbeat/module/prometheus/query/_meta/fields.yml new file mode 100644 index 00000000000..acbc35db449 --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/fields.yml @@ -0,0 +1,6 @@ +- name: query + type: group + description: > + query metricset + release: beta + fields: diff --git a/metricbeat/module/prometheus/query/_meta/test/querymetrics_instant_vector.json b/metricbeat/module/prometheus/query/_meta/test/querymetrics_instant_vector.json new file mode 100644 index 00000000000..1ffdee97232 --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/test/querymetrics_instant_vector.json @@ -0,0 +1,24 @@ +{ + "status" : "success", + "data" : { + "resultType" : "vector", + "result" : [ + { + "metric" : { + "__name__" : "up", + "job" : "prometheus", + "instance" : "localhost:9090" + }, + "value": [ 1435781451.781, "1.0" ] + }, + { + "metric" : { + "__name__" : "up", + "job" : "node", + "instance" : "localhost:9100" + }, + "value" : [ 1435781451.781, "1.19" ] + } + ] + } +} diff --git a/metricbeat/module/prometheus/query/_meta/test/querymetrics_range_vector.json b/metricbeat/module/prometheus/query/_meta/test/querymetrics_range_vector.json new file mode 100644 index 00000000000..7256a92c5f7 --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/test/querymetrics_range_vector.json @@ -0,0 +1,32 @@ +{ + "status" : "success", + "data" : { + "resultType" : "matrix", + "result" : [ + { + "metric" : { + "__name__" : "up", + "job" : "prometheus", + "instance" : "localhost:9090" + }, + "values" : [ + [ 1435781430.781, "1" ], + [ 1435781445.781, "1" ], + [ 1435781460.781, "1" ] + ] + }, + { + "metric" : { + "__name__" : "up", + "job" : "node", + "instance" : "localhost:9091" + }, + "values" : [ + [ 1435781430.781, "0" ], + [ 1435781445.781, "0" ], + [ 1435781460.781, "1" ] + ] + } + ] + } +} diff --git a/metricbeat/module/prometheus/query/_meta/test/querymetrics_scalar.json b/metricbeat/module/prometheus/query/_meta/test/querymetrics_scalar.json new file mode 100644 index 00000000000..c7255fe9455 --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/test/querymetrics_scalar.json @@ -0,0 +1,10 @@ +{ + "status":"success", + "data":{ + "resultType":"scalar", + "result":[ + 1584628364.185, + "100.4" + ] + } +} diff --git a/metricbeat/module/prometheus/query/_meta/test/querymetrics_string.json b/metricbeat/module/prometheus/query/_meta/test/querymetrics_string.json new file mode 100644 index 00000000000..9dc16d2a17f --- /dev/null +++ b/metricbeat/module/prometheus/query/_meta/test/querymetrics_string.json @@ -0,0 +1,10 @@ +{ + "status":"success", + "data":{ + "resultType":"string", + "result":[ + 1584628642.569, + "apis" + ] + } +} diff --git a/metricbeat/module/prometheus/query/config.go b/metricbeat/module/prometheus/query/config.go new file mode 100644 index 00000000000..afb30f1c80d --- /dev/null +++ b/metricbeat/module/prometheus/query/config.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "errors" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// Config defines the "query" metricset's configuration +type Config struct { + Queries []QueryConfig `config:"queries"` + DefaultQuery QueryConfig `config:"default_path"` +} + +// QueryConfig is used to make an API request. +type QueryConfig struct { + Path string `config:"path"` + Params common.MapStr `config:"params"` + Name string `config:"name"` +} + +func defaultConfig() Config { + return Config{ + DefaultQuery: QueryConfig{ + Path: "/api/v1/query", + Name: "default", + }, + } +} + +// Validate for Prometheus "query" metricset config +func (p *QueryConfig) Validate() error { + if p.Name == "" { + return errors.New("`name` can not be empty in path configuration") + } + + if p.Path == "" { + return errors.New("`path` can not be empty in path configuration") + } + + return nil +} diff --git a/metricbeat/module/prometheus/query/data.go b/metricbeat/module/prometheus/query/data.go new file mode 100644 index 00000000000..b89df6b4305 --- /dev/null +++ b/metricbeat/module/prometheus/query/data.go @@ -0,0 +1,327 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/metricbeat/mb" +) + +// Response stores the very basic response information to only keep the Status and the ResultType. +type Response struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + } `json:"data"` +} + +// ArrayResponse is for "scalar", "string" type. +// example: {"status":"success","data":{"resultType":"string","result":[1584628642.569,"100"]}} +type ArrayResponse struct { + Status string `json:"status"` + Data arrayData `json:"data"` +} +type arrayData struct { + ResultType string `json:"resultType"` + Results []interface{} `json:"result"` +} + +// InstantVectorResponse is for "vector" type from Prometheus Query API Request +// instantVectorResult format: +// [ +// { +// "metric": { "": "", ... }, +// "value": [ , "" ] +// }, +// ... +//] +type InstantVectorResponse struct { + Status string `json:"status"` + Data instantVectorData `json:"data"` +} +type instantVectorData struct { + ResultType string `json:"resultType"` + Results []instantVectorResult `json:"result"` +} +type instantVectorResult struct { + Metric map[string]string `json:"metric"` + Vector []interface{} `json:"value"` +} + +// InstantVectorResponse is for "vector" type from Prometheus Query API Request +// rangeVectorResult format: +// [ +// { +// "metric": { "": "", ... }, +// "values": [ [ , "" ], ... ] +// }, +// ... +//] +type RangeVectorResponse struct { + Status string `json:"status"` + Data rangeVectorData `json:"data"` +} +type rangeVectorData struct { + ResultType string `json:"resultType"` + Results []rangeVectorResult `json:"result"` +} +type rangeVectorResult struct { + Metric map[string]string `json:"metric"` + Vectors [][]interface{} `json:"values"` +} + +func parseResponse(body []byte, pathConfig QueryConfig) ([]mb.Event, error) { + var events []mb.Event + + resultType, err := getResultType(body) + if err != nil { + return events, err + } + + switch resultType { + case "scalar", "string": + event, err := getEventFromScalarOrString(body, resultType, pathConfig.Name) + if err != nil { + return events, err + } + events = append(events, event) + case "vector": + evnts, err := getEventsFromVector(body, pathConfig.Name) + if err != nil { + return events, err + } + events = append(events, evnts...) + case "matrix": + evnts, err := getEventsFromMatrix(body, pathConfig.Name) + if err != nil { + return events, err + } + events = append(events, evnts...) + default: + msg := fmt.Sprintf("Unknown resultType '%v'", resultType) + return events, errors.New(msg) + } + return events, nil +} + +func getEventsFromMatrix(body []byte, queryName string) ([]mb.Event, error) { + events := []mb.Event{} + resultType := "matrix" + convertedMap, err := convertJSONToRangeVectorResponse(body) + if err != nil { + return events, err + } + results := convertedMap.Data.Results + for _, result := range results { + for _, vector := range result.Vectors { + if vector != nil { + timestamp, err := getTimestampFromVector(vector) + if err != nil { + return []mb.Event{}, err + } + val, err := getValueFromVector(vector) + if err != nil { + return []mb.Event{}, err + } + if math.IsNaN(val) || math.IsInf(val, 0) { + continue + } + events = append(events, mb.Event{ + Timestamp: getTimestamp(timestamp), + ModuleFields: common.MapStr{"labels": result.Metric}, + MetricSetFields: common.MapStr{ + "dataType": resultType, + queryName: val, + }, + }) + } else { + return []mb.Event{}, errors.New("Could not parse results") + } + } + } + return events, nil +} + +func getEventsFromVector(body []byte, queryName string) ([]mb.Event, error) { + events := []mb.Event{} + resultType := "vector" + convertedMap, err := convertJSONToInstantVectorResponse(body) + if err != nil { + return events, err + } + results := convertedMap.Data.Results + for _, result := range results { + if result.Vector != nil { + timestamp, err := getTimestampFromVector(result.Vector) + if err != nil { + return []mb.Event{}, err + } + val, err := getValueFromVector(result.Vector) + if err != nil { + return []mb.Event{}, err + } + if math.IsNaN(val) || math.IsInf(val, 0) { + continue + } + events = append(events, mb.Event{ + Timestamp: getTimestamp(timestamp), + ModuleFields: common.MapStr{"labels": result.Metric}, + MetricSetFields: common.MapStr{ + "dataType": resultType, + queryName: val, + }, + }) + } else { + return []mb.Event{}, errors.New("Could not parse results") + } + } + return events, nil +} + +func getEventFromScalarOrString(body []byte, resultType string, queryName string) (mb.Event, error) { + convertedArray, err := convertJSONToArrayResponse(body) + if err != nil { + return mb.Event{}, err + } + if convertedArray.Data.Results != nil { + timestamp, err := getTimestampFromVector(convertedArray.Data.Results) + if err != nil { + return mb.Event{}, err + } + if resultType == "scalar" { + val, err := getValueFromVector(convertedArray.Data.Results) + if err != nil { + return mb.Event{}, err + } + if math.IsNaN(val) || math.IsInf(val, 0) { + return mb.Event{}, nil + } + return mb.Event{ + Timestamp: getTimestamp(timestamp), + MetricSetFields: common.MapStr{ + "dataType": resultType, + queryName: val, + }, + }, nil + } else if resultType == "string" { + value, ok := convertedArray.Data.Results[1].(string) + if !ok { + msg := fmt.Sprintf("Could not parse value of result: %v", convertedArray.Data.Results) + return mb.Event{}, errors.New(msg) + } + return mb.Event{ + Timestamp: getTimestamp(timestamp), + ModuleFields: common.MapStr{"labels": common.MapStr{queryName: value}}, + MetricSetFields: common.MapStr{ + "dataType": resultType, + queryName: 1, + }, + }, nil + } + } + return mb.Event{}, errors.New("Could not parse results") +} + +func getTimestampFromVector(vector []interface{}) (float64, error) { + // Example input: [ , "" ] + if len(vector) != 2 { + return 0, errors.New("Could not parse results") + } + timestamp, ok := vector[0].(float64) + if !ok { + msg := fmt.Sprintf("Could not parse timestamp of result: %v", vector) + return 0, errors.New(msg) + } + return timestamp, nil +} + +func getValueFromVector(vector []interface{}) (float64, error) { + // Example input: [ , "" ] + if len(vector) != 2 { + return 0, errors.New("Could not parse results") + } + value, ok := vector[1].(string) + if !ok { + msg := fmt.Sprintf("Could not parse value of result: %v", vector) + return 0, errors.New(msg) + } + val, err := strconv.ParseFloat(value, 64) + if err != nil { + msg := fmt.Sprintf("Could not parse value of result: %v", vector) + return 0, errors.New(msg) + } + return val, nil +} + +func getResultType(body []byte) (string, error) { + response := Response{} + if err := json.Unmarshal(body, &response); err != nil { + return "", errors.Wrap(err, "Failed to parse api response") + } + if response.Status == "error" { + return "", errors.Errorf("Failed to query") + } + return response.Data.ResultType, nil +} + +func convertJSONToArrayResponse(body []byte) (ArrayResponse, error) { + arrayBody := ArrayResponse{} + if err := json.Unmarshal(body, &arrayBody); err != nil { + return arrayBody, errors.Wrap(err, "Failed to parse api response") + } + if arrayBody.Status == "error" { + return arrayBody, errors.Errorf("Failed to query") + } + return arrayBody, nil +} + +func convertJSONToRangeVectorResponse(body []byte) (RangeVectorResponse, error) { + mapBody := RangeVectorResponse{} + if err := json.Unmarshal(body, &mapBody); err != nil { + return RangeVectorResponse{}, errors.Wrap(err, "Failed to parse api response") + } + if mapBody.Status == "error" { + return mapBody, errors.Errorf("Failed to query") + } + return mapBody, nil +} + +func convertJSONToInstantVectorResponse(body []byte) (InstantVectorResponse, error) { + mapBody := InstantVectorResponse{} + if err := json.Unmarshal(body, &mapBody); err != nil { + return InstantVectorResponse{}, errors.Wrap(err, "Failed to parse api response") + } + if mapBody.Status == "error" { + return mapBody, errors.Errorf("Failed to query") + } + return mapBody, nil +} + +func getTimestamp(num float64) time.Time { + sec := int64(num) + ns := int64((num - float64(sec)) * 1000) + return time.Unix(sec, ns) +} diff --git a/metricbeat/module/prometheus/query/query.go b/metricbeat/module/prometheus/query/query.go new file mode 100644 index 00000000000..3f419753a72 --- /dev/null +++ b/metricbeat/module/prometheus/query/query.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "io/ioutil" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" +) + +var ( + hostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + }.Build() +) + +func init() { + mb.Registry.MustAddMetricSet("prometheus", "query", New, + mb.WithHostParser(hostParser), + ) +} + +// MetricSet type defines all fields of the MetricSet for Prometheus Query +type MetricSet struct { + mb.BaseMetricSet + http *helper.HTTP + queries []QueryConfig + baseURL string +} + +// New create a new instance of the MetricSet +// Part of new is also setting up the configuration by processing additional +// configuration entries if needed. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + config := defaultConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + return &MetricSet{ + BaseMetricSet: base, + http: http, + queries: config.Queries, + baseURL: http.GetURI(), + }, nil +} + +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { + for _, pathConfig := range m.queries { + url := m.getURL(pathConfig.Path, pathConfig.Params) + m.http.SetURI(url) + response, err := m.http.FetchResponse() + if err != nil { + reporter.Error(errors.Wrapf(err, "unable to fetch data from prometheus endpoint: %v", url)) + continue + } + defer func() { + if err := response.Body.Close(); err != nil { + m.Logger().Debug("error closing http body") + } + }() + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + + events, parseErr := parseResponse(body, pathConfig) + if parseErr != nil { + reporter.Error(errors.Wrapf(parseErr, "error parsing response from: %v", url)) + continue + } + for _, e := range events { + reporter.Event(e) + } + } + return nil +} + +func (m *MetricSet) getURL(path string, queryMap common.MapStr) string { + queryStr := mb.QueryParams(queryMap).String() + return m.baseURL + path + "?" + queryStr +} diff --git a/metricbeat/module/prometheus/query/query_integration_test.go b/metricbeat/module/prometheus/query/query_integration_test.go new file mode 100644 index 00000000000..a54c9f1daf8 --- /dev/null +++ b/metricbeat/module/prometheus/query/query_integration_test.go @@ -0,0 +1,97 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package query + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/tests/compose" + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + service := compose.EnsureUp(t, "prometheus") + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{service.Host()}, + "queries": []common.MapStr{ + common.MapStr{ + "name": "go_threads", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "go_threads", + }, + }, + }, + } + ms := mbtest.NewReportingMetricSetV2Error(t, config) + var err error + for retries := 0; retries < 3; retries++ { + err = mbtest.WriteEventsReporterV2Error(ms, t, "") + if err == nil { + return + } + time.Sleep(10 * time.Second) + } + t.Fatal("write", err) +} + +func TestQueryFetch(t *testing.T) { + service := compose.EnsureUp(t, "prometheus") + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{service.Host()}, + "queries": []common.MapStr{ + common.MapStr{ + "name": "go_info", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "go_info", + }, + }, + }, + } + f := mbtest.NewReportingMetricSetV2Error(t, config) + + var events []mb.Event + var errors []error + for retries := 0; retries < 3; retries++ { + events, errors = mbtest.ReportingFetchV2Error(f) + if len(events) > 0 { + break + } + time.Sleep(10 * time.Second) + } + if len(errors) > 0 { + t.Fatalf("Expected 0 errors, had %d. %v\n", len(errors), errors) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) +} diff --git a/metricbeat/module/prometheus/query/query_test.go b/metricbeat/module/prometheus/query/query_test.go new file mode 100644 index 00000000000..73a7b885f50 --- /dev/null +++ b/metricbeat/module/prometheus/query/query_test.go @@ -0,0 +1,216 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func TestQueryFetchEventContentInstantVector(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test/") + + // test with response format like: + //[ + // { + // "metric": { "": "", ... }, + // "value": [ , "" ] + // }, + // ... + //] + response, _ := ioutil.ReadFile(absPath + "/querymetrics_instant_vector.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{server.URL}, + // queries do not have an actual role here since all http responses are mocked + "queries": []common.MapStr{ + common.MapStr{ + "name": "up", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "up", + }, + }, + }, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + events := reporter.GetEvents() + if len(events) != 2 { + t.Fatalf("Expected 2 events, had %d. %v\n", len(events), events) + } + for _, event := range events { + e := mbtest.StandardizeEvent(metricSet, event) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) + } +} + +func TestQueryFetchEventContentRangeVector(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test/") + + // test with response format like: + //[ + // { + // "metric": { "": "", ... }, + // "values": [ [ , "" ], ... ] + // }, + // ... + //] + response, _ := ioutil.ReadFile(absPath + "/querymetrics_range_vector.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{server.URL}, + // queries do not have an actual role here since all http responses are mocked + "queries": []common.MapStr{ + common.MapStr{ + "name": "up_range", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "up", + "start": "2019-12-20T23:30:30.000Z", + "end": "2019-12-21T23:31:00.000Z", + "step": "15s", + }, + }, + }, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + events := reporter.GetEvents() + if len(events) != 6 { + t.Fatalf("Expected 6 events, had %d. %v\n", len(events), events) + } + for _, event := range events { + e := mbtest.StandardizeEvent(metricSet, event) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) + } +} + +func TestQueryFetchEventContentScalar(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test/") + + // test with response format like: + //[ , "" ] + response, _ := ioutil.ReadFile(absPath + "/querymetrics_scalar.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{server.URL}, + // queries do not have an actual role here since all http responses are mocked + "queries": []common.MapStr{ + common.MapStr{ + "name": "scalar", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "100", + }, + }, + }, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + events := reporter.GetEvents() + if len(events) != 1 { + t.Fatalf("Expected 1 events, had %d. %v\n", len(events), events) + } + for _, event := range events { + e := mbtest.StandardizeEvent(metricSet, event) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) + } +} + +func TestQueryFetchEventContentString(t *testing.T) { + absPath, _ := filepath.Abs("./_meta/test/") + + // test with response format like: + //[ , "" ] + response, _ := ioutil.ReadFile(absPath + "/querymetrics_string.json") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "prometheus", + "metricsets": []string{"query"}, + "hosts": []string{server.URL}, + // queries do not have an actual role here since all http responses are mocked + "queries": []common.MapStr{ + common.MapStr{ + "name": "string", + "path": "/api/v1/query", + "params": common.MapStr{ + "query": "some", + }, + }, + }, + } + reporter := &mbtest.CapturingReporterV2{} + + metricSet := mbtest.NewReportingMetricSetV2Error(t, config) + metricSet.Fetch(reporter) + + events := reporter.GetEvents() + if len(events) != 1 { + t.Fatalf("Expected 1 events, had %d. %v\n", len(events), events) + } + for _, event := range events { + e := mbtest.StandardizeEvent(metricSet, event) + t.Logf("%s/%s event: %+v", metricSet.Module().Name(), metricSet.Name(), e.Fields.StringToPrint()) + } +} diff --git a/metricbeat/module/prometheus/test_prometheus.py b/metricbeat/module/prometheus/test_prometheus.py index 1d119b0b7b3..e58b1138b99 100644 --- a/metricbeat/module/prometheus/test_prometheus.py +++ b/metricbeat/module/prometheus/test_prometheus.py @@ -37,6 +37,36 @@ def test_stats(self): self.assert_fields_are_documented(evt) + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_query(self): + """ + prometheus query test + """ + self.render_config_template(modules=[{ + "name": "prometheus", + "metricsets": ["query"], + "hosts": self.get_hosts(), + "period": "5s", + "extras": { + "queries": [{ + "path": "/api/v1/query", + 'name': 'go_info', + 'params': {'query': 'go_info'} + }] + } + }]) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0, 60) + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + output = self.read_output_json() + evt = output[0] + + self.assertCountEqual(self.de_dot(PROMETHEUS_FIELDS), evt.keys(), evt) + + self.assert_fields_are_documented(evt) + class TestRemoteWrite(metricbeat.BaseTest): diff --git a/metricbeat/modules.d/prometheus.yml.disabled b/metricbeat/modules.d/prometheus.yml.disabled index 065c7df407e..34804913598 100644 --- a/metricbeat/modules.d/prometheus.yml.disabled +++ b/metricbeat/modules.d/prometheus.yml.disabled @@ -18,6 +18,7 @@ #ssl.certificate_authorities: # - /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt + # Metrics sent by a Prometheus server using remote_write option - module: prometheus metricsets: ["remote_write"] @@ -27,3 +28,29 @@ # Secure settings for the server using TLS/SSL: #ssl.certificate: "/etc/pki/server/cert.pem" #ssl.key: "/etc/pki/server/cert.key" + +# Metrics that will be collected using a PromQL +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + #- name: "instant_vector" + # path: "/api/v1/query" + # params: + # query: "sum(rate(prometheus_http_requests_total[1m]))" + #- name: "range_vector" + # path: "/api/v1/query_range" + # params: + # query: "up" + # start: "2019-12-20T00:00:00.000Z" + # end: "2019-12-21T00:00:00.000Z" + # step: 1h + #- name: "scalar" + # path: "/api/v1/query" + # params: + # query: "100" + #- name: "string" + # path: "/api/v1/query" + # params: + # query: "some_value" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 8c9cd35d3c2..fadfba75cf9 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1009,6 +1009,7 @@ metricbeat.modules: #ssl.certificate_authorities: # - /var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt + # Metrics sent by a Prometheus server using remote_write option - module: prometheus metricsets: ["remote_write"] @@ -1019,6 +1020,32 @@ metricbeat.modules: #ssl.certificate: "/etc/pki/server/cert.pem" #ssl.key: "/etc/pki/server/cert.key" +# Metrics that will be collected using a PromQL +- module: prometheus + period: 10s + hosts: ["localhost:9090"] + metricsets: ["query"] + queries: + #- name: "instant_vector" + # path: "/api/v1/query" + # params: + # query: "sum(rate(prometheus_http_requests_total[1m]))" + #- name: "range_vector" + # path: "/api/v1/query_range" + # params: + # query: "up" + # start: "2019-12-20T00:00:00.000Z" + # end: "2019-12-21T00:00:00.000Z" + # step: 1h + #- name: "scalar" + # path: "/api/v1/query" + # params: + # query: "100" + #- name: "string" + # path: "/api/v1/query" + # params: + # query: "some_value" + #------------------------------- RabbitMQ Module ------------------------------- - module: rabbitmq metricsets: ["node", "queue", "connection"]