Skip to content

Commit

Permalink
rebase master
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 21, 2024
1 parent ccc20c1 commit 61a7595
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 15 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheCaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,16 +597,17 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Max capacity of the topic name cache"
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheCaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "A Specifies the minimum number of minutes that the system stays in the memory, to avoid clear cache"
+ " frequently when there are too many topics are in use"
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxMinutesToClearTopicNameCache = 120;
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,12 +629,12 @@ public void start() throws Exception {
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxMinutesToClearTopicNameCache = pulsar.getConfiguration().getMaxMinutesToClearTopicNameCache();
statsUpdater.scheduleAtFixedRate(
() -> TopicName.clearIfFull(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()),
maxMinutesToClearTopicNameCache,
maxMinutesToClearTopicNameCache,
TimeUnit.MINUTES);
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheCaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -56,6 +59,8 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTopicNameCacheCaxCapacity(5000);
conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
Expand Down Expand Up @@ -187,6 +192,34 @@ public void testDynamicBrokerPort() throws Exception {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}

@Test
public void testTopicCacheConfiguration() throws Exception {
cleanup();
setup();
assertEquals(conf.getTopicNameCacheCaxCapacity(), 5000);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);

List<TopicName> topicNameCached = new ArrayList<>();
for (int i = 0; i < 20; i++) {
topicNameCached.add(TopicName.get("public/default/tp_" + i));
}

// Verify: the cache does not clear since it is not larger than max capacity.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertTrue(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}

// Update max capacity.
admin.brokers().updateDynamicConfiguration("topicNameCacheCaxCapacity", "10");

// Verify: the cache were cleared.
Thread.sleep(10 * 1000);
for (int i = 0; i < 20; i++) {
assertFalse(topicNameCached.get(i) == TopicName.get("public/default/tp_" + i));
}
}

@Test
public void testBacklogAndRetentionCheck() throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,7 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheCaxCapacity(), 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void testInit() throws Exception {
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheCaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down Expand Up @@ -375,4 +377,15 @@ public void testAllowAutoTopicCreationType() throws Exception {
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getAllowAutoTopicCreationType(), TopicType.NON_PARTITIONED);
}

@Test
public void testTopicNameCacheConfiguration() throws Exception {
ServiceConfiguration conf;
final Properties properties = new Properties();
properties.setProperty("maxSecondsToClearTopicNameCache", "2");
properties.setProperty("topicNameCacheCaxCapacity", "100");
conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
assertEquals(conf.getTopicNameCacheCaxCapacity(), 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
topicNameCacheCaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
topicNameCacheCaxCapacity=200
maxSecondsToClearTopicNameCache=1
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class NamespaceName implements ServiceUnitId {
private final String localName;

private static final LoadingCache<String, NamespaceName> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader<String, NamespaceName>() {
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, NamespaceName>() {
@Override
public NamespaceName load(String name) throws Exception {
return new NamespaceName(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public class TopicName implements ServiceUnitId {

private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();

public static void clearIfFull(int maxCapacity) {
public static void clearIfReachedMaxCapacity(int maxCapacity) {
if (maxCapacity < 0) {
// Unlimited cache.
return;
}
if (cache.size() > maxCapacity) {
cache.clear();
}
Expand All @@ -74,9 +78,11 @@ public static TopicName get(String domain, String tenant, String cluster, String
}

public static TopicName get(String topic) {
return cache.computeIfAbsent(topic, k -> {
return new TopicName(k);
});
TopicName tp = cache.get(topic);
if (tp != null) {
return tp;
}
return cache.computeIfAbsent(topic, k -> new TopicName(k));
}

public static TopicName getPartitionedTopicName(String topic) {
Expand Down

0 comments on commit 61a7595

Please sign in to comment.