diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a9b6439adf2..29625d0a376 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] +- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075] - Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121] - Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163] - Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258] diff --git a/filebeat/input/input.go b/filebeat/input/input.go index d52d63e0f85..2d8faa0b08f 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -48,12 +49,13 @@ type Input interface { // Runner encapsulate the lifecycle of the input type Runner struct { - config inputConfig - input Input - done chan struct{} - wg *sync.WaitGroup - Once bool - beatDone chan struct{} + config inputConfig + input Input + done chan struct{} + wg *sync.WaitGroup + Once bool + beatDone chan struct{} + statusReporter status.StatusReporter } // New instantiates a new Runner @@ -83,10 +85,11 @@ func New( } context := Context{ - States: states, - Done: input.done, - BeatDone: input.beatDone, - Meta: nil, + States: states, + Done: input.done, + BeatDone: input.beatDone, + Meta: nil, + GetStatusReporter: input.GetStatusReporter, } var ipt Input ipt, err = f(conf, connector, context) @@ -164,3 +167,11 @@ func (p *Runner) stop() { func (p *Runner) String() string { return fmt.Sprintf("input [type=%s]", p.config.Type) } + +func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) { + p.statusReporter = statusReporter +} + +func (p *Runner) GetStatusReporter() status.StatusReporter { + return p.statusReporter +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 4882735d36b..40c42ddeebd 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -78,6 +79,7 @@ type Input struct { meta map[string]string stopOnce sync.Once fileStateIdentifier file.StateIdentifier + getStatusReporter input.GetStatusReporter } // NewInput instantiates a new Log @@ -157,8 +159,11 @@ func NewInput( done: context.Done, meta: meta, fileStateIdentifier: identifier, + getStatusReporter: context.GetStatusReporter, } + p.updateStatus(status.Starting, "starting the log input") + // Create empty harvester to check if configs are fine // TODO: Do config validation instead _, err = p.createHarvester(logger, file.State{}, nil) @@ -224,6 +229,9 @@ func (p *Input) loadStates(states []file.State) error { // Run runs the input func (p *Input) Run() { + // Mark it Running for now. + // Any errors encountered in this loop will change state to degraded + p.updateStatus(status.Running, "") logger := p.logger logger.Debug("Start next scan") @@ -558,6 +566,7 @@ func (p *Input) scan() { continue } if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err)) logger.Errorf(harvesterErrMsg, newState.Source, err) } } else { @@ -583,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) err := p.startHarvester(logger, newState, oldState.Offset) if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)) logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } return @@ -593,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size()) err := p.startHarvester(logger, newState, 0) if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)) logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) } @@ -833,3 +844,12 @@ func (p *Input) stopWhenDone() { p.Wait() } + +func (p *Input) updateStatus(status status.Status, msg string) { + if p.getStatusReporter == nil { + return + } + if reporter := p.getStatusReporter(); reporter != nil { + reporter.UpdateStatus(status, msg) + } +} diff --git a/filebeat/input/registry.go b/filebeat/input/registry.go index 829cf16866c..18e35fa2274 100644 --- a/filebeat/input/registry.go +++ b/filebeat/input/registry.go @@ -22,15 +22,19 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) +type GetStatusReporter func() status.StatusReporter + type Context struct { - States []file.State - Done chan struct{} - BeatDone chan struct{} - Meta map[string]string + States []file.State + Done chan struct{} + BeatDone chan struct{} + Meta map[string]string + GetStatusReporter GetStatusReporter } // Factory is used to register functions creating new Input instances. diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go new file mode 100644 index 00000000000..f9e31703038 --- /dev/null +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -0,0 +1,205 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestLogStatusReporter(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.Filebeat()) + tmpDir := t.TempDir() + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(tmpDir, filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + + /* + * valid input stream, shouldn't raise any error. + */ + inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*.log"), 2) + require.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test.log"), []byte("Line1\nLine2\nLine3\n"), 0777)) + /* + * try to open an irregular file. + * This should throw "Tried to open non regular file:" and status to degraded + */ + nullDeviceFile := "/dev/null" + if runtime.GOOS == "windows" { + nullDeviceFile = "NUL" + } + inputStreamIrregular := getInputStream(unitOneID, nullDeviceFile, 1) + + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Type: "file", + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + } + + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + // V2 mock server + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + require.NoError(t, server.Start()) + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.Filebeat().Execute() + require.NoError(t, err) + }() + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIrregular, + }, + { + proto.State_DEGRADED, + &inputStream, + }, + { + proto.State_HEALTHY, + &inputStream, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStream, + }, + } + + timer := time.NewTimer(2 * time.Minute) + id := 0 + for id < len(scenarios) { + select { + case observed := <-observedStates: + state := extractState(observed.GetUnits(), unitOneID) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { + continue + } + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + + timer.Reset(2 * time.Minute) + id++ + case <-timer.C: + t.Fatal("timeout waiting for checkin") + default: + } + } + require.Eventually(t, func() bool { + events := tests.ReadLogLines(t, outPath) + return events > 0 // wait until we see one output event + }, 15*time.Second, 1*time.Second) +} + +func extractState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} + +func getInputStream(id string, path string, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Streams: []*proto.Stream{{ + Id: "filebeat/log-default-system", + Source: tests.RequireNewStruct(map[string]interface{}{ + "enabled": true, + "symlinks": true, + "type": "log", + "paths": []interface{}{path}, + "scan_frequency": "500ms", + }), + }}, + Type: "log", + Id: "log-input-test", + Name: "log-1", + Revision: 1, + }, + } +}