diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index b8d13256d225a..482fa2cbd2300 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -49,4 +49,6 @@ default Optional getRateLimiter() { } boolean isConnected(); + + long getNumberOfEntriesInBacklog(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 973dec8980ea0..1b376492accc5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1579,13 +1579,23 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) { Optional clusterURL = getMigratedClusterUrl(service.getPulsar()); if (clusterURL.isPresent()) { - log.info("[{}] redirect migrated producer to topic {}: producerId={}, {}", remoteAddress, topicName, - producerId, ex.getCause().getMessage()); - commandSender.sendTopicMigrated(ResourceType.Producer, producerId, - clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls()); - closeProducer(producer); - return null; - + if (topic.isReplicationBacklogExist()) { + log.info("Topic {} is migrated but replication backlog exist: " + + "producerId = {}, producerName = {}, {}", topicName, + producerId, producerName, ex.getCause().getMessage()); + } else { + log.info("[{}] redirect migrated producer to topic {}: " + + "producerId={}, producerName = {}, {}", remoteAddress, + topicName, producerId, producerName, ex.getCause().getMessage()); + boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId, + clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls()); + if (!msgSent) { + log.info("client doesn't support topic migration handling {}-{}-{}", topic, + remoteAddress, producerId); + } + closeProducer(producer); + return null; + } } else { log.warn("[{}] failed producer because migration url not configured topic {}: producerId={}, {}", remoteAddress, topicName, producerId, ex.getCause().getMessage()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e6a29368dbb85..7657d77e1299f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -234,6 +234,8 @@ CompletableFuture createSubscription(String subscriptionName, Init boolean isBrokerPublishRateExceeded(); + boolean isReplicationBacklogExist(); + void disableCnxAutoRead(); void enableCnxAutoRead(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 7ad231926189b..514db4219db98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -248,7 +248,7 @@ protected Position getReplicatorReadPosition() { } @Override - protected long getNumberOfEntriesInBacklog() { + public long getNumberOfEntriesInBacklog() { // No-op return 0; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a0a8462a22753..317b8df6b9a82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -236,6 +236,11 @@ public void checkMessageDeduplicationInfo() { // No-op } + @Override + public boolean isReplicationBacklogExist() { + return false; + } + @Override public void removeProducer(Producer producer) { checkArgument(producer.getTopic() == this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index f38bcc71582a1..a556237f4342c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -165,8 +165,8 @@ protected Position getReplicatorReadPosition() { } @Override - protected long getNumberOfEntriesInBacklog() { - return cursor.getNumberOfEntriesInBacklog(false); + public long getNumberOfEntriesInBacklog() { + return cursor.getNumberOfEntriesInBacklog(true); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38..0374fc98212f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -575,6 +575,7 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) { @Override public synchronized void addFailed(ManagedLedgerException exception, Object ctx) { + PublishContext callback = (PublishContext) ctx; if (exception instanceof ManagedLedgerFencedException) { // If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen close(); @@ -587,7 +588,11 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) List> futures = new ArrayList<>(); // send migration url metadata to producers before disconnecting them if (isMigrated()) { - producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl())); + if (isReplicationBacklogExist()) { + log.info("Topic {} is migrated but replication backlog exists. Closing producers.", topic); + } else { + producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl())); + } } producers.forEach((__, producer) -> futures.add(producer.disconnect())); disconnectProducersFuture = FutureUtil.waitForAll(futures); @@ -599,8 +604,6 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) return null; }); - PublishContext callback = (PublishContext) ctx; - if (exception instanceof ManagedLedgerAlreadyClosedException) { if (log.isDebugEnabled()) { log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage()); @@ -2510,6 +2513,18 @@ public CompletableFuture checkClusterMigration() { } } + public boolean isReplicationBacklogExist() { + ConcurrentOpenHashMap replicators = getReplicators(); + if (replicators != null) { + for (Replicator replicator : replicators.values()) { + if (replicator.getNumberOfEntriesInBacklog() != 0) { + return true; + } + } + } + return false; + } + @Override public void checkGC() { if (!isDeleteWhileInactive()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index cd31f9150e619..3fe8c22b1c4de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -112,7 +112,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { protected URI lookupUrl; protected boolean isTcpLookup = false; - protected static final String configClusterName = "test"; + protected String configClusterName = "test"; protected boolean enableBrokerInterceptor = false; @@ -120,6 +120,12 @@ public MockedPulsarServiceBaseTest() { resetConfig(); } + protected void setupWithClusterName(String clusterName) throws Exception { + this.conf.setClusterName(clusterName); + this.configClusterName = clusterName; + this.internalSetup(); + } + protected PulsarService getPulsar() { return pulsar; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index f376ea4541737..df4f66c43d2b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.broker.service; +import static java.lang.Thread.sleep; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Method; @@ -59,7 +61,8 @@ public class ClusterMigrationTest { protected String methodName; String namespace = "pulsar/migrationNs"; - TestBroker broker1, broker2; + TestBroker broker1, broker2, broker3, broker4; + URL url1; URL urlTls1; PulsarService pulsar1; @@ -71,6 +74,16 @@ public class ClusterMigrationTest { PulsarService pulsar2; PulsarAdmin admin2; + URL url3; + URL urlTls3; + PulsarService pulsar3; + PulsarAdmin admin3; + + URL url4; + URL urlTls4; + PulsarService pulsar4; + PulsarAdmin admin4; + @DataProvider(name = "TopicsubscriptionTypes") public Object[][] subscriptionTypes() { return new Object[][] { @@ -91,9 +104,10 @@ public void setup() throws Exception { log.info("--- Starting ReplicatorTestBase::setup ---"); - broker1 = new TestBroker(); - broker2 = new TestBroker(); - String clusterName = broker1.getClusterName(); + broker1 = new TestBroker("r1"); + broker2 = new TestBroker("r2"); + broker3 = new TestBroker("r3"); + broker4 = new TestBroker("r4"); pulsar1 = broker1.getPulsarService(); url1 = new URL(pulsar1.getWebServiceAddress()); @@ -105,32 +119,81 @@ public void setup() throws Exception { urlTls2 = new URL(pulsar2.getWebServiceAddressTls()); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); - // Start region 3 + pulsar3 = broker3.getPulsarService(); + url3 = new URL(pulsar3.getWebServiceAddress()); + urlTls3 = new URL(pulsar3.getWebServiceAddressTls()); + admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build(); - // Provision the global namespace - admin1.clusters().createCluster(clusterName, + pulsar4 = broker4.getPulsarService(); + url4 = new URL(pulsar4.getWebServiceAddress()); + urlTls4 = new URL(pulsar4.getWebServiceAddressTls()); + admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build(); + + + admin1.clusters().createCluster("r1", ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString()) .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build()); - admin2.clusters().createCluster(clusterName, + admin3.clusters().createCluster("r1", + ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build()); + admin2.clusters().createCluster("r2", + ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString()) + .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build()); + admin4.clusters().createCluster("r2", ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString()) .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build()); + admin1.clusters().createCluster("r3", + ClusterData.builder().serviceUrl(url3.toString()).serviceUrlTls(urlTls3.toString()) + .brokerServiceUrl(pulsar3.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()).build()); + + admin3.clusters().createCluster("r3", + ClusterData.builder().serviceUrl(url3.toString()).serviceUrlTls(urlTls3.toString()) + .brokerServiceUrl(pulsar3.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls()).build()); + + admin2.clusters().createCluster("r4", + ClusterData.builder().serviceUrl(url4.toString()).serviceUrlTls(urlTls4.toString()) + .brokerServiceUrl(pulsar4.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build()); + admin4.clusters().createCluster("r4", + ClusterData.builder().serviceUrl(url4.toString()).serviceUrlTls(urlTls4.toString()) + .brokerServiceUrl(pulsar4.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build()); + + // Setting r3 as replication cluster for r1 admin1.tenants().createTenant("pulsar", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet(clusterName))); - admin1.namespaces().createNamespace(namespace, Sets.newHashSet(clusterName)); - + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3"))); + admin3.tenants().createTenant("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3"))); + admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r3")); + admin3.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r3")); + + // Setting r4 as replication cluster for r2 admin2.tenants().createTenant("pulsar", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet(clusterName))); - admin2.namespaces().createNamespace(namespace, Sets.newHashSet(clusterName)); - - assertEquals(admin1.clusters().getCluster(clusterName).getServiceUrl(), url1.toString()); - assertEquals(admin2.clusters().getCluster(clusterName).getServiceUrl(), url2.toString()); - assertEquals(admin1.clusters().getCluster(clusterName).getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl()); - assertEquals(admin2.clusters().getCluster(clusterName).getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()); - - Thread.sleep(100); + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4"))); + admin4.tenants().createTenant("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r2", "r4"))); + admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", "r4")); + admin4.namespaces().createNamespace(namespace); + admin2.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r4")); + + assertEquals(admin1.clusters().getCluster("r1").getServiceUrl(), url1.toString()); + assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString()); + assertEquals(admin3.clusters().getCluster("r3").getServiceUrl(), url3.toString()); + assertEquals(admin4.clusters().getCluster("r4").getServiceUrl(), url4.toString()); + assertEquals(admin1.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl()); + assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()); + assertEquals(admin3.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl()); + assertEquals(admin4.clusters().getCluster("r4").getBrokerServiceUrl(), pulsar4.getBrokerServiceUrl()); + + sleep(100); log.info("--- ReplicatorTestBase::setup completed ---"); } @@ -140,6 +203,8 @@ protected void cleanup() throws Exception { log.info("--- Shutting down ---"); broker1.cleanup(); broker2.cleanup(); + broker3.cleanup(); + broker4.cleanup(); } @BeforeMethod(alwaysRun = true) @@ -154,20 +219,19 @@ public void beforeMethod(Method m) throws Exception { * (3) Migrate topic to cluster-2 * (4) Validate producer-1 is connected to cluster-2 * (5) create consumer1, drain backlog and migrate and reconnect to cluster-2 - * (6) Create new consumer2 with different subscription on cluster-1, + * (6) Create new consumer2 with different subscription on cluster-1, * which immediately migrate and reconnect to cluster-2 * (7) Create producer-2 directly to cluster-2 - * (8) Create producer-3 on cluster-1 which should be redirected to cluster-2 + * (8) Create producer-3 on cluster-1 which should be redirected to cluster-2 * (8) Publish messages using producer1, producer2, and producer3 * (9) Consume all messages by both consumer1 and consumer2 - * (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1 - * (11) Restart Broker-1 and connect producer/consumer on cluster-1 + * (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1 + * (11) Restart Broker-1 and connect producer/consumer on cluster-1 * @throws Exception */ @Test(dataProvider = "TopicsubscriptionTypes") public void testClusterMigration(boolean persistent, SubscriptionType subType) throws Exception { log.info("--- Starting ReplicatorTest::testClusterMigration ---"); - persistent = false; final String topicName = BrokerTestUtil .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); @@ -202,7 +266,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t assertFalse(topic2.getProducers().isEmpty()); ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); - admin1.clusters().updateClusterMigration(broker2.getClusterName(), true, migratedUrl); + admin1.clusters().updateClusterMigration("r1", true, migratedUrl); retryStrategically((test) -> { try { @@ -214,12 +278,16 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t return false; }, 10, 500); + topic1.checkClusterMigration().get(); + log.info("before sending message"); + sleep(1000); producer1.sendAsync("test1".getBytes()); // producer is disconnected from cluster-1 retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + log.info("before asserting"); assertTrue(topic1.getProducers().isEmpty()); // create 3rd producer on cluster-1 which should be redirected to cluster-2 @@ -297,15 +365,116 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t log.info("Successfully consumed messages by migrated consumers"); } + @Test(dataProvider = "TopicsubscriptionTypes") + public void testClusterMigrationWithReplicationBacklog(boolean persistent, SubscriptionType subType) throws Exception { + log.info("--- Starting ReplicatorTest::testClusterMigrationWithReplicationBacklog ---"); + persistent = true; + final String topicName = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + @Cleanup + PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-1 producer/consumer + Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + + // cluster-3 consumer + Consumer consumer3 = client3.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(topic1.getProducers().isEmpty()); + assertFalse(topic1.getSubscriptions().isEmpty()); + + // build backlog + consumer1.close(); + retryStrategically((test) -> topic1.getReplicators().size() == 1, 10, 3000); + assertEquals(topic1.getReplicators().size(), 1); + + // stop service in the replication cluster to build replication backlog + broker3.cleanup(); + retryStrategically((test) -> broker3.getPulsarService() == null, 10, 1000); + assertNull(pulsar3.getBrokerService()); + + //publish messages into topic in "r1" cluster + int n = 5; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + retryStrategically((test) -> topic1.isReplicationBacklogExist(), 10, 1000); + assertTrue(topic1.isReplicationBacklogExist()); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-2 producer/consumer + Producer producer2 = client2.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + log.info("name of topic 2 - {}", topic2.getName()); + assertFalse(topic2.getProducers().isEmpty()); + + retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000); + log.info("replicators should be ready"); + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration("r1", true, migratedUrl); + log.info("update cluster migration called"); + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + + topic1.checkClusterMigration().get(); + + producer1.sendAsync("test1".getBytes()); + + // producer is disconnected from cluster-1 + retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + assertTrue(topic1.getProducers().isEmpty()); + + // verify that the disconnected producer is not redirected + // to replication cluster since there is replication backlog. + assertEquals(topic2.getProducers().size(), 1); + + // Restart the service in cluster "r3". + broker3.restart(); + retryStrategically((test) -> broker3.getPulsarService() != null, 10, 1000); + assertNotNull(broker3.getPulsarService()); + pulsar3 = broker3.getPulsarService(); + + // verify that the replication backlog drains once service in cluster "r3" is restarted. + retryStrategically((test) -> !topic1.isReplicationBacklogExist(), 10, 1000); + assertFalse(topic1.isReplicationBacklogExist()); + + // verify that the producer1 is now is now connected to migrated cluster "r2" since backlog is cleared. + retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500); + assertEquals(topic2.getProducers().size(), 2); + } + static class TestBroker extends MockedPulsarServiceBaseTest { - public TestBroker() throws Exception { + private String clusterName; + + public TestBroker(String clusterName) throws Exception { + this.clusterName = clusterName; setup(); } @Override protected void setup() throws Exception { - super.internalSetup(); + super.setupWithClusterName(clusterName); } public PulsarService getPulsarService() { @@ -318,9 +487,9 @@ public String getClusterName() { @Override protected void cleanup() throws Exception { - internalCleanup(); + stopBroker(); } - + public void restart() throws Exception { restartBroker(); }