From 8eaf5c6590e4dc93f5600afb25cc765f5edeca6f Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 9 Aug 2024 13:11:13 +0800 Subject: [PATCH] Address comments --- .../pulsar/client/api/InterceptorsTest.java | 101 ++---------------- .../client/impl/MultiTopicsConsumerImpl.java | 3 +- 2 files changed, 13 insertions(+), 91 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index 8e674a6a4d5f2..1449f94fb4ea1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -402,9 +402,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -448,13 +448,19 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId int keyCount = 0; for (int i = 0; i < 2; i++) { - Message received = consumer.receive(); + Message received; + if (i % 2 == 0) { + received = consumer.receive(); + } else { + received = consumer.receiveAsync().join(); + } MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { if ("beforeConsumer".equals(keyValue.getKey())) { keyCount++; } } + Assert.assertEquals(keyCount, i + 1); consumer.acknowledge(received); } Assert.assertEquals(2, keyCount); @@ -474,9 +480,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -855,91 +861,6 @@ public void onPartitionsChange(String topicName, int partitions) { Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS)); } - @Test - public void testConsumerInterceptorWithMultiTopics() throws PulsarClientException { - - AtomicInteger hitCount = new AtomicInteger(); - - ConsumerInterceptor interceptor = new ConsumerInterceptor() { - @Override - public void close() { - - } - - @Override - public Message beforeConsume(Consumer consumer, Message message) { - hitCount.incrementAndGet(); - log.info("beforeConsume consumer: {}, messageId: {}", consumer.getTopic(), message.getMessageId()); - return message; - } - - @Override - public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { - log.info("onAcknowledge messageId: {}", messageId, cause); - } - - @Override - public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { - log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause); - } - - @Override - public void onNegativeAcksSend(Consumer consumer, Set messageIds) { - - } - - @Override - public void onAckTimeoutSend(Consumer consumer, Set messageIds) { - - } - }; - - List topics = Arrays.asList("persistent://my-property/my-ns/my-topic", - "persistent://my-property/my-ns/my-topic1", "persistent://my-property/my-ns/my-topic2"); - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topics(topics) - .subscriptionType(SubscriptionType.Shared) - .receiverQueueSize(100) - .intercept(interceptor) - .subscriptionName("my-subscription") - .subscribe(); - - Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") - .create(); - - for (int i = 0; i < 50; i++) { - producer.newMessage().value("Hello Pulsar!").send(); - } - - Producer producer2 = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic1") - .create(); - - for (int i = 0; i < 50; i++) { - producer2.newMessage().value("Hello Pulsar-2!").send(); - } - - - for (int i = 0; i < 100; i++) { - Message msg; - if (i % 2 == 0) { - msg = consumer.receive(); - } else { - msg = consumer.receiveAsync().join(); - } - Assert.assertEquals(hitCount.get(), i + 1); - log.info("Received message: {}, count: {}", msg.getMessageId(), hitCount.get()); - consumer.acknowledge(msg); - } - producer.close(); - producer2.close(); - consumer.close(); - - Assert.assertEquals(100, hitCount.get()); - } - - private void produceAndConsume(int msgCount, Producer producer, Reader reader) throws PulsarClientException { for (int i = 0; i < msgCount; i++) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bffff89c2244f..fe54607166369 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -392,9 +392,10 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount()); + message = beforeConsume(message); } resumeReceivingFromPausedConsumersIfNeeded(); - return beforeConsume(message); + return message; } catch (Exception e) { ExceptionHandler.handleInterruptedException(e); throw PulsarClientException.unwrap(e);