From be4ab66bb8c5f141df6ed93de961a0216e0a4253 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 22 Sep 2023 16:26:04 +0800 Subject: [PATCH] [improve] [client] Merge lookup requests for the same topic (#21232) Motivation: Multiple consumers and producers can be maintained by the same Pulsar Client. In some cases, multiple consumers or producers might attempt to connect to the same topic. To optimize the process, it is recommended to perform the topic lookup only once for each topic. Modifications: - Merge lookup requests for the same topic. - Merge get partitioned metadata request for the same partitioned topic. --- .../client/api/BrokerServiceLookupTest.java | 103 ++++++++++++++++++ .../client/impl/BinaryProtoLookupService.java | 40 ++++++- 2 files changed, 141 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index accdd2a335f39..6becc9cb57806 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.prometheus.client.CollectorRegistry; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; @@ -72,6 +73,9 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.impl.BinaryProtoLookupService; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -94,6 +98,7 @@ import org.asynchttpclient.Response; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -826,6 +831,104 @@ public void testSkipSplitBundleIfOnlyOneBroker() throws Exception { } } + @Test + public void testMergeGetPartitionedMetadataRequests() throws Exception { + // Assert the lookup service is a "BinaryProtoLookupService". + final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; + final LookupService lookupService = pulsarClientImpl.getLookup(); + assertTrue(lookupService instanceof BinaryProtoLookupService); + + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final int topicPartitions = 10; + admin.topics().createPartitionedTopic(tpName, topicPartitions); + + // Verify the request is works after merge the requests. + List> futures = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName))); + } + for (CompletableFuture future : futures) { + assertEquals(future.join().partitions, topicPartitions); + } + + // cleanup. + admin.topics().deletePartitionedTopic(tpName); + } + + @Test + public void testMergeLookupRequests() throws Exception { + // Assert the lookup service is a "BinaryProtoLookupService". + final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; + final LookupService lookupService = pulsarClientImpl.getLookup(); + assertTrue(lookupService instanceof BinaryProtoLookupService); + + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(tpName); + + // Create 1 producer and 100 consumers. + List> producers = new ArrayList<>(); + List> consumers = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + producers.add(pulsarClient.newProducer(Schema.STRING).topic(tpName).create()); + } + for (int i = 0; i < 20; i++) { + consumers.add(pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName("s" + i).subscribe()); + } + + // Verify the lookup count will be smaller than before improve. + int lookupCountBeforeUnload = calculateLookupRequestCount(); + admin.namespaces().unload(TopicName.get(tpName).getNamespace()); + Awaitility.await().untilAsserted(() -> { + for (Producer p : producers) { + assertEquals(WhiteboxImpl.getInternalState(p, "state").toString(), "Ready"); + } + for (Consumer c : consumers) { + assertEquals(WhiteboxImpl.getInternalState(c, "state").toString(), "Ready"); + } + }); + int lookupCountAfterUnload = calculateLookupRequestCount(); + log.info("lookup count before unload: {}, after unload: {}", lookupCountBeforeUnload, lookupCountAfterUnload); + assertTrue(lookupCountAfterUnload < lookupCountBeforeUnload * 2, + "the lookup count should be smaller than before improve"); + + // Verify the producers and consumers is still works. + List messagesSent = new ArrayList<>(); + int index = 0; + for (Producer producer: producers) { + String message = Integer.valueOf(index++).toString(); + producer.send(message); + messagesSent.add(message); + } + HashSet messagesReceived = new HashSet<>(); + for (Consumer consumer : consumers) { + while (true) { + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + messagesReceived.add(msg.getValue()); + } + } + assertEquals(messagesReceived.size(), producers.size()); + + // cleanup. + for (Producer producer: producers) { + producer.close(); + } + for (Consumer consumer : consumers) { + consumer.close(); + } + admin.topics().delete(tpName); + } + + private int calculateLookupRequestCount() throws Exception { + int failures = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total") + .intValue(); + int answers = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total") + .intValue(); + return failures + answers; + } + @Test(timeOut = 10000) public void testPartitionedMetadataWithDeprecatedVersion() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index d5ce9213211dd..8ceb8e44975c8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -26,10 +26,12 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SchemaSerializationException; @@ -56,6 +58,12 @@ public class BinaryProtoLookupService implements LookupService { private final String listenerName; private final int maxLookupRedirects; + private final ConcurrentHashMap>> + lookupInProgress = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap> + partitionedMetadataInProgress = new ConcurrentHashMap<>(); + public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, @@ -92,7 +100,21 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException { * @return broker-socket-address that serves given topic */ public CompletableFuture> getBroker(TopicName topicName) { - return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0); + final MutableObject newFutureCreated = new MutableObject<>(); + try { + return lookupInProgress.computeIfAbsent(topicName, tpName -> { + CompletableFuture> newFuture = + findBroker(serviceNameResolver.resolveHost(), false, topicName, 0); + newFutureCreated.setValue(newFuture); + return newFuture; + }); + } finally { + if (newFutureCreated.getValue() != null) { + newFutureCreated.getValue().whenComplete((v, ex) -> { + lookupInProgress.remove(topicName, newFutureCreated.getValue()); + }); + } + } } /** @@ -100,7 +122,21 @@ public CompletableFuture> getBroker(T * */ public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { - return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName); + final MutableObject newFutureCreated = new MutableObject<>(); + try { + return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { + CompletableFuture newFuture = + getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName); + newFutureCreated.setValue(newFuture); + return newFuture; + }); + } finally { + if (newFutureCreated.getValue() != null) { + newFutureCreated.getValue().whenComplete((v, ex) -> { + partitionedMetadataInProgress.remove(topicName, newFutureCreated.getValue()); + }); + } + } } private CompletableFuture> findBroker(InetSocketAddress socketAddress,