Skip to content

Commit

Permalink
make setupWithClusterName method protected
Browse files Browse the repository at this point in the history
  • Loading branch information
vineeth1995 committed Apr 10, 2023
1 parent 8a0e290 commit a7d95f3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public MockedPulsarServiceBaseTest() {
resetConfig();
}

public void setupWithClusterName(String clusterName) throws Exception {
protected void setupWithClusterName(String clusterName) throws Exception {
this.conf.setClusterName(clusterName);
this.configClusterName = clusterName;
this.internalSetup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class ClusterMigrationTest {
URL urlTls3;
PulsarService pulsar3;
PulsarAdmin admin3;

URL url4;
URL urlTls4;
PulsarService pulsar4;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void setup() throws Exception {
broker2 = new TestBroker("r2");
broker3 = new TestBroker("r3");
broker4 = new TestBroker("r4");

pulsar1 = broker1.getPulsarService();
url1 = new URL(pulsar1.getWebServiceAddress());
urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
Expand All @@ -129,7 +129,7 @@ public void setup() throws Exception {
urlTls4 = new URL(pulsar4.getWebServiceAddressTls());
admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build();


admin1.clusters().createCluster("r1",
ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
Expand Down Expand Up @@ -165,7 +165,7 @@ public void setup() throws Exception {
ClusterData.builder().serviceUrl(url4.toString()).serviceUrlTls(urlTls4.toString())
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()).build());

// Setting r3 as replication cluster for r1
admin1.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r3")));
Expand All @@ -183,7 +183,7 @@ public void setup() throws Exception {
admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", "r4"));
admin4.namespaces().createNamespace(namespace);
admin2.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r4"));

assertEquals(admin1.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin3.clusters().getCluster("r3").getServiceUrl(), url3.toString());
Expand Down Expand Up @@ -219,14 +219,14 @@ public void beforeMethod(Method m) throws Exception {
* (3) Migrate topic to cluster-2
* (4) Validate producer-1 is connected to cluster-2
* (5) create consumer1, drain backlog and migrate and reconnect to cluster-2
* (6) Create new consumer2 with different subscription on cluster-1,
* (6) Create new consumer2 with different subscription on cluster-1,
* which immediately migrate and reconnect to cluster-2
* (7) Create producer-2 directly to cluster-2
* (8) Create producer-3 on cluster-1 which should be redirected to cluster-2
* (8) Create producer-3 on cluster-1 which should be redirected to cluster-2
* (8) Publish messages using producer1, producer2, and producer3
* (9) Consume all messages by both consumer1 and consumer2
* (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* (10) Create Producer/consumer on non-migrated cluster and verify their connection with cluster-1
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* @throws Exception
*/
@Test(dataProvider = "TopicsubscriptionTypes")
Expand Down Expand Up @@ -278,7 +278,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
return false;
}, 10, 500);


topic1.checkClusterMigration().get();

log.info("before sending message");
Expand Down Expand Up @@ -402,15 +402,15 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc
broker3.cleanup();
retryStrategically((test) -> broker3.getPulsarService() == null, 10, 1000);
assertNull(pulsar3.getBrokerService());
//publish messages into topic in "r1" cluster

//publish messages into topic in "r1" cluster
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}
retryStrategically((test) -> topic1.isReplicationBacklogExist(), 10, 1000);
assertTrue(topic1.isReplicationBacklogExist());

@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Expand All @@ -420,7 +420,7 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
log.info("name of topic 2 - {}", topic2.getName());
assertFalse(topic2.getProducers().isEmpty());

retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000);
log.info("replicators should be ready");
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
Expand All @@ -437,27 +437,27 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc
}, 10, 500);

topic1.checkClusterMigration().get();

producer1.sendAsync("test1".getBytes());

// producer is disconnected from cluster-1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
assertTrue(topic1.getProducers().isEmpty());

// verify that the disconnected producer is not redirected
// to replication cluster since there is replication backlog.
assertEquals(topic2.getProducers().size(), 1);

// Restart the service in cluster "r3".
broker3.restart();
retryStrategically((test) -> broker3.getPulsarService() != null, 10, 1000);
assertNotNull(broker3.getPulsarService());
pulsar3 = broker3.getPulsarService();

// verify that the replication backlog drains once service in cluster "r3" is restarted.
retryStrategically((test) -> !topic1.isReplicationBacklogExist(), 10, 1000);
assertFalse(topic1.isReplicationBacklogExist());

// verify that the producer1 is now is now connected to migrated cluster "r2" since backlog is cleared.
retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500);
assertEquals(topic2.getProducers().size(), 2);
Expand All @@ -466,17 +466,17 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc
static class TestBroker extends MockedPulsarServiceBaseTest {

private String clusterName;

public TestBroker(String clusterName) throws Exception {
this.clusterName = clusterName;
setup();
}

@Override
protected void setup() throws Exception {
super.setupWithClusterName(clusterName);
}

public PulsarService getPulsarService() {
return pulsar;
}
Expand All @@ -489,7 +489,7 @@ public String getClusterName() {
protected void cleanup() throws Exception {
stopBroker();
}

public void restart() throws Exception {
restartBroker();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
public abstract class TestPulsarAuth extends MockedPulsarServiceBaseTest {
private SecretKey secretKey;
private final String SUPER_USER_ROLE = "admin";

Expand Down

0 comments on commit a7d95f3

Please sign in to comment.