From 9a16c9586b089d5b5738f5730ed5cd12c56da44d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 14 Jan 2025 12:21:20 +0200 Subject: [PATCH 1/9] Attempt to reproduce the issue --- .../apache/pulsar/broker/BrokerTestUtil.java | 17 +- .../client/api/KeySharedSubscriptionTest.java | 163 +++++++++++++++++- 2 files changed, 177 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 6a41e86f8934e..8364cae53b223 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -224,7 +224,22 @@ public static String getJsonResourceAsString(String uri) { public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, Duration quietTimeout, Consumer... consumers) { - FutureUtil.waitForAll(Arrays.stream(consumers) + receiveMessages(messageHandler, quietTimeout, Arrays.stream(consumers)); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The message handler should return true if it wants to continue receiving more messages, false otherwise. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, + Duration quietTimeout, + Stream> consumers) { + FutureUtil.waitForAll(consumers .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 92257c1df53f8..0dabf29f3f8ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -55,6 +55,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -93,7 +94,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; -import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "broker-impl") @@ -104,7 +104,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { private final KeySharedImplementationType implementationType; // Comment out the next line (Factory annotation) to run tests manually in IntelliJ, one-by-one - @Factory + //@Factory public static Object[] createTestInstances() { return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionTest::new); } @@ -2359,4 +2359,163 @@ public void testOrderingAfterReconnects(KeySharedImplementationType impl) throws logTopicStats(topic); } } + + @Test(dataProvider = "currentImplementationType") + public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementationType impl) throws Exception { + @Cleanup("interrupt") + Thread updateRatesThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + pulsar.getBrokerService().updateRates(); + } + }); + updateRatesThread.start(); + + String topic = newUniqueName("testDeliveryOfRemainingMessages"); + int numberOfKeys = 100; + long pauseTime = 100L; + + @Cleanup + Producer producer = createProducer(topic, false); + + // create a consumer and close it to create a subscription + pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe() + .close(); + + Set remainingMessageValues = Collections.synchronizedSet(new HashSet<>()); + List, Message>> unackedMessages = new ArrayList<>(); + AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true); + Set keysForC2 = new HashSet<>(); + + Map> keyPositions = new HashMap<>(); + BiFunction, Message, Boolean> messageHandler = (consumer, msg) -> { + synchronized (this) { + String key = msg.getKey(); + if (c2MessagesShouldBeUnacked.get() && keysForC2.contains(key)) { + unackedMessages.add(Pair.of(consumer, msg)); + return true; + } + consumer.acknowledgeAsync(msg); + MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); + Position currentPosition = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + Pair prevPair = keyPositions.get(key); + if (prevPair != null && prevPair.getLeft().compareTo(currentPosition) > 0) { + log.error("key: {} value: {} prev: {}/{} current: {}/{}", key, msg.getValue(), prevPair.getLeft(), + prevPair.getRight(), currentPosition, consumer.getConsumerName()); + fail("out of order"); + } + keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); + boolean removed = remainingMessageValues.remove(msg.getValue()); + if (!removed) { + // duplicates are possible during reconnects, this is not an error + log.warn("Duplicate message: {} value: {}", msg.getMessageId(), msg.getValue()); + } + return true; + } + }; + + // Adding a new consumer. + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + @Cleanup + Consumer c3 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c3") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + StickyKeyConsumerSelector selector = getSelector(topic, SUBSCRIPTION_NAME); + + // find keys that will be assigned to c2 + for (int i = 0; i < numberOfKeys; i++) { + String key = String.valueOf(i); + byte[] keyBytes = key.getBytes(UTF_8); + int hash = selector.makeStickyKeyHash(keyBytes); + if (selector.select(hash).consumerName().equals("c2")) { + keysForC2.add(key); + } + } + + // close c2 + c2.close(); + Thread.sleep(pauseTime); + + // produce messages with random keys + for (int i = 0; i < 1000; i++) { + String key = String.valueOf(random.nextInt(numberOfKeys)); + //log.info("Producing message with key: {} value: {}", key, i); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + } + + // consume the messages + receiveMessages(messageHandler, Duration.ofSeconds(1), c1, c3); + + c2MessagesShouldBeUnacked.set(false); + + // reconnect c2 + c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .startPaused(true) + .subscribe(); + + Thread.sleep(2 * pauseTime); + + // produce messages with c2 keys + List keysForC2List=new ArrayList<>(keysForC2); + for (int i = 1000; i < 1100; i++) { + String key = keysForC2List.get(random.nextInt(keysForC2List.size())); + //log.info("Producing message with key: {} value: {}", key, i); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + } + + Thread.sleep(2 * pauseTime); + + // ack the unacked messages to unblock c2 keys + unackedMessages.forEach(pair -> { + messageHandler.apply(pair.getLeft(), pair.getRight()); + }); + + Thread.sleep(50 * pauseTime); + + // resume c2 + c2.resume(); + + // consume the messages + receiveMessages(messageHandler, Duration.ofSeconds(2), c1, c2, c3); + + try { + assertEquals(remainingMessageValues, Collections.emptySet()); + } finally { + logTopicStats(topic); + } + } } From b8c4e8c6b7d7056a263ee40225f126a643716aee Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 21:57:02 +0200 Subject: [PATCH 2/9] Add 1ms delay between updateRates calls --- .../pulsar/client/api/KeySharedSubscriptionTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 0dabf29f3f8ed..1a86982d5a9a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -2366,8 +2366,13 @@ public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati Thread updateRatesThread = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { pulsar.getBrokerService().updateRates(); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - }); + }, "update-rates-thread"); updateRatesThread.start(); String topic = newUniqueName("testDeliveryOfRemainingMessages"); From 4cdcd631c84e6f55292b5046577b59d6bc2343f6 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 21:57:27 +0200 Subject: [PATCH 3/9] Switch to use read-write locks in DrainingHashesTracker to prevent deadlocks --- .../broker/service/DrainingHashesTracker.java | 301 +++++++++++------- 1 file changed, 191 insertions(+), 110 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 46762c844db6c..bfd6308969c07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.PrimitiveIterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.DrainingHash; @@ -34,7 +36,7 @@ import org.roaringbitmap.RoaringBitmap; /** - * A thread-safe map to store draining hashes in the consumer. + * A thread-safe map to store draining hashes in the consumer using read-write locks for improved concurrency. */ @Slf4j public class DrainingHashesTracker { @@ -42,6 +44,7 @@ public class DrainingHashesTracker { private final UnblockingHandler unblockingHandler; // optimize the memory consumption of the map by using primitive int keys private final Int2ObjectOpenHashMap drainingHashes = new Int2ObjectOpenHashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); int batchLevel; boolean unblockedWhileBatching; private final Map consumerDrainingHashesStatsMap = @@ -52,9 +55,14 @@ public class DrainingHashesTracker { */ @ToString public static class DrainingHashEntry { + private static final AtomicIntegerFieldUpdater REF_COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DrainingHashEntry.class, "refCount"); + private static final AtomicIntegerFieldUpdater BLOCKED_COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DrainingHashEntry.class, "blockedCount"); + private final Consumer consumer; - private int refCount; - private int blockedCount; + private volatile int refCount; + private volatile int blockedCount; /** * Constructs a new DrainingHashEntry with the specified Consumer. @@ -81,7 +89,7 @@ public Consumer getConsumer() { * Increments the reference count. */ void incrementRefCount() { - refCount++; + REF_COUNT_UPDATER.incrementAndGet(this); } /** @@ -90,14 +98,14 @@ void incrementRefCount() { * @return true if the reference count is zero, false otherwise */ boolean decrementRefCount() { - return --refCount == 0; + return REF_COUNT_UPDATER.decrementAndGet(this) == 0; } /** * Increments the blocked count. */ void incrementBlockedCount() { - blockedCount++; + BLOCKED_COUNT_UPDATER.incrementAndGet(this); } /** @@ -108,51 +116,85 @@ void incrementBlockedCount() { boolean isBlocking() { return blockedCount > 0; } + + /** + * Gets the current reference count. + * + * @return the current reference count + */ + int getRefCount() { + return refCount; + } + + /** + * Gets the current blocked count. + * + * @return the current blocked count + */ + int getBlockedCount() { + return blockedCount; + } } private class ConsumerDrainingHashesStats { private final RoaringBitmap drainingHashes = new RoaringBitmap(); - long drainingHashesClearedTotal; + private long drainingHashesClearedTotal; + private final ReentrantReadWriteLock statsLock = new ReentrantReadWriteLock(); - public synchronized void addHash(int stickyHash) { - drainingHashes.add(stickyHash); + public void addHash(int stickyHash) { + statsLock.writeLock().lock(); + try { + drainingHashes.add(stickyHash); + } finally { + statsLock.writeLock().unlock(); + } } - public synchronized boolean clearHash(int hash) { - drainingHashes.remove(hash); - drainingHashesClearedTotal++; - boolean empty = drainingHashes.isEmpty(); - if (log.isDebugEnabled()) { - log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}", - dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality()); + public boolean clearHash(int hash) { + statsLock.writeLock().lock(); + try { + drainingHashes.remove(hash); + drainingHashesClearedTotal++; + boolean empty = drainingHashes.isEmpty(); + if (log.isDebugEnabled()) { + log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}", + dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality()); + } + return empty; + } finally { + statsLock.writeLock().unlock(); } - return empty; } - public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { - int drainingHashesUnackedMessages = 0; - List drainingHashesStats = new ArrayList<>(); - PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator(); - while (hashIterator.hasNext()) { - int hash = hashIterator.nextInt(); - DrainingHashEntry entry = getEntry(hash); - if (entry == null) { - log.warn("[{}] Draining hash {} not found in the tracker for consumer {}", dispatcherName, hash, - consumer); - continue; + public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + statsLock.readLock().lock(); + try { + int drainingHashesUnackedMessages = 0; + List drainingHashesStats = new ArrayList<>(); + PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator(); + while (hashIterator.hasNext()) { + int hash = hashIterator.nextInt(); + DrainingHashEntry entry = getEntry(hash); + if (entry == null) { + log.warn("[{}] Draining hash {} not found in the tracker for consumer {}", dispatcherName, hash, + consumer); + continue; + } + int unackedMessages = entry.getRefCount(); + DrainingHashImpl drainingHash = new DrainingHashImpl(); + drainingHash.hash = hash; + drainingHash.unackMsgs = unackedMessages; + drainingHash.blockedAttempts = entry.getBlockedCount(); + drainingHashesStats.add(drainingHash); + drainingHashesUnackedMessages += unackedMessages; } - int unackedMessages = entry.refCount; - DrainingHashImpl drainingHash = new DrainingHashImpl(); - drainingHash.hash = hash; - drainingHash.unackMsgs = unackedMessages; - drainingHash.blockedAttempts = entry.blockedCount; - drainingHashesStats.add(drainingHash); - drainingHashesUnackedMessages += unackedMessages; + consumerStats.drainingHashesCount = drainingHashesStats.size(); + consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal; + consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages; + consumerStats.drainingHashes = drainingHashesStats; + } finally { + statsLock.readLock().unlock(); } - consumerStats.drainingHashesCount = drainingHashesStats.size(); - consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal; - consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages; - consumerStats.drainingHashes = drainingHashesStats; } } @@ -179,50 +221,66 @@ public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblocking * @param consumer the consumer * @param stickyHash the sticky hash */ - public synchronized void addEntry(Consumer consumer, int stickyHash) { + public void addEntry(Consumer consumer, int stickyHash) { if (stickyHash == 0) { throw new IllegalArgumentException("Sticky hash cannot be 0"); } - DrainingHashEntry entry = drainingHashes.get(stickyHash); - if (entry == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", dispatcherName, - stickyHash, consumer.consumerId(), consumer.consumerName()); - } - entry = new DrainingHashEntry(consumer); - drainingHashes.put(stickyHash, entry); - // update the consumer specific stats - consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), - k -> new ConsumerDrainingHashesStats()).addHash(stickyHash); - } else if (entry.getConsumer() != consumer) { - throw new IllegalStateException( - "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash - + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer - + "."); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} incrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, - entry.refCount + 1, consumer.consumerId(), consumer.consumerName()); + + lock.writeLock().lock(); + try { + DrainingHashEntry entry = drainingHashes.get(stickyHash); + if (entry == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", + dispatcherName, stickyHash, consumer.consumerId(), consumer.consumerName()); + } + entry = new DrainingHashEntry(consumer); + drainingHashes.put(stickyHash, entry); + // update the consumer specific stats + consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), + k -> new ConsumerDrainingHashesStats()).addHash(stickyHash); + } else if (entry.getConsumer() != consumer) { + throw new IllegalStateException( + "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash + + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer + + "."); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} incrementing {} consumer id:{} name:{}", dispatcherName, + stickyHash, entry.getRefCount() + 1, consumer.consumerId(), consumer.consumerName()); + } } + entry.incrementRefCount(); + } finally { + lock.writeLock().unlock(); } - entry.incrementRefCount(); } /** * Start a batch operation. There could be multiple nested batch operations. * The unblocking of sticky key hashes will be done only when the last batch operation ends. */ - public synchronized void startBatch() { - batchLevel++; + public void startBatch() { + lock.writeLock().lock(); + try { + batchLevel++; + } finally { + lock.writeLock().unlock(); + } } /** * End a batch operation. */ - public synchronized void endBatch() { - if (--batchLevel == 0 && unblockedWhileBatching) { - unblockedWhileBatching = false; - unblockingHandler.stickyKeyHashUnblocked(-1); + public void endBatch() { + lock.writeLock().lock(); + try { + if (--batchLevel == 0 && unblockedWhileBatching) { + unblockedWhileBatching = false; + unblockingHandler.stickyKeyHashUnblocked(-1); + } + } finally { + lock.writeLock().unlock(); } } @@ -231,46 +289,52 @@ public synchronized void endBatch() { * * @param consumer the consumer * @param stickyHash the sticky hash - * @param closing + * @param closing whether the consumer is closing */ - public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { + public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { if (stickyHash == 0) { return; } - DrainingHashEntry entry = drainingHashes.get(stickyHash); - if (entry == null) { - return; - } - if (entry.getConsumer() != consumer) { - throw new IllegalStateException( - "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash - + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer - + "."); - } - if (entry.decrementRefCount()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, - consumer.consumerId(), consumer.consumerName()); + + lock.writeLock().lock(); + try { + DrainingHashEntry entry = drainingHashes.get(stickyHash); + if (entry == null) { + return; } - DrainingHashEntry removed = drainingHashes.remove(stickyHash); - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyHash); + if (entry.getConsumer() != consumer) { + throw new IllegalStateException( + "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash + + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer + + "."); } - if (!closing && removed.isBlocking()) { - if (batchLevel > 0) { - unblockedWhileBatching = true; - } else { - unblockingHandler.stickyKeyHashUnblocked(stickyHash); + if (entry.decrementRefCount()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); + } + DrainingHashEntry removed = drainingHashes.remove(stickyHash); + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } + if (!closing && removed.isBlocking()) { + if (batchLevel > 0) { + unblockedWhileBatching = true; + } else { + unblockingHandler.stickyKeyHashUnblocked(stickyHash); + } + } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, + stickyHash, entry.getRefCount(), consumer.consumerId(), consumer.consumerName()); } } - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, - entry.refCount, consumer.consumerId(), consumer.consumerName()); - } + } finally { + lock.writeLock().unlock(); } } @@ -281,12 +345,12 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole * @param stickyKeyHash the sticky key hash * @return true if the sticky key hash should be blocked, false otherwise */ - public synchronized boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) { + public boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) { if (stickyKeyHash == STICKY_KEY_HASH_NOT_SET) { log.warn("[{}] Sticky key hash is not set. Allowing dispatching", dispatcherName); return false; } - DrainingHashEntry entry = drainingHashes.get(stickyKeyHash); + DrainingHashEntry entry = getEntry(stickyKeyHash); // if the entry is not found, the hash is not draining. Don't block the hash. if (entry == null) { return false; @@ -294,10 +358,14 @@ public synchronized boolean shouldBlockStickyKeyHash(Consumer consumer, int stic // hash has been reassigned to the original consumer, remove the entry // and don't block the hash if (entry.getConsumer() == consumer) { - log.info("[{}] Hash {} has been reassigned consumer {}. " - + "The draining hash entry with refCount={} will be removed.", - dispatcherName, stickyKeyHash, entry.getConsumer(), entry.refCount); - drainingHashes.remove(stickyKeyHash, entry); + log.info("[{}] Hash {} has been reassigned consumer {}. The draining hash entry with refCount={} will " + + "be removed.", dispatcherName, stickyKeyHash, entry.getConsumer(), entry.getRefCount()); + lock.writeLock().lock(); + try { + drainingHashes.remove(stickyKeyHash, entry); + } finally { + lock.writeLock().unlock(); + } return false; } // increment the blocked count which is used to determine if the hash is blocking @@ -313,16 +381,29 @@ public synchronized boolean shouldBlockStickyKeyHash(Consumer consumer, int stic * @param stickyKeyHash the sticky key hash * @return the draining hash entry, or null if not found */ - public synchronized DrainingHashEntry getEntry(int stickyKeyHash) { - return stickyKeyHash != 0 ? drainingHashes.get(stickyKeyHash) : null; + public DrainingHashEntry getEntry(int stickyKeyHash) { + if (stickyKeyHash == 0) { + return null; + } + lock.readLock().lock(); + try { + return drainingHashes.get(stickyKeyHash); + } finally { + lock.readLock().unlock(); + } } /** * Clear all entries in the draining hashes tracker. */ - public synchronized void clear() { - drainingHashes.clear(); - consumerDrainingHashesStatsMap.clear(); + public void clear() { + lock.writeLock().lock(); + try { + drainingHashes.clear(); + consumerDrainingHashesStatsMap.clear(); + } finally { + lock.writeLock().unlock(); + } } /** From c1eca612a619390d400c754e07b17255a025e5e0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 22:45:48 +0200 Subject: [PATCH 4/9] Improve test --- .../pulsar/client/api/KeySharedSubscriptionTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 1a86982d5a9a3..22b5d1fdfc3d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -2362,15 +2362,12 @@ public void testOrderingAfterReconnects(KeySharedImplementationType impl) throws @Test(dataProvider = "currentImplementationType") public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementationType impl) throws Exception { + conf.setUnblockStuckSubscriptionEnabled(false); @Cleanup("interrupt") Thread updateRatesThread = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { pulsar.getBrokerService().updateRates(); - try { - Thread.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + Thread.yield(); } }, "update-rates-thread"); updateRatesThread.start(); From 79dc5bf4897447556a6b9c48545e233509f90977 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 22:53:15 +0200 Subject: [PATCH 5/9] Add counter --- .../apache/pulsar/client/api/KeySharedSubscriptionTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 22b5d1fdfc3d0..c41b909ba8d3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -2365,9 +2365,13 @@ public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati conf.setUnblockStuckSubscriptionEnabled(false); @Cleanup("interrupt") Thread updateRatesThread = new Thread(() -> { - while (!Thread.currentThread().isInterrupted()) { + int count = 0; + while (!Thread.currentThread().isInterrupted() && count++ < 1_000_000) { pulsar.getBrokerService().updateRates(); Thread.yield(); + if (count % 10000 == 0) { + log.info("updateRatesThread count: {}", count); + } } }, "update-rates-thread"); updateRatesThread.start(); From 150a5aa673b5e6dc4f49d1a97684819ea3aca1a5 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 23:45:31 +0200 Subject: [PATCH 6/9] Reduce duration of locks in DrainingHashesTracker to prevent deadlocks --- .../broker/service/DrainingHashesTracker.java | 94 ++++++++++++------- 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index bfd6308969c07..38b0a196022b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -226,9 +226,11 @@ public void addEntry(Consumer consumer, int stickyHash) { throw new IllegalArgumentException("Sticky hash cannot be 0"); } + DrainingHashEntry entry; + ConsumerDrainingHashesStats addedStats = null; lock.writeLock().lock(); try { - DrainingHashEntry entry = drainingHashes.get(stickyHash); + entry = drainingHashes.get(stickyHash); if (entry == null) { if (log.isDebugEnabled()) { log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", @@ -236,9 +238,9 @@ public void addEntry(Consumer consumer, int stickyHash) { } entry = new DrainingHashEntry(consumer); drainingHashes.put(stickyHash, entry); - // update the consumer specific stats - consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), - k -> new ConsumerDrainingHashesStats()).addHash(stickyHash); + // add the consumer specific stats + addedStats = consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), + k -> new ConsumerDrainingHashesStats()); } else if (entry.getConsumer() != consumer) { throw new IllegalStateException( "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash @@ -250,10 +252,14 @@ public void addEntry(Consumer consumer, int stickyHash) { stickyHash, entry.getRefCount() + 1, consumer.consumerId(), consumer.consumerName()); } } - entry.incrementRefCount(); } finally { lock.writeLock().unlock(); } + if (addedStats != null) { + // add hash to added stats + addedStats.addHash(stickyHash); + } + entry.incrementRefCount(); } /** @@ -273,15 +279,20 @@ public void startBatch() { * End a batch operation. */ public void endBatch() { + boolean unblock = false; lock.writeLock().lock(); try { if (--batchLevel == 0 && unblockedWhileBatching) { unblockedWhileBatching = false; - unblockingHandler.stickyKeyHashUnblocked(-1); + unblock = true; } } finally { lock.writeLock().unlock(); } + // unblock outside the lock + if (unblock) { + unblockingHandler.stickyKeyHashUnblocked(-1); + } } /** @@ -296,45 +307,56 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { return; } - lock.writeLock().lock(); - try { - DrainingHashEntry entry = drainingHashes.get(stickyHash); - if (entry == null) { - return; + DrainingHashEntry entry = getEntry(stickyHash); + if (entry == null) { + return; + } + if (entry.getConsumer() != consumer) { + throw new IllegalStateException( + "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash + + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer + + "."); + } + if (entry.decrementRefCount()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); } - if (entry.getConsumer() != consumer) { - throw new IllegalStateException( - "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash - + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer - + "."); + DrainingHashEntry removed; + lock.writeLock().lock(); + try { + removed = drainingHashes.remove(stickyHash); + } finally { + lock.writeLock().unlock(); } - if (entry.decrementRefCount()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, - consumer.consumerId(), consumer.consumerName()); - } - DrainingHashEntry removed = drainingHashes.remove(stickyHash); - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyHash); - } + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } + boolean unblock = false; + lock.writeLock().lock(); + try { if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; } else { - unblockingHandler.stickyKeyHashUnblocked(stickyHash); + unblock = true; } } - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, - stickyHash, entry.getRefCount(), consumer.consumerId(), consumer.consumerName()); - } + } finally { + lock.writeLock().unlock(); + } + // unblock the hash outside the lock + if (unblock) { + unblockingHandler.stickyKeyHashUnblocked(stickyHash); + } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, + stickyHash, entry.getRefCount(), consumer.consumerId(), consumer.consumerName()); } - } finally { - lock.writeLock().unlock(); } } From 3adc0acc9bdc8158f86dd0a3e2ef87b9a8221617 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 15 Jan 2025 23:54:10 +0200 Subject: [PATCH 7/9] Improve test --- .../pulsar/client/api/KeySharedSubscriptionTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index c41b909ba8d3d..b7f11b3764150 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -94,6 +94,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "broker-impl") @@ -104,7 +105,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { private final KeySharedImplementationType implementationType; // Comment out the next line (Factory annotation) to run tests manually in IntelliJ, one-by-one - //@Factory + @Factory public static Object[] createTestInstances() { return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionTest::new); } @@ -2362,11 +2363,16 @@ public void testOrderingAfterReconnects(KeySharedImplementationType impl) throws @Test(dataProvider = "currentImplementationType") public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementationType impl) throws Exception { + // don't set the unblock stuck subscription flag which is set to false by default, but for this test class + // it is enabled in the setup method conf.setUnblockStuckSubscriptionEnabled(false); + + // this was the way how reproduce the deadlock issue https://github.com/apache/pulsar/issues/23848 @Cleanup("interrupt") Thread updateRatesThread = new Thread(() -> { int count = 0; - while (!Thread.currentThread().isInterrupted() && count++ < 1_000_000) { + // the deadlock issue typically reproduced before 100000 iterations + while (!Thread.currentThread().isInterrupted() && count++ < 200_000) { pulsar.getBrokerService().updateRates(); Thread.yield(); if (count % 10000 == 0) { @@ -2376,7 +2382,7 @@ public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati }, "update-rates-thread"); updateRatesThread.start(); - String topic = newUniqueName("testDeliveryOfRemainingMessages"); + String topic = newUniqueName("testDeliveryOfRemainingMessagesWithoutDeadlock"); int numberOfKeys = 100; long pauseTime = 100L; From 2e3dcf64b24b84fdc9d979cc748e3e578ca03404 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 16 Jan 2025 09:08:38 +0200 Subject: [PATCH 8/9] Address review comments and improve clarity of the code --- .../broker/service/DrainingHashesTracker.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 38b0a196022b5..73bf867e14ec3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -36,7 +36,12 @@ import org.roaringbitmap.RoaringBitmap; /** - * A thread-safe map to store draining hashes in the consumer using read-write locks for improved concurrency. + * A thread-safe map to store draining hashes in the consumer. + * The implementation uses read-write locks for ensuring thread-safe access. The high-level strategy to prevent + * deadlocks is to perform side-effects (calls to other collaborators which could have other exclusive locks) + * outside of the write lock. Early versions of this class had a problem where deadlocks could occur when + * a consumer operations happened at the same time as another thread requested topic stats which include + * the draining hashes state. This problem is avoided with the current implementation. */ @Slf4j public class DrainingHashesTracker { @@ -227,7 +232,7 @@ public void addEntry(Consumer consumer, int stickyHash) { } DrainingHashEntry entry; - ConsumerDrainingHashesStats addedStats = null; + ConsumerDrainingHashesStats addedStatsForNewEntry = null; lock.writeLock().lock(); try { entry = drainingHashes.get(stickyHash); @@ -239,8 +244,8 @@ public void addEntry(Consumer consumer, int stickyHash) { entry = new DrainingHashEntry(consumer); drainingHashes.put(stickyHash, entry); // add the consumer specific stats - addedStats = consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), - k -> new ConsumerDrainingHashesStats()); + addedStatsForNewEntry = consumerDrainingHashesStatsMap + .computeIfAbsent(new ConsumerIdentityWrapper(consumer), k -> new ConsumerDrainingHashesStats()); } else if (entry.getConsumer() != consumer) { throw new IllegalStateException( "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash @@ -255,11 +260,14 @@ public void addEntry(Consumer consumer, int stickyHash) { } finally { lock.writeLock().unlock(); } - if (addedStats != null) { + // increment the reference count of the entry (applies to both new and existing entries) + entry.incrementRefCount(); + + // perform side-effects outside of the lock to reduce chances for deadlocks + if (addedStatsForNewEntry != null) { // add hash to added stats - addedStats.addHash(stickyHash); + addedStatsForNewEntry.addHash(stickyHash); } - entry.incrementRefCount(); } /** @@ -279,18 +287,18 @@ public void startBatch() { * End a batch operation. */ public void endBatch() { - boolean unblock = false; + boolean notifyUnblocking = false; lock.writeLock().lock(); try { if (--batchLevel == 0 && unblockedWhileBatching) { unblockedWhileBatching = false; - unblock = true; + notifyUnblocking = true; } } finally { lock.writeLock().unlock(); } - // unblock outside the lock - if (unblock) { + // notify unblocking of the hash outside the lock + if (notifyUnblocking) { unblockingHandler.stickyKeyHashUnblocked(-1); } } @@ -322,34 +330,34 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, consumer.consumerId(), consumer.consumerName()); } + DrainingHashEntry removed; + boolean notifyUnblocking = false; lock.writeLock().lock(); try { removed = drainingHashes.remove(stickyHash); - } finally { - lock.writeLock().unlock(); - } - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyHash); - } - boolean unblock = false; - lock.writeLock().lock(); - try { if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; } else { - unblock = true; + notifyUnblocking = true; } } } finally { lock.writeLock().unlock(); } - // unblock the hash outside the lock - if (unblock) { + + // perform side-effects outside of the lock to reduce chances for deadlocks + + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } + + // notify unblocking of the hash outside the lock + if (notifyUnblocking) { unblockingHandler.stickyKeyHashUnblocked(stickyHash); } } else { From 8289e320c119eeb7449ab616957864d048c179d4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 16 Jan 2025 09:10:00 +0200 Subject: [PATCH 9/9] trim the RoaringBitmap instance when it's empty to release memory --- .../apache/pulsar/broker/service/DrainingHashesTracker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 73bf867e14ec3..9bc5c5f1e44ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -165,6 +165,10 @@ public boolean clearHash(int hash) { log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}", dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality()); } + if (empty) { + // reduce memory usage by trimming the bitmap when the RoaringBitmap instance is empty + drainingHashes.trim(); + } return empty; } finally { statsLock.writeLock().unlock();