Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-13-3/3: auto subscribe based on regex pattern topics changing #1175

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;

import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
Expand All @@ -40,6 +41,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -56,6 +58,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
Expand Down Expand Up @@ -967,6 +970,34 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
}
}

@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String property = commandGetTopicsOfNamespace.getProperty();
final String cluster = commandGetTopicsOfNamespace.getCluster();
final String localName = commandGetTopicsOfNamespace.getLocalName();

try {
List<String> topics = getBrokerService().pulsar()
.getNamespaceService()
.getListOfDestinations(property, cluster, localName);

if (log.isDebugEnabled()) {
log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}/{}/{}] by {}, size:{}",
remoteAddress, property, cluster, localName, requestId, topics.size());
}

ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
} catch (Exception e) {
log.warn("[{]] Error GetTopicsOfNamespace for namespace [//{}/{}/{}] by {}",
remoteAddress, property, cluster, localName, requestId);
ctx.writeAndFlush(
Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)),
e.getMessage()));
}
}

@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,11 @@ public void testUnackedCountWithRedeliveries() throws Exception {
producer.send(("hello-" + i).getBytes());
}

Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();
Set<MessageId> c1_receivedMessages = new HashSet<>();

// C-1 gets all messages but doesn't ack
for (int i = 0; i < numMsgs; i++) {
c1_receivedMessages.add((MessageIdImpl) consumer1.receive().getMessageId());
c1_receivedMessages.add(consumer1.receive().getMessageId());
}

// C-2 will not get any message initially, since everything went to C-1 already
Expand Down

Large diffs are not rendered by default.

Loading