Skip to content

Commit

Permalink
[improve] [client] Merge lookup requests for the same topic (#21232)
Browse files Browse the repository at this point in the history
Motivation: Multiple consumers and producers can be maintained by the same Pulsar Client. In some cases, multiple consumers or producers might attempt to connect to the same topic. To optimize the process, it is recommended to perform the topic lookup only once for each topic.

Modifications:
- Merge lookup requests for the same topic.
- Merge get partitioned metadata request for the same partitioned topic.
  • Loading branch information
poorbarcode authored Sep 22, 2023
1 parent d6c3fa4 commit be4ab66
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -72,6 +73,9 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
Expand All @@ -94,6 +98,7 @@
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -826,6 +831,104 @@ public void testSkipSplitBundleIfOnlyOneBroker() throws Exception {
}
}

@Test
public void testMergeGetPartitionedMetadataRequests() throws Exception {
// Assert the lookup service is a "BinaryProtoLookupService".
final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
final LookupService lookupService = pulsarClientImpl.getLookup();
assertTrue(lookupService instanceof BinaryProtoLookupService);

final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final int topicPartitions = 10;
admin.topics().createPartitionedTopic(tpName, topicPartitions);

// Verify the request is works after merge the requests.
List<CompletableFuture<PartitionedTopicMetadata>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName)));
}
for (CompletableFuture<PartitionedTopicMetadata> future : futures) {
assertEquals(future.join().partitions, topicPartitions);
}

// cleanup.
admin.topics().deletePartitionedTopic(tpName);
}

@Test
public void testMergeLookupRequests() throws Exception {
// Assert the lookup service is a "BinaryProtoLookupService".
final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
final LookupService lookupService = pulsarClientImpl.getLookup();
assertTrue(lookupService instanceof BinaryProtoLookupService);

final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Create 1 producer and 100 consumers.
List<Producer<String>> producers = new ArrayList<>();
List<Consumer<String>> consumers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
producers.add(pulsarClient.newProducer(Schema.STRING).topic(tpName).create());
}
for (int i = 0; i < 20; i++) {
consumers.add(pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName("s" + i).subscribe());
}

// Verify the lookup count will be smaller than before improve.
int lookupCountBeforeUnload = calculateLookupRequestCount();
admin.namespaces().unload(TopicName.get(tpName).getNamespace());
Awaitility.await().untilAsserted(() -> {
for (Producer p : producers) {
assertEquals(WhiteboxImpl.getInternalState(p, "state").toString(), "Ready");
}
for (Consumer c : consumers) {
assertEquals(WhiteboxImpl.getInternalState(c, "state").toString(), "Ready");
}
});
int lookupCountAfterUnload = calculateLookupRequestCount();
log.info("lookup count before unload: {}, after unload: {}", lookupCountBeforeUnload, lookupCountAfterUnload);
assertTrue(lookupCountAfterUnload < lookupCountBeforeUnload * 2,
"the lookup count should be smaller than before improve");

// Verify the producers and consumers is still works.
List<String> messagesSent = new ArrayList<>();
int index = 0;
for (Producer producer: producers) {
String message = Integer.valueOf(index++).toString();
producer.send(message);
messagesSent.add(message);
}
HashSet<String> messagesReceived = new HashSet<>();
for (Consumer<String> consumer : consumers) {
while (true) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messagesReceived.add(msg.getValue());
}
}
assertEquals(messagesReceived.size(), producers.size());

// cleanup.
for (Producer producer: producers) {
producer.close();
}
for (Consumer consumer : consumers) {
consumer.close();
}
admin.topics().delete(tpName);
}

private int calculateLookupRequestCount() throws Exception {
int failures = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total")
.intValue();
int answers = CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total")
.intValue();
return failures + answers;
}

@Test(timeOut = 10000)
public void testPartitionedMetadataWithDeprecatedVersion() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
Expand All @@ -56,6 +58,12 @@ public class BinaryProtoLookupService implements LookupService {
private final String listenerName;
private final int maxLookupRedirects;

private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
Expand Down Expand Up @@ -92,15 +100,43 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
newFutureCreated.setValue(newFuture);
return newFuture;
});
} finally {
if (newFutureCreated.getValue() != null) {
newFutureCreated.getValue().whenComplete((v, ex) -> {
lookupInProgress.remove(topicName, newFutureCreated.getValue());
});
}
}
}

/**
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
newFutureCreated.setValue(newFuture);
return newFuture;
});
} finally {
if (newFutureCreated.getValue() != null) {
newFutureCreated.getValue().whenComplete((v, ex) -> {
partitionedMetadataInProgress.remove(topicName, newFutureCreated.getValue());
});
}
}
}

private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
Expand Down

0 comments on commit be4ab66

Please sign in to comment.