diff --git a/conf/broker.conf b/conf/broker.conf index b715c4e515bc8..2b934f8d3dd7f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false # Factory class-name to create topic with custom workflow topicFactoryClassName= +# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache +# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName. +topicNameCacheCaxCapacity=100000 + +# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when +# there are too many topics are in use. +maxSecondsToClearTopicNameCache=7200 + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3e9a4c77ef270..93e324fc5f4f1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -597,16 +597,17 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = true, category = CATEGORY_POLICIES, - doc = "Max capacity of the topic name cache" + doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache" + + " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName." ) private int topicNameCacheCaxCapacity = 100_000; @FieldContext( category = CATEGORY_POLICIES, - doc = "A Specifies the minimum number of minutes that the system stays in the memory, to avoid clear cache" - + " frequently when there are too many topics are in use" + doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache" + + " frequently when there are too many topics are in use." ) - private int maxMinutesToClearTopicNameCache = 120; + private int maxSecondsToClearTopicNameCache = 3600 * 2; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2082f6b8f8627..9e5ca1e5927eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -629,12 +629,12 @@ public void start() throws Exception { } protected void startClearInvalidateTopicNameCacheTask() { - final int maxMinutesToClearTopicNameCache = pulsar.getConfiguration().getMaxMinutesToClearTopicNameCache(); - statsUpdater.scheduleAtFixedRate( - () -> TopicName.clearIfFull(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), - maxMinutesToClearTopicNameCache, - maxMinutesToClearTopicNameCache, - TimeUnit.MINUTES); + final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache(); + inactivityMonitor.scheduleAtFixedRate( + () -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()), + maxSecondsToClearTopicNameCache, + maxSecondsToClearTopicNameCache, + TimeUnit.SECONDS); } protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index daa4393db55fd..4b42b2c93a6b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -24,11 +24,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertSame; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception { super.doInitConf(); conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setTopicNameCacheCaxCapacity(5000); + conf.setMaxSecondsToClearTopicNameCache(5); if (useStaticPorts) { conf.setBrokerServicePortTls(Optional.of(6651)); conf.setBrokerServicePort(Optional.of(6660)); @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception { assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } + @Test + public void testTopicCacheConfiguration() throws Exception { + cleanup(); + setup(); + assertEquals(conf.getTopicNameCacheCaxCapacity(), 5000); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5); + + List topicNameCached = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + topicNameCached.add(TopicName.get("public/default/tp_" + i)); + } + + // Verify: the cache does not clear since it is not larger than max capacity. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + + // Update max capacity. + admin.brokers().updateDynamicConfiguration("topicNameCacheCaxCapacity", "10"); + + // Verify: the cache were cleared. + Thread.sleep(10 * 1000); + for (int i = 0; i < 20; i++) { + assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i)); + } + } + @Test public void testBacklogAndRetentionCheck() throws PulsarServerException { ServiceConfiguration config = new ServiceConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index b99f8d5338f60..0234501ebe4e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -63,5 +63,7 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getAdvertisedListeners(), "internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651"); assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(standalone.getConfig().getTopicNameCacheCaxCapacity(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index ebeaffc48e4b9..9aaa27949d448 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -74,6 +74,8 @@ public void testInit() throws Exception { assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05); assertEquals(config.getHttpMaxRequestHeaderSize(), 1234); assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); + assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); + assertEquals(config.getTopicNameCacheCaxCapacity(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception { conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED); } + + @Test + public void testTopicNameCacheConfiguration() throws Exception { + ServiceConfiguration conf; + final Properties properties = new Properties(); + properties.setProperty("maxSecondsToClearTopicNameCache", "2"); + properties.setProperty("topicNameCacheCaxCapacity", "100"); + conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2); + assertEquals(conf.getTopicNameCacheCaxCapacity(), 100); + } } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index ddda30d0a4bd9..340c8143c6dd4 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true transactionPendingAckBatchedWriteMaxRecords=44 transactionPendingAckBatchedWriteMaxSize=55 transactionPendingAckBatchedWriteMaxDelayInMillis=66 +topicNameCacheCaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index 812c8dc9748f9..4deef66d44b89 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true +topicNameCacheCaxCapacity=200 +maxSecondsToClearTopicNameCache=1 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java index 8739ed1dd2993..a804e7b6506ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java @@ -40,7 +40,7 @@ public class NamespaceName implements ServiceUnitId { private final String localName; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public NamespaceName load(String name) throws Exception { return new NamespaceName(name); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 59c9c236e6352..dd24c9a971210 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -51,7 +51,11 @@ public class TopicName implements ServiceUnitId { private static final ConcurrentHashMap cache = new ConcurrentHashMap<>(); - public static void clearIfFull(int maxCapacity) { + public static void clearIfReachedMaxCapacity(int maxCapacity) { + if (maxCapacity < 0) { + // Unlimited cache. + return; + } if (cache.size() > maxCapacity) { cache.clear(); } @@ -74,9 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String } public static TopicName get(String topic) { - return cache.computeIfAbsent(topic, k -> { - return new TopicName(k); - }); + TopicName tp = cache.get(topic); + if (tp != null) { + return tp; + } + return cache.computeIfAbsent(topic, k -> new TopicName(k)); } public static TopicName getPartitionedTopicName(String topic) {