Skip to content

Commit

Permalink
Address comment from @Hangleton
Browse files Browse the repository at this point in the history
  • Loading branch information
kowshik committed Feb 22, 2023
1 parent 3c40f6a commit 484eef1
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ abstract class AbstractFetcherThread(name: String,
private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition,
topicId: Option[Uuid],
currentLeaderEpoch: Int,
truncateAndBuild: => (Int, Long) => Long,
truncateAndBuild: => OffsetAndEpoch => Long,
fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = {
val replicaEndOffset = logEndOffset(topicPartition)

Expand Down Expand Up @@ -719,7 +719,7 @@ abstract class AbstractFetcherThread(name: String,
// Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof
// OffsetMovedToTieredStorage error) is greater than follower's log end offset.
// truncateAndBuild returns offset value from which it needs to start fetching.
truncateAndBuild(offsetAndEpoch.leaderEpoch, leaderStartOffset)
truncateAndBuild(offsetAndEpoch)
} else {
replicaEndOffset
}
Expand All @@ -736,7 +736,8 @@ abstract class AbstractFetcherThread(name: String,
*/
private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
(_, leaderLogStartOffset) => {
offsetAndEpoch => {
val leaderLogStartOffset = offsetAndEpoch.offset
truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
leaderLogStartOffset
},
Expand Down Expand Up @@ -807,7 +808,10 @@ abstract class AbstractFetcherThread(name: String,
leaderLogStartOffset: Long): Boolean = {
try {
val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
(offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
offsetAndEpoch => {
val leaderLocalLogStartOffset = offsetAndEpoch.offset
buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetAndEpoch.leaderEpoch(), leaderLogStartOffset)
})

partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
Expand Down

0 comments on commit 484eef1

Please sign in to comment.