diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 58d7e71b65d84..ac37aca531af9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -607,13 +607,12 @@ public CompletableFuture closeAsync() { } } - closeLocalMetadataStore(); + asyncCloseFutures.add(closeLocalMetadataStore()); + if (configMetadataSynchronizer != null) { + asyncCloseFutures.add(configMetadataSynchronizer.closeAsync()); + } if (configurationMetadataStore != null && shouldShutdownConfigurationMetadataStore) { configurationMetadataStore.close(); - if (configMetadataSynchronizer != null) { - configMetadataSynchronizer.close(); - configMetadataSynchronizer = null; - } } if (transactionExecutorProvider != null) { @@ -1160,14 +1159,16 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro .build()); } - protected void closeLocalMetadataStore() throws Exception { + protected CompletableFuture closeLocalMetadataStore() throws Exception { if (localMetadataStore != null) { localMetadataStore.close(); } if (localMetadataSynchronizer != null) { - localMetadataSynchronizer.close(); + CompletableFuture closeSynchronizer = localMetadataSynchronizer.closeAsync(); localMetadataSynchronizer = null; + return closeSynchronizer; } + return CompletableFuture.completedFuture(null); } protected void startLeaderElectionService() { @@ -1928,4 +1929,69 @@ public CompletableFuture newTopicCompactionService(Strin return CompletableFuture.failedFuture(e); } } + + public void initConfigMetadataSynchronizerIfNeeded() { + mutex.lock(); + try { + final String newTopic = config.getConfigurationMetadataSyncEventTopic(); + final PulsarMetadataEventSynchronizer oldSynchronizer = configMetadataSynchronizer; + // Skip if not support. + if (!(configurationMetadataStore instanceof MetadataStoreExtended)) { + LOG.info( + "Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}]" + + " does not support.", configurationMetadataStore.getClass().getName()); + return; + } + // Skip if no changes. + // case-1: both null. + // case-2: both topics are the same. + if ((oldSynchronizer == null && StringUtils.isBlank(newTopic))) { + LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed."); + } + if (StringUtils.isNotBlank(newTopic) && oldSynchronizer != null) { + TopicName newTopicName = TopicName.get(newTopic); + TopicName oldTopicName = TopicName.get(oldSynchronizer.getTopicName()); + if (newTopicName.equals(oldTopicName)) { + LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.", + oldTopicName); + } + } + // Update(null or not null). + // 1.set the new one. + // 2.close the old one. + // 3.async start the new one. + if (StringUtils.isBlank(newTopic)) { + configMetadataSynchronizer = null; + } else { + configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, newTopic); + } + // close the old one and start the new one. + PulsarMetadataEventSynchronizer newSynchronizer = configMetadataSynchronizer; + MetadataStoreExtended metadataStoreExtended = (MetadataStoreExtended) configurationMetadataStore; + metadataStoreExtended.updateMetadataEventSynchronizer(newSynchronizer); + Runnable startNewSynchronizer = () -> { + if (newSynchronizer == null) { + return; + } + try { + newSynchronizer.start(); + } catch (Exception e) { + // It only occurs when get internal client fails. + LOG.error("Start Metadata Synchronizer with topic {} failed.", + newTopic, e); + } + }; + executor.submit(() -> { + if (oldSynchronizer != null) { + oldSynchronizer.closeAsync().whenComplete((ignore, ex) -> { + startNewSynchronizer.run(); + }); + } else { + startNewSynchronizer.run(); + } + }); + } finally { + mutex.unlock(); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c1b2b9e1da974..0cba16470319c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2806,6 +2806,11 @@ private void updateConfigurationAndRegisterListeners() { pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled); }); + // add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed. + registerConfigurationListener("configurationMetadataSyncEventTopic", enabled -> { + pulsar.initConfigMetadataSynchronizerIfNeeded(); + }); + // add more listeners here // (3) create dynamic-config if not exist. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index 0383a0b755245..8b2ebf200537e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -19,11 +19,15 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -46,6 +50,7 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class); protected PulsarService pulsar; protected BrokerService brokerService; + @Getter protected String topicName; protected PulsarClientImpl client; protected volatile Producer producer; @@ -53,19 +58,32 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize private final CopyOnWriteArrayList>> listeners = new CopyOnWriteArrayList<>(); - private volatile boolean started = false; + static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state"); + @Getter + private volatile State state; public static final String SUBSCRIPTION_NAME = "metadata-syncer"; private static final int MAX_PRODUCER_PENDING_SIZE = 1000; protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); + private volatile CompletableFuture closeFuture; - public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) throws PulsarServerException { + public enum State { + Init, + Starting_Producer, + Starting_Consumer, + Started, + Closing, + Closed; + } + + public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) { this.pulsar = pulsar; this.brokerService = pulsar.getBrokerService(); this.topicName = topicName; + this.state = State.Init; if (!StringUtils.isNotBlank(topicName)) { log.info("Metadata synchronizer is disabled"); - return; } } @@ -74,10 +92,11 @@ public void start() throws PulsarServerException { log.info("metadata topic doesn't exist.. skipping metadata synchronizer init.."); return; } + log.info("Metadata event synchronizer is starting on topic {}", topicName); this.client = (PulsarClientImpl) pulsar.getClient(); - startProducer(); - startConsumer(); - log.info("Metadata event synchronizer started on topic {}", topicName); + if (STATE_UPDATER.compareAndSet(this, State.Init, State.Starting_Producer)) { + startProducer(); + } } @Override @@ -98,7 +117,7 @@ public String getClusterName() { } private void publishAsync(MetadataEvent event, CompletableFuture future) { - if (!started) { + if (!isProducerStarted()) { log.info("Producer is not started on {}, failed to publish {}", topicName, event); future.completeExceptionally(new IllegalStateException("producer is not started yet")); } @@ -114,62 +133,100 @@ private void publishAsync(MetadataEvent event, CompletableFuture future) { } private void startProducer() { + if (isClosingOrClosed()) { + log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName); + } + if (producer != null) { + log.error("[{}] Failed to start the producer because the producer has been set, state: {}", + topicName, state); + return; + } log.info("[{}] Starting producer", topicName); client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(topicName) - .messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false) - .sendTimeout(0, TimeUnit.SECONDS) // - .maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> { + .messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) // + .maxPendingMessages(MAX_PRODUCER_PENDING_SIZE).createAsync().thenAccept(prod -> { + backOff.reset(); + if (STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Starting_Consumer)) { producer = prod; - started = true; log.info("producer is created successfully {}", topicName); - }).exceptionally(ex -> { - long waitTimeMs = backOff.next(); - log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(), - waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); - return null; - }); + PulsarMetadataEventSynchronizer.this.startConsumer(); + } else { + State stateTransient = state; + log.info("[{}] Closing the new producer because the synchronizer state is {}", prod, + stateTransient); + CompletableFuture closeProducer = new CompletableFuture<>(); + closeResource(() -> prod.closeAsync(), closeProducer); + closeProducer.thenRun(() -> { + log.info("[{}] Closed the new producer because the synchronizer state is {}", prod, + stateTransient); + }); + } + }).exceptionally(ex -> { + long waitTimeMs = backOff.next(); + log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(), + waitTimeMs / 1000.0); + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + return null; + }); } private void startConsumer() { + if (isClosingOrClosed()) { + log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName); + } if (consumer != null) { + log.error("[{}] Failed to start the consumer because the consumer has been set, state: {}", + topicName, state); return; } + log.info("[{}] Starting consumer", topicName); ConsumerBuilder consumerBuilder = client.newConsumer(Schema.AVRO(MetadataEvent.class)) - .topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, TimeUnit.SECONDS) - .subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> { - log.info("Processing metadata event for {} with listeners {}", msg.getValue().getPath(), - listeners.size()); - try { - if (listeners.size() == 0) { - c.acknowledgeAsync(msg); - return; - - } - if (listeners.size() == 1) { - listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg)) - .exceptionally(ex -> { - log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName, - ex.getCause()); - return null; - }); - } else { - FutureUtil - .waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue())) - .collect(Collectors.toList())) - .thenApply(__ -> c.acknowledgeAsync(msg)).exceptionally(ex -> { - log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName); - return null; - }); - } - } catch (Exception e) { - log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName); + .topic(topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Failover).messageListener((c, msg) -> { + log.info("Processing metadata event for {} with listeners {}", msg.getValue().getPath(), + listeners.size()); + try { + if (listeners.size() == 0) { + c.acknowledgeAsync(msg); + return; + } - }); + if (listeners.size() == 1) { + listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg)) + .exceptionally(ex -> { + log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName, + ex.getCause()); + return null; + }); + } else { + FutureUtil + .waitForAll(listeners.stream().map(listener -> listener.apply(msg.getValue())) + .collect(Collectors.toList())) + .thenApply(__ -> c.acknowledgeAsync(msg)).exceptionally(ex -> { + log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName); + return null; + }); + } + } catch (Exception e) { + log.warn("Failed to synchronize {} for {}", msg.getMessageId(), topicName); + } + }); consumerBuilder.subscribeAsync().thenAccept(consumer -> { - log.info("successfully created consumer {}", topicName); - this.consumer = consumer; + backOff.reset(); + if (STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Started)) { + this.consumer = consumer; + log.info("successfully created consumer {}", topicName); + } else { + State stateTransient = state; + log.info("[{}] Closing the new consumer because the synchronizer state is {}", stateTransient); + CompletableFuture closeConsumer = new CompletableFuture<>(); + closeResource(() -> consumer.closeAsync(), closeConsumer); + closeConsumer.thenRun(() -> { + log.info("[{}] Closed the new consumer because the synchronizer state is {}", stateTransient); + }); + } }).exceptionally(ex -> { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create consumer ({}), retrying in {} s", topicName, ex.getMessage(), @@ -181,19 +238,81 @@ private void startConsumer() { } public boolean isStarted() { - return started; + return this.state == State.Started; + } + + public boolean isProducerStarted() { + return this.state.ordinal() > State.Starting_Producer.ordinal() + && this.state.ordinal() < State.Closing.ordinal(); + } + + public boolean isClosingOrClosed() { + return this.state == State.Closing || this.state == State.Closed; } @Override - public void close() { - started = false; - if (producer != null) { - producer.closeAsync(); - producer = null; + public synchronized CompletableFuture closeAsync() { + int tryChangeStateCounter = 0; + while (true) { + if (isClosingOrClosed()) { + return closeFuture; + } + if (STATE_UPDATER.compareAndSet(this, State.Init, State.Closing) + || STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Closing) + || STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Closing) + || STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)) { + break; + } + // Just for avoid spinning loop which would cause 100% CPU consumption here. + if (++tryChangeStateCounter > 100) { + log.error("Unexpected error: the state can not be changed to closing {}, state: {}", topicName, state); + return CompletableFuture.failedFuture(new RuntimeException("Unexpected error," + + " the state can not be changed to closing")); + } } - if (consumer != null) { - consumer.closeAsync(); - consumer = null; + CompletableFuture closeProducer = new CompletableFuture<>(); + CompletableFuture closeConsumer = new CompletableFuture<>(); + if (producer == null) { + closeProducer.complete(null); + } else { + closeResource(() -> producer.closeAsync(), closeProducer); + } + if (consumer == null) { + closeConsumer.complete(null); + } else { + closeResource(() -> consumer.closeAsync(), closeConsumer); + } + + // Add logs. + closeProducer.thenRun(() -> log.info("Successfully close producer {}", topicName)); + closeConsumer.thenRun(() -> log.info("Successfully close consumer {}", topicName)); + + closeFuture = FutureUtil.waitForAll(Arrays.asList(closeProducer, closeConsumer)); + closeFuture.thenRun(() -> { + this.state = State.Closed; + log.info("Successfully close metadata store synchronizer {}", topicName); + }); + return closeFuture; + } + + private void closeResource(final Supplier> asyncCloseable, + final CompletableFuture future) { + if (asyncCloseable == null) { + future.complete(null); + return; } + asyncCloseable.get().whenComplete((ignore, ex) -> { + if (ex == null) { + backOff.reset(); + future.complete(null); + return; + } + // Retry. + long waitTimeMs = backOff.next(); + log.warn("[{}] Exception: '{}' occurred while trying to close the %s. Retrying again in {} s.", + topicName, ex.getMessage(), asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex); + brokerService.executor().schedule(() -> closeResource(asyncCloseable, future), waitTimeMs, + TimeUnit.MILLISECONDS); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java new file mode 100644 index 0000000000000..9b4dd5192e1ec --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Sets; +import java.net.URL; +import java.util.Collections; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.tests.TestRetrySupport; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; + +@Slf4j +public abstract class GeoReplicationWithConfigurationSyncTestBase extends TestRetrySupport { + + protected final String defaultTenant = "public"; + protected final String defaultNamespace = defaultTenant + "/default"; + + protected final String cluster1 = "r1"; + protected URL url1; + protected URL urlTls1; + protected ServiceConfiguration config1 = new ServiceConfiguration(); + protected ZookeeperServerTest brokerConfigZk1; + protected LocalBookkeeperEnsemble bkEnsemble1; + protected PulsarService pulsar1; + protected BrokerService ns1; + protected PulsarAdmin admin1; + protected PulsarClient client1; + + protected URL url2; + protected URL urlTls2; + protected final String cluster2 = "r2"; + protected ServiceConfiguration config2 = new ServiceConfiguration(); + protected ZookeeperServerTest brokerConfigZk2; + protected LocalBookkeeperEnsemble bkEnsemble2; + protected PulsarService pulsar2; + protected BrokerService ns2; + protected PulsarAdmin admin2; + protected PulsarClient client2; + + protected void startZKAndBK() throws Exception { + // Start ZK. + brokerConfigZk1 = new ZookeeperServerTest(0); + brokerConfigZk1.start(); + brokerConfigZk2 = new ZookeeperServerTest(0); + brokerConfigZk2.start(); + + // Start BK. + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1.start(); + bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble2.start(); + } + + protected void startBrokers() throws Exception { + // Start brokers. + setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1); + pulsar1 = new PulsarService(config1); + pulsar1.start(); + ns1 = pulsar1.getBrokerService(); + + url1 = new URL(pulsar1.getWebServiceAddress()); + urlTls1 = new URL(pulsar1.getWebServiceAddressTls()); + admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); + client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); + + // Start region 2 + setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2); + pulsar2 = new PulsarService(config2); + pulsar2.start(); + ns2 = pulsar2.getBrokerService(); + + url2 = new URL(pulsar2.getWebServiceAddress()); + urlTls2 = new URL(pulsar2.getWebServiceAddressTls()); + admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); + client2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); + } + + protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { + admin1.clusters().createCluster(cluster1, ClusterData.builder() + .serviceUrl(url1.toString()) + .serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(false) + .build()); + admin1.clusters().createCluster(cluster2, ClusterData.builder() + .serviceUrl(url2.toString()) + .serviceUrlTls(urlTls2.toString()) + .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(false) + .build()); + admin2.clusters().createCluster(cluster1, ClusterData.builder() + .serviceUrl(url1.toString()) + .serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(false) + .build()); + admin2.clusters().createCluster(cluster2, ClusterData.builder() + .serviceUrl(url2.toString()) + .serviceUrlTls(urlTls2.toString()) + .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(false) + .build()); + + admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), + Sets.newHashSet(cluster1, cluster2))); + admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), + Sets.newHashSet(cluster1, cluster2))); + + admin1.namespaces().createNamespace(defaultNamespace); + admin2.namespaces().createNamespace(defaultNamespace); + } + + @Override + protected void setup() throws Exception { + incrementSetupNumber(); + + log.info("--- Starting OneWayReplicatorTestBase::setup ---"); + + startZKAndBK(); + + startBrokers(); + + createDefaultTenantsAndClustersAndNamespace(); + + Thread.sleep(100); + log.info("--- OneWayReplicatorTestBase::setup completed ---"); + } + + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setWebServicePort(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort()); + config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo"); + config.setBrokerDeleteInactiveTopicsEnabled(false); + config.setBrokerDeleteInactiveTopicsFrequencySeconds(60); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); + config.setBacklogQuotaCheckIntervalInSeconds(5); + config.setDefaultNumberOfNamespaceBundles(1); + config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + config.setEnableReplicatedSubscriptions(true); + config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadBalancerSheddingEnabled(false); + } + + @Override + protected void cleanup() throws Exception { + // shutdown. + markCurrentSetupNumberCleaned(); + log.info("--- Shutting down ---"); + + // Stop brokers. + if (client1 != null) { + client1.close(); + client1 = null; + } + if (client2 != null) { + client2.close(); + client2 = null; + } + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } + + // Stop ZK and BK. + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (bkEnsemble2 != null) { + bkEnsemble2.stop(); + bkEnsemble2 = null; + } + if (brokerConfigZk1 != null) { + brokerConfigZk1.stop(); + brokerConfigZk1 = null; + } + if (brokerConfigZk2 != null) { + brokerConfigZk2.stop(); + brokerConfigZk2 = null; + } + + // Reset configs. + config1 = new ServiceConfiguration(); + config2 = new ServiceConfiguration(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java new file mode 100644 index 0000000000000..577725f96ed34 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SyncConfigStoreTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.util.Arrays; +import java.util.HashSet; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.metadata.api.MetadataEvent; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class SyncConfigStoreTest extends GeoReplicationWithConfigurationSyncTestBase { + + private static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic"; + private static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + SYSTEM_NAMESPACE + + "/__sync_config_meta"; + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + TenantInfoImpl tenantInfo = new TenantInfoImpl(); + tenantInfo.setAllowedClusters(new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(), tenantInfo); + admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace()); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + } + + @Test + public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception { + // Verify the condition that supports synchronizer: the metadata store is a different one. + Awaitility.await().untilAsserted(() -> { + boolean shouldShutdownConfigurationMetadataStore = + WhiteboxImpl.getInternalState(pulsar1, "shouldShutdownConfigurationMetadataStore"); + assertTrue(shouldShutdownConfigurationMetadataStore); + }); + + // Verify the synchronizer will be created dynamically. + admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC, SYNC_EVENT_TOPIC); + Awaitility.await().untilAsserted(() -> { + assertEquals(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(), SYNC_EVENT_TOPIC); + PulsarMetadataEventSynchronizer synchronizer = + WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer"); + assertNotNull(synchronizer); + assertEquals(synchronizer.getState(), PulsarMetadataEventSynchronizer.State.Started); + assertTrue(synchronizer.isStarted()); + }); + + PulsarMetadataEventSynchronizer synchronizerStarted = + WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer"); + Producer producerStarted = + WhiteboxImpl.getInternalState(synchronizerStarted, "producer"); + Consumer consumerStarted = + WhiteboxImpl.getInternalState(synchronizerStarted, "consumer"); + + // Verify the synchronizer will be closed dynamically. + admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC); + Awaitility.await().untilAsserted(() -> { + // The synchronizer that was started will be closed. + assertEquals(synchronizerStarted.getState(), PulsarMetadataEventSynchronizer.State.Closed); + assertTrue(synchronizerStarted.isClosingOrClosed()); + assertFalse(producerStarted.isConnected()); + assertFalse(consumerStarted.isConnected()); + // The synchronizer in memory will be null. + assertNull(pulsar1.getConfig().getConfigurationMetadataSyncEventTopic()); + PulsarMetadataEventSynchronizer synchronizer = + WhiteboxImpl.getInternalState(pulsar1, "configMetadataSynchronizer"); + assertNull(synchronizer); + }); + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java index 9a735e0f15ab8..cababd0324627 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataEventSynchronizer.java @@ -49,5 +49,5 @@ public interface MetadataEventSynchronizer { /** * close synchronizer resources. */ - void close(); + CompletableFuture closeAsync(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java index e565ba30d3dfb..182c14ef601a4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java @@ -84,6 +84,8 @@ default Optional getMetadataEventSynchronizer() { return Optional.empty(); } + default void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {} + /** * Handles a metadata synchronizer event. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 7a495f78771b1..3909a89cf5eb2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -82,8 +82,7 @@ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadata String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener - synchronizer = metadataStoreConfig.getSynchronizer(); - registerSyncListener(Optional.ofNullable(synchronizer)); + updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer()); if ("local".equals(name)) { map = new TreeMap<>(); sequentialIdGenerator = new AtomicLong(); @@ -233,6 +232,12 @@ public Optional getMetadataEventSynchronizer() { return Optional.ofNullable(synchronizer); } + @Override + public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) { + this.synchronizer = synchronizer; + registerSyncListener(Optional.ofNullable(synchronizer)); + } + @Override public void close() throws Exception { if (isClosed.compareAndSet(false, true)) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index be985129f2ad1..39f7edd5ceed5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -112,8 +112,7 @@ public static RocksdbMetadataStore get(String metadataStoreUri, MetadataStoreCon // Create a new store instance store = new RocksdbMetadataStore(metadataStoreUri, conf); // update synchronizer and register sync listener - store.synchronizer = conf.getSynchronizer(); - store.registerSyncListener(Optional.ofNullable(store.synchronizer)); + store.updateMetadataEventSynchronizer(conf.getSynchronizer()); instancesCache.put(metadataStoreUri, store); return store; } @@ -572,6 +571,12 @@ protected CompletableFuture storePut(String path, byte[] data, Optional getMetadataEventSynchronizer() { return Optional.ofNullable(synchronizer); } + + @Override + public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) { + this.synchronizer = synchronizer; + registerSyncListener(Optional.ofNullable(synchronizer)); + } } class RocksdbMetadataStoreProvider implements MetadataStoreProvider { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index a164e4c246066..5b45530d2e20e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -52,7 +52,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final int maxDelayMillis; private final int maxOperations; private final int maxSize; - private final MetadataEventSynchronizer synchronizer; + private MetadataEventSynchronizer synchronizer; private final BatchMetadataStoreStats batchMetadataStoreStats; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { @@ -75,8 +75,7 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { } // update synchronizer and register sync listener - synchronizer = conf.getSynchronizer(); - registerSyncListener(Optional.ofNullable(synchronizer)); + updateMetadataEventSynchronizer(conf.getSynchronizer()); this.batchMetadataStoreStats = new BatchMetadataStoreStats(metadataStoreName, executor); } @@ -161,6 +160,12 @@ public Optional getMetadataEventSynchronizer() { return Optional.ofNullable(synchronizer); } + @Override + public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) { + this.synchronizer = synchronizer; + registerSyncListener(Optional.ofNullable(synchronizer)); + } + private void enqueue(MessagePassingQueue queue, MetadataOp op) { if (enabled) { if (!queue.offer(op)) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 728bc1175b9ba..f85e3d2dc7562 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -55,7 +55,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private final AsyncOxiaClient client; private final String identity; - private final Optional synchronizer; + private Optional synchronizer; OxiaMetadataStore( @NonNull String serviceAddress, @@ -69,7 +69,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { if (!metadataStoreConfig.isBatchingEnabled()) { linger = 0; } - this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer()); + updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer()); identity = UUID.randomUUID().toString(); client = OxiaClientBuilder.create(serviceAddress) @@ -286,5 +286,11 @@ public Optional getMetadataEventSynchronizer() { return synchronizer; } + @Override + public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) { + this.synchronizer = Optional.ofNullable(synchronizer); + registerSyncListener(this.synchronizer); + } + private record PathWithPutResult(String path, PutResult result) {} } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java index 3fabe9647eb34..caca16ff538a4 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java @@ -206,8 +206,8 @@ public String getClusterName() { } @Override - public void close() { - // No-op + public CompletableFuture closeAsync() { + return CompletableFuture.completedFuture(null); } }