Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.4](backport #1130) Remove the fleet reporter #1138

Merged
merged 2 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
- Fix data duplication for standalone agent on Kubernetes using the default manifest {issue-beats}31512[31512] {pull}742[742]
- Agent updates will clean up unneeded artifacts. {issue}693[693] {issue}694[694] {pull}752[752]
- Fix a panic caused by a race condition when installing the Elastic Agent. {issues}806[806]
- Remove fleet event reporter and events from checkin body. {issue}993[993]

==== New features

Expand Down
15 changes: 0 additions & 15 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ type agentInfo interface {
AgentID() string
}

type fleetReporter interface {
Events() ([]fleetapi.SerializableEvent, func())
}

type stateStore interface {
Add(fleetapi.Action)
AckToken() string
Expand All @@ -85,7 +81,6 @@ type fleetGateway struct {
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
Expand All @@ -104,7 +99,6 @@ func New(
agentInfo agentInfo,
client client.Sender,
d pipeline.Dispatcher,
r fleetReporter,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
Expand All @@ -120,7 +114,6 @@ func New(
client,
d,
scheduler,
r,
acker,
statusController,
stateStore,
Expand All @@ -136,7 +129,6 @@ func newFleetGatewayWithScheduler(
client client.Sender,
d pipeline.Dispatcher,
scheduler scheduler.Scheduler,
r fleetReporter,
acker store.FleetAcker,
statusController status.Controller,
stateStore stateStore,
Expand All @@ -162,7 +154,6 @@ func newFleetGatewayWithScheduler(
settings.Backoff.Max,
),
done: done,
reporter: r,
acker: acker,
statusReporter: statusController.RegisterComponent("gateway"),
statusController: statusController,
Expand Down Expand Up @@ -323,9 +314,6 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
// get events
ee, ack := f.reporter.Events()

ecsMeta, err := info.Metadata()
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
Expand All @@ -341,7 +329,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Events: ee,
Metadata: ecsMeta,
Status: f.statusController.StatusString(),
}
Expand Down Expand Up @@ -374,8 +361,6 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}
}

// ack events so they are dropped from queue
ack()
return resp, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -29,9 +28,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
repo "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -137,7 +133,7 @@ func (m *mockQueue) Actions() []fleetapi.Action {
return args.Get(0).([]fleetapi.Action)
}

type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend)
type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper)

func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
Expand All @@ -146,8 +142,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
dispatcher := newTestingDispatcher()

log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -167,7 +161,6 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand All @@ -176,7 +169,7 @@ func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGat

require.NoError(t, err)

fn(t, gateway, client, dispatcher, scheduler, rep)
fn(t, gateway, client, dispatcher, scheduler)
}
}

