-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start to split filebeat/channel up #17655
Conversation
Pinging @elastic/integrations-services (Team:Services) |
filebeat/beater/filebeat.go
Outdated
@@ -235,8 +237,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { | |||
return err | |||
} | |||
|
|||
fb.pipeline = pipetool.WithPipelineEventCounter(b.Publisher, wgEvents) | |||
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When doing this change I noticed wgEvents was propably not hooked up correctly. This PR might fix an issue or flaky tests with shutdown timeout not working correctly.
@@ -62,7 +62,7 @@ func newCrawler( | |||
|
|||
// Start starts the crawler with all inputs | |||
func (c *crawler) Start( | |||
pipeline beat.Pipeline, | |||
pipeline beat.PipelineConnector, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PipelineConnector is the 'smaller' interface. I hope to merge Pipeline and PipelineConnector interfaces in the future, such that Pipeline will not have any extra methods anymore.
filebeat/channel/runner.go
Outdated
|
||
type onCreateFactory struct { | ||
factory cfgfile.RunnerFactory | ||
edit onCreateWrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: call this create
instead of edit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Jenkins, run tests |
jenkins run the tests please |
The Factory in filebeat/channel package wraps the publisher pipeline, in order to add some shutdown propagation support, but also to modify the beat.ClientConfig when an input is configured. The channel.Connector reads and applies common settings for all inputs, configures Guaranteed sending, and hooks up a global counter with each beat.Client, in order to track live events. This change reduces the responsibilities of the factory, by splitting them up. A package libbeat/publisher/pipetool is introduced to provide helper functions for modifying a beat.Pipeline before and after Connect. This centers support for more functionality on the beat.PipelineConnector and beat.Client interfaces, instead of incompatible types wrapping the pipeline. The Factory in filebeat/channel will not modify the input configuration or the beat.Client anymore. It is only required to add shutdown signaling to the pipeline (which we will remove in the future as well). The filebeat/channel provides a helper function for decorating a cfgfile.RunnerFactory, such that common configurations are still applied as before. Again, by centering around a small set of common interfaces, reduce the effort to integrate future input refactorings. All in all, the filebeat/channel factory still provides the same functionality as before (for now), but splits the different functionalities into 3 separate interfaces.
The way the counter support is setup is very specific for filebeat and how Filebeat hooks up a global ACKer, and the Registrar for storing file.State. Moving it to filebeat to not encourage someone to use a it.
💔 Build FailedExpand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
(cherry picked from commit bc8123a)
(cherry picked from commit bc8123a)
What does this PR do?
The Factory in filebeat/channel package wraps the publisher pipeline, in order to add some shutdown propagation support, but also to modify the beat.ClientConfig when an input is configured. The channel.Connector reads and applies common settings for all inputs, configures Guaranteed sending, and hooks up a global counter with each beat.Client, in order to track live events.
This change reduces the responsibilities of the factory, by splitting them up. A package libbeat/publisher/pipetool is introduced to provide helper functions for modifying a beat.Pipeline before and after Connect. This centers support for more functionality on the beat.PipelineConnector and beat.Client interfaces, instead of incompatible types wrapping the pipeline.
The Factory in filebeat/channel will not modify the input configuration or the beat.Client anymore. It is only required to add shutdown signaling to the pipeline (which we will remove in the future as well). The filebeat/channel provides a helper function for decorating a cfgfile.RunnerFactory, such that common configurations are still applied as before. Again, by centering around a small set of common interfaces, reduce the effort to integrate future input refactorings.
All in all, the filebeat/channel factory still provides the same functionality as before (for now), but splits the different functionalities into 3 separate interfaces.
Why is it important?
Support ease of integration of new input API, based on RunnerFactory only.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration files- [ ] I have added tests that prove my fix is effective or that my feature worksCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues