Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove EventFetcher and EventsFetcher interface #25093

Merged
merged 5 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]
- Remove `NumCPU` as clients should update the CPU count on the fly in case of config changes in a VM. {pull}23154[23154]
- Remove Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}25093[25093]

==== Bugfixes

Expand Down
82 changes: 0 additions & 82 deletions metricbeat/helper/prometheus/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/metricbeat/mb/testing/flags"
Expand All @@ -45,87 +44,6 @@ type TestCases []struct {
ExpectedFile string
}

// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted
// into the expected events when passed by the given metricset.
// If -data flag is passed, the expected JSON file will be updated with the result
func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) {
for _, test := range cases {
t.Logf("Testing %s file\n", test.MetricsFile)

file, err := os.Open(test.MetricsFile)
assert.NoError(t, err, "cannot open test file "+test.MetricsFile)

body, err := ioutil.ReadAll(file)
assert.NoError(t, err, "cannot read test file "+test.MetricsFile)

server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1")
w.Write([]byte(body))
}))

server.Start()
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.Nil(t, err, "Errors while fetching metrics")

if *flags.DataFlag {
sort.SliceStable(events, func(i, j int) bool {
h1, _ := hashstructure.Hash(events[i], nil)
h2, _ := hashstructure.Hash(events[j], nil)
return h1 < h2
})
eventsJSON, _ := json.MarshalIndent(events, "", "\t")
err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644)
assert.NoError(t, err)
}

// Read expected events from reference file
expected, err := ioutil.ReadFile(test.ExpectedFile)
if err != nil {
t.Fatal(err)
}

var expectedEvents []common.MapStr
err = json.Unmarshal(expected, &expectedEvents)
if err != nil {
t.Fatal(err)
}

for _, event := range events {
// ensure the event is in expected list
found := -1
for i, expectedEvent := range expectedEvents {
if event.String() == expectedEvent.String() {
found = i
break
}
}
if found > -1 {
expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...)
} else {
t.Errorf("Event was not expected: %+v", event)
}
}

if len(expectedEvents) > 0 {
t.Error("Some events were missing:")
for _, e := range expectedEvents {
t.Error(e)
}
t.Fatal()
}
}
}

// TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected
// events when passed by the given metricset.
// If -data flag is passed, the expected JSON file will be updated with the result
Expand Down
10 changes: 1 addition & 9 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error {
// of them.
func mustImplementFetcher(ms MetricSet) error {
var ifcs []string
if _, ok := ms.(EventFetcher); ok {
ifcs = append(ifcs, "EventFetcher")
}

if _, ok := ms.(EventsFetcher); ok {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}
Expand Down Expand Up @@ -256,7 +248,7 @@ func mustImplementFetcher(ms MetricSet) error {
switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"producing interface ("+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+
"PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)",
ms.Module().Name(), ms.Name())
Expand Down
17 changes: 1 addition & 16 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func (m *BaseModule) WithConfig(config common.Config) (*BaseModule, error) {
// MetricSet interfaces

// MetricSet is the common interface for all MetricSet implementations. In
// addition to this interface, all MetricSets must implement either
// EventFetcher or EventsFetcher (but not both).
// addition to this interface, all MetricSets must implement a fetcher interface.
type MetricSet interface {
ID() string // Unique ID identifying a running MetricSet.
Name() string // Name returns the name of the MetricSet.
Expand All @@ -154,20 +153,6 @@ type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
Expand Down
40 changes: 5 additions & 35 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

// Reporting V2 MetricSet

type testModule struct {
BaseModule
hostParser func(string) (HostData, error)
Expand All @@ -39,25 +41,11 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher

type testMetricSet struct {
BaseMetricSet
}

func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
}
func (m *testMetricSet) Fetch(reporter ReporterV2) {}

// ReportingFetcher

Expand Down Expand Up @@ -259,25 +247,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
name := "ReportingMetricSetV2"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}
Expand All @@ -287,7 +257,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventsFetcher)
_, ok := ms.(ReportingMetricSetV2)
assert.True(t, ok, name+" not implemented")
})

Expand Down
10 changes: 5 additions & 5 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExampleWrapper() {
// Build a configuration object.
config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
fmt.Println("Error:", err)
Expand Down Expand Up @@ -91,17 +91,17 @@ func ExampleWrapper() {
// },
// "@timestamp": "2016-05-10T23:27:58.485Z",
// "event": {
// "dataset": "fake.eventfetcher",
// "dataset": "fake.reportingfetcher",
// "duration": 111,
// "module": "fake"
// },
// "fake": {
// "eventfetcher": {
// "reportingfetcher": {
// "metric": 1
// }
// },
// "metricset": {
// "name": "eventfetcher",
// "name": "reportingfetcher",
// "period": 10000
// },
// "service": {
Expand All @@ -120,7 +120,7 @@ func ExampleRunner() {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRunner(t *testing.T) {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
t.Fatal(err)
Expand Down
25 changes: 1 addition & 24 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V2())
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
msw.startPeriodicFetching(&channelContext{done}, reporter)
default:
// Earlier startup stages prevent this from happening.
Expand Down Expand Up @@ -241,10 +240,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
case mb.EventsFetcher:
msw.multiEventFetch(fetcher, reporter)
case mb.ReportingMetricSet:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V1())
Expand All @@ -270,24 +265,6 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
}
}

func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) {
reporter.StartFetchTimer()
event, err := fetcher.Fetch()
reporter.V1().ErrorWith(err, event)
}

func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) {
reporter.StartFetchTimer()
events, err := fetcher.Fetch()
if len(events) == 0 {
reporter.V1().ErrorWith(err, nil)
} else {
for _, event := range events {
reporter.V1().ErrorWith(err, event)
}
}
}

// close closes the underlying MetricSet if it implements the mb.Closer
// interface.
func (msw *metricSetWrapper) close() error {
Expand Down
Loading