Skip to content

Commit

Permalink
[improve] Use Google re2/j library for user provided regexes (#22829)
Browse files Browse the repository at this point in the history
Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
  • Loading branch information
merlimat and lhotari authored Jun 4, 2024
1 parent be5eb91 commit 30069db
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 31 deletions.
1 change: 1 addition & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ The Apache Software License, Version 2.0
* Apache Avro
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar
* RE2j -- re2j-1.7.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ flexible messaging model and an intuitive client API.</description>
<opentelemetry.instrumentation.alpha.version>${opentelemetry.instrumentation.version}-alpha</opentelemetry.instrumentation.alpha.version>
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
<picocli.version>4.7.5</picocli.version>
<re2j.version>1.7</re2j.version>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
Expand Down Expand Up @@ -656,6 +657,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
<version>${re2j.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.re2j.Pattern;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -59,7 +60,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import com.google.re2j.Pattern;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.TopicResources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import com.google.re2j.Pattern;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand All @@ -43,7 +44,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.regex.Pattern;

public class TopicListServiceTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import com.google.re2j.Pattern;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.metadata.api.NotificationType;
import static org.mockito.Mockito.mock;
Expand All @@ -29,7 +30,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

public class TopicListWatcherTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

// 4. verify consumer
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

Expand Down Expand Up @@ -310,7 +310,7 @@ public void testBinaryProtoSubscribeAllTopicOfNamespace() throws Exception {
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

Expand Down Expand Up @@ -393,7 +393,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

Expand Down Expand Up @@ -490,7 +490,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

Expand Down Expand Up @@ -566,7 +566,7 @@ public void testStartEmptyPatternConsumer() throws Exception {
});

// 3. verify consumer get methods, to get 5 number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
Expand Down Expand Up @@ -595,7 +595,7 @@ public void testStartEmptyPatternConsumer() throws Exception {

// 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
Awaitility.await().untilAsserted(() -> {
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
Expand Down Expand Up @@ -667,7 +667,7 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi

// 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the
// changes to update(CommandWatchTopicUpdate).
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
Awaitility.await().untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4);
Expand Down Expand Up @@ -728,7 +728,7 @@ public void testPreciseRegexpSubscribe(boolean partitioned, boolean createTopicA
}

// 2. verify consumer can subscribe the topic.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
Awaitility.await().untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
Expand Down Expand Up @@ -786,7 +786,7 @@ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned)

// 2. verify consumer can subscribe the topic.
// Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 1);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 1);
Expand Down Expand Up @@ -883,7 +883,7 @@ public void testAutoSubscribePatternConsumer() throws Exception {
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
Expand Down Expand Up @@ -999,7 +999,7 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
Expand Down Expand Up @@ -1092,7 +1092,7 @@ public void testTopicDeletion() throws Exception {
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

// 4. verify consumer get methods
assertSame(consumerImpl.getPattern(), pattern);
assertSame(consumerImpl.getPattern().pattern(), pattern.pattern());
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);

producer1.send("msg-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
Expand All @@ -32,7 +33,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.re2j.Pattern;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
Expand Down Expand Up @@ -577,6 +578,7 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo
Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
Pattern pattern = Pattern.compile(conf.getTopicsPattern().pattern());

CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null)
Expand All @@ -592,10 +594,10 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo

List<String> topicsList = getTopicsResult.getTopics();
if (!getTopicsResult.isFiltered()) {
topicsList = TopicList.filterTopics(getTopicsResult.getTopics(), conf.getTopicsPattern());
topicsList = TopicList.filterTopics(getTopicsResult.getTopics(), pattern);
}
conf.getTopicNames().addAll(topicsList);
ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<>(pattern,
getTopicsResult.getTopicsHash(),
PulsarClientImpl.this,
conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
*/
package org.apache.pulsar.client.impl;

import com.google.re2j.Pattern;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Sets;
import com.google.re2j.Pattern;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -32,7 +33,6 @@
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.regex.Pattern;

public class PatternMultiTopicsConsumerImplTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import com.google.re2j.Pattern;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
Expand All @@ -41,7 +42,6 @@
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;

public class TopicListWatcherTest {

Expand Down
5 changes: 5 additions & 0 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.re2j.Matcher;
import com.google.re2j.Pattern;

public class PulsarVersion {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.pulsar.common.topics;

import com.google.common.hash.Hashing;
import com.google.re2j.Pattern;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.common.naming.SystemTopicNames;
Expand All @@ -47,6 +47,7 @@ public static List<String> filterTopics(List<String> original, String regex) {
}
public static List<String> filterTopics(List<String> original, Pattern topicsPattern) {


final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString()));

return original.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.pulsar.common.topics;

import com.google.common.collect.Lists;
import com.google.re2j.Pattern;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.testng.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;

import com.google.common.util.concurrent.RateLimiter;
import com.google.re2j.Pattern;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -269,11 +270,14 @@ private void handle(final byte command, final DataInputStream inputStream, final
tradeConf.size = inputStream.readInt();
tradeConf.rate = inputStream.readDouble();
// See if a topic belongs to this tenant and group using this regex.
final String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
final Pattern groupRegex =
Pattern.compile(".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*");

for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String topic = entry.getKey();
final TradeUnit unit = entry.getValue();
if (topic.matches(groupRegex)) {

if (groupRegex.matcher(topic).matches()) {
unit.change(tradeConf);
}
}
Expand All @@ -282,11 +286,11 @@ private void handle(final byte command, final DataInputStream inputStream, final
// Stop all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
// See if a topic belongs to this tenant and group using this regex.
final String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
final Pattern regex = Pattern.compile(".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*");
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String topic = entry.getKey();
final TradeUnit unit = entry.getValue();
if (topic.matches(regex)) {
if (regex.matcher(topic).matches()) {
unit.stop.set(true);
}
}
Expand Down

0 comments on commit 30069db

Please sign in to comment.