diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index 20407295ccb0e..8bd95bc93d930 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -32,8 +32,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -45,10 +45,12 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandFlow; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; +import org.testng.AssertJUnit; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -541,18 +543,34 @@ public void testReaderInitAtDeletedPosition() throws Exception { .getStats(topicName, true, true, true).getSubscriptions().get("s1"); log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); assertEquals(subscriptionStats.getMsgBacklog(), 0); - ManagedLedgerInternalStats.CursorStats cursorStats = - admin.topics().getInternalStats(topicName).cursors.get("s1"); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); + ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1"); String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); PositionImpl actMarkDeletedPos = PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); PositionImpl expectedMarkDeletedPos = PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); + log.info("LAC: {}", internalStats.lastConfirmedEntry); log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); - assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); + AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); }); + admin.topics().createSubscription(topicName, "s2", MessageId.earliest); + admin.topics().createSubscription(topicName, "s3", MessageId.latest); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); + ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2"); + String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":"); + PositionImpl actMarkDeletedPos2 = + PositionImpl.get(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1])); + ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3"); + String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":"); + PositionImpl actMarkDeletedPos3 = + PositionImpl.get(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1])); + log.info("LAC: {}", internalStats.lastConfirmedEntry); + log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2); + log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3); + pulsar.getBrokerService().getTopic(topicName, false).join().get(); // cleanup. reader.close(); producer.close();