From 60ba703499fbafececdcdddb6672a7c60a1747f9 Mon Sep 17 00:00:00 2001 From: Pablo Mercado Date: Thu, 9 May 2019 17:59:23 +0200 Subject: [PATCH] [Metricbeat](Etcd-Leader)Followers wont report leader metrics (#12004) * manage leader metricset so that followers don't report errors nor events * add debug message when skipping leader events from non leader members --- CHANGELOG.next.asciidoc | 1 + .../etcd/_meta/test/leaderstats_empty.json | 0 .../etcd/_meta/test/leaderstats_follower.json | 1 + .../_meta/test/leaderstats_internalerror.json | 1 + metricbeat/module/etcd/leader/leader.go | 70 +++++++++-- metricbeat/module/etcd/leader/leader_test.go | 110 ++++++++++++++---- 6 files changed, 153 insertions(+), 30 deletions(-) create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_empty.json create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_follower.json create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0549f3dd609..32f2e92a6b7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -193,6 +193,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add check on object name in the counter path if the instance name is missing {issue}6528[6528] {pull}11878[11878] - Add AWS cloudwatch metricset. {pull}11798[11798] {issue}11734[11734] - Add `regions` in aws module config to specify target regions for querying cloudwatch metrics. {issue}11932[11932] {pull}11956[11956] +- Keep `etcd` followers members from reporting `leader` metricset events {pull}12004[12004] *Packetbeat* diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_empty.json b/metricbeat/module/etcd/_meta/test/leaderstats_empty.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_follower.json b/metricbeat/module/etcd/_meta/test/leaderstats_follower.json new file mode 100644 index 00000000000..c64ace3c384 --- /dev/null +++ b/metricbeat/module/etcd/_meta/test/leaderstats_follower.json @@ -0,0 +1 @@ +{"message":"not current leader"} \ No newline at end of file diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json b/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json new file mode 100644 index 00000000000..af1c2edca60 --- /dev/null +++ b/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json @@ -0,0 +1 @@ +{"message":"random error message"} \ No newline at end of file diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go index 51c10007515..16a63555410 100644 --- a/metricbeat/module/etcd/leader/leader.go +++ b/metricbeat/module/etcd/leader/leader.go @@ -18,9 +18,16 @@ package leader import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -30,6 +37,12 @@ const ( defaultScheme = "http" defaultPath = "/v2/stats/leader" apiVersion = "2" + + // returned JSON management + msgElement = "message" + msgValueNonLeader = "not current leader" + + logSelector = "etcd.leader" ) var ( @@ -46,11 +59,15 @@ func init() { ) } +// MetricSet for etcd.leader type MetricSet struct { mb.BaseMetricSet - http *helper.HTTP + http *helper.HTTP + logger *logp.Logger + debugEnabled bool } +// New etcd.leader metricset object func New(base mb.BaseMetricSet) (mb.MetricSet, error) { config := struct{}{} if err := base.Module().UnpackConfig(&config); err != nil { @@ -64,6 +81,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ base, http, + logp.NewLogger(logSelector), + logp.IsDebug(logSelector), }, nil } @@ -71,14 +90,49 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { - content, err := m.http.FetchContent() + res, err := m.http.FetchResponse() + if err != nil { + return errors.Wrap(err, "error fetching response") + } + defer res.Body.Close() + + content, err := ioutil.ReadAll(res.Body) if err != nil { - return errors.Wrap(err, "error in http fetch") + return errors.Wrapf(err, "error reading body response") } - reporter.Event(mb.Event{ - MetricSetFields: eventMapping(content), - ModuleFields: common.MapStr{"api_version": apiVersion}, - }) - return nil + if res.StatusCode == http.StatusOK { + reporter.Event(mb.Event{ + MetricSetFields: eventMapping(content), + ModuleFields: common.MapStr{"api_version": apiVersion}, + }) + return nil + } + + // Errors might be reported as {"message":""} + // let's look for that structure + var jsonResponse map[string]interface{} + if err = json.Unmarshal(content, &jsonResponse); err == nil { + if retMessage := jsonResponse[msgElement]; retMessage != "" { + // there is an error message element, let's use it + + // If a 403 is returned and {"message":"not current leader"} + // do not consider this an error + // do not report events since this is not a leader + if res.StatusCode == http.StatusForbidden && + retMessage == msgValueNonLeader { + if m.debugEnabled { + m.logger.Debugf("skipping event for non leader member %q", m.Host()) + } + return nil + } + + return fmt.Errorf("fetching HTTP response returned status code %d: %s", + res.StatusCode, retMessage) + } + } + + // no message in the JSON payload, return standard error + return fmt.Errorf("fetching HTTP response returned status code %d", res.StatusCode) + } diff --git a/metricbeat/module/etcd/leader/leader_test.go b/metricbeat/module/etcd/leader/leader_test.go index 990c0bca43f..31dfa2e718e 100644 --- a/metricbeat/module/etcd/leader/leader_test.go +++ b/metricbeat/module/etcd/leader/leader_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "path/filepath" + "regexp" "github.com/stretchr/testify/assert" @@ -40,29 +41,94 @@ func TestEventMapping(t *testing.T) { } func TestFetchEventContent(t *testing.T) { - absPath, err := filepath.Abs("../_meta/test/") - assert.NoError(t, err) - response, err := ioutil.ReadFile(absPath + "/leaderstats.json") - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) - })) - defer server.Close() - - config := map[string]interface{}{ - "module": "etcd", - "metricsets": []string{"leader"}, - "hosts": []string{server.URL}, - } + const ( + module = "etcd" + metricset = "leader" + mockedFetchLocation = "../_meta/test/" + ) - f := mbtest.NewReportingMetricSetV2Error(t, config) - events, errs := mbtest.ReportingFetchV2Error(f) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) - } - assert.NotEmpty(t, events) + var testcases = []struct { + name string + mockedFetchFile string + httpCode int + + expectedFetchErrorRegexp string + expectedNumEvents int + }{ + { + name: "Leader member stats", + mockedFetchFile: "/leaderstats.json", + httpCode: http.StatusOK, + expectedNumEvents: 1, + }, + { + name: "Follower member", + mockedFetchFile: "/leaderstats_follower.json", + httpCode: http.StatusForbidden, + expectedNumEvents: 0, + }, + { + name: "Simulating credentials issue", + mockedFetchFile: "/leaderstats_empty.json", + httpCode: http.StatusForbidden, + expectedFetchErrorRegexp: "fetching HTTP response returned status code 403", + expectedNumEvents: 0, + }, + { + name: "Simulating failure message", + mockedFetchFile: "/leaderstats_internalerror.json", + httpCode: http.StatusInternalServerError, + expectedFetchErrorRegexp: "fetching HTTP response returned status code 500:.+", + expectedNumEvents: 0, + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + + absPath, err := filepath.Abs(mockedFetchLocation + tc.mockedFetchFile) + assert.NoError(t, err) + + response, err := ioutil.ReadFile(absPath) + assert.NoError(t, err) - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events[0]) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.httpCode) + w.Header().Set("Content-Type", "application/json;") + w.Write([]byte(response)) + })) + defer server.Close() + + config := map[string]interface{}{ + "module": module, + "metricsets": []string{metricset}, + "hosts": []string{server.URL}, + } + + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) + + if tc.expectedFetchErrorRegexp != "" { + for _, err := range errs { + if match, _ := regexp.MatchString(tc.expectedFetchErrorRegexp, err.Error()); match { + // found expected fetch error, no need for further checks + return + } + } + t.Fatalf("Expected fetch error not found:\n Expected:%s\n Got: %+v", + tc.expectedFetchErrorRegexp, + errs) + } + + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + + assert.Equal(t, tc.expectedNumEvents, len(events)) + + for i := range events { + t.Logf("%s/%s event[%d]: %+v", f.Module().Name(), f.Name(), i, events[i]) + } + }) + } }