Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Improve CPU resources usege of TopicName Cache #23052

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean backlogQuotaCheckEnabled = true;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,16 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTopicNameCacheMaxCapacity(5000);
conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down Expand Up @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}

@Test
public void testTopicCacheConfiguration() throws Exception {
cleanup();
setup();
assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);

List<TopicName> topicNameCached = new ArrayList<>();
for (int i = 0; i < 20; i++) {
topicNameCached.add(TopicName.get("public/default/tp_" + i));
}

// Verify: the cache does not clear since it is not larger than max capacity.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}

// Update max capacity.
admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10");

// Verify: the cache were cleared.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
nodece marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Test
public void testBacklogAndRetentionCheck() throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,7 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void testInit() throws Exception {
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down Expand Up @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception {
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED);
}

@Test
public void testTopicNameCacheConfiguration() throws Exception {
ServiceConfiguration conf;
final Properties properties = new Properties();
properties.setProperty("maxSecondsToClearTopicNameCache", "2");
properties.setProperty("topicNameCacheMaxCapacity", "100");
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
package org.apache.pulsar.common.naming;

import com.google.common.base.Splitter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;

Expand All @@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {

private final int partitionIndex;

private static final LoadingCache<String, TopicName> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, TopicName>() {
@Override
public TopicName load(String name) throws Exception {
return new TopicName(name);
}
});
private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved

public static void clearIfReachedMaxCapacity(int maxCapacity) {
if (maxCapacity < 0) {
// Unlimited cache.
return;
}
if (cache.size() > maxCapacity) {
cache.clear();
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
}
}

public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
Expand All @@ -79,11 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String
}

public static TopicName get(String topic) {
try {
return cache.get(topic);
} catch (ExecutionException | UncheckedExecutionException e) {
throw (RuntimeException) e.getCause();
TopicName tp = cache.get(topic);
if (tp != null) {
return tp;
}
return cache.computeIfAbsent(topic, k -> new TopicName(k));
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
}

public static TopicName getPartitionedTopicName(String topic) {
Expand Down
Loading