Skip to content

Commit

Permalink
[fix] [broker] Fix compatibility issues for PIP-344 (apache#23136)
Browse files Browse the repository at this point in the history
Co-authored-by: Lari Hotari <lhotari@apache.org>
  • Loading branch information
poorbarcode and lhotari authored Aug 10, 2024
1 parent 6f5c656 commit 702c0b3
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -1498,8 +1499,22 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){
if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) {
// Since the feature PIP-344 isn't supported, restore the behavior to previous
// behavior before https://github.com/apache/pulsar/pull/22838 changes.
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
+ "was performed using the behavior prior to PIP-344 changes, "
+ "because the broker does not support the PIP-344 feature "
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
topic);
return CompletableFuture.completedFuture(false);
} else {
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
} else {
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
Expand Down Expand Up @@ -219,4 +231,80 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
}

@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
return new Object[][]{
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
{true, true, true},
{true, true, false},
{true, false, true},
{true, false, false},
{false, true, true},
{false, true, false},
{false, false, true},
{false, false, false}
};
}

@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE)
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);

// Initialize the connections of internal Pulsar Client.
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));

// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback
// to "getPartitionsForTopic(topic, true)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();

// Verify: we will not get an un-support error.
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
for (PulsarClientImpl client : clientArray) {
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
|| unwrapEx instanceof PulsarClientException.NotFoundException);
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
+ " the broker"));
}
}

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// reset clients.
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -225,6 +229,60 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t
}
}

@Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
// Initialize connections.
String pulsarUrl = pulsar1.getBrokerServiceUrl();
PulsarClientImpl[] clients = getClientsToTest(false);
for (PulsarClientImpl client : clients) {
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
}
// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}

// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to
// "getPartitionsForTopic(topic)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();
for (PulsarClientImpl client : clients) {
// Verify: the behavior of topic creation.
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
client.getPartitionedTopicMetadata(tp, false, true).join();
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
assertTrue(metadata1.isPresent());
assertEquals(metadata1.get().partitions, 3);

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// Cleanup.
admin1.topics().deletePartitionedTopic(tp, false);
}

// reset clients.
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, true);
}
}
}

@DataProvider(name = "autoCreationParamsAll")
public Object[][] autoCreationParamsAll(){
return new Object[][]{
Expand Down Expand Up @@ -265,7 +323,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 0);
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
assertFalse(partitionedTopics.contains(topicNameStr));
Expand Down Expand Up @@ -298,7 +356,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 3);
verifyNonPartitionedTopicNeverCreated(topicNameStr);

Expand Down Expand Up @@ -332,7 +390,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
// Case-1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 3);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -347,7 +405,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -380,7 +438,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
// Case 1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -392,7 +450,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -443,7 +501,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
final TopicName topicName = TopicName.get(topicNameStr);
// Verify: the result of get partitioned topic metadata.
try {
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
fail("Expect a not found exception");
} catch (Exception e) {
Expand Down Expand Up @@ -496,7 +554,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
// Verify: the result of get partitioned topic metadata.
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down
Loading

0 comments on commit 702c0b3

Please sign in to comment.