-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tests and support for truncated files in filestream input #24424
Add tests and support for truncated files in filestream input #24424
Conversation
Pinging @elastic/agent (Team:Agent) |
❕ Build Aborted
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪Log outputExpand to view the last 100 lines of log output
|
|
||
truncatedTestLines := []byte("truncated first line\n") | ||
env.mustWriteLinesToFile(testlogName, truncatedTestLines) | ||
env.waitUntilEventCount(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test makes me wonder, what happens if the file is bigger than before when it is finally discovered by the prospector that the file has been truncated?
This has been a common problem in the logs input (where only the harvester did detect truncation). I presume with the prospector restarting the harvvester early the chance of this happening is reduced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it gets a bit better. However, if the output is blocked filestream
is waiting to publish previous events. So the Harvester
has to wait for the previous one to publish and stop before it can start reading from the beginning. I was thinking about moving the check if the file was truncated out-of-band just like we check if the file was removed. Maybe we could even introduce a new option close.on_state_change.truncated
, so users can choose if they are willing to lose events in case of truncation just to make sure the system does not get blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, if the output is blocked filestream is waiting to publish previous events. So the Harvester has to wait for the previous one to publish and stop before it can start reading from the beginning.
Hmm... One of the goals of the filestream input is to decouple control from the state of the output. Even if the output is blocked, the harvester must be reactive and shutdown if it is told to do so. As annoying as it is, but once a file is truncated, there is no way for us to recover the content of the file if lines have not yet read, or filebeat is restart. On truncate we should ensure that the pending state updates are not written to the registry anymore... they must get lost. This ensures that we can unlock the registry state immediately and start a new harvester that can start overwriting existing state immediately.
The invalidation of pending registry state updates can be implemented in the ack handler, or the update op itself (e.g. using a shared 'alive' atomic bool). For example the ack handler for the harvester is setup here. Each harvester has its own beat.Client, including its private ACK handling. An atomic bool can be introduced to tell the acker to discard updateOps. The bool would be set to true if a file is truncated, meaning that all pending state is to be discarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it can be solved quite easily by running defaultHarvesterGroup.Stop
before restart. My reasoning behind not doing so was to be able to flush the events that are already picked up by Filebeat but not yet accepted by the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ack handler is async and might even outlive the harvester or harvester group. The ack handler lives for as long as there are pending events in the queue to be ACKed.
The events that have already been enqueued will still be shippped. They are not removed. At worst only the current event to be published will be discarded, because the output as blocking publishing. But as the truncate operation is racy anyways I think that is an OK compromise.
Events for a single resource are always ACKed in order. That is when just calling stop and then starting a new harvester, we still will see the offset in the registry to increase with the old events before it is reset.
Just calling Stop
would be good enough indeed. But as the offset is still updated, it is impossible for the prospector to reset the read offset to 0, as there might be pending events still updating the offset on disk. Unlinking the ack handler/updateOp in case of a truncate event allows us to drop pending updates, set the read offset in the persistent store to 0 and start a new harvester. Setting the read to 0 in the persisted store improves resiliency in case Filebeat is restarted. If Filebeat is not restarted, everything will be fine. Auto discovery adding/removing the filestream input for said files will have no impact as well.
01522dd
to
042311a
Compare
042311a
to
ebb90dd
Compare
646daa4
to
61e2fe1
Compare
ctx, cancelInput := context.WithCancel(context.Background()) | ||
env.startInput(ctx, inp) | ||
|
||
for len(env.pipeline.clients) != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line might get flagged at times if we run go test -race
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets flagged, I could try to refactor it to be able wait for more clients without a dead lock.
|
||
resource, err := lock(ctx, hg.store, sourceName) | ||
resource, err := lock(ctx, hg.store, srcID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the lock
call before the hg.readers.newContext
? I hope this will also help with the error handling and hot loop if we really have to wait for harvesters to be shut down (the worst might a k8s meta data event that basically asks us to restart all harvesters/inputs at once).
Instead of waiting in a hot loop for 5 seconds we can have a timeout without spending CPU by creating a context like this:
lockCtx := ctx.Cancelation
if mustWait {
lockCtx = context.WithTimeout(...)
}
resource, err := lock(lockCtx, hg.store, srcID)
The mutex used to protect the resource also has a TryLock and a LockTimeout method. e.g. If you don't want to block on startup if the resource is still taken, just use TryLock. Your 'magic' could also be pushed down to the lock
helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the timeout for the waiting. Regardless, I will move the locking.
61e2fe1
to
3b61d53
Compare
I am adding |
3110809
to
7d1eb83
Compare
3073316
to
ae2e537
Compare
I have rebased the PR so it only contains up to date code. |
// If it is stopping itself, it must clean up the bookkeeper. | ||
if ctx.Cancelation.Err() != context.Canceled { | ||
hg.readers.remove(srcID) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we always remove the ID on shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But I had to separate it into several cases because calling hg.Stop
, hg.readers.remove
and hg.Start
after one another lead to random order of execution. In some cases it made handling truncation undefined, because first the Prospector
stopped the Harvester
for a given Source
. Stop
is already removing the ID from the bookkeeper. When the new Harvester
was starting for the truncated file, the previous Harvester
might be still shutting down and as hg.readers.remove
was deferred the newly started Harvester
was cancelled by the previous one sometimes.
It is still not perfect. One open question what happens the context is cancelled for some reason not by hg.readers.remove. So far I haven't found a case that would cause problem with this method. Maybe when the Prospector
is stopped I have to delete all running sources from the bookkeeper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It seems to work for now. Still, we should try to clean this up in a follow up PR. Shutdown and group source ID handling is not fully intuitive and a potential source of bugs in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. It definitely needs more work and we need a bit more time to come up with a clearer solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I hope when we introduce the copytruncate
prospector most of the complexity due to supporting truncation in the default prospector can be moved and refactored. The complexity is coming from the facts that we cannot anticipate how the files are truncated and rotated. With specialized prospectors, we can handle special cases a bit better.
0deeac1
to
f4e4bb1
Compare
…c#24424) ## What does this PR do? Add support for truncated files and adds migrates related tests from `test_harvester.py`. It also adds more tests to cover the following cases: * truncation is detected only by the `Prospector` * truncation is detected first by the `Harvester` then by the `Prospector` * truncation is detected first by the `Prospector` then by the `Harvester` * file gets truncated when the output is not able to accept events ## Why is it important? The support for stopping reading from truncated files was already implemented. However, `filestream` input could not start reading it from the beginning. A new file system event is added called `OpTruncate`. When the size of a file has shrinked compared to the last time the scanner has encountered it, an `OpTruncate` event is emitted. When the prospector gets this event, the `HarvesterGroup` is restarting the `Harvester` of the file. Restarting basically means that the new `Harvester` cancels the previous reader and starts (cherry picked from commit 2875faf)
#24959) ## What does this PR do? Add support for truncated files and adds migrates related tests from `test_harvester.py`. It also adds more tests to cover the following cases: * truncation is detected only by the `Prospector` * truncation is detected first by the `Harvester` then by the `Prospector` * truncation is detected first by the `Prospector` then by the `Harvester` * file gets truncated when the output is not able to accept events ## Why is it important? The support for stopping reading from truncated files was already implemented. However, `filestream` input could not start reading it from the beginning. A new file system event is added called `OpTruncate`. When the size of a file has shrinked compared to the last time the scanner has encountered it, an `OpTruncate` event is emitted. When the prospector gets this event, the `HarvesterGroup` is restarting the `Harvester` of the file. Restarting basically means that the new `Harvester` cancels the previous reader and starts (cherry picked from commit 2875faf)
What does this PR do?
Add support for truncated files and adds migrates related tests from
test_harvester.py
. It also adds more tests to cover the following cases:Prospector
Harvester
then by theProspector
Prospector
then by theHarvester
Why is it important?
The support for stopping reading from truncated files was already implemented. However,
filestream
input could not start reading it from the beginning.A new file system event is added called
OpTruncate
. When the size of a file has shrinked compared to the last time the scanner has encountered it, anOpTruncate
event is emitted. When the prospector gets this event, theHarvesterGroup
is restarting theHarvester
of the file. Restarting basically means that the newHarvester
cancels the previous reader and starts a new one.Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration files- [ ] I have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.