Skip to content

Commit

Permalink
KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (
Browse files Browse the repository at this point in the history
apache#14457)

When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.

Reviewers: Divij Vaidya <diviv@amazon.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
  • Loading branch information
chernyih authored and AnatolyPopov committed Feb 16, 2024
1 parent 0b211f3 commit e6861da
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String,

// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
val lastFetchedEpoch =
if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching,
logAppendInfo.lastLeaderEpoch.asScala)
currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
if (validBytes > 0) fetcherStats.byteRate.mark(validBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest {
val log: UnifiedLog = mock(classOf[UnifiedLog])
val partition: Partition = mock(classOf[Partition])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
val lastFetchedEpoch = 2

when(log.highWatermark).thenReturn(0)
when(log.latestEpoch).thenReturn(Some(0))
when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch))
when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0)))
when(log.logEndOffset).thenReturn(0)
when(log.maybeUpdateHighWatermark(0)).thenReturn(None)
Expand Down Expand Up @@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest {

// Lag is set to Some(0).
assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag))
assertEquals(Some(lastFetchedEpoch), thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch))
}

@Test
Expand Down

0 comments on commit e6861da

Please sign in to comment.