Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Aug 9, 2024
1 parent 2fb5a76 commit 8eaf5c6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -448,13 +448,19 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId

int keyCount = 0;
for (int i = 0; i < 2; i++) {
Message<String> received = consumer.receive();
Message<String> received;
if (i % 2 == 0) {
received = consumer.receive();
} else {
received = consumer.receiveAsync().join();
}
MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
keyCount++;
}
}
Assert.assertEquals(keyCount, i + 1);
consumer.acknowledge(received);
}
Assert.assertEquals(2, keyCount);
Expand All @@ -474,9 +480,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -855,91 +861,6 @@ public void onPartitionsChange(String topicName, int partitions) {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}

@Test
public void testConsumerInterceptorWithMultiTopics() throws PulsarClientException {

AtomicInteger hitCount = new AtomicInteger();

ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {

}

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
hitCount.incrementAndGet();
log.info("beforeConsume consumer: {}, messageId: {}", consumer.getTopic(), message.getMessageId());
return message;
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledge messageId: {}", messageId, cause);
}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

List<String> topics = Arrays.asList("persistent://my-property/my-ns/my-topic",
"persistent://my-property/my-ns/my-topic1", "persistent://my-property/my-ns/my-topic2");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topics(topics)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(100)
.intercept(interceptor)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.create();

for (int i = 0; i < 50; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}

Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic1")
.create();

for (int i = 0; i < 50; i++) {
producer2.newMessage().value("Hello Pulsar-2!").send();
}


for (int i = 0; i < 100; i++) {
Message<String> msg;
if (i % 2 == 0) {
msg = consumer.receive();
} else {
msg = consumer.receiveAsync().join();
}
Assert.assertEquals(hitCount.get(), i + 1);
log.info("Received message: {}, count: {}", msg.getMessageId(), hitCount.get());
consumer.acknowledge(msg);
}
producer.close();
producer2.close();
consumer.close();

Assert.assertEquals(100, hitCount.get());
}


private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,10 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
message = beforeConsume(message);
}
resumeReceivingFromPausedConsumersIfNeeded();
return beforeConsume(message);
return message;
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand Down

0 comments on commit 8eaf5c6

Please sign in to comment.