Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for topic deletions with regex consumers #5230

Merged
merged 5 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ public TopicBusyException(String msg) {
}
}

public static class TopicNotFoundException extends BrokerServiceException {
public TopicNotFoundException(String msg) {
super(msg);
}
}

public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
Expand Down Expand Up @@ -180,6 +186,8 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
} else if (t instanceof TopicNotFoundException) {
return PulsarApi.ServerError.TopicNotFound;
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -611,6 +612,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
final boolean forceTopicCreation = subscribe.getForceTopicCreation();

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -676,8 +678,18 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}
}

service.getOrCreateTopic(topicName.toString())
.thenCompose(topic -> {
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.pulsar().getConfig().isAllowAutoTopicCreation();

service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException("Topic does not exist"));
}

Topic topic = optTopic.get();

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(isCompatible -> {
Expand Down Expand Up @@ -728,6 +740,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage(), exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
Schema.BYTES, null
Schema.BYTES, null,
true
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -77,6 +79,7 @@ public void testAutoTopicCreationDisable() throws Exception{
final String subscriptionName = "test-topic-sub";
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Lists;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -760,4 +762,51 @@ public void testAutoUnbubscribePatternConsumer() throws Exception {
producer2.close();
producer3.close();
}

@Test()
public void testTopicDeletion() throws Exception {
String baseTopicName = "persistent://my-property/my-ns/pattern-topic-" + System.currentTimeMillis();
Pattern pattern = Pattern.compile(baseTopicName + ".*");

// Create 2 topics
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-1")
.create();
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-2")
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(1)
.subscriptionName("sub")
.subscribe();

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

// 4. verify consumer get methods
assertSame(consumerImpl.getPattern(), pattern);
assertEquals(consumerImpl.getTopics().size(), 2);

producer1.send("msg-1");

producer1.close();

Message<String> message = consumer.receive();
assertEquals(message.getValue(), "msg-1");
consumer.acknowledge(message);

// Force delete the topic while the regex consumer is connected
admin.topics().delete(baseTopicName + "-1", true);

producer2.send("msg-2");

message = consumer.receive();
assertEquals(message.getValue(), "msg-2");
consumer.acknowledge(message);

assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);

// 8. re-subscribe topic3
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
subFuture.get();

// 9. producer publish messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ public IncompatibleSchemaException(String msg) {
}
}

/**
* Topic does not exist and cannot be created.
*/
public static class TopicDoesNotExistException extends PulsarClientException {
/**
* Constructs an {@code TopicDoesNotExistException} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public TopicDoesNotExistException(String msg) {
super(msg);
}
}

/**
* Lookup exception thrown by Pulsar client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
*/
package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
Expand All @@ -29,13 +37,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -54,14 +64,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;

/**
* pulsar-client consume command implementation.
*
Expand Down Expand Up @@ -91,7 +93,10 @@ public class CmdConsume {
@Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume, "
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;


@Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
private boolean isRegex = false;

private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
Expand Down Expand Up @@ -144,7 +149,7 @@ public int run() throws PulsarClientException, IOException {
throw (new ParameterException("Number of messages should be zero or positive."));

String topic = this.mainOptions.get(0);

if(this.serviceURL.startsWith("ws")) {
return consumeFromWebSocket(topic);
}else {
Expand All @@ -158,8 +163,17 @@ private int consume(String topic) {

try {
PulsarClient client = clientBuilder.build();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType).subscribe();
ConsumerBuilder<byte[]> builder = client.newConsumer()
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType);

if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
} else {
builder.topic(topic);
}

Consumer<byte[]> consumer = builder.subscribe();

RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
Expand Down Expand Up @@ -197,13 +211,13 @@ private int consumeFromWebSocket(String topic) {
int returnCode = 0;

TopicName topicName = TopicName.get(topic);

String wsTopic = String.format(
"%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/")
+ "%s/%s/%s?subscriptionType=%s",
topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(),
subscriptionName, subscriptionType.toString());

String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic;
URI consumerUri = URI.create(consumerBaseUri);

Expand Down Expand Up @@ -252,7 +266,7 @@ private int consumeFromWebSocket(String topic) {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
try {
System.out.println(Base64.getDecoder().decode(msg));
System.out.println(Base64.getDecoder().decode(msg));
}catch(Exception e) {
System.out.println(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
return new PulsarClientException.TopicTerminatedException(errorMsg);
case IncompatibleSchema:
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case TopicNotFound:
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down
Loading