Skip to content

Commit

Permalink
MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes (
Browse files Browse the repository at this point in the history
#13272)

Reviewers: Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Feb 22, 2023
1 parent 069ce59 commit db6beb9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
72 changes: 72 additions & 0 deletions core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
Expand Down Expand Up @@ -118,6 +120,76 @@ class LocalLeaderEndPointTest {
assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
}

@Test
def testFetchEpochEndOffsets(): Unit = {
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))

var result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(0)))

var expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(0)
.setEndOffset(3L))

assertEquals(expected, result)

// Change leader epoch and end offset, and verify the behavior again.
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error))

result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(4)))

expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(4)
.setEndOffset(6L))

assertEquals(expected, result)

// Check missing epoch: 3, we expect the API to return (leader_epoch=0, end_offset=3).
result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(3)))

expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(0)
.setEndOffset(3L))

assertEquals(expected, result)

// Check missing epoch: 5, we expect the API to return (leader_epoch=-1, end_offset=-1)
result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(5)))

expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(-1)
.setEndOffset(-1L))

assertEquals(expected, result)
}

private class CallbackResult[T] {
private var value: Option[T] = None
private var fun: Option[T => Unit] = None
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.mock

import java.util
import scala.jdk.CollectionConverters._

class RemoteLeaderEndPointTest {

Expand Down Expand Up @@ -80,6 +82,23 @@ class RemoteLeaderEndPointTest {
assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
}

@Test
def testFetchEpochEndOffsets(): Unit = {
val expected = Map(
topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(0)
.setEndOffset(logEndOffset))
blockingSend.setOffsetsForNextResponse(expected.asJava)
val result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(currentLeaderEpoch)))

assertEquals(expected, result)
}

@Test
def testThrowsFencedLeaderEpochException(): Unit = {
blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
Expand Down

0 comments on commit db6beb9

Please sign in to comment.