Skip to content

Commit

Permalink
KAFKA-10251: increase timeout for consuming records (#10228)
Browse files Browse the repository at this point in the history
Bump the `pollUntilAtLeastNumRecords` timeout from 15s to 30s

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
  • Loading branch information
showuon authored Mar 2, 2021
1 parent a4ba732 commit 420162d
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable

class TransactionsBounceTest extends IntegrationTestHarness {
private val consumeRecordTimeout = 30000
private val producerBufferSize = 65536
private val serverMessageMaxBytes = producerBufferSize/2
private val numPartitions = 3
Expand Down Expand Up @@ -106,7 +107,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {
while (numMessagesProcessed < numInputRecords) {
val toRead = Math.min(200, numInputRecords - numMessagesProcessed)
trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..")
val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead, waitTimeMs = consumeRecordTimeout)
trace(s"Received ${records.size} messages, sending them transactionally to $outputTopic")

producer.beginTransaction()
Expand Down Expand Up @@ -134,7 +135,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {

val verifyingConsumer = createConsumerAndSubscribe("randomGroup", List(outputTopic), readCommitted = true)
val recordsByPartition = new mutable.HashMap[TopicPartition, mutable.ListBuffer[Int]]()
TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).foreach { record =>
TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords, waitTimeMs = consumeRecordTimeout).foreach { record =>
val value = TestUtils.assertCommittedAndGetValue(record).toInt
val topicPartition = new TopicPartition(record.topic(), record.partition())
recordsByPartition.getOrElseUpdate(topicPartition, new mutable.ListBuffer[Int])
Expand Down

0 comments on commit 420162d

Please sign in to comment.