-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12554: Refactor Log layer #10280
Conversation
Can we do one PR for renaming |
@ijuma In this PR the intention was not to rename Thoughts? |
OK, thanks for the explanation. Btw, why do we call one of the logs |
@ijuma The purpose of The above and few more things are explained in the doc attached in the description. I'd suggest having a look at the doc. |
@kowshik I can't comment on the doc, that's why I commented here. :) I didn't see any reason there for calling it |
@ijuma I've opened up the doc for comments. I've also updated it to use the name |
ee9d77f
to
de98edf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the PR. A few comments below.
@@ -1816,8 +1292,12 @@ class Log(@volatile private var _dir: File, | |||
*/ | |||
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, | |||
reason: SegmentDeletionReason): Int = { | |||
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment], logEndOffset: Long): Boolean = { | |||
highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(logEndOffset) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do we need to wrap predicate
with an additional condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to accomodate for the hwm check that was previously happening in Log#deletableSegments
in this line. The deletableSegments
method has now moved to LocalLog
, but we can't do the hwm check inside LocalLog
since hwm is still owned by Log
. So we piggyback on the predicate here to additionally attach the hwm check.
f5ae1d5
to
63be325
Compare
@junrao Just a heads up on the following. I'm working on the changes for the following in separate PRs, these are related with refactoring the recovery logic (KAFKA-12553):
It seems better if we merge those into trunk ahead of the current PR. |
63be325
to
c263438
Compare
3375149
to
a541efe
Compare
@junrao This PR is ready for another round of review. I've rebased the PR onto latest AK trunk, iterated on the implementation bit more and added new unit tests for |
262ddab
to
1ae93dd
Compare
64100f9
to
c419c35
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. A few more comments below.
producerStateManager.takeSnapshot() | ||
}, | ||
postRollAction = (newSegment: LogSegment, deletedSegment: Option[LogSegment]) => { | ||
deletedSegment.foreach(segment => deleteProducerSnapshotAsync(Seq(segment))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to have exposed an existing bug. During roll, deletedSegment will be non-empty if there is an existing segment of 0 size with the newOffsetToRoll. However, since we take a producer snapshot on newOffsetToRoll before calling postRollAction, we will be deleting the same snapshot we just created.
In this case, I think we don't need to delete producerSnapshot for deletedSegment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great catch. I agree with you. While I can address it in this PR, should we create a separate JIRA for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could fix this in a separate jira too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created a JIRA to track this: https://issues.apache.org/jira/browse/KAFKA-12876.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
rebuildProducerState(endOffset, producerStateManager) | ||
lock synchronized { | ||
rebuildProducerState(endOffset, producerStateManager) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change has a couple of issues.
(1) updateHighWatermark() now only updates the offset, but not the corresponding offset metadata. The offset metadata is needed in serving fetch requests. Recomputing that requires index lookup and log scan, and can be extensive. So, we need to preserve the offset metadata during truncate() and truncateFully().
(2) I think updateHighWatermark() needs to be called within the lock. updateHighWatermark() reads local log's logEndOffset. So, we don't want the logEndOffset to change while updateHighWatermark() is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
params.logIdentifier) | ||
deleteProducerSnapshotsAsync(result.deletedSegments, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessary since during splitting, the old segment is replaced with a new segment with the same base offset. So, result.deletedSegments is always empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Great catch. It appears straightforward to just skip deleting the snapshot here, I can leave a comment explaining why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I thought about this again. Correct me if I'm wrong, but it appears we may be altering existing behavior if we go down this route. Should we do it in a separate PR to isolate the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have created a jira to track this improvement. https://issues.apache.org/jira/browse/KAFKA-12923
45a55e2
to
e201295
Compare
@junrao Thanks for the review! I've addressed your comments in e201295e03e0ea8a7102983888d1a7afc66d384a, and have also rebased this PR onto most recent commit in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. A few more comments. Also, could you run all system tests for the PR?
} else { | ||
segment | ||
} | ||
} | ||
|
||
/** | ||
* Roll the log over to a new active segment starting with the current logEndOffset. | ||
* Roll the local log over to a new active segment starting with the current logEndOffset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not very accurate since we roll to expectedNextOffset or logEndOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll fix it. Good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8f14879.
} | ||
Utils.delete(dir) | ||
// File handlers will be closed if this log is deleted | ||
isMemoryMappedBufferClosed = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we should set isMemoryMappedBufferClosed in deleteAllSegments()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I'll move it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8f14879.
@@ -1812,37 +1577,36 @@ class Log(@volatile private var _dir: File, | |||
endOffset: Long | |||
): Unit = { | |||
logStartOffset = startOffset | |||
nextOffsetMetadata = LogOffsetMetadata(endOffset, activeSegment.baseOffset, activeSegment.size) | |||
recoveryPoint = math.min(recoveryPoint, endOffset) | |||
localLog.updateLogEndOffset(endOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to preserve the LogOffsetMetadata for endOffset and use it to call updateHighWatermark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. This can be updated to updateHighWatermark(localLog.logEndOffsetMetadata)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8f14879.
initFileSize = config.initFileSize, | ||
preallocate = config.preallocate)) | ||
val deletedSegments = localLog.truncateFullyAndStartAt(newOffset) | ||
deleteProducerSnapshots(deletedSegments, asyncDelete = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producerStateManager.truncateFullyAndStartAt()
removes all producer snapshots. So, this is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8f14879.
params.logIdentifier) | ||
deleteProducerSnapshotsAsync(result.deletedSegments, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's fine.
@junrao @dhruvilshah3 I ran a perf test against a Broker build with and without this PR. The test involved the following:
The tests have similar results, meaning that the performance with and without this PR looks similar. Here are the results: log.segment.bytes=10MB
log.segment.bytes=100MB
log.segment.bytes=1GB
|
e34c0b5
to
20ccbc4
Compare
52d53ad
to
4d207a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. LGTM. Are there any more tests that you plan to do?
4d207a7
to
a0d94b3
Compare
@junrao Thanks for the review. I ran load tests on the changes from this PR, there weren't any new regressions (i.e. latency regressions or errors) that I noticed, except for an issue that I found which looks unrelated to this PR, its described in this jira: https://issues.apache.org/jira/browse/KAFKA-13070. The load test was run on a 6-broker cluster with 250GB SSD disks:
Mid-way through the test, I rolled the cluster under load to check how the cluster behaved. Overall things looked OK. There weren't any additional tests that I was planning to do. |
Thanks @junrao for merging into trunk. Can we also push this to 3.0 branch as we discussed earlier? |
What is the reason for including a refactoring in 3.0 after the feature freeze? |
In this PR, I've renamed kafka.log.Log to kafka.log.UnifiedLog. With the advent of KIP-405, going forward the existing Log class would present a unified view of local and tiered log segments, so we rename it to UnifiedLog. The motivation for this PR is also the same as outlined in this design document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. This PR is a follow-up to #10280 where we had refactored the Log layer introducing a new kafka.log.LocalLog class. Note: the Log class name had to be hardcoded to ensure metrics are defined under the Log class (for backwards compatibility). Please refer to the newly introduced UnifiedLog.metricName() method. Reviewers: Cong Ding <cong@ccding.com>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
TL;DR: This PR implements the details of the Log layer refactor, as outlined in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. Few details maybe different from the doc, but it is more or less the same. STRATEGY: In this PR, I've extracted a new class called LocalLog out of Log. Currently LocalLog is purely an implementation detail thats not exposed outside Log class (except for tests). The object encapsulation is that each Log instance wraps around a LocalLog instance. This new LocalLog class attempts to encompass most of the responsibilities of local log surrounding the segments map, which otherwise were present in Log previously. Note that not all local log responsibilities have been moved over to this new class (yet). The criteria I used was to preserve (for now) in existing Log class, any logic that is mingled in a complex manner with the logStartOffset or the LeaderEpochCache or the ProducerStateManager. Reviewers: Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
In this PR, I've renamed kafka.log.Log to kafka.log.UnifiedLog. With the advent of KIP-405, going forward the existing Log class would present a unified view of local and tiered log segments, so we rename it to UnifiedLog. The motivation for this PR is also the same as outlined in this design document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. This PR is a follow-up to apache#10280 where we had refactored the Log layer introducing a new kafka.log.LocalLog class. Note: the Log class name had to be hardcoded to ensure metrics are defined under the Log class (for backwards compatibility). Please refer to the newly introduced UnifiedLog.metricName() method. Reviewers: Cong Ding <cong@ccding.com>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
TL;DR:
This PR implements the details of the Log layer refactor, as outlined in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. Few details maybe different from the doc, but it is more or less the same.
STRATEGY:
In this PR, I've extracted a new class called
LocalLog
out ofLog
. CurrentlyLocalLog
is purely an implementation detail thats not exposed outsideLog
class (except for tests). The object encapsulation is that eachLog
instance wraps around aLocalLog
instance.This new
LocalLog
class attempts to encompass most of the responsibilities of local log surrounding thesegments
map, which otherwise were present inLog
previously. Note that not all local log responsibilities have been moved over to this new class (yet). The criteria I used was to preserve (for now) in existingLog
class, any logic that is mingled in a complex manner with thelogStartOffset
or theLeaderEpochCache
or theProducerStateManager
.WINS:
The main win is that the new
LocalLog
class is now agnostic of thelogStartOffset
, which continues to be managed mainly byLog
class. Below is the local log functionality that has successfully moved over fromLog
toLocalLog
:LogSegments
instance containing the localLogSegment
objects.Below is the main local log functionality that continues to remain in
Log
:logStartOffset
orLeaderEpochCache
orProducerStateManager
. This makes it hard to separate just the local logic out of it.fetchOffsetByTimestamp
and logic tolegacyFetchOffsetsBefore
.LeaderEpochCache
andProducerStateManager
.PAINPOINTS:
Log
class to make migration feasible.LocalLog
are crude in nature or signature, examples:def checkIfMemoryMappedBufferClosed
,def markFlushed
,def updateRecoveryPoint
,def replaceSegments
etc.def append
logic) were hard to migrate because logic was mingled with Leader epoch cache, Producer state manager and log start offset.TESTS:
LocalLogTest.scala
has been provided containing tests specific toLocalLog
class.All other existing tests are expected to pass.
trunk/6de37e536ac76ef13530d49dc7320110332cd1ee
.kafkatest.tests.client.consumer_test
rerun:trunk/6de37e536ac76ef13530d49dc7320110332cd1ee
.