Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check ES for CCR feature availability #17073

Merged
merged 11 commits into from
Mar 18, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add dashboard for `redisenterprise` module. {pull}16752[16752]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]
- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538]
- Check if CCR feature is available on Elasticsearch cluster before attempting to call CCR APIs from `elasticsearch/ccr` metricset. {issue}16511[16511] {pull}17073[17073]

*Packetbeat*

Expand Down
9 changes: 7 additions & 2 deletions metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
This is the `ccr` metricset of the Elasticsearch module. It interrogates the
Cross Cluster Replication Stats API endpoint to fetch information about shards
in the Elasticsearch cluster that are participating in cross-cluster replication.
Cross-Cluster Replication Stats API endpoint to fetch metrics about cross-cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sentence fragment. Maybe say something like:

"This metricset uses the Cross-Cluster Replication Stats API endpoint to fetch...."

replication from the Elasticsearch clusters that are participating in cross-cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I'm trying to clean up the docs by using the correct attribute to resolve product names. So you should use {es} instead of Elasticsearch throughout this topic.

replication.

If the Elasticserach cluster does not have the cross-cluster replication feature
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest pruning a few words here and saying: "...does not have cross-cluster replication enabled..."

enabled, this metricset will not collect any metrics. A DEBUG log message about this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, some pruning: "...will not collect metrics."

will be emitted in the Metricbeat log.
10 changes: 10 additions & 0 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *common.Ver
return
}

xpack, err := elasticsearch.GetXPack(m.HTTP, m.GetServiceURI())
if err != nil {
return "", errors.Wrap(err, "error determining xpack features")
}

if !xpack.Features.CCR.Enabled {
message = "the CCR feature is not enabled on your Elasticsearch cluster."
return
}

isAvailable := elastic.IsFeatureAvailable(currentElasticsearchVersion, elasticsearch.CCRStatsAPIAvailableVersion)

if !isAvailable {
Expand Down
116 changes: 116 additions & 0 deletions metricbeat/module/elasticsearch/ccr/ccr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ccr

import (
"net/http"
"net/http/httptest"
"strconv"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"

mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
)

func startESServer(esVersion, license string, ccrEnabled bool) *httptest.Server {

nodesLocalHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"nodes": { "foobar": {}}}`))
}
clusterStateMasterHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"master_node": "foobar"}`))
}
rootHandler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
}
w.Write([]byte(`{"version": { "number": "` + esVersion + `" } }`))
}
licenseHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{ "license": { "type": "` + license + `" } }`))
}
xpackHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{ "features": { "ccr": { "enabled": ` + strconv.FormatBool(ccrEnabled) + `}}}`))
}
ccrStatsHandler := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "this should never have been called", 418)
}

mux := http.NewServeMux()
mux.Handle("/_nodes/_local/nodes", http.HandlerFunc(nodesLocalHandler))
mux.Handle("/_cluster/state/master_node", http.HandlerFunc(clusterStateMasterHandler))
mux.Handle("/", http.HandlerFunc(rootHandler))
mux.Handle("/_license", http.HandlerFunc(licenseHandler)) // for 7.0 and above
mux.Handle("/_xpack/license", http.HandlerFunc(licenseHandler)) // for before 7.0
mux.Handle("/_xpack", http.HandlerFunc(xpackHandler))
mux.Handle("/_ccr/stats", http.HandlerFunc(ccrStatsHandler))

return httptest.NewServer(mux)
}

func TestCCRNotAvailable(t *testing.T) {
tests := map[string]struct {
esVersion string
license string
ccrEnabled bool
}{
"old_version": {
"6.4.0",
"platinum",
true,
},
"low_license": {
"7.6.0",
"basic",
true,
},
"feature_unavailable": {
"7.6.0",
"platinum",
false,
},
}

// Disable license caching for these tests
elasticsearch.LicenseCacheEnabled = false
defer func() { elasticsearch.LicenseCacheEnabled = true }()

for name, test := range tests {
t.Run(name, func(t *testing.T) {
server := startESServer(test.esVersion, test.license, test.ccrEnabled)
defer server.Close()

ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL))
events, errs := mbtest.ReportingFetchV2Error(ms)

require.Empty(t, errs)
require.Empty(t, events)
})
}
}

func getConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": elasticsearch.ModuleName,
"metricsets": []string{"ccr"},
"hosts": []string{host},
}
}
34 changes: 32 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,27 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return stackUsage, err
}

type XPack struct {
Features struct {
CCR struct {
Enabled bool `json:"enabled"`
} `json:"CCR"`
} `json:"features"`
}

// GetXPack returns information about xpack features.
func GetXPack(http *helper.HTTP, resetURI string) (XPack, error) {
content, err := fetchPath(http, resetURI, "_xpack", "")

if err != nil {
return XPack{}, err
}

var xpack XPack
err = json.Unmarshal(content, &xpack)
return xpack, err
}

// IsMLockAllEnabled returns if the given Elasticsearch node has mlockall enabled
func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error) {
content, err := fetchPath(http, resetURI, "_nodes/"+nodeID, "filter_path=nodes.*.process.mlockall")
Expand Down Expand Up @@ -437,8 +458,13 @@ func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error)
return settings, nil
}

// Global cache for license information. Assumption is that license information changes infrequently.
var licenseCache = &_licenseCache{}
var (
// Global cache for license information. Assumption is that license information changes infrequently.
licenseCache = &_licenseCache{}

// LicenseCacheEnabled controls whether license caching is enabled or not. Intended for test use.
LicenseCacheEnabled = true
)

type _licenseCache struct {
sync.RWMutex
Expand All @@ -460,6 +486,10 @@ func (c *_licenseCache) get() *License {
}

func (c *_licenseCache) set(license *License, ttl time.Duration) {
if !LicenseCacheEnabled {
return
}

c.Lock()
defer c.Unlock()

Expand Down