Skip to content

Commit

Permalink
[fix][broker] Introduce the last sent position to fix message orderin…
Browse files Browse the repository at this point in the history
…g issues in Key_Shared (PIP-282) (#21953)
  • Loading branch information
equanz authored Jul 19, 2024
1 parent 59136a0 commit d7e8ea1
Show file tree
Hide file tree
Showing 13 changed files with 1,158 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,19 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
return individualDeletedMessages;
}

public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
LongPairRangeSet.RangeProcessor<Position> processor) {
final Position mdp;
lock.readLock().lock();
try {
mdp = markDeletePosition;
individualDeletedMessages.forEach(processor);
} finally {
lock.readLock().unlock();
}
return mdp;
}

public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
* the position range
* @return the count of entries
*/
long getNumberOfEntries(Range<Position> range) {
public long getNumberOfEntries(Range<Position> range) {
Position fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
Position toPosition = range.upperEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private Position readPositionWhenJoining;
private Position lastSentPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
if (lastSentPositionWhenJoining != null) {
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
}
return stats;
}
Expand Down Expand Up @@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
}

public int getMaxUnackedMessages() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1305,9 +1305,26 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
// The dispatcher allows same name consumers
final StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("consumerName=").append(k.consumerName())
.append(", consumerId=").append(k.consumerId());
if (k.cnx() != null) {
stringBuilder.append(", address=").append(k.cnx().clientAddress());
}
subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString());
});
}
final String lastSentPosition = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.getLastSentPosition();
if (lastSentPosition != null) {
subStats.lastSentPosition = lastSentPosition;
}
final String individuallySentPositions = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.getIndividuallySentPositions();
if (individuallySentPositions != null) {
subStats.individuallySentPositions = individuallySentPositions;
}
}
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -56,6 +58,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
Expand All @@ -65,6 +68,7 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -75,6 +79,8 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.GetStatsOptions;
Expand Down Expand Up @@ -139,7 +145,10 @@
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -3449,43 +3458,198 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
}

@Test
public void testGetReadPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
public void testGetLastSentPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
consumer1.receive();
}

List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();
consumers.add(consumer);
}
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
ConsumerStats consumerStats = subStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
ConsumerStats consumerStats = subStats.getConsumers().stream()
.filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetLastSentPosition() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final AtomicInteger counter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
counter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertNull(subStats.getLastSentPosition());

for (Consumer<byte[]> consumer : consumers) {
consumer.close();
final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
}

Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages));

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getLastSentPosition(), PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetIndividuallySentPositions() throws Exception {
// The producer sends messages with two types of keys.
// The dispatcher sends keyA messages to consumer1.
// Consumer1 will not receive any messages. Its receiver queue size is 1.
// Consumer2 will receive and ack any messages immediately.

final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

final String consumer1Name = "c1";
final String consumer2Name = "c2";

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer1Name)
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
final String keyA = "key-a";
final String keyB = "key-b";
final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());

final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
selectorField.setAccessible(true);
final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
selectorField.set(dispatcher, selector);

// the selector returns consumer1 if keyA
doAnswer((invocationOnMock -> {
final int hash = invocationOnMock.getArgument(0);

final String consumerName = hash == hashA ? consumer1Name : consumer2Name;
return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get();
})).when(selector).select(anyInt());

final AtomicInteger consumer2AckCounter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer2Name)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
consumer2AckCounter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
final LongPairRangeSet<Position> expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());

final Function<String, MessageIdImpl> sendFn = (key) -> {
try {
return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
};
final List<MessageIdImpl> messageIdList = new ArrayList<>();

// the dispatcher can send keyA message, but then consumer1's receiver queue will be full
messageIdList.add(sendFn.apply(keyA));

// the dispatcher can send messages other than keyA
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyB));

assertEquals(messageIdList.size(), 6);
Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3));

// set expected value
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(),
messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId());
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(),
messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId());

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
}

@Test
Expand Down
Loading

0 comments on commit d7e8ea1

Please sign in to comment.