Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix compatibility issues for PIP-344 #23136

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -1486,16 +1487,30 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
brokerUrl = lookupData.getBrokerUrl();
}
return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.getPartitionedTopicMetadata(topicName, false, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof PulsarClientException.NotFoundException
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){
if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) {
// Since the feature PIP-344 isn't supported, restore the behavior to previous
// behavior before https://github.com/apache/pulsar/pull/22838 changes.
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
+ "was performed using the behavior prior to PIP-344 changes, "
+ "because the broker does not support the PIP-344 feature "
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
topic);
return CompletableFuture.completedFuture(false);
} else {
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
} else {
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
Expand Down Expand Up @@ -219,4 +231,70 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
}

@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
return new Object[][]{
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
{true, true, true},
{true, true, false},
{true, false, true},
{true, false, false},
{false, true, true},
{false, true, false},
{false, false, true},
{false, false, false}
};
}

@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic")
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);

// Initialize the connections of internal Pulsar Client.
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));

// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback
// to "getPartitionsForTopic(topic, true)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();

// Verify: we will not get an un-support error.
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
for (PulsarClientImpl client : clientArray) {
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
|| unwrapEx instanceof PulsarClientException.NotFoundException);
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
+ " the broker"));
}
}

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -225,6 +229,50 @@
}
}

@Test(dataProvider = "topicDomains")
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
// Initialize connections.
String pulsarUrl = pulsar1.getBrokerServiceUrl();
PulsarClientImpl[] clients = getClientsToTest(false);
for (PulsarClientImpl client : clients) {
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
}
// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}

// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to
// "getPartitionsForTopic(topic)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();
for (PulsarClientImpl client : clients) {
// Verify: the behavior of topic creation.
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
client.getPartitionedTopicMetadata(tp, false, true).join();
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
assertTrue(metadata1.isPresent());
assertEquals(metadata1.get().partitions, 3);

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// Cleanup.
admin1.topics().deletePartitionedTopic(tp, false);
}
}

@DataProvider(name = "autoCreationParamsAll")
public Object[][] autoCreationParamsAll(){
return new Object[][]{
Expand Down Expand Up @@ -265,7 +313,7 @@
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();

Check failure on line 316 in pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 3

GetPartitionMetadataTest.testGetMetadataIfNonPartitionedTopicExists

org.apache.pulsar.client.api.PulsarClientException$FeatureNotSupportedException: The feature of getting partitions without auto-creation is not supported by the broker. Please upgrade the broker to version that supports PIP-344 to resolve this issue.
assertEquals(response.partitions, 0);
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
assertFalse(partitionedTopics.contains(topicNameStr));
Expand Down Expand Up @@ -298,7 +346,7 @@
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 3);
verifyNonPartitionedTopicNeverCreated(topicNameStr);

Expand Down Expand Up @@ -332,7 +380,7 @@
// Case-1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 3);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -347,7 +395,7 @@
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -380,7 +428,7 @@
// Case 1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -392,7 +440,7 @@
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -443,7 +491,7 @@
final TopicName topicName = TopicName.get(topicNameStr);
// Verify: the result of get partitioned topic metadata.
try {
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
fail("Expect a not found exception");
} catch (Exception e) {
Expand Down Expand Up @@ -496,7 +544,7 @@
// Verify: the result of get partitioned topic metadata.
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
// we want to skip the "lookup" phase, because it is blocked by the HTTP API
LookupService mockLookup = mock(LookupService.class);
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ public void testMergeGetPartitionedMetadataRequests() throws Exception {
// Verify the request is works after merge the requests.
List<CompletableFuture<PartitionedTopicMetadata>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false));
futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false, false));
}
for (CompletableFuture<PartitionedTopicMetadata> future : futures) {
assertEquals(future.join().partitions, topicPartitions);
Expand Down
Loading
Loading