-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Properly update offset in case of unparasable line #22685
Conversation
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Pinging @elastic/integrations-platforms (Team:Platforms) |
Signed-off-by: chrismark <chrismarkou92@gmail.com>
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This will need to be backported to 7.10 and 6.8 too.
filebeat/input/log/harvester.go
Outdated
@@ -333,7 +333,8 @@ func (h *Harvester) Run() error { | |||
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) | |||
case reader.ErrLineUnparsable: | |||
logp.Info("Skipping unparsable line in file: %v", h.state.Source) | |||
//line unparsable, go to next line | |||
//line unparsable, update offset and go to next line | |||
h.state.Offset += int64(message.Bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also update read offset metrics as some lines below?
beats/filebeat/input/log/harvester.go
Lines 360 to 361 in 015d379
// Update metics of harvester as event was sent | |
h.metrics.readOffset.Set(state.Offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso it would be good to have your input here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, these two should always be kept in sync.
Great work @ChrsMark! would it be possible to add some unit tests in the harvester code? |
Harvester's tests so far are not really detailed (https://github.com/exekias/beats/blob/7007d97c6aadc4621d58d9d3122dd8e0c5115bb5/filebeat/input/log/harvester_test.go#L127) and hence it would require some effort to add UTs from scratch to cover that case, I think we can rely on system tests. wdyt? |
Why doesn't Filebeat recover here? Do you have a link to an issue? (Not asking to clean this up now, but wondering if we have some debt to open an issue for): |
Signed-off-by: chrismark <chrismarkou92@gmail.com>
@urso Filebeat will recover and will be able to parse lines after the first failure however the offset will continue to be inconsistent and hence every time that harvesting is being restarted the first parse will fail since it starts with wrong offset and will not be a valid CRI nor docker-json line. We don't have an issue for this, I can open one if you want, we just found it with @exekias while investigating an SDH(see link above). |
The issue happens like this: For some reason we end up in the wrong offset, we still need to investigate why that happens (maybe a truncate on a symlink?). Problem is that once you reach this state, the current harvester code won't heal from it. When the input reads the new line, it cannot parse it because of being in the wrong offset, it still updates the message.Bytes. When we handle this error we don't update the offset with the new Bytes data, so any further update to the offset is wrong, as it missed the update from the wrong line. The problem happens again as soon as you restart the input |
I see. Actually the reader is required to only return valid messages. If we need to update the offset based on skipped contents, we maybe have to reconsider the return type from the Reader interface to also report the amount of bytes that have been consumed in order to produce the message. Actually The harvester can not tell which offset is correct or not, as the harvester does not care about the contents. The issue looks like truncate indeed. Here the reader should make an attempt to check if that is the case "very likely if this comes from a container" and send an ErrTruncate, which will force the Harvester to consider the file to be new. If the reader thinks the file is just broken, the reader should make an attempt to find a safe point in the file it continue to read from without having to notify the harvester at all. |
I think we should get back to this issue, we have seem a few cases related to this. In general I agree this looks like a truncate, but the fact that we stop sending logs after this creates a really bad experience. I would rather get this in with the proposed changes as we keep investigating issues related to the different log rotation mechanisms |
So if I understand correctly the preferred way to fix this is to change beats/libbeat/reader/readjson/docker_json.go Line 205 in 315a17e
|
That could be an option to improve things. I liked what @urso said about The only problem I see with trying to detect truncate situations is that permanent errors in the format (or bugs in our side) would end up in an infinite input restart, which could be a bad idea, I guess we can counter measure this with backoffs. |
Truncation is the most likely the cause, but no necessarily. If the input is configured with In general I would prefer to fix the docker json parser by 'ignoring' the error. The parser reports how many bytes it has consumed in order to generate the current 'message'. The parser can ignore the failure by resetting its internal buffer while keeping the current byte count. The new filestream input can detect file truncation asynchronously and restart the harvester in time, even if the harvester is blocked in the output and can't detect the truncation for that reason. This reduces the chance of missing logs when the file is truncated. Having the parser fixed, will allow us to reuse the parser in the filestream input in the future and be able to handle possible errors due to truncation, or having the wrong offset for other unknown causes. For reference:
|
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Content: []byte{}, | ||
Bytes: 0, | ||
}, io.EOF | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test were the first line is broken (from a truncate) but the next one is valid? we should get the content from the valid one only but bytes should account for both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This came along really simple in the end! I left a comment around testing
Added |
(cherry picked from commit 655984e)
(cherry picked from commit 655984e)
case reader.ErrLineUnparsable: | ||
logp.Info("Skipping unparsable line in file: %v", h.state.Source) | ||
//line unparsable, go to next line | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love when issues get solved by removing code 🙂
* upstream/master: (91 commits) [Filebeat] Change okta.target to nested field (elastic#24636) Add RFC5424 format support for syslog input (elastic#23954) Fix links to Beats product pages (elastic#24821) [DOCS] Fix 'make setup' instructions for a new beat (elastic#24944) Remove duplicate decode_xml entry (elastic#24941) [libbeat] Add wineventlog schema to decode_xml processor (elastic#24726) [Elastic Agent] Add check for URL set when cert and cert key. (elastic#24904) feat: stage execution cache (elastic#24780) Fix error in Journalbeat commands (elastic#24880) Add baseline ECS 1.9.0 upgrade (elastic#24909) [Elastic Agent] Cloud container legacy apm files. (elastic#24896) [Elastic Agent]: Reduce allowed socket path length (elastic#24914) Add ability to destroy indices with wildcards in testing (elastic#24915) Add status subcommand to report status of running daemon. (elastic#24856) Fix types of fields GetHits and Ops in Metricbeat module for Couchbase (elastic#23287) Add support for Filestream input in elastic agent. (elastic#24820) Implement k8s secrets provider for Agent (elastic#24789) Sort processor list in docs (elastic#24874) Add support for SCRAM authentication in kafka metricbeat module (elastic#24810) Properly update offset in case of unparasable line (elastic#22685) ...
…unparasable line (elastic#24886)
What does this PR do?
This PR adds a fix for cases of
docker
reader meets an unparsable line and skips it (introduced at #12268). In such cases we should properly update the offset by adding the skipped bytes so as to point to the right byte.Why is it important?
Having a wrong offset in the registry will make harvester start from the wrong offset in case of Filebeat's restarts or reopened files which will lead to another
ErrLineUnparsable
. Offset will never be healed from now on.Testing notes
make python-env
source ./build/python-env/bin/activate
make filebeat.test
pytest tests/system/test_container.py