Skip to content

Commit

Permalink
[fix][test] Fix flaky test TopicReaderTest.testMultiReaderIsAbleToSee…
Browse files Browse the repository at this point in the history
…kWithTimeOnMiddleOfTopic
  • Loading branch information
nicoloboschi committed Aug 18, 2022
1 parent 7913fe5 commit 7007d2b
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1383,26 +1383,30 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic" + System.currentTimeMillis();
final int numOfMessage = 10;
final int halfMessages = numOfMessage / 2;
admin.topics().createPartitionedTopic(topicName, 3);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
long halfTime = 0;
for (int i = 0; i < numOfMessage; i++) {
if (i == numOfMessage / 2) {
if (i == 6) {
halfTime = System.currentTimeMillis();
}
producer.send(String.format("msg num %d", i).getBytes());
}
Assert.assertTrue(halfTime != 0);
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.latest).create();

reader.seek(halfTime);
Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
// make sure we are headed to the end of the topic
if (message == null) {
break;
}
String receivedMessage = new String(message.getData());
Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage);
}
assertEquals(messageSet.size(), 4);
reader.close();
producer.close();
}
Expand Down

0 comments on commit 7007d2b

Please sign in to comment.