diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 837cb9caf88..1da7080e1a1 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Refactor docker CPU calculations to be more consistent with `docker stats`. {pull}6608[6608] - Update logstash.node_stats metricset to write data under `logstash.node.stats.*`. {pull}6714[6714] - Fixed typo in values for `state_container` `status.phase`, from `terminate` to `terminated`. {pull}6916[6916] +- RabbitMQ management plugin path is now configured at the module level instead of having to do it in each of the metricsets. New `management_path_prefix` option should be used now {pull}7074[7074] *Packetbeat* @@ -230,6 +231,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606] - Add exchanges metricset to the RabbitMQ module {issue}6442[6442] {pull}6607[6607] - Add Elasticsearch index_summary metricset. {pull}6918[6918] +- Add config option `management_path_prefix` for RabbitMQ module to configure management plugin path prefix {issue}6875[6875] {pull}7074[7074] *Packetbeat* diff --git a/metricbeat/docs/modules/rabbitmq.asciidoc b/metricbeat/docs/modules/rabbitmq.asciidoc index 6d19884154b..bfd7f6dff0e 100644 --- a/metricbeat/docs/modules/rabbitmq.asciidoc +++ b/metricbeat/docs/modules/rabbitmq.asciidoc @@ -9,7 +9,9 @@ beta[] The RabbitMQ module uses http://www.rabbitmq.com/management.html[HTTP API] created by the management plugin to collect metrics. -The default metricsets are `connection`, `node` and `queue`. +The default metricsets are `connection`, `node`, `queue` and `exchange`. + +If `management.path_prefix` is set in RabbitMQ configuration, `management_path_prefix` has to be set to the same value in this module configuration. [float] @@ -27,6 +29,10 @@ metricbeat.modules: period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest ---- diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go index e5fbf250ceb..966bbb853a1 100644 --- a/metricbeat/helper/http.go +++ b/metricbeat/helper/http.go @@ -95,18 +95,27 @@ func (h *HTTP) FetchResponse() (*http.Response, error) { return resp, nil } +// SetHeader sets HTTP headers to use in requests func (h *HTTP) SetHeader(key, value string) { h.headers[key] = value } +// SetMethod sets HTTP method to use in requests func (h *HTTP) SetMethod(method string) { h.method = method } +// GetURI gets the URI used in requests +func (h *HTTP) GetURI() string { + return h.uri +} + +// SetURI sets URI to use in requests func (h *HTTP) SetURI(uri string) { h.uri = uri } +// SetBody sets the body of the requests func (h *HTTP) SetBody(body []byte) { h.body = body } diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 453132c7d2a..0bb8f8bec92 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -511,6 +511,10 @@ metricbeat.modules: period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest diff --git a/metricbeat/module/rabbitmq/_meta/config.reference.yml b/metricbeat/module/rabbitmq/_meta/config.reference.yml index 62bf0879d1e..b96a0279cc6 100644 --- a/metricbeat/module/rabbitmq/_meta/config.reference.yml +++ b/metricbeat/module/rabbitmq/_meta/config.reference.yml @@ -4,5 +4,9 @@ period: 10s hosts: ["localhost:15672"] + # Management path prefix, if `management.path_prefix` is set in RabbitMQ + # configuration, it has to be set to the same value. + #management_path_prefix: "" + #username: guest #password: guest diff --git a/metricbeat/module/rabbitmq/_meta/docs.asciidoc b/metricbeat/module/rabbitmq/_meta/docs.asciidoc index f6fac935416..7d4677f809e 100644 --- a/metricbeat/module/rabbitmq/_meta/docs.asciidoc +++ b/metricbeat/module/rabbitmq/_meta/docs.asciidoc @@ -1,3 +1,5 @@ The RabbitMQ module uses http://www.rabbitmq.com/management.html[HTTP API] created by the management plugin to collect metrics. -The default metricsets are `connection`, `node` and `queue`. +The default metricsets are `connection`, `node`, `queue` and `exchange`. + +If `management.path_prefix` is set in RabbitMQ configuration, `management_path_prefix` has to be set to the same value in this module configuration. diff --git a/metricbeat/module/rabbitmq/connection/connection.go b/metricbeat/module/rabbitmq/connection/connection.go index 6050ea5d0ff..977a7aabe99 100644 --- a/metricbeat/module/rabbitmq/connection/connection.go +++ b/metricbeat/module/rabbitmq/connection/connection.go @@ -3,50 +3,31 @@ package connection import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/connections" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "connection", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } // MetricSet for fetching RabbitMQ connections. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq connection metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.ConnectionsPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } // Fetch makes an HTTP request to fetch connections metrics from the connections endpoint. diff --git a/metricbeat/module/rabbitmq/connection/connection_test.go b/metricbeat/module/rabbitmq/connection/connection_test.go index b625496f110..222968bd420 100644 --- a/metricbeat/module/rabbitmq/connection/connection_test.go +++ b/metricbeat/module/rabbitmq/connection/connection_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/connection_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/connection_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/connections": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/exchange/exchange.go b/metricbeat/module/rabbitmq/exchange/exchange.go index aa895d400fd..b35cc97ebc1 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange.go +++ b/metricbeat/module/rabbitmq/exchange/exchange.go @@ -3,57 +3,31 @@ package exchange import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) -const ( - defaultScheme = "http" - defaultPath = "/api/exchanges" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() -) - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. func init() { - if err := mb.Registry.AddMetricSet("rabbitmq", "exchange", New, hostParser); err != nil { - panic(err) - } + mb.Registry.MustAddMetricSet("rabbitmq", "exchange", New, + mb.WithHostParser(rabbitmq.HostParser), + mb.DefaultMetricSet(), + ) } -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. +// MetricSet for fetching RabbitMQ exchanges metrics. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq exchange metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.ExchangesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } // Fetch methods implements the data gathering and data conversion to the right diff --git a/metricbeat/module/rabbitmq/exchange/exchange_test.go b/metricbeat/module/rabbitmq/exchange/exchange_test.go index 994710127be..8d955de7066 100644 --- a/metricbeat/module/rabbitmq/exchange/exchange_test.go +++ b/metricbeat/module/rabbitmq/exchange/exchange_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/exchange_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/exchange_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/exchanges": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/metricset.go b/metricbeat/module/rabbitmq/metricset.go new file mode 100644 index 00000000000..38b847da248 --- /dev/null +++ b/metricbeat/module/rabbitmq/metricset.go @@ -0,0 +1,29 @@ +package rabbitmq + +import ( + "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/mb" +) + +// MetricSet can be used to build other metric sets that query RabbitMQ +// management plugin +type MetricSet struct { + mb.BaseMetricSet + *helper.HTTP +} + +// NewMetricSet creates an metric set that can be used to build other metric +// sets that query RabbitMQ management plugin +func NewMetricSet(base mb.BaseMetricSet, subPath string) (*MetricSet, error) { + http, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + http.SetURI(http.GetURI() + subPath) + http.SetHeader("Accept", "application/json") + + return &MetricSet{ + base, + http, + }, nil +} diff --git a/metricbeat/module/rabbitmq/metricset_test.go b/metricbeat/module/rabbitmq/metricset_test.go new file mode 100644 index 00000000000..3b8cfead050 --- /dev/null +++ b/metricbeat/module/rabbitmq/metricset_test.go @@ -0,0 +1,63 @@ +package rabbitmq + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + + "github.com/stretchr/testify/assert" +) + +func init() { + mb.Registry.MustAddMetricSet("rabbitmq", "test", newTestMetricSet, + mb.WithHostParser(HostParser), + ) +} + +type testMetricSet struct { + *MetricSet +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + ms, err := NewMetricSet(base, "/api/test") + if err != nil { + return nil, err + } + return &testMetricSet{ms}, nil +} + +// Fetch makes an HTTP request to fetch connections metrics from the connections endpoint. +func (m *testMetricSet) Fetch() ([]common.MapStr, error) { + _, err := m.HTTP.FetchContent() + return nil, err +} + +func TestManagementPathPrefix(t *testing.T) { + visited := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/management_prefix/api/test": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + visited = true + default: + w.WriteHeader(404) + } + })) + defer server.Close() + + config := map[string]interface{}{ + "module": "rabbitmq", + "metricsets": []string{"test"}, + "hosts": []string{server.URL}, + pathConfigKey: "/management_prefix", + } + + f := mbtest.NewEventsFetcher(t, config) + f.Fetch() + assert.True(t, visited) +} diff --git a/metricbeat/module/rabbitmq/node/node.go b/metricbeat/module/rabbitmq/node/node.go index f24dc13f985..3301729e7d7 100644 --- a/metricbeat/module/rabbitmq/node/node.go +++ b/metricbeat/module/rabbitmq/node/node.go @@ -3,48 +3,31 @@ package node import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/nodes" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "node", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } +// MetricSet for fetching RabbitMQ node metrics type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq node metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.NodesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } func (m *MetricSet) Fetch() ([]common.MapStr, error) { diff --git a/metricbeat/module/rabbitmq/node/node_test.go b/metricbeat/module/rabbitmq/node/node_test.go index 7dc1021eba5..272b5751f9c 100644 --- a/metricbeat/module/rabbitmq/node/node_test.go +++ b/metricbeat/module/rabbitmq/node/node_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/node_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/node_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/nodes": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/queue/queue.go b/metricbeat/module/rabbitmq/queue/queue.go index 37582943692..75a4abfbf63 100644 --- a/metricbeat/module/rabbitmq/queue/queue.go +++ b/metricbeat/module/rabbitmq/queue/queue.go @@ -3,48 +3,31 @@ package queue import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" -) - -const ( - defaultScheme = "http" - defaultPath = "/api/queues" -) - -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: defaultScheme, - DefaultPath: defaultPath, - }.Build() + "github.com/elastic/beats/metricbeat/module/rabbitmq" ) func init() { mb.Registry.MustAddMetricSet("rabbitmq", "queue", New, - mb.WithHostParser(hostParser), + mb.WithHostParser(rabbitmq.HostParser), mb.DefaultMetricSet(), ) } +// MetricSet for fetching RabbitMQ queues metrics. type MetricSet struct { - mb.BaseMetricSet - *helper.HTTP + *rabbitmq.MetricSet } +// New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The rabbitmq queue metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := rabbitmq.NewMetricSet(base, rabbitmq.QueuesPath) if err != nil { return nil, err } - http.SetHeader("Accept", "application/json") - - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{ms}, nil } func (m *MetricSet) Fetch() ([]common.MapStr, error) { diff --git a/metricbeat/module/rabbitmq/queue/queue_test.go b/metricbeat/module/rabbitmq/queue/queue_test.go index 78250eeaec8..e27ca72ed81 100644 --- a/metricbeat/module/rabbitmq/queue/queue_test.go +++ b/metricbeat/module/rabbitmq/queue/queue_test.go @@ -14,13 +14,21 @@ import ( ) func TestFetchEventContents(t *testing.T) { - absPath, err := filepath.Abs("../_meta/testdata/") + absPath, _ := filepath.Abs("../_meta/testdata/") - response, err := ioutil.ReadFile(absPath + "/queue_sample_response.json") + response, _ := ioutil.ReadFile(absPath + "/queue_sample_response.json") + notFound, _ := ioutil.ReadFile(absPath + "/notfound_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + switch r.URL.Path { + case "/api/queues": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + default: + w.WriteHeader(404) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(notFound)) + } })) defer server.Close() diff --git a/metricbeat/module/rabbitmq/url.go b/metricbeat/module/rabbitmq/url.go new file mode 100644 index 00000000000..0c22a7ed249 --- /dev/null +++ b/metricbeat/module/rabbitmq/url.go @@ -0,0 +1,26 @@ +package rabbitmq + +import ( + "github.com/elastic/beats/metricbeat/mb/parse" +) + +// Subpaths to management plugin endpoints +const ( + ConnectionsPath = "/api/connections" + ExchangesPath = "/api/exchanges" + NodesPath = "/api/nodes" + QueuesPath = "/api/queues" +) + +const ( + defaultScheme = "http" + pathConfigKey = "management_path_prefix" +) + +var ( + // HostParser parses host urls for RabbitMQ management plugin + HostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + PathConfigKey: pathConfigKey, + }.Build() +)