diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index 80adc79e6fee8..bbac688d9224c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -34,7 +34,6 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; -import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -47,12 +46,10 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandFlow; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; -import org.testng.AssertJUnit; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -545,34 +542,18 @@ public void testReaderInitAtDeletedPosition() throws Exception { .getStats(topicName, true, true, true).getSubscriptions().get("s1"); log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); assertEquals(subscriptionStats.getMsgBacklog(), 0); - PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); - ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1"); + ManagedLedgerInternalStats.CursorStats cursorStats = + admin.topics().getInternalStats(topicName).cursors.get("s1"); String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); - ImmutablePositionImpl actMarkDeletedPos = - new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); - ImmutablePositionImpl expectedMarkDeletedPos = - new ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); - log.info("LAC: {}", internalStats.lastConfirmedEntry); + Position actMarkDeletedPos = + PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); + Position expectedMarkDeletedPos = + PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); - AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); + assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); }); - admin.topics().createSubscription(topicName, "s2", MessageId.earliest); - admin.topics().createSubscription(topicName, "s3", MessageId.latest); - PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); - ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2"); - String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":"); - ImmutablePositionImpl actMarkDeletedPos2 = - new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1])); - ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3"); - String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":"); - ImmutablePositionImpl actMarkDeletedPos3 = - new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1])); - log.info("LAC: {}", internalStats.lastConfirmedEntry); - log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2); - log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3); - pulsar.getBrokerService().getTopic(topicName, false).join().get(); // cleanup. reader.close(); producer.close();