-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for parsers in filestream input #24763
Add support for parsers in filestream input #24763
Conversation
Pinging @elastic/agent (Team:Agent) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the approach looks good to me. I assume syslog and CRI-O will be added next?
Have you tried JSON in JSON or ndjson -> multiline, as is common in the case of containers?
Most of the time? You see any potential limitation that don't exist in comparison to the logs input? |
JSON parsing is tricky in the The main problem is that the JSON reader only parses the json fields and puts them under |
I see. Not sure how much postprocessing will be required, but maybe we can introduce a generic helper function/method that can convert the Maybe we also need to enhance |
acda1f2
to
b560e7e
Compare
b560e7e
to
231d20c
Compare
74e7393
to
3b0d514
Compare
} | ||
fields["message"] = string(m.Content) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the complete function body up until here I wonder, if this could be a separate parser we just append to the list of parsers? In that case the beat.Event
could be extracted from the message directly.
|
||
for _, proc := range inp.msgPostProc { | ||
proc.PostProcess(&m) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to build and apply the post processors separately? Couldn't we just ensure that the parser correctly modifies the message
? Are there fields that must be 'set' before the post processing is run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the fields log.offset
, log.file.path
are set before the input merges the JSON fields with the existing fields.
@@ -359,16 +368,24 @@ func (inp *filestream) eventFromMessage(m reader.Message, path string) beat.Even | |||
}, | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can/should inject a processor at the beginning? E.g. if we have filebeat collect the output of another filebeat, shall we really overwrite the existing fields with the new ones we define here, or should we make an attempt to keep the original fields from the original log file intact if they are present?
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// +build integration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, these are quite a few integration tests. Having a few integration tests is indeed important, as we combine quite a few components.
But as the focus is on the parsers, it would be nice to have more unit tests with parsers only, instead of full fledged integration tests that operate on the disk.
I understand that most of these tests are copied from the existing system tests and it is better to keep them as is. But maybe we can create a follow up issue to clean the tests up a little.
// The message key might have been modified by multiline | ||
if len(pp.cfg.MessageKey) > 0 && len(msg.Content) > 0 { | ||
jsonFields[pp.cfg.MessageKey] = string(msg.Content) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which value has MessageKey
and what do we store in here if we didn't run multiline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageKey
is only needed if multiline
is configured before the JSON parser.
This pull request is now in conflicts. Could you fix it? 🙏
|
01e84d5
to
dd077b9
Compare
var config readjson.Config | ||
cfg := ns.Config() | ||
cfg.Unpack(&config) | ||
pp = append(pp, readjson.NewJSONPostProcessor(&config)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens to post processing if I have multiple JSON parsers? E.g. json => multiline => json
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, multiple post processors are added. It means that e.g. if keys_under_root
or overwrite_keys
is enabled for both json
parsers, the last parser "wins".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm more curious what happens to the event if one has keys_under_root
disable and one has it enabled.
e.g. let's assume I have the configuration:
parsers:
- ndjson.fields_under_root: true
- ndjson.fields_under_root: false
with this event:
{"log.level": "debug", "message": "{\"id\": 5}"}
then given the parsers I would assume my event will look like:
{
"log.level": "debug",
"json": { "id": 5 }
}
What what will actually be produced is:
{
"log.level": "debug",
"id": 5,
}
Because the post processor removes json
from the event, it is not the first one that wins, but the last one... in this case.
Looks good for now. But unfortunately we are not off the hook yet. In order to support docker json logs or syslog parsing in the future, we have to try to make the parsers more self-contained without the need for post processing. Maybe we have to 'fork' the current readers and create new implementations for the filestream input. The current readers we have are bound to the logs inputs only, anyways. |
This PR adds support for pasers in the `filestream` input. Example configuration that aggregates fives lines into a single event and parses the JSON contents: ```yaml - type: filestream enabled: true paths: - test.log parsers: - multiline: type: count count_lines: 5 skip_newline: true - ndjson: fields_under_root: true ``` (cherry picked from commit 30331bc)
This PR adds support for pasers in the `filestream` input. Example configuration that aggregates fives lines into a single event and parses the JSON contents: ```yaml - type: filestream enabled: true paths: - test.log parsers: - multiline: type: count count_lines: 5 skip_newline: true - ndjson: fields_under_root: true ``` (cherry picked from commit 30331bc)
…-github-pr-comment-template * upstream/master: Add support for parsers in filestream input (elastic#24763) Skip flaky test TestFilestreamTruncate (elastic#25218) backport: Add 7.13 branch (elastic#25189) Update decode_json_fields.asciidoc (elastic#25056) [Elastic Agent] Fix status and inspect command to work inside running container (elastic#25204)
…ng-versions-stack * upstream/master: (28 commits) Add support for parsers in filestream input (elastic#24763) Skip flaky test TestFilestreamTruncate (elastic#25218) backport: Add 7.13 branch (elastic#25189) Update decode_json_fields.asciidoc (elastic#25056) [Elastic Agent] Fix status and inspect command to work inside running container (elastic#25204) Check native environment before starting (elastic#25186) Change event.code and winlog.event_id type (elastic#25176) [Ingest Manager] Proxy processes/elastic-agent to stats (elastic#25193) Update mergify backporting to 7.x and 7.13 (elastic#25196) [Heartbeat]: ensure synthetics version co* [Heartbeat]: ensure synthetics version compatability for suites * address review and fix notice * fix lowercase struct * fix version conflict and rebase * update go.* stuff to master * fix notice.txt * move validate inside sourcempatability for suites (elastic#24777) [Filebeat] Ensure Kibana audit `event.category` and `event.type` are still processed as strings. (elastic#25101) Update replace.asciidoc (elastic#25055) Fix nil panic when overwriting metadata (elastic#24741) [Filebeat] Add Malware Bazaar to Threat Intel Module (elastic#24570) Fix k8s svc selectors mapping (elastic#25169) [Ingest Manager] Make agent retry values for bootstraping configurable (elastic#25163) [Metricbeat] Remove elasticsearc.index.created from the SM code (elastic#25113) [Ingest Manager] Keep http and logging config during enroll (elastic#25132) Refactor kubernetes autodiscover to avoid skipping short-living pods (elastic#24742) [libbeat] New decode xml wineventlog processor (elastic#25115) ...
This PR adds support for pasers in the `filestream` input. Example configuration that aggregates fives lines into a single event and parses the JSON contents: ```yaml - type: filestream enabled: true paths: - test.log parsers: - multiline: type: count count_lines: 5 skip_newline: true - ndjson: fields_under_root: true ``` (cherry picked from commit 30331bc)
This PR adds support for pasers in the `filestream` input. Example configuration that aggregates fives lines into a single event and parses the JSON contents: ```yaml - type: filestream enabled: true paths: - test.log parsers: - multiline: type: count count_lines: 5 skip_newline: true - ndjson: fields_under_root: true ``` (cherry picked from commit 30331bc)
This PR adds support for pasers in the
filestream
input.Example configuration that aggregates fives lines into a single event and parses the JSON contents: