Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][filestream] Enable status reporter for filestream input #40121

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable StatusReporter for filestream input. {pull}40121[40121]
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved

*Heartbeat*

Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/parser"
Expand Down Expand Up @@ -163,7 +164,11 @@ func (inp *filestream) Run(
})
defer streamCancel()

return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics)
if err := inp.readFromSource(ctx, log, r, fs.newPath, state, publisher, metrics); err != nil {
ctx.UpdateStatus(status.Degraded, err.Error())
return err
}
return nil
}

func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ func startHarvester(
if errors.Is(err, ErrHarvesterAlreadyRunning) {
return nil
}

ctx.UpdateStatus(status.Degraded, err.Error())
return fmt.Errorf("error while adding new reader to the bookkeeper %w", err)
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -214,6 +215,7 @@ func startHarvester(
resource, err := lock(ctx, hg.store, srcID)
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, err.Error())
return fmt.Errorf("error while locking resource: %w", err)
}
defer releaseResource(resource)
Expand All @@ -223,6 +225,7 @@ func startHarvester(
})
if err != nil {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, err.Error())
return fmt.Errorf("error while connecting to output with pipeline: %w", err)
}
defer client.Close()
Expand All @@ -234,6 +237,7 @@ func startHarvester(
err = hg.harvester.Run(ctx, src, cursor, publisher, metrics)
if err != nil && !errors.Is(err, context.Canceled) {
hg.readers.remove(srcID)
ctx.UpdateStatus(status.Degraded, err.Error())
return fmt.Errorf("error while running harvester: %w", err)
}
// If the context was not cancelled it means that the Harvester is stopping because of
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/ctxtool"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
ctx.UpdateStatus(status.Starting, "")
groupStore := inp.manager.getRetainedStore()
defer groupStore.Release()

Expand Down Expand Up @@ -85,6 +87,10 @@ func (inp *managedInput) Run(
defer prospectorStore.Release()
sourceStore := newSourceStore(prospectorStore, inp.sourceIdentifier)

// Mark it as running for now.
// Any errors encountered by harverter will change state to Degraded
ctx.UpdateStatus(status.Running, "")

inp.prospector.Run(ctx, sourceStore, hg)

// Notify the manager the input has stopped, currently that is used to
Expand Down
Loading