From ec8f2341d805e1318cd3bd45d4149e75082de3d6 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 11 May 2018 15:25:35 +0200 Subject: [PATCH] Metricbeat: Add configuration for RabbitMQ management prefix path (#7074) Refactor RabbitMQ metricsets to reuse host parser and metricset builder so management prefix path can be configured as an only module setting. Check in tests the paths metricbeat is trying to access to collect the metrics. Previously it required to configure each metricset in its own module block with the path in the host. --- CHANGELOG.asciidoc | 2 + metricbeat/docs/modules/rabbitmq.asciidoc | 8 ++- metricbeat/helper/http.go | 9 +++ metricbeat/metricbeat.reference.yml | 4 ++ .../rabbitmq/_meta/config.reference.yml | 4 ++ .../module/rabbitmq/_meta/docs.asciidoc | 4 +- .../module/rabbitmq/connection/connection.go | 29 ++------- .../rabbitmq/connection/connection_test.go | 18 ++++-- .../module/rabbitmq/exchange/exchange.go | 46 +++----------- .../module/rabbitmq/exchange/exchange_test.go | 18 ++++-- metricbeat/module/rabbitmq/metricset.go | 29 +++++++++ metricbeat/module/rabbitmq/metricset_test.go | 63 +++++++++++++++++++ metricbeat/module/rabbitmq/node/node.go | 31 +++------ metricbeat/module/rabbitmq/node/node_test.go | 18 ++++-- metricbeat/module/rabbitmq/queue/queue.go | 31 +++------ .../module/rabbitmq/queue/queue_test.go | 18 ++++-- metricbeat/module/rabbitmq/url.go | 26 ++++++++ 17 files changed, 228 insertions(+), 130 deletions(-) create mode 100644 metricbeat/module/rabbitmq/metricset.go create mode 100644 metricbeat/module/rabbitmq/metricset_test.go create mode 100644 metricbeat/module/rabbitmq/url.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 837cb9caf88..1da7080e1a1 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Refactor docker CPU calculations to be more consistent with `docker stats`. {pull}6608[6608] - Update logstash.node_stats metricset to write data under `logstash.node.stats.*`. {pull}6714[6714] - Fixed typo in values for `state_container` `status.phase`, from `terminate` to `terminated`. {pull}6916[6916] +- RabbitMQ management plugin path is now configured at the module level instead of having to do it in each of the metricsets. New `management_path_prefix` option should be used now {pull}7074[7074] *Packetbeat* @@ -230,6 +231,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606] - Add exchanges metricset to the RabbitMQ module {issue}6442[6442] {pull}6607[6607] - Add Elasticsearch index_summary metricset. {pull}6918[6918] +- Add config option `management_path_prefix` for RabbitMQ module to configure management plugin path prefix {issue}6875[6875] {pull}7074[7074] *Packetbeat* diff --git a/metricbeat/docs/modules/rabbitmq.asciidoc b/metricbeat/docs/modules/rabbitmq.asciidoc index 6d19884154b..bfd7f6dff0e 100644 --- a/metricbeat/docs/modules/rabbitmq.asciidoc +++ b/metricbeat/docs/modules/rabbitmq.asciidoc @@ -9,7 +9,9 @@ beta[] The RabbitMQ module uses http://www.rabbitmq.com/management.html[HTTP API] created by the management plugin to collect metrics. -The default metricsets are `connection`, `node` and `queue`. +The default metricsets are `connection`, `node`, `queue` and `exchange`. + +If `management.path_prefix` is set in RabbitMQ configuration, `management_path_prefix` has to be set to the same value in this module configuration. [float] @@ -27,6 +29,10 @@ metricbeat.modules: period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest ---- diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go index e5fbf250ceb..966bbb853a1 100644 --- a/metricbeat/helper/http.go +++ b/metricbeat/helper/http.go @@ -95,18 +95,27 @@ func (h *HTTP) FetchResponse() (*http.Response, error) { return resp, nil } +// SetHeader sets HTTP headers to use in requests func (h *HTTP) SetHeader(key, value string) { h.headers[key] = value } +// SetMethod sets HTTP method to use in requests func (h *HTTP) SetMethod(method string) { h.method = method } +// GetURI gets the URI used in requests +func (h *HTTP) GetURI() string { + return h.uri +} + +// SetURI sets URI to use in requests func (h *HTTP) SetURI(uri string) { h.uri = uri } +// SetBody sets the body of the requests func (h *HTTP) SetBody(body []byte) { h.body = body } diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 453132c7d2a..0bb8f8bec92 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -511,6 +511,10 @@ metricbeat.modules: period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest diff --git a/metricbeat/module/rabbitmq/_meta/config.reference.yml b/metricbeat/module/rabbitmq/_meta/config.reference.yml index 62bf0879d1e..b96a0279cc6 100644 --- a/metricbeat/module/rabbitmq/_meta/config.reference.yml +++ b/metricbeat/module/rabbitmq/_meta/config.reference.yml @@ -4,5 +4,9 @@ period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest diff --git a/metricbeat/module/rabbitmq/_meta/docs.asciidoc b/metricbeat/module/rabbitmq/_meta/docs.asciidoc index f6fac935416..7d4677f809e 100644 --- a/metricbeat/module/rabbitmq/_meta/docs.asciidoc +++ b/metricbeat/module/rabbitmq/_meta/docs.asciidoc @@ -1,3 +1,5 @@ The RabbitMQ module uses http://www.rabbitmq.com/management.html[HTTP API] created by the management plugin to collect metrics. -The default metricsets are `connection`, `node` and `queue`. +The default metricsets are `connection`, `node`, `queue` and `exchange`. + +If `management.path_prefix` is set in RabbitMQ configuration, `management_path_prefix` has to be set to the same value in this module configuration. diff --git a/metricbeat/module/rabbitmq/connection/connection.go b/metricbeat/module/rabbitmq/connection/connection.go index 6050ea5d0ff..977a7aabe99 100644 --- a/metricbeat/module/rabbitmq/connection/connection.go +++ b/metricbeat/module/rabbitmq/connection/connection.go @@ -3,50 +3,31 @@ package connection import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/connections" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "connection", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } // MetricSet for fetching RabbitMQ connections. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq connection metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.ConnectionsPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } // Fetch makes an HTTP request to fetch connections metrics from the connections endpoint. diff --git a/metricbeat/module/rabbitmq/connection/connection_test.go b/metricbeat/module/rabbitmq/connection/connection_test.go index b625496f110..222968bd420 100644 --- a/metricbeat/module/rabbitmq/connection/connection_test.go +++ b/metricbeat/module/rabbitmq/connection/connection_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/connection_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/connection_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/connections": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/exchange/exchange.go b/metricbeat/module/rabbitmq/exchange/exchange.go index aa895d400fd..b35cc97ebc1 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange.go +++ b/metricbeat/module/rabbitmq/exchange/exchange.go @@ -3,57 +3,31 @@ package exchange import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) -const ( - defaultScheme = "http" - defaultPath = "/api/exchanges" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() -) - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. func init() { - if err := mb.Registry.AddMetricSet("rabbitmq", "exchange", New, hostParser); err != nil { - panic(err) - } + mb.Registry.MustAddMetricSet("rabbitmq", "exchange", New, + mb.WithHostParser(rabbitmq.HostParser), + mb.DefaultMetricSet(), + ) } -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. +// MetricSet for fetching RabbitMQ exchanges metrics. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq exchange metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.ExchangesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } // Fetch methods implements the data gathering and data conversion to the right diff --git a/metricbeat/module/rabbitmq/exchange/exchange_test.go b/metricbeat/module/rabbitmq/exchange/exchange_test.go index 994710127be..8d955de7066 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange_test.go +++ b/metricbeat/module/rabbitmq/exchange/exchange_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/exchange_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/exchange_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/exchanges": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/metricset.go b/metricbeat/module/rabbitmq/metricset.go new file mode 100644 index 00000000000..38b847da248 --- /dev/null +++ b/metricbeat/module/rabbitmq/metricset.go @@ -0,0 +1,29 @@ +package rabbitmq + +import ( + "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/mb" +) + +// MetricSet can be used to build other metric sets that query RabbitMQ +// management plugin +type MetricSet struct { + mb.BaseMetricSet + *helper.HTTP +} + +// NewMetricSet creates an metric set that can be used to build other metric +// sets that query RabbitMQ management plugin +func NewMetricSet(base mb.BaseMetricSet, subPath string) (*MetricSet, error) { + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + http.SetURI(http.GetURI() + subPath) + http.SetHeader("Accept", "application/json") + + return &MetricSet{ + base, + http, + }, nil +} diff --git a/metricbeat/module/rabbitmq/metricset_test.go b/metricbeat/module/rabbitmq/metricset_test.go new file mode 100644 index 00000000000..3b8cfead050 --- /dev/null +++ b/metricbeat/module/rabbitmq/metricset_test.go @@ -0,0 +1,63 @@ +package rabbitmq + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + + "github.com/stretchr/testify/assert" +) + +func init() { + mb.Registry.MustAddMetricSet("rabbitmq", "test", newTestMetricSet, + mb.WithHostParser(HostParser), + ) +} + +type testMetricSet struct { + *MetricSet +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + ms, err := NewMetricSet(base, "/api/test") + if err != nil { + return nil, err + } + return &testMetricSet{ms}, nil +} + +// Fetch makes an HTTP request to fetch connections metrics from the connections endpoint. +func (m *testMetricSet) Fetch() ([]common.MapStr, error) { + _, err := m.HTTP.FetchContent() + return nil, err +} + +func TestManagementPathPrefix(t *testing.T) { + visited := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/management_prefix/api/test": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + visited = true + default: + w.WriteHeader(404) + } + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"test"}, + "hosts": []string{server.URL}, + pathConfigKey: "/management_prefix", + } + + f := mbtest.NewEventsFetcher(t, config) + f.Fetch() + assert.True(t, visited) +} diff --git a/metricbeat/module/rabbitmq/node/node.go b/metricbeat/module/rabbitmq/node/node.go index f24dc13f985..3301729e7d7 100644 --- a/metricbeat/module/rabbitmq/node/node.go +++ b/metricbeat/module/rabbitmq/node/node.go @@ -3,48 +3,31 @@ package node import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/nodes" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "node", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } +// MetricSet for fetching RabbitMQ node metrics type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq node metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.NodesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } func (m *MetricSet) Fetch() ([]common.MapStr, error) { diff --git a/metricbeat/module/rabbitmq/node/node_test.go b/metricbeat/module/rabbitmq/node/node_test.go index 7dc1021eba5..272b5751f9c 100644 --- a/metricbeat/module/rabbitmq/node/node_test.go +++ b/metricbeat/module/rabbitmq/node/node_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/node_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/node_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/nodes": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/queue/queue.go b/metricbeat/module/rabbitmq/queue/queue.go index 37582943692..75a4abfbf63 100644 --- a/metricbeat/module/rabbitmq/queue/queue.go +++ b/metricbeat/module/rabbitmq/queue/queue.go @@ -3,48 +3,31 @@ package queue import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/queues" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "queue", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } +// MetricSet for fetching RabbitMQ queues metrics. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq queue metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.QueuesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } func (m *MetricSet) Fetch() ([]common.MapStr, error) { diff --git a/metricbeat/module/rabbitmq/queue/queue_test.go b/metricbeat/module/rabbitmq/queue/queue_test.go index 78250eeaec8..e27ca72ed81 100644 --- a/metricbeat/module/rabbitmq/queue/queue_test.go +++ b/metricbeat/module/rabbitmq/queue/queue_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/queue_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/queue_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/queues": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/url.go b/metricbeat/module/rabbitmq/url.go new file mode 100644 index 00000000000..0c22a7ed249 --- /dev/null +++ b/metricbeat/module/rabbitmq/url.go @@ -0,0 +1,26 @@ +package rabbitmq + +import ( + "github.com/elastic/beats/metricbeat/mb/parse" +) + +// Subpaths to management plugin endpoints +const ( + ConnectionsPath = "/api/connections" + ExchangesPath = "/api/exchanges" + NodesPath = "/api/nodes" + QueuesPath = "/api/queues" +) + +const ( + defaultScheme = "http" + pathConfigKey = "management_path_prefix" +) + +var ( + // HostParser parses host urls for RabbitMQ management plugin + HostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + PathConfigKey: pathConfigKey, + }.Build() +)