Skip to content

Commit

Permalink
Fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Jul 19, 2022
1 parent fe9d287 commit 559d42d
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
Assert.assertNotEquals(ninthPercentileValue, 0);
}

// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 42;
// Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 31;

private static final int PUBLISH_INTERVAL_SECS = 10;
private static final int NUM_PRODUCERS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,8 @@ public void testBatchMessage() throws Exception {
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
Assert.assertNull(message.getProperty(BATCH_HEADER));
Assert.assertNull(message.getProperty(BATCH_SIZE_HEADER));

// 2. test for getMessagesById
message = (MessageImpl) admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
Expand All @@ -249,9 +248,8 @@ public void testBatchMessage() throws Exception {
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
Assert.assertNull(message.getProperty(BATCH_HEADER));
Assert.assertNull(message.getProperty(BATCH_SIZE_HEADER));

// 3. test for examineMessage
message = (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1);
Expand All @@ -261,9 +259,8 @@ public void testBatchMessage() throws Exception {
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
Assert.assertNull(message.getProperty(BATCH_HEADER));
Assert.assertNull(message.getProperty(BATCH_SIZE_HEADER));
}

@Test(timeOut = 20000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -361,8 +361,7 @@ public void testKeyBasedBatchingOrder() throws Exception {
for (int i = 0; i < 5; i++) {
// Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
assertTrue(messageId instanceof BatchMessageIdImpl);
final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
assertEquals(messageIdImpl.getLedgerId(), -1L);
assertEquals(messageIdImpl.getEntryId(), -1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,6 @@ public void testHasMessageAvailableWithBatch() throws Exception {
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertTrue(messageId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ public void testDisableBatching() throws Exception {
Assert.assertNotNull(msg);
if (i < numberOfMessages) {
Assert.assertEquals(new String(msg.getData()), "batched");
Assert.assertTrue(msg.getMessageId() instanceof BatchMessageIdImpl);
} else {
Assert.assertEquals(new String(msg.getData()), "non-batched");
Assert.assertFalse(msg.getMessageId() instanceof BatchMessageIdImpl);
Expand Down

0 comments on commit 559d42d

Please sign in to comment.