Skip to content

Commit

Permalink
[fix] [test] Revert the modification to NonDurableSubscriptionTest ca…
Browse files Browse the repository at this point in the history
…used by a mistake in the PR#23129
  • Loading branch information
poorbarcode committed Aug 14, 2024
1 parent d5ce1ce commit ca5f9ca
Showing 1 changed file with 7 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand All @@ -47,12 +46,10 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -545,34 +542,18 @@ public void testReaderInitAtDeletedPosition() throws Exception {
.getStats(topicName, true, true, true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1");
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get("s1");
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
ImmutablePositionImpl actMarkDeletedPos =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
ImmutablePositionImpl expectedMarkDeletedPos =
new ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("LAC: {}", internalStats.lastConfirmedEntry);
Position actMarkDeletedPos =
PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
Position expectedMarkDeletedPos =
PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
admin.topics().createSubscription(topicName, "s3", MessageId.latest);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2");
String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":");
ImmutablePositionImpl actMarkDeletedPos2 =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1]));
ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3");
String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":");
ImmutablePositionImpl actMarkDeletedPos3 =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1]));
log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
pulsar.getBrokerService().getTopic(topicName, false).join().get();
// cleanup.
reader.close();
producer.close();
Expand Down

0 comments on commit ca5f9ca

Please sign in to comment.