Skip to content

Commit

Permalink
also fix testReaderIsAbleToSeekWithTimeOnMiddleOfTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Aug 18, 2022
1 parent c4ae029 commit 54febc8
Showing 1 changed file with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1350,30 +1350,36 @@ public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exceptio
public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
final int numOfMessage = 10;
final int halfMessages = numOfMessage / 2;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();

long l = System.currentTimeMillis();
long halfTime = 0;
for (int i = 0; i < numOfMessage; i++) {
if (i == 6) {
halfTime = System.currentTimeMillis();
}
producer.send(String.format("msg num %d", i).getBytes());
Thread.sleep(100);
}

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();
.startMessageId(MessageId.latest).create();

int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);
reader.seek(halfTime);

Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
int i = 6;
while (true) {
Message<byte[]> message = reader.readNext(1, TimeUnit.SECONDS);
// make sure we are headed to the end of the topic
if (message == null) {
break;
}
String receivedMessage = new String(message.getData());
String expectedMessage = String.format("msg num %d", i);
String expectedMessage = String.format("msg num %d", i++);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
assertEquals(messageSet.size(), 4);

reader.close();
producer.close();
Expand Down

0 comments on commit 54febc8

Please sign in to comment.