Skip to content

Commit

Permalink
improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Aug 9, 2024
1 parent f15cc51 commit 202b943
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ public Object[][] getTopicPartition() {
return new Object[][] {{ 0 }, { 3 }};
}

@DataProvider(name = "topics")
public Object[][] getTopics() {
return new Object[][] {{ List.of("persistent://my-property/my-ns/my-topic") },
{ List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }};
}

@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
Expand Down Expand Up @@ -617,8 +623,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand All @@ -645,6 +651,7 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}

Expand All @@ -655,15 +662,15 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

for (int i = 0; i < totalNumOfMessages; i++) {
Expand All @@ -687,8 +694,9 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException,
InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand Down Expand Up @@ -719,16 +727,17 @@ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> message

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionName("foo")
.intercept(interceptor)
.ackTimeout(2, TimeUnit.SECONDS)
Expand Down

0 comments on commit 202b943

Please sign in to comment.