Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix compatibility issues for PIP-344 #23136

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck
.SupportsGetPartitionedMetadataWithoutAutoCreation;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -1486,16 +1488,30 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
brokerUrl = lookupData.getBrokerUrl();
}
return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.getPartitionedTopicMetadata(topicName, false, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof PulsarClientException.NotFoundException
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){
if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) {
// Since the feature PIP-344 isn't supported, restore the behavior to previous
// before before https://github.com/apache/pulsar/pull/22838 changes.
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
+ "was performed using the behavior prior to PIP-344 changes, "
+ "because the broker does not support the PIP-344 feature "
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
topic);
return CompletableFuture.completedFuture(false);
} else {
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
} else {
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
Expand Down Expand Up @@ -219,4 +231,70 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
}

@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
return new Object[][]{
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
{true, true, true},
{true, true, false},
{true, false, true},
{true, false, false},
{false, true, true},
{false, true, false},
{false, false, true},
{false, false, false}
};
}

@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic")
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);

// Initialize the connections of internal Pulsar Client.
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));

// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}
// Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed
// to "getPartitionsForTopic(topic, true)".
int lookupPermitsBefore = getLookupRequestPermits();

// Verify: we will not get an un-support error.
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
for (PulsarClientImpl client : clientArray) {
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
|| unwrapEx instanceof PulsarClientException.NotFoundException);
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
+ " the broker"));
}
}

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -225,6 +229,50 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t
}
}

@Test(dataProvider = "topicDomains")
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
// Initialize connections.
String pulsarUrl = pulsar1.getBrokerServiceUrl();
PulsarClientImpl[] clients = getClientsToTest(false);
for (PulsarClientImpl client : clients) {
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
}
// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}

// Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed to
// "getPartitionsForTopic(topic)".
int lookupPermitsBefore = getLookupRequestPermits();
for (PulsarClientImpl client : clients) {
// Verify: the behavior of topic creation.
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
client.getPartitionedTopicMetadata(tp, false, true).join();
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
assertTrue(metadata1.isPresent());
assertEquals(metadata1.get().partitions, 3);

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// Cleanup.
admin1.topics().deletePartitionedTopic(tp, false);
}
}

@DataProvider(name = "autoCreationParamsAll")
public Object[][] autoCreationParamsAll(){
return new Object[][]{
Expand Down Expand Up @@ -265,7 +313,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 0);
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
assertFalse(partitionedTopics.contains(topicNameStr));
Expand Down Expand Up @@ -298,7 +346,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 3);
verifyNonPartitionedTopicNeverCreated(topicNameStr);

Expand Down Expand Up @@ -332,7 +380,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
// Case-1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 3);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -347,7 +395,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -380,7 +428,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
// Case 1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -392,7 +440,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -443,7 +491,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
final TopicName topicName = TopicName.get(topicNameStr);
// Verify: the result of get partitioned topic metadata.
try {
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
fail("Expect a not found exception");
} catch (Exception e) {
Expand Down Expand Up @@ -496,7 +544,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
// Verify: the result of get partitioned topic metadata.
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
// we want to skip the "lookup" phase, because it is blocked by the HTTP API
LookupService mockLookup = mock(LookupService.class);
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ public void testMergeGetPartitionedMetadataRequests() throws Exception {
// Verify the request is works after merge the requests.
List<CompletableFuture<PartitionedTopicMetadata>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false));
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false, false));
}
for (CompletableFuture<PartitionedTopicMetadata> future : futures) {
assertEquals(future.join().partitions, topicPartitions);
Expand Down
Loading
Loading