diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 8f9c0dd060f3a..46762c844db6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -254,16 +254,10 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole } DrainingHashEntry removed = drainingHashes.remove(stickyHash); // update the consumer specific stats - consumerDrainingHashesStatsMap.compute(new ConsumerIdentityWrapper(consumer), - (key, consumerDrainingHashesStats) -> { - if (consumerDrainingHashesStats != null && consumerDrainingHashesStats.clearHash(stickyHash)) { - // remove the consumer specific stats if all hashes are cleared - return null; - } - return consumerDrainingHashesStats; - }); - if (log.isDebugEnabled()) { - log.debug("consumerDrainingHashesStatsMap size: {}", consumerDrainingHashesStatsMap.size()); + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); } if (!closing && removed.isBlocking()) { if (batchLevel > 0) {