-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Composable ACKer #19632
Composable ACKer #19632
Conversation
💔 Tests FailedExpand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
jenkins run the tests please |
Pinging @elastic/integrations-services (Team:Services) |
This change replaces the ACK handler functions with a single interface that makes it easier to combine ACK handlers. The global ACK handler is removed from the pipeline, requiring Beats to wrap and compose per input ACK handlers with their own ones.
libbeat/common/acker/acker.go
Outdated
|
||
// ConnectionOnly ensures that the given ACKer is only used for as long as the | ||
// pipeline Client is active. Once the Client is closed, the ACKer will drop | ||
// it's internal state and no more ACK events will be processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// it's internal state and no more ACK events will be processed. | |
// its internal state and no more ACK events will be processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have done basic Functionbeat tests, but it would be nice if someone else would test it during the "testing weeks" after the FF.
This change replaces the ACK handler functions with a single interface that makes it easier to combine ACK handlers. The global ACK handler is removed from the pipeline, requiring Beats to wrap and compose per input ACK handlers with their own ones. Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base. The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs. In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration. The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs. (cherry picked from commit bb89344)
This change replaces the ACK handler functions with a single interface that makes it easier to combine ACK handlers. The global ACK handler is removed from the pipeline, requiring Beats to wrap and compose per input ACK handlers with their own ones. Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base. The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs. In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration. The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs. (cherry picked from commit bb89344)
This change replaces the ACK handler functions with a single interface that makes it easier to combine ACK handlers. The global ACK handler is removed from the pipeline, requiring Beats to wrap and compose per input ACK handlers with their own ones. Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base. The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs. In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration. The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.
What does this PR do?
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.
Review Notes
Although the PR is quite big, the main difference is that the
ACKCount
,ACKEvents
, andACKLastEvents
handlers have been replaced by a single interface (beat.ACKer
). The original ACKer implementations fromlibbeat/publisher/pipeline/acker.go
andlibbeat/publisher/pipeline/client_acker.go
have been movedlibbeat/common/acker
. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. Theacker.Combine
andacker.ConnectionOnly
are the only new additions to the code base.Why is it important?
tl;dr This change is required to integrate the v2 input API.
The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.
In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.
The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
ACKer implementations should be handled mostly by unit tests. Still, this change can have great impact and some Beats should be tested to double check that we have not introduced any regressions here.
drop_event
with conditional. Check non-filtered events are publishedRelated issues
Dev Docs:
Calls to
pipeline.ConnectWith
that used to setup an ACK callback need to use the acker helpers. Previously the callbacks have been ignored if the input is shutdown, but not all events have been ACKed yet. For end-to-end ACKers that loose the connection to the source-system on shutdown this behavior can be preserved by usingack.ConnectionOnly(<acker>)
.Instead of directly passing callbacks, the callbacks should be wrapped using some of the utility functions in the libbeat/common/acker package:
ACKCount: func(n int) { ... }
withACKHandler: acker.Counting(func(n int) { ... })
.ACKEvents: func(private []interface{}) { ... }
withACKHandler: acker.EventPrivateReporter(func(_ int, private []interface{}) { ... })
.ACKLastEvent: func(private interface{}) { ... }
withACKHandler: acker.LastEventPrivateReporter(func(_ int, interface{}) { ... })
The
(beat.Pipeline).SetACKHandler
method has been removed.libbeat/common/acker
andlibbeat/publisher/pipetool
provide some helpers to modify and combine ACKers for all newbeat.Client
connections. For example this will use the global and local ACK handler for each event published.The
WithACKer
helper can be used arbitrarily often. ACKers are combined level by level viaacker.Combine
.