-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve event normalization performance #22781
Conversation
on another thought, should I be able to configured whether skip normalization could also be defined at a pipeline level instead of current beat/global level? i.e. adding another fields Please let me know your thought on this, if you think it's acceptable, I can file a separate PR for this for add it in this one.. Thanks! another edit, I tried to run filebeat and passing my own settings to it to skip normalization, it worked fine for a customized input, but failed for a normal |
Pinging @elastic/integrations-services (Team:Services) |
@@ -59,8 +61,7 @@ func NewGenericEventConverter(keepNull bool) *GenericEventConverter { | |||
// Nil values in maps are dropped during the conversion. Any unsupported types | |||
// that are found in the MapStr are dropped and warnings are logged. | |||
func (e *GenericEventConverter) Convert(m MapStr) MapStr { | |||
keys := make([]string, 0, 10) | |||
event, errs := e.normalizeMap(m, keys...) | |||
event, errs := e.normalizeMap(m, e.keys[:0]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the keys have been constructed e.g. fmt.Sprintf
, then we should allow them to be gc'ed. Meaning that we also need a e.keys = e.keys[:0]
on function exit.
The keys
array is used for error reporting only, which is supposed to happen not all that often. Instead of constructing and passing down the keys
array we might want to handle this in the error path only. e.g.
type keyError struct {
path []string
message string
cause error
}
func (e *keyError) Unwrap() error { return e.cause }
func (e *keyError) Error() string {
if len(e.path) == 0 {
return fmt.Sprintf("%v: %v", e.message, e.cause)
}
return fmt.Sprintf("key=%v: %v: %v", joinKeys(e.path), e.message, e.cause)
}
func newError(msg string) *keyError { return &keyError{ message: msg } }
func newErrorf(msg string, args ...interface{}) *keyError { return newError(fmt.Sprintf(msg, args...) }
func wrapErr(cause error, msg string) *keyError { return &keyError{ message: msg, cause: cause}}
func wrapErrf(cause error, msg string, args ...interface{}) *keyError { return wrapErr(cause, fmt.Sprintf(msg, args...)) }
Efficiently constructing the path can be tricky, as we have to construct 'path' when passing the error up. Either we do not care (it's an edge-case anyways), or we do so in the reverse order and just "reverse" the array in the top-level Convert
method. E.g.
func (e *GenericEventConverter) Convert(m MapStr) MapStr {
events, errs := e.normalizeMap(m)
if len(errs) > 0 {
for _, err := range errs {
reverseKeys(err.path)
}
e.log.Warnf(...)
}
return event
}
func (e *keyError) addKey(k string) {
e.path = append(e.path, k)
}
func (e *GenericEventConverter) normalizeMap(m MapStr) (MapStr, []*KeyError) {
...
for key, value := range m {
v, valueErrs := e.normalizeValue(value)
if len(valueErrs) > 0 {
for _, err := range valueErrs {
err.addKey(key)
}
errs = append(errs, valueErrs...)
}
}
...
}
func (e *GenericEventConverter) normalizeValue(value interface{}) (interface{}, *KeyError) {
...
}
It is a little more code, but a proper error type seems to help reducing allocations. In order to reduce the number of allocations in the error path one could pass an int to keep track of the level. e.g.
func newError(level int, message string) *keyError {
return &keyError{
path: make([]string, 0, level),
message: message,
}
}
func (e *GenericEventConverter) normalizeMap(m MapStr, level int) (MapStr, []*KeyError) {
...
for key, value := range m {
v, valueErrs := e.normalizeValue(value, level+1)
if len(valueErrs) > 0 {
for _, err := range valueErrs {
err.addKey(key)
}
errs = append(errs, valueErrs...)
}
}
...
}
This replaces the "always track an array" with an int.
All in all, the change to keyError
trades the good path overhead with more complexity in the error path. But errors in here are an edge case and should be addressed by the Beats input author, who produced an event that is not compatible with libbeat.
@@ -44,13 +44,13 @@ type processorFn struct { | |||
|
|||
func newGeneralizeProcessor(keepNull bool) *processorFn { | |||
logger := logp.NewLogger("publisher_processing") | |||
g := common.NewGenericEventConverter(keepNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The g.keys
is might be shared between multiple go-routines without a mutex when moving the constructor here. GenericEventConverter must be immutable in order to share it with multiple instances. With the introduction of keys
as buffer, this is not the case anymore.
Even the returned function can be called from multiple go-routines (fortunately protected by a mutex I think).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes after that g.keys
is not safe anymore in a concurrent execution.. I thought GenericEventConverter is created for each pipeline and not shared by multiple instances(input I suppose?).
It maybe improper to make keys a buffer and shared. I think the gain from this change is mostly because of NewGenericEventConverter
is now called once in newGeneralizeProcessor
, not every time when processFn
is invoked, as NewGenericEventConverter
also news a logger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @urso , I would like to revert changes to g.keys, and have the only change to be this line (apart from testing code :) ). Making NewGenericEventConverter
to be called only once to save the cost of "newing" the logger seems reasonable, as g
is not shared and only in the scope of that processorFn
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to open another PR and mention this one from the new one (I will easily find the new PR). We can also discuss alternative implementation in a separate PRs and finally close the ones we don't like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good. :) field #22974
|
||
fields := common.MapStr{"a": "b"} | ||
for i := 0; i < b.N; i++ { | ||
f := fields.Clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clone is expensive as well. Consider to use b.XTimer()
in order to focus on the prog.Run
only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure I will explore this a little and get back here
prog, err := s.Create(beat.ProcessingConfig{}, false) | ||
require.NoError(b, err) | ||
|
||
fields := common.MapStr{"a": "b"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Events can have different complexity, which the runtime clearly depends upon. How about:
cases := map[string]common.MapStr{
"event with single field": {"a": "b"},
}
for name, event := range cases {
b.Run(name, func(b *testing.B) {
...
})
}
This allows us to easily add more cases now or in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. will do.
The event normalization step is about validating the event and ensuring events can correctly used by the different processors. The step also ensure that (via copying) there are no shared references that can change while the event is published. Event creation and publishing are async. Inputs or processors that share meta-data or modify common fields can lead to crashes otherwise. As you noticed it is not always safe to disable normalization. This is why it is at the Beat level and not per input (some Beats modify events depending on your setting). Beats that disable the normalization do so because they do the "copy" themselves. For example by building the event as a go structure first and then have a final "conversation" step that copies the full event into a Improving performance as you attempted here is a HUGHE improvement. All inputs and users will profit from this. From your mini-benchmark it looks like there is loads to gain here. |
Thanks a lot for the clarification! I guess it's trade off between safety and performance, it makes sense that an event could be modified in its publish phase and this may surprise the output component, in cases a beat has multiple input sources and each behaves on their own, and this seems would eventually affect all.. |
Closing this one on favor of: #22974 |
What does this PR do?
Improve event normalization performance
Why is it important?
generalize/normalize is widely used in beats
Checklist
I have commented my code, particularly in hard-to-understand areasI have made corresponding changes to the documentationI have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
Benchmark
Related issues
Use cases
Screenshots
Logs
Before
After