MINOR: Ensure LocalLog.flush() is immune to recoveryPoint change by different thread #11814
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Issue:
Imagine a scenario where two threads T1 and T2 are inside
UnifiedLog.flush()
concurrently:KafkaScheduler
thread T1 -> The periodic work callsLogManager.flushDirtyLogs()
which in turn callsUnifiedLog.flush()
. For example, this can happen due tolog.flush.scheduler.interval.ms
here.KafkaScheduler
thread T2 -> AUnifiedLog.flush()
call is triggered asynchronously during segment roll here.Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within
LogSegments.values()
here for thread T2, when it is called fromLocalLog.flush()
here. The exception causes theKafkaScheduler
thread to die, which is not desirable.Fix:
We fix this by ensuring that
LocalLog.flush()
is immune to the case where the recoveryPoint advances beyond the flush offset.Tests:
I was able to test this manually by introducing barriers in the code to help simulate the race condition. As such, this is a hard case to write an automated unit test for, so I haven't added a new test case in this PR. So I'm mostly just relying on code review and also ensure there are no regressions in existing tests.