Skip to content

Commit

Permalink
4/17 part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Apr 18, 2024
1 parent b1db27e commit 888d764
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -70,7 +72,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private final PulsarService pulsarService;
private final HashSet localCluster;
private final String clusterName;
private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() {
@Override
protected NamespaceEventsSystemTopicFactory initialize() {
try {
return new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
throw new RuntimeException(e);
}
}
};

@VisibleForTesting
final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -102,7 +116,7 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
});
})
.buildAsync((namespaceName, executor) -> {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespaceName);
return systemTopicClient.newWriterAsync();
});
Expand Down Expand Up @@ -301,7 +315,7 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa
result.complete(null);
return result;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newReaderAsync().thenAccept(r ->
fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result));
Expand Down Expand Up @@ -373,7 +387,7 @@ protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemT
} catch (PulsarServerException ex) {
return FutureUtil.failedFuture(ex);
}
final SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
final SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespace);
return systemTopicClient.newReaderAsync();
}
Expand Down Expand Up @@ -561,7 +575,7 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
log.error("Failed to create system topic factory");
break;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getEventKey(topicName),
Expand Down Expand Up @@ -595,18 +609,18 @@ private boolean hasReplicateTo(Message<?> message) {
}

private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
if (namespaceEventsSystemTopicFactory == null) {
synchronized (this) {
if (namespaceEventsSystemTopicFactory == null) {
try {
namespaceEventsSystemTopicFactory =
new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
throw e;
}
}
}
try {
getNamespaceEventsSystemTopicFactory();
} catch (Exception e) {
throw new PulsarServerException(e);
}
}

private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
try {
return namespaceEventsSystemTopicFactoryLazyInitializer.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,12 @@ public void testDoNotReplicateSystemTopic() throws Exception {

private void initTransaction(int coordinatorSize, PulsarAdmin admin, String ServiceUrl,
PulsarService pulsarService) throws Exception {
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), coordinatorSize);

if (!admin.namespaces().getNamespaces(NamespaceName.SYSTEM_NAMESPACE.getTenant())
.contains(NamespaceName.SYSTEM_NAMESPACE.toString())) {
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), coordinatorSize);
}

pulsarService.getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Sets;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand All @@ -45,6 +46,7 @@ protected void cleanup() throws Exception {

@Test
public void testMaxTenant() throws Exception {
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
conf.setMaxTenants(2);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
Expand Down

0 comments on commit 888d764

Please sign in to comment.