diff --git a/metricbeat/module/mongodb/collstats/_meta/data.json b/metricbeat/module/mongodb/collstats/_meta/data.json index 6f8ef320b6a..ff74e5f9d66 100644 --- a/metricbeat/module/mongodb/collstats/_meta/data.json +++ b/metricbeat/module/mongodb/collstats/_meta/data.json @@ -1,77 +1,82 @@ { - "@timestamp": "2016-05-23T08:05:34.853Z", - "beat": { - "hostname": "beathost", - "name": "beathost" - }, - "metricset": { - "host": "localhost", - "module": "mongodb", - "name": "collstats", - "rtt": 44269 - }, - "mongodb": { - "collstats": { - "db": "admin", - "collection": "system.users", - "name": "admin.system.users", - "total": { - "time": { - "us": 54756221 - }, - "count": 3159951 - }, - "lock": { - "read": { - "time": { - "us": 54747284 - }, - "count": 3159944 - }, - "write": { - "time": { - "us": 8937 - }, - "count": 7 + "@timestamp": "2017-10-12T08:05:34.853Z", + "agent": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "event": { + "dataset": "mongodb.collstats", + "duration": 115000, + "module": "mongodb" + }, + "metricset": { + "name": "collstats" + }, + "mongodb": { + "collstats": { + "collection": "startup_log", + "commands": { + "count": 0, + "time": { + "us": 0 + } + }, + "db": "local", + "getmore": { + "count": 0, + "time": { + "us": 0 + } + }, + "insert": { + "count": 0, + "time": { + "us": 0 + } + }, + "lock": { + "read": { + "count": 74, + "time": { + "us": 443 + } + }, + "write": { + "count": 1, + "time": { + "us": 8 + } + } + }, + "name": "local.startup_log", + "queries": { + "count": 0, + "time": { + "us": 0 + } + }, + "remove": { + "count": 0, + "time": { + "us": 0 + } + }, + "total": { + "count": 75, + "time": { + "us": 451 + } + }, + "update": { + "count": 0, + "time": { + "us": 0 + } + } } - }, - "queries": { - "time": { - "us": 2310 - }, - "count": 15 - }, - "getmore": { - "time": { - "us": 0 - }, - "count": 0 - }, - "insert": { - "time": { - "us": 8937 - }, - "count": 7 - }, - "update": { - "time": { - "us": 0 - }, - "count": 0 - }, - "remove": { - "time": { - "us": 0 - }, - "count": 0 - }, - "commands": { - "time": { - "us": 50743698 - }, - "count": 45793 - } + }, + "service": { + "address": "172.26.0.2:27017", + "type": "mongodb" } - }, - "type": "metricsets" -} +} \ No newline at end of file diff --git a/metricbeat/module/mongodb/collstats/collstats.go b/metricbeat/module/mongodb/collstats/collstats.go index d3ed7b2da0a..3f78a35352f 100644 --- a/metricbeat/module/mongodb/collstats/collstats.go +++ b/metricbeat/module/mongodb/collstats/collstats.go @@ -18,7 +18,7 @@ package collstats import ( - "errors" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/metricbeat/module/mongodb" ) -var debugf = logp.MakeDebug("mongodb.collstats") +var logger = logp.NewLogger("mongodb.collstats") func init() { mb.Registry.MustAddMetricSet("mongodb", "collstats", New, @@ -54,17 +54,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ms}, nil } -// Fetch methods implements the data gathering and data conversion to the right format -// It returns the event which is then forward to the output. In case of an error, a -// descriptive error must be returned. -func (m *MetricSet) Fetch() ([]common.MapStr, error) { - // events is the list of events collected from each of the collections. - var events []common.MapStr - +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // instantiate direct connections to each of the configured Mongo hosts mongoSession, err := mongodb.NewDirectSession(m.DialInfo) if err != nil { - return nil, err + logger.Error(err) + reporter.Error(err) + return } defer mongoSession.Close() @@ -72,21 +71,25 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { err = mongoSession.Run("top", &result) if err != nil { - logp.Err("Error retrieving collection totals from Mongo instance") - return events, err + err = errors.Wrap(err, "Error retrieving collection totals from Mongo instance") + logger.Error(err) + reporter.Error(err) + return } if _, ok := result["totals"]; !ok { err = errors.New("Error accessing collection totals in returned data") - logp.Err(err.Error()) - return events, err + logger.Error(err) + reporter.Error(err) + return } totals, ok := result["totals"].(common.MapStr) if !ok { err = errors.New("Collection totals are not a map") - logp.Err(err.Error()) - return events, err + logger.Error(err) + reporter.Error(err) + return } for group, info := range totals { @@ -97,18 +100,21 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { infoMap, ok := info.(common.MapStr) if !ok { err = errors.New("Unexpected data returned by mongodb") - logp.Err(err.Error()) + logger.Error(err) + reporter.Error(err) continue } event, err := eventMapping(group, infoMap) if err != nil { - logp.Err("Mapping of the event data filed") + err = errors.Wrap(err, "Mapping of the event data filed") + logger.Error(err) + reporter.Error(err) continue } - events = append(events, event) + reporter.Event(mb.Event{MetricSetFields: event}) } - return events, nil + return } diff --git a/metricbeat/module/mongodb/collstats/collstats_integration_test.go b/metricbeat/module/mongodb/collstats/collstats_integration_test.go index 0de42bd027f..dc828330f5a 100644 --- a/metricbeat/module/mongodb/collstats/collstats_integration_test.go +++ b/metricbeat/module/mongodb/collstats/collstats_integration_test.go @@ -32,20 +32,22 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "mongodb") - f := mbtest.NewEventsFetcher(t, getConfig()) - events, err := f.Fetch() - if !assert.NoError(t, err) { - t.FailNow() + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } + assert.NotEmpty(t, events) for _, event := range events { t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + metricsetFields := event.MetricSetFields // Check a few event Fields - db := event["db"].(string) + db := metricsetFields["db"].(string) assert.NotEqual(t, db, "") - collection := event["collection"].(string) + collection := metricsetFields["collection"].(string) assert.NotEqual(t, collection, "") } } @@ -53,9 +55,14 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "mongodb") - f := mbtest.NewEventsFetcher(t, getConfig()) - err := mbtest.WriteEvents(f, t) - if err != nil { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } }