-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12553: Refactor recovery logic to introduce LogLoader #10478
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass
c978b67
to
a236063
Compare
cc @dhruvilshah3 @junrao @satishd -- this PR is ready for a review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the PR. A few comments below.
@@ -2586,11 +1999,15 @@ object Log { | |||
logDirFailureChannel: LogDirFailureChannel, | |||
lastShutdownClean: Boolean = true, | |||
topicId: Option[Uuid], | |||
keepPartitionMetadataFile: Boolean): Log = { | |||
keepPartitionMetadataFile: Boolean = true): Log = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the default value needed? If so, it seems many tests are not taking advantage of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've removed it. It's not needed, I believe it showed up accidentally. My bad.
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) | ||
new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, | ||
producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel, lastShutdownClean, topicId, keepPartitionMetadataFile) | ||
val logLoader = new LogLoader(dir, topicPartition, config, scheduler, time, logDirFailureChannel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need LogLoader to be a class? Another option is to have LogLoader as an Object with a single public method load()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I tried this, but few problems I ran into were:
- A test tries to intercept the recovery logic by overriding
def recoverLog
inLogLoader
. Ex:LogLoaderTest.testLogRecoveryIsCalledUponBrokerCrash
. - Some tests like to inject a custom
ProducerStateManager
, ex:LogLoaderTest.testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown
. - Some tests like to inject a custom
LogSegments
, ex:LogLoaderTest. testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat
.
I'm working on solving these issues, and will post a PR update soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've updated the PR converting LogLoader into an object.
@volatile var topicId: Option[Uuid], | ||
val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { | ||
val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the default value needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've removed it. It's not needed, I believe it showed up accidentally. My bad.
sealed trait SegmentDeletionReason { | ||
def logReason(log: Log, toDelete: List[LogSegment]): Unit | ||
abstract class SegmentDeletionReason { | ||
def logReason(logger: Logging, toDelete: List[LogSegment]): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this method is being used, but no subclass implements it. Are we losing logging because of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. After an investigation, I realized this change is no longer required and I've reverted it now.
writeNormalSegment(nextOffset) | ||
} | ||
|
||
def allRecords(log: Log): List[Record] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. With def verifyRecordsInLog
eliminated, I was able to eliminate this one too. Related comment: #10478 (comment).
recordsFound.toList | ||
} | ||
|
||
def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It was unused and I've eliminated it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kowshik for the PR, overall LGTM. Left a minor comment.
@junrao @satishd: Thanks for the review! I've responded to your comments and addressed them in 8f068caa78a9998c14a18f4583eaaa8a90cba71c, except for this comment - I'm still working on this and will update the PR soon. |
8f068ca
to
debcbd5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. A few more comments.
* | ||
* @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that | ||
* overflow index offset; or when we find an unexpected | ||
* number of .log files with overflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this comment is no longer accurate. The only place that can throw LogSegmentOffsetOverflowException is from completeSwapOperations(). The rest of the places are all wrapped with retryOnOffsetOverflow().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've fixed it.
time = params.time, | ||
initFileSize = params.config.initFileSize)) | ||
} | ||
(params.recoveryPointCheckpoint, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the nextOffset is 0, recoveryPoint needs to be 0 too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I've fixed it now.
* @param config The configuration settings for the log being loaded | ||
* @param scheduler The thread pool scheduler used for background actions | ||
* @param time The time instance used for checking the clock | ||
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle log |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, if we hit any IOException during loading, we just kill the broker. So, it seems that there is no need to pass in logDirFailureChannel during log loading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the recovery path, the logDirFailureChannel
is used to asynchronously handle IOException
within the Log.deleteSegmentFiles()
calls. This was the existing behavior prior to this refactor as well. Should we change the existing behavior?https://github.com/apache/kafka/blob/193a9dfc44fa2c0df163a8518a804c4579d37e36/core/src/main/scala/kafka/log/Log.scala#L2377
* to reflect the contents of the loaded log. | ||
* | ||
* This method does not need to convert IOException to KafkaStorageException because it is only | ||
* called before all logs are loaded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loading code still converts IOException to KafkaStorageException through Log.deleteSegmentFiles().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree this comment was a bit misleading. I've improved it now to say the following, do you feel its better?
In the context of the calling thread, this function does not need to convert
IOException to KafkaStorageException because it is only called before all logs are loaded.
In the recovery path, we always asynchronously delete segments. The IOException
conversion happens only within the background work that's scheduled for segment deletions. https://github.com/apache/kafka/blob/193a9dfc44fa2c0df163a8518a804c4579d37e36/core/src/main/scala/kafka/log/Log.scala#L2377
@@ -222,25 +222,31 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { | |||
* | |||
* @param _dir The directory in which log segments are created. | |||
* @param config The log configuration settings | |||
* @param segments The log segments, these may be non-empty when recovered from disk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The segments is always non-empty since we initialize it with an empty segment if no segments are present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Fixed it.
@junrao: Thanks for the review! I have addressed the latest comments and also fixed the broken unit test in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. LGTM. Could you rebase?
bdf0567
to
eaf8519
Compare
I checked the failed tests in CI this morning and none of them are related with this PR:
|
…ns (#10742) When #10478 was merged, we accidentally lost the identifier/prefix string that we used to previously log to stderr from some of the functions in the Log class. In this PR, I have reinstated the identifier/prefix logging in these functions, so that the debuggability is restored. Reviewers: Luke Chen <showuon@gmail.com>, Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
In this PR, I have refactored the recovery logic code introducing a new class
kafka.log.LogLoader
responsible for all activities related with recovery of log segments from disk. With this change, the recovery logic has been moved out of theLog
class and into the newLogLoader
class.Advantages:
This refactor has the following advantages over the existing code:
Log
instantiation. Some parts of the recovery logic are fairly independent from the rest of theLog
class. By moving the independent private logic to a separateLogLoader
class, the existingLog
class has become more modular, and the constructor behavior is a lot simpler now. Therefore, this makes the code more maintainable.LeaderEpochFileCache
andProducerStateManager
instances, so as such the logic does not fit very well into the definition of a "local log". By extracting it out of theLog
class, in the future this will make it much easier to clearly define the separation of concerns betweenLocalLog
andUnifiedLog
.Note to reviewers:
Most of this PR is about moving code around (including tests) and sharing static functions wherever possible. In order to start reviewing this PR, it is best to start looking at the sequence of operations when a
Log
instance is created (seeLog.apply()
):LogLoader
loads/recovers the segments from disk and using the metadata it also suitably sets up the related components viz. log start offset, recovery point, next offset, leader epoch cache and producer state manager. The entry point for theLogLoader
is theLogLoader.load()
API. The implementation is pretty much identical to the previously existingLog
recovery logic.LogLoader
are passed to theLog
instance during construction. Some of these components are further mutable by theLog
instance during its lifecycle. The rest of theLog
class pretty much remains intact, except that some private methods have been converted to static functions to be shared with the recovery logic.Tests:
I'm relying on the newly added
LogLoaderTest.scala
for testing. I have moved 35 recovery related tests out ofLogTest.scala
and into the newLogLoaderTest.scala
test suite.