From 6e2e06c06743fde7cb4fbb06934c720d59f9959a Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Fri, 9 Sep 2022 06:42:28 -0700 Subject: [PATCH] Remove the fleet reporter (#1130) * Remove the fleet reporter Remove the fleet-reporter so that checkins no longer deliver the event list. * add CHANGELOG fix tests --- CHANGELOG.next.asciidoc | 1 + .../gateway/fleet/fleet_gateway.go | 15 -- .../gateway/fleet/fleet_gateway_test.go | 54 +--- .../pkg/agent/application/managed_mode.go | 9 +- internal/pkg/agent/configuration/fleet.go | 19 +- internal/pkg/fleetapi/checkin_cmd.go | 7 +- internal/pkg/reporter/fleet/config/config.go | 19 -- internal/pkg/reporter/fleet/reporter.go | 175 ------------- internal/pkg/reporter/fleet/reporter_test.go | 241 ------------------ 9 files changed, 16 insertions(+), 524 deletions(-) delete mode 100644 internal/pkg/reporter/fleet/config/config.go delete mode 100644 internal/pkg/reporter/fleet/reporter.go delete mode 100644 internal/pkg/reporter/fleet/reporter_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f7793f7b3a9..48109ba3c0d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -114,6 +114,7 @@ - Agent updates will clean up unneeded artifacts. {issue}693[693] {issue}694[694] {pull}752[752] - Use the Elastic Agent configuration directory as the root of the `inputs.d` folder. {issues}663[663] - Fix a panic caused by a race condition when installing the Elastic Agent. {issues}806[806] +- Remove fleet event reporter and events from checkin body. {issue}993[993] ==== New features diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index f5c02d3356a..3cce7073e3a 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -56,10 +56,6 @@ type agentInfo interface { AgentID() string } -type fleetReporter interface { - Events() ([]fleetapi.SerializableEvent, func()) -} - type stateStore interface { Add(fleetapi.Action) AckToken() string @@ -85,7 +81,6 @@ type fleetGateway struct { backoff backoff.Backoff settings *fleetGatewaySettings agentInfo agentInfo - reporter fleetReporter done chan struct{} wg sync.WaitGroup acker store.FleetAcker @@ -104,7 +99,6 @@ func New( agentInfo agentInfo, client client.Sender, d pipeline.Dispatcher, - r fleetReporter, acker store.FleetAcker, statusController status.Controller, stateStore stateStore, @@ -120,7 +114,6 @@ func New( client, d, scheduler, - r, acker, statusController, stateStore, @@ -136,7 +129,6 @@ func newFleetGatewayWithScheduler( client client.Sender, d pipeline.Dispatcher, scheduler scheduler.Scheduler, - r fleetReporter, acker store.FleetAcker, statusController status.Controller, stateStore stateStore, @@ -162,7 +154,6 @@ func newFleetGatewayWithScheduler( settings.Backoff.Max, ), done: done, - reporter: r, acker: acker, statusReporter: statusController.RegisterComponent("gateway"), statusController: statusController, @@ -323,9 +314,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { } func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) { - // get events - ee, ack := f.reporter.Events() - ecsMeta, err := info.Metadata() if err != nil { f.log.Error(errors.New("failed to load metadata", err)) @@ -341,7 +329,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client) req := &fleetapi.CheckinRequest{ AckToken: ackToken, - Events: ee, Metadata: ecsMeta, Status: f.statusController.StatusString(), } @@ -374,8 +361,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, } } - // ack events so they are dropped from queue - ack() return resp, nil } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 6ce62448276..b02507e61ad 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/pkg/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -29,9 +28,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/core/state" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop" - repo "github.com/elastic/elastic-agent/internal/pkg/reporter" - fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet" - fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config" "github.com/elastic/elastic-agent/internal/pkg/scheduler" "github.com/elastic/elastic-agent/internal/pkg/testutils" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -137,7 +133,7 @@ func (m *mockQueue) Actions() []fleetapi.Action { return args.Get(0).([]fleetapi.Action) } -type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend) +type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper) func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) { return func(t *testing.T) { @@ -146,8 +142,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat dispatcher := newTestingDispatcher() log, _ := logger.New("fleet_gateway", false) - rep := getReporter(agentInfo, log, t) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -167,7 +161,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat client, dispatcher, scheduler, - rep, noopacker.NewAcker(), &noopController{}, stateStore, @@ -176,7 +169,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat require.NoError(t, err) - fn(t, gateway, client, dispatcher, scheduler, rep) + fn(t, gateway, client, dispatcher, scheduler) } } @@ -214,7 +207,6 @@ func TestFleetGateway(t *testing.T) { client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, - rep repo.Backend, ) { waitFn := ackSeq( client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { @@ -240,7 +232,6 @@ func TestFleetGateway(t *testing.T) { client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, - rep repo.Backend, ) { waitFn := ackSeq( client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { @@ -305,7 +296,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -366,7 +356,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -432,7 +421,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -487,7 +475,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -544,7 +531,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -594,9 +580,7 @@ func TestFleetGateway(t *testing.T) { client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, - rep repo.Backend, ) { - _ = rep.Report(context.Background(), &testStateEvent{}) waitFn := ackSeq( client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { cr := &request{} @@ -609,8 +593,6 @@ func TestFleetGateway(t *testing.T) { t.Fatal(err) } - require.Equal(t, 1, len(cr.Events)) - resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) return resp, nil }), @@ -657,7 +639,6 @@ func TestFleetGateway(t *testing.T) { client, dispatcher, scheduler, - getReporter(agentInfo, log, t), noopacker.NewAcker(), &noopController{}, stateStore, @@ -712,8 +693,6 @@ func TestRetriesOnFailures(t *testing.T) { client := newTestingClient() dispatcher := newTestingDispatcher() log, _ := logger.New("fleet_gateway", false) - rep := getReporter(agentInfo, log, t) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -742,7 +721,6 @@ func TestRetriesOnFailures(t *testing.T) { client, dispatcher, scheduler, - rep, noopacker.NewAcker(), statusController, stateStore, @@ -757,8 +735,6 @@ func TestRetriesOnFailures(t *testing.T) { err = gateway.Start() require.NoError(t, err) - _ = rep.Report(context.Background(), &testStateEvent{}) - // Initial tick is done out of bound so we can block on channels. scheduler.Next() @@ -780,8 +756,6 @@ func TestRetriesOnFailures(t *testing.T) { t.Fatal(err) } - require.Equal(t, 1, len(cr.Events)) - resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) return resp, nil }), @@ -807,7 +781,6 @@ func TestRetriesOnFailures(t *testing.T) { client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, - rep repo.Backend, ) { fail := func(_ http.Header, _ io.Reader) (*http.Response, error) { return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil @@ -816,8 +789,6 @@ func TestRetriesOnFailures(t *testing.T) { err := gateway.Start() require.NoError(t, err) - _ = rep.Report(context.Background(), &testStateEvent{}) - // Initial tick is done out of bound so we can block on channels. scheduler.Next() @@ -830,27 +801,8 @@ func TestRetriesOnFailures(t *testing.T) { })) } -func getReporter(info agentInfo, log *logger.Logger, t *testing.T) *fleetreporter.Reporter { - fleetR, err := fleetreporter.NewReporter(info, log, fleetreporterConfig.DefaultConfig()) - if err != nil { - t.Fatal(errors.Wrap(err, "fail to create reporters")) - } - - return fleetR -} - type testAgentInfo struct{} func (testAgentInfo) AgentID() string { return "agent-secret" } -type testStateEvent struct{} - -func (testStateEvent) Type() string { return repo.EventTypeState } -func (testStateEvent) SubType() string { return repo.EventSubTypeInProgress } -func (testStateEvent) Time() time.Time { return time.Unix(0, 1) } -func (testStateEvent) Message() string { return "hello" } -func (testStateEvent) Payload() map[string]interface{} { return map[string]interface{}{"key": 1} } - -type request struct { - Events []interface{} `json:"events"` -} +type request struct{} diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 08c43aeeca3..037cf74ad5c 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -44,7 +44,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/queue" reporting "github.com/elastic/elastic-agent/internal/pkg/reporter" - fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet" logreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/log" "github.com/elastic/elastic-agent/internal/pkg/sorted" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -124,12 +123,7 @@ func newManaged( } logR := logreporter.NewReporter(log) - fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Fleet.Reporting) - if err != nil { - return nil, errors.New(err, "fail to create reporters") - } - - combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR) + combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR) monitor, err := monitoring.NewMonitor(cfg.Settings) if err != nil { return nil, errors.New(err, "failed to initialize monitoring") @@ -288,7 +282,6 @@ func newManaged( agentInfo, client, actionDispatcher, - fleetR, actionAcker, statusCtrl, stateStore, diff --git a/internal/pkg/agent/configuration/fleet.go b/internal/pkg/agent/configuration/fleet.go index 5bc9c115a63..0ae59c8f4e8 100644 --- a/internal/pkg/agent/configuration/fleet.go +++ b/internal/pkg/agent/configuration/fleet.go @@ -7,18 +7,16 @@ package configuration import ( "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/remote" - fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config" ) // FleetAgentConfig is the internal configuration of the agent after the enrollment is done, // this configuration is not exposed in anyway in the elastic-agent.yml and is only internal configuration. type FleetAgentConfig struct { - Enabled bool `config:"enabled" yaml:"enabled"` - AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"` - Client remote.Config `config:",inline" yaml:",inline"` - Reporting *fleetreporterConfig.Config `config:"reporting" yaml:"reporting"` - Info *AgentInfo `config:"agent" yaml:"agent"` - Server *FleetServerConfig `config:"server" yaml:"server,omitempty"` + Enabled bool `config:"enabled" yaml:"enabled"` + AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"` + Client remote.Config `config:",inline" yaml:",inline"` + Info *AgentInfo `config:"agent" yaml:"agent"` + Server *FleetServerConfig `config:"server" yaml:"server,omitempty"` } // Valid validates the required fields for accessing the API. @@ -44,9 +42,8 @@ func (e *FleetAgentConfig) Valid() error { // DefaultFleetAgentConfig creates a default configuration for fleet. func DefaultFleetAgentConfig() *FleetAgentConfig { return &FleetAgentConfig{ - Enabled: false, - Client: remote.DefaultClientConfig(), - Reporting: fleetreporterConfig.DefaultConfig(), - Info: &AgentInfo{}, + Enabled: false, + Client: remote.DefaultClientConfig(), + Info: &AgentInfo{}, } } diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 47a76ea47e7..e225aababb9 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -22,10 +22,9 @@ const checkingPath = "/api/fleet/agents/%s/checkin" // CheckinRequest consists of multiple events reported to fleet ui. type CheckinRequest struct { - Status string `json:"status"` - AckToken string `json:"ack_token,omitempty"` - Events []SerializableEvent `json:"events"` - Metadata *info.ECSMeta `json:"local_metadata,omitempty"` + Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` + Metadata *info.ECSMeta `json:"local_metadata,omitempty"` } // SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin diff --git a/internal/pkg/reporter/fleet/config/config.go b/internal/pkg/reporter/fleet/config/config.go deleted file mode 100644 index 1e42b956ee8..00000000000 --- a/internal/pkg/reporter/fleet/config/config.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package config - -// Config is a configuration describing fleet connected parts -type Config struct { - Threshold int `yaml:"threshold" config:"threshold" validate:"min=1"` - ReportingCheckFrequency int `yaml:"check_frequency_sec" config:"check_frequency_sec" validate:"min=1"` -} - -// DefaultConfig initiates FleetManagementConfig with default values -func DefaultConfig() *Config { - return &Config{ - Threshold: 10000, - ReportingCheckFrequency: 30, - } -} diff --git a/internal/pkg/reporter/fleet/reporter.go b/internal/pkg/reporter/fleet/reporter.go deleted file mode 100644 index edf5008bc01..00000000000 --- a/internal/pkg/reporter/fleet/reporter.go +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package fleet - -import ( - "context" - "sync" - "time" - - "github.com/elastic/elastic-agent/internal/pkg/fleetapi" - "github.com/elastic/elastic-agent/internal/pkg/reporter" - "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -type event struct { - AgentID string `json:"agent_id"` - EventType string `json:"type"` - TS fleetapi.Time `json:"timestamp"` - SubType string `json:"subtype"` - Msg string `json:"message"` - Payload map[string]interface{} `json:"payload,omitempty"` -} - -func (e *event) Type() string { - return e.EventType -} - -func (e *event) Timestamp() time.Time { - return time.Time(e.TS) -} - -func (e *event) Message() string { - return e.Msg -} - -// Reporter is a reporter without any effects, serves just as a showcase for further implementations. -type Reporter struct { - lastAck time.Time - info agentInfo - logger *logger.Logger - queue []fleetapi.SerializableEvent - threshold int - qlock sync.Mutex -} - -type agentInfo interface { - AgentID() string -} - -// NewReporter creates a new fleet reporter. -func NewReporter(agentInfo agentInfo, l *logger.Logger, c *config.Config) (*Reporter, error) { - r := &Reporter{ - info: agentInfo, - queue: make([]fleetapi.SerializableEvent, 0), - logger: l, - threshold: c.Threshold, - } - - return r, nil -} - -// Report enqueue event into reporter queue. -func (r *Reporter) Report(ctx context.Context, e reporter.Event) error { - r.qlock.Lock() - defer r.qlock.Unlock() - - r.queue = append(r.queue, &event{ - AgentID: r.info.AgentID(), - EventType: e.Type(), - TS: fleetapi.Time(e.Time()), - SubType: e.SubType(), - Msg: e.Message(), - Payload: e.Payload(), - }) - - if r.threshold > 0 && len(r.queue) > r.threshold { - // drop some low importance event if needed - r.dropEvent() - } - - return nil -} - -// Events returns a list of event from a queue and a ack function -// which clears those events once caller is done with processing. -func (r *Reporter) Events() ([]fleetapi.SerializableEvent, func()) { - r.qlock.Lock() - defer r.qlock.Unlock() - - cp := r.queueCopy() - - ackFn := func() { - // as time is monotonic and this is on single machine this should be ok. - r.clear(cp, time.Now()) - } - - return cp, ackFn -} - -func (r *Reporter) clear(items []fleetapi.SerializableEvent, ackTime time.Time) { - r.qlock.Lock() - defer r.qlock.Unlock() - - if ackTime.Sub(r.lastAck) <= 0 || - len(r.queue) == 0 || - items == nil || - len(items) == 0 { - return - } - - var dropIdx int - r.lastAck = ackTime - itemsLen := len(items) - -OUTER: - for idx := itemsLen - 1; idx >= 0; idx-- { - for i, v := range r.queue { - if v == items[idx] { - dropIdx = i - break OUTER - } - } - } - - r.queue = r.queue[dropIdx+1:] -} - -// Close stops all the background jobs reporter is running. -// Guards against panic of closing channel multiple times. -func (r *Reporter) Close() error { - return nil -} - -func (r *Reporter) queueCopy() []fleetapi.SerializableEvent { - size := len(r.queue) - batch := make([]fleetapi.SerializableEvent, size) - - copy(batch, r.queue) - return batch -} - -func (r *Reporter) dropEvent() { - if dropped := r.tryDropInfo(); !dropped { - r.dropFirst() - } -} - -// tryDropInfo returns true if info was found and dropped. -func (r *Reporter) tryDropInfo() bool { - for i, e := range r.queue { - if e.Type() != reporter.EventTypeError { - r.queue = append(r.queue[:i], r.queue[i+1:]...) - r.logger.Infof("fleet reporter dropped event because threshold[%d] was reached: %v", r.threshold, e) - return true - } - } - - return false -} - -func (r *Reporter) dropFirst() { - if len(r.queue) == 0 { - return - } - - first := r.queue[0] - r.logger.Infof("fleet reporter dropped event because threshold[%d] was reached: %v", r.threshold, first) - r.queue = r.queue[1:] -} - -// Check it is reporter.Backend. -var _ reporter.Backend = &Reporter{} diff --git a/internal/pkg/reporter/fleet/reporter_test.go b/internal/pkg/reporter/fleet/reporter_test.go deleted file mode 100644 index c5160168a98..00000000000 --- a/internal/pkg/reporter/fleet/reporter_test.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package fleet - -import ( - "context" - "testing" - "time" - - "github.com/elastic/elastic-agent/internal/pkg/fleetapi" - "github.com/elastic/elastic-agent/internal/pkg/reporter" - "github.com/elastic/elastic-agent/pkg/core/logger" -) - -func TestEventsHaveAgentID(t *testing.T) { - // setup client - threshold := 10 - r := newTestReporter(1*time.Second, threshold) - - // report events - firstBatchSize := 5 - ee := getEvents(firstBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check after delay for output - reportedEvents, _ := r.Events() - if reportedCount := len(reportedEvents); reportedCount != firstBatchSize { - t.Fatalf("expected %v events got %v", firstBatchSize, reportedCount) - } - - for _, e := range reportedEvents { - re, ok := e.(*event) - - if !ok { - t.Fatal("reported event is not an event") - } - - if re.AgentID != "agentID" { - t.Fatalf("reported event id incorrect, expected: 'agentID', got: '%v'", re.AgentID) - } - } - -} - -func TestReporting(t *testing.T) { - // setup client - threshold := 10 - r := newTestReporter(1*time.Second, threshold) - - // report events - firstBatchSize := 5 - ee := getEvents(firstBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check after delay for output - reportedEvents, ack := r.Events() - if reportedCount := len(reportedEvents); reportedCount != firstBatchSize { - t.Fatalf("expected %v events got %v", firstBatchSize, reportedCount) - } - - // reset reported events - ack() - - // report events > threshold - secondBatchSize := threshold + 1 - ee = getEvents(secondBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check events are dropped - reportedEvents, _ = r.Events() - if reportedCount := len(reportedEvents); reportedCount != threshold { - t.Fatalf("expected %v events got %v", secondBatchSize, reportedCount) - } -} - -func TestInfoDrop(t *testing.T) { - // setup client - threshold := 2 - r := newTestReporter(2*time.Second, threshold) - - // report 1 info and 1 error - ee := []reporter.Event{testStateEvent{}, testErrorEvent{}, testErrorEvent{}} - - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check after delay for output - reportedEvents, _ := r.Events() - if reportedCount := len(reportedEvents); reportedCount != 2 { - t.Fatalf("expected %v events got %v", 2, reportedCount) - } - - // check both are errors - if reportedEvents[0].Type() != reportedEvents[1].Type() || reportedEvents[0].Type() != reporter.EventTypeError { - t.Fatalf("expected ERROR events got [1]: '%v', [2]: '%v'", reportedEvents[0].Type(), reportedEvents[1].Type()) - } -} - -func TestOutOfOrderAck(t *testing.T) { - // setup client - threshold := 100 - r := newTestReporter(1*time.Second, threshold) - - // report events - firstBatchSize := 5 - ee := getEvents(firstBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check after delay for output - reportedEvents1, ack1 := r.Events() - if reportedCount := len(reportedEvents1); reportedCount != firstBatchSize { - t.Fatalf("expected %v events got %v", firstBatchSize, reportedCount) - } - - // report events > threshold - secondBatchSize := threshold + 1 - ee = getEvents(secondBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check all events are returned - reportedEvents2, ack2 := r.Events() - if reportedCount := len(reportedEvents2); reportedCount == firstBatchSize+secondBatchSize { - t.Fatalf("expected %v events got %v", secondBatchSize, reportedCount) - } - - // ack second batch - ack2() - - reportedEvents, _ := r.Events() - if reportedCount := len(reportedEvents); reportedCount != 0 { - t.Fatalf("expected all events are removed after second batch ack, got %v events", reportedCount) - } - - defer func() { - r := recover() - if r != nil { - t.Fatalf("expected ack is ignored but it paniced: %v", r) - } - }() - - ack1() - reportedEvents, _ = r.Events() - if reportedCount := len(reportedEvents); reportedCount != 0 { - t.Fatalf("expected all events are still removed after first batch ack, got %v events", reportedCount) - } -} - -func TestAfterDrop(t *testing.T) { - // setup client - threshold := 7 - r := newTestReporter(1*time.Second, threshold) - - // report events - firstBatchSize := 5 - ee := getEvents(firstBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check after delay for output - reportedEvents1, ack1 := r.Events() - if reportedCount := len(reportedEvents1); reportedCount != firstBatchSize { - t.Fatalf("expected %v events got %v", firstBatchSize, reportedCount) - } - - // report events > threshold - secondBatchSize := 5 - ee = getEvents(secondBatchSize) - for _, e := range ee { - r.Report(context.Background(), e) - } - - // check all events are returned - reportedEvents2, _ := r.Events() - if reportedCount := len(reportedEvents2); reportedCount != threshold { - t.Fatalf("expected %v events got %v", secondBatchSize, reportedCount) - } - - // remove first batch from queue - ack1() - - reportedEvents, _ := r.Events() - if reportedCount := len(reportedEvents); reportedCount != secondBatchSize { - t.Fatalf("expected all events from first batch are removed, got %v events", reportedCount) - } - -} - -func getEvents(count int) []reporter.Event { - ee := make([]reporter.Event, 0, count) - for i := 0; i < count; i++ { - ee = append(ee, testStateEvent{}) - } - - return ee -} - -func newTestReporter(frequency time.Duration, threshold int) *Reporter { - log, _ := logger.New("", false) - r := &Reporter{ - info: &testInfo{}, - queue: make([]fleetapi.SerializableEvent, 0), - logger: log, - threshold: threshold, - } - - return r -} - -type testInfo struct{} - -func (*testInfo) AgentID() string { return "agentID" } - -type testStateEvent struct{} - -func (testStateEvent) Type() string { return reporter.EventTypeState } -func (testStateEvent) SubType() string { return reporter.EventSubTypeInProgress } -func (testStateEvent) Time() time.Time { return time.Unix(0, 1) } -func (testStateEvent) Message() string { return "hello" } -func (testStateEvent) Payload() map[string]interface{} { return map[string]interface{}{"key": 1} } - -type testErrorEvent struct{} - -func (testErrorEvent) Type() string { return reporter.EventTypeError } -func (testErrorEvent) SubType() string { return "PATH" } -func (testErrorEvent) Time() time.Time { return time.Unix(0, 1) } -func (testErrorEvent) Message() string { return "hello" } -func (testErrorEvent) Payload() map[string]interface{} { return map[string]interface{}{"key": 1} }