Expand Down Expand Up @@ -214,7 +207,6 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
Expand All @@ -240,7 +232,6 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
Expand Down Expand Up @@ -305,7 +296,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -366,7 +356,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -432,7 +421,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -487,7 +475,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -544,7 +531,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -594,9 +580,7 @@ func TestFleetGateway(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
_ = rep.Report(context.Background(), &testStateEvent{})
waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
Expand All @@ -609,8 +593,6 @@ func TestFleetGateway(t *testing.T) {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
Expand Down Expand Up @@ -657,7 +639,6 @@ func TestFleetGateway(t *testing.T) {
client,
dispatcher,
scheduler,
getReporter(agentInfo, log, t),
noopacker.NewAcker(),
&noopController{},
stateStore,
Expand Down Expand Up @@ -712,8 +693,6 @@ func TestRetriesOnFailures(t *testing.T) {
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -742,7 +721,6 @@ func TestRetriesOnFailures(t *testing.T) {
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
statusController,
stateStore,
Expand All @@ -757,8 +735,6 @@ func TestRetriesOnFailures(t *testing.T) {
err = gateway.Start()
require.NoError(t, err)

_ = rep.Report(context.Background(), &testStateEvent{})

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

Expand All @@ -780,8 +756,6 @@ func TestRetriesOnFailures(t *testing.T) {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
Expand All @@ -807,7 +781,6 @@ func TestRetriesOnFailures(t *testing.T) {
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
Expand All @@ -816,8 +789,6 @@ func TestRetriesOnFailures(t *testing.T) {
err := gateway.Start()
require.NoError(t, err)

_ = rep.Report(context.Background(), &testStateEvent{})

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

Expand All @@ -830,27 +801,8 @@ func TestRetriesOnFailures(t *testing.T) {
}))
}

func getReporter(info agentInfo, log *logger.Logger, t *testing.T) *fleetreporter.Reporter {
fleetR, err := fleetreporter.NewReporter(info, log, fleetreporterConfig.DefaultConfig())
if err != nil {
t.Fatal(errors.Wrap(err, "fail to create reporters"))
}

return fleetR
}

type testAgentInfo struct{}

func (testAgentInfo) AgentID() string { return "agent-secret" }

type testStateEvent struct{}

func (testStateEvent) Type() string { return repo.EventTypeState }
func (testStateEvent) SubType() string { return repo.EventSubTypeInProgress }
func (testStateEvent) Time() time.Time { return time.Unix(0, 1) }
func (testStateEvent) Message() string { return "hello" }
func (testStateEvent) Payload() map[string]interface{} { return map[string]interface{}{"key": 1} }

type request struct {
Events []interface{} `json:"events"`
}
type request struct{}
9 changes: 1 addition & 8 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/queue"
reporting "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
logreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/log"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -124,12 +123,7 @@ func newManaged(
}

logR := logreporter.NewReporter(log)
fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Fleet.Reporting)
if err != nil {
return nil, errors.New(err, "fail to create reporters")
}

combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR)
combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR)
monitor, err := monitoring.NewMonitor(cfg.Settings)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
Expand Down Expand Up @@ -288,7 +282,6 @@ func newManaged(
agentInfo,
client,
actionDispatcher,
fleetR,
actionAcker,
statusCtrl,
stateStore,
Expand Down
19 changes: 8 additions & 11 deletions internal/pkg/agent/configuration/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ package configuration
import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/remote"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
)

// FleetAgentConfig is the internal configuration of the agent after the enrollment is done,
// this configuration is not exposed in anyway in the elastic-agent.yml and is only internal configuration.
type FleetAgentConfig struct {
Enabled bool `config:"enabled" yaml:"enabled"`
AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"`
Client remote.Config `config:",inline" yaml:",inline"`
Reporting *fleetreporterConfig.Config `config:"reporting" yaml:"reporting"`
Info *AgentInfo `config:"agent" yaml:"agent"`
Server *FleetServerConfig `config:"server" yaml:"server,omitempty"`
Enabled bool `config:"enabled" yaml:"enabled"`
AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"`
Client remote.Config `config:",inline" yaml:",inline"`
Info *AgentInfo `config:"agent" yaml:"agent"`
Server *FleetServerConfig `config:"server" yaml:"server,omitempty"`
}

// Valid validates the required fields for accessing the API.
Expand All @@ -44,9 +42,8 @@ func (e *FleetAgentConfig) Valid() error {
// DefaultFleetAgentConfig creates a default configuration for fleet.
func DefaultFleetAgentConfig() *FleetAgentConfig {
return &FleetAgentConfig{
Enabled: false,
Client: remote.DefaultClientConfig(),
Reporting: fleetreporterConfig.DefaultConfig(),
Info: &AgentInfo{},
Enabled: false,
Client: remote.DefaultClientConfig(),
Info: &AgentInfo{},
}
}
7 changes: 3 additions & 4 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ const checkingPath = "/api/fleet/agents/%s/checkin"

// CheckinRequest consists of multiple events reported to fleet ui.
type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []SerializableEvent `json:"events"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
}

// SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin
Expand Down
Loading