Skip to content

Commit

Permalink
[improve] [client]Add new ServiceUrlProvider implementation: SameAuth…
Browse files Browse the repository at this point in the history
…ParamsAutoClusterFailover (#23129)
  • Loading branch information
poorbarcode authored and Technoboy- committed Aug 22, 2024
1 parent 799cb71 commit b0923ef
Showing 1 changed file with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -45,10 +45,12 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -541,18 +543,34 @@ public void testReaderInitAtDeletedPosition() throws Exception {
.getStats(topicName, true, true, true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get("s1");
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1");
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
PositionImpl expectedMarkDeletedPos =
PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
admin.topics().createSubscription(topicName, "s3", MessageId.latest);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2");
String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos2 =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1]));
ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3");
String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos3 =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1]));
log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
pulsar.getBrokerService().getTopic(topicName, false).join().get();
// cleanup.
reader.close();
producer.close();
Expand Down

0 comments on commit b0923ef

Please sign in to comment.