Skip to content

Commit

Permalink
Check ES for CCR feature availability (#17073) (#17099)
Browse files Browse the repository at this point in the history
* Add TODO

* Check if CCR feature is available in ES

* Allow tests to disable license caching

* Add unit tests

* Separate message

* Adding license header

* Adding CHANGELOG entry

* Check enabled, not available

* Renaming var

* Adding documentation

* Doc feedback fixes
  • Loading branch information
ycombinator committed Mar 19, 2020
1 parent a89a5cd commit 6373f79
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Convert increments of 100 nanoseconds/ticks to milliseconds for WriteTime and ReadTime in diskio metricset (Windows) for consistency. {issue}14233[14233]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]
- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538]
- Check if CCR feature is available on Elasticsearch cluster before attempting to call CCR APIs from `elasticsearch/ccr` metricset. {issue}16511[16511] {pull}17073[17073]

*Packetbeat*

Expand Down
11 changes: 8 additions & 3 deletions metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
This is the `ccr` metricset of the Elasticsearch module. It interrogates the
Cross Cluster Replication Stats API endpoint to fetch information about shards
in the Elasticsearch cluster that are participating in cross-cluster replication.
This is the `ccr` metricset of the {es} module. It uses the
Cross-Cluster Replication Stats API endpoint to fetch metrics about cross-cluster
replication from the {es} clusters that are participating in cross-cluster
replication.

If the {es} cluster does not have cross-cluster replication enabled, this metricset
will not collect metrics. A DEBUG log message about this will be emitted in the
Metricbeat log.
10 changes: 10 additions & 0 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *common.Ver
return
}

xpack, err := elasticsearch.GetXPack(m.HTTP, m.GetServiceURI())
if err != nil {
return "", errors.Wrap(err, "error determining xpack features")
}

if !xpack.Features.CCR.Enabled {
message = "the CCR feature is not enabled on your Elasticsearch cluster."
return
}

isAvailable := elastic.IsFeatureAvailable(currentElasticsearchVersion, elasticsearch.CCRStatsAPIAvailableVersion)

if !isAvailable {
Expand Down
116 changes: 116 additions & 0 deletions metricbeat/module/elasticsearch/ccr/ccr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ccr

import (
"net/http"
"net/http/httptest"
"strconv"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"

mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
)

func startESServer(esVersion, license string, ccrEnabled bool) *httptest.Server {

nodesLocalHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"nodes": { "foobar": {}}}`))
}
clusterStateMasterHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"master_node": "foobar"}`))
}
rootHandler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
}
w.Write([]byte(`{"version": { "number": "` + esVersion + `" } }`))
}
licenseHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{ "license": { "type": "` + license + `" } }`))
}
xpackHandler := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{ "features": { "ccr": { "enabled": ` + strconv.FormatBool(ccrEnabled) + `}}}`))
}
ccrStatsHandler := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "this should never have been called", 418)
}

mux := http.NewServeMux()
mux.Handle("/_nodes/_local/nodes", http.HandlerFunc(nodesLocalHandler))
mux.Handle("/_cluster/state/master_node", http.HandlerFunc(clusterStateMasterHandler))
mux.Handle("/", http.HandlerFunc(rootHandler))
mux.Handle("/_license", http.HandlerFunc(licenseHandler)) // for 7.0 and above
mux.Handle("/_xpack/license", http.HandlerFunc(licenseHandler)) // for before 7.0
mux.Handle("/_xpack", http.HandlerFunc(xpackHandler))
mux.Handle("/_ccr/stats", http.HandlerFunc(ccrStatsHandler))

return httptest.NewServer(mux)
}

func TestCCRNotAvailable(t *testing.T) {
tests := map[string]struct {
esVersion string
license string
ccrEnabled bool
}{
"old_version": {
"6.4.0",
"platinum",
true,
},
"low_license": {
"7.6.0",
"basic",
true,
},
"feature_unavailable": {
"7.6.0",
"platinum",
false,
},
}

// Disable license caching for these tests
elasticsearch.LicenseCacheEnabled = false
defer func() { elasticsearch.LicenseCacheEnabled = true }()

for name, test := range tests {
t.Run(name, func(t *testing.T) {
server := startESServer(test.esVersion, test.license, test.ccrEnabled)
defer server.Close()

ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL))
events, errs := mbtest.ReportingFetchV2Error(ms)

require.Empty(t, errs)
require.Empty(t, events)
})
}
}

func getConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": elasticsearch.ModuleName,
"metricsets": []string{"ccr"},
"hosts": []string{host},
}
}
34 changes: 32 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,27 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return stackUsage, err
}

type XPack struct {
Features struct {
CCR struct {
Enabled bool `json:"enabled"`
} `json:"CCR"`
} `json:"features"`
}

// GetXPack returns information about xpack features.
func GetXPack(http *helper.HTTP, resetURI string) (XPack, error) {
content, err := fetchPath(http, resetURI, "_xpack", "")

if err != nil {
return XPack{}, err
}

var xpack XPack
err = json.Unmarshal(content, &xpack)
return xpack, err
}

// IsMLockAllEnabled returns if the given Elasticsearch node has mlockall enabled
func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error) {
content, err := fetchPath(http, resetURI, "_nodes/"+nodeID, "filter_path=nodes.*.process.mlockall")
Expand Down Expand Up @@ -437,8 +458,13 @@ func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error)
return settings, nil
}

// Global cache for license information. Assumption is that license information changes infrequently.
var licenseCache = &_licenseCache{}
var (
// Global cache for license information. Assumption is that license information changes infrequently.
licenseCache = &_licenseCache{}

// LicenseCacheEnabled controls whether license caching is enabled or not. Intended for test use.
LicenseCacheEnabled = true
)

type _licenseCache struct {
sync.RWMutex
Expand All @@ -460,6 +486,10 @@ func (c *_licenseCache) get() *License {
}

func (c *_licenseCache) set(license *License, ttl time.Duration) {
if !LicenseCacheEnabled {
return
}

c.Lock()
defer c.Unlock()

Expand Down

0 comments on commit 6373f79

Please sign in to comment.