Skip to content

Commit

Permalink
[fix][client] Fix DLQ producer name conflicts when there are same nam…
Browse files Browse the repository at this point in the history
…e consumers (#23577)
  • Loading branch information
geniusjoe authored Nov 25, 2024
1 parent e8657e2 commit c6561e1
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.NameUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
Expand Down Expand Up @@ -1084,8 +1084,8 @@ public void testHandleConsumerAfterClientChannelInactive() throws Exception {
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
final String cName1 = NameUtil.generateRandomName();
final String cName2 = NameUtil.generateRandomName();
resetChannel();
setChannelConnected();

Expand Down Expand Up @@ -1126,8 +1126,8 @@ public void test2ndSubFailedIfDisabledConCheck()
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
final String cName1 = NameUtil.generateRandomName();
final String cName2 = NameUtil.generateRandomName();
// Disabled connection check.
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1);
resetChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
Expand Down Expand Up @@ -261,7 +262,12 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
final String consumerName = "my-consumer";
String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, subscription, consumerName);
Pattern deadLetterProducerNamePattern =
Pattern.compile("^persistent://my-property/my-ns/dead-letter-topic"
+ "-my-subscription"
+ "-my-consumer"
+ "-[a-zA-Z0-9]{5}"
+ "-DLQ$");

final int maxRedeliveryCount = 1;

Expand Down Expand Up @@ -308,8 +314,9 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getProducerName(), deadLetterProducerName);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
assertTrue(deadLetterProducerNamePattern.matcher(message.getProducerName()).matches());
log.info("dead letter consumer received message : {} {}, dead letter producer name : {}",
message.getMessageId(), new String(message.getData()), message.getProducerName());
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
Expand All @@ -318,6 +325,81 @@ public void testDeadLetterTopicWithProducerName() throws Exception {
consumer.close();
}


@Test(timeOut = 30000)
public void testMultipleSameNameConsumersToDeadLetterTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/same-name-consumers-dead-letter-topic";
final int maxRedeliveryCount = 1;
final int messageCount = 10;
final int consumerCount = 3;

//1 start 3 parallel consumers
List<Consumer<String>> consumers = new ArrayList<>();
final AtomicInteger totalReceived = new AtomicInteger(0);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(consumerCount);
for (int i = 0; i < consumerCount; i++) {
executor.execute(() -> {
try {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-subscription-DuplicatedMessage")
.subscriptionType(SubscriptionType.Shared)
.consumerName("my-consumer")
.ackTimeout(1001, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic(topic + "-DLQ").build())
.negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
totalReceived.getAndIncrement();
//never ack
})
.subscribe();
consumers.add(consumer);
} catch (PulsarClientException e) {
fail();
}
});
}

//2 send messages
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < messageCount; i++) {
producer.send(String.format("Message [%d]", i));
}

//3 start a DLQ consumer
Consumer<String> deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic + "-DLQ")
.subscriptionName("my-subscription-DuplicatedMessage-DLQ")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
int totalInDeadLetter = 0;
while (true) {
Message<String> message = deadLetterConsumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
}

//4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1)
assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1));

//5 The message in DLQ should be equal to messageCount
assertEquals(totalInDeadLetter, messageCount);

//6 clean up
producer.close();
deadLetterConsumer.close();
for (Consumer<String> consumer : consumers) {
consumer.close();
}
}

@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NameUtil;
import org.apache.pulsar.client.util.NoOpLock;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand Down Expand Up @@ -132,7 +132,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = conf.getSubscriptionName();
this.conf = conf;
this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
this.consumerName = conf.getConsumerName() == null ? NameUtil.generateRandomName() : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
this.listener = conf.getMessageListener();
this.consumerEventListener = conf.getConsumerEventListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NameUtil;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
Expand Down Expand Up @@ -2265,8 +2266,8 @@ private void initDeadLetterProducerIfNeeded() {
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName))
.producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
this.consumerName, NameUtil.generateRandomName()))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NameUtil;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -113,15 +113,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, executorProvider,
this(client, DUMMY_TOPIC_NAME_PREFIX + NameUtil.generateRandomName(), conf, executorProvider,
subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
}

MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId,
long startMessageRollbackDurationInSec) {
this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, executorProvider,
this(client, DUMMY_TOPIC_NAME_PREFIX + NameUtil.generateRandomName(), conf, executorProvider,
subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, startMessageId,
startMessageRollbackDurationInSec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.UUID;
import org.apache.commons.codec.digest.DigestUtils;

public class ConsumerName {
public class NameUtil {
public static String generateRandomName() {
return DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5);
}
Expand Down

0 comments on commit c6561e1

Please sign in to comment